Skip to content

Commit

Permalink
Add types to qos_overriding_options.py (#1248)
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Carlstrom <[email protected]>
  • Loading branch information
InvincibleRMC authored Mar 24, 2024
1 parent 7a7f23e commit 7d52cfa
Showing 1 changed file with 28 additions and 17 deletions.
45 changes: 28 additions & 17 deletions rclpy/rclpy/qos_overriding_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from typing import Callable
from typing import Iterable
from typing import List
from typing import Optional
from typing import Text
from typing import Type
Expand All @@ -28,11 +29,16 @@
from rclpy.exceptions import ParameterAlreadyDeclaredException
from rclpy.parameter import Parameter
from rclpy.publisher import Publisher
from rclpy.qos import QoSDurabilityPolicy
from rclpy.qos import QoSHistoryPolicy
from rclpy.qos import QoSLivelinessPolicy
from rclpy.qos import QoSPolicyKind
from rclpy.qos import QoSProfile
from rclpy.qos import QoSReliabilityPolicy
from rclpy.subscription import Subscription

if TYPE_CHECKING:
from typing import TypeAlias
from rclpy.node import Node


Expand All @@ -41,7 +47,7 @@ class InvalidQosOverridesError(Exception):


# Return type of qos validation callbacks
QosCallbackResult = SetParametersResult
QosCallbackResult: 'TypeAlias' = SetParametersResult
# Qos callback type annotation
QosCallbackType = Callable[[QoSProfile], QosCallbackResult]

Expand Down Expand Up @@ -103,7 +109,7 @@ def _declare_qos_parameters(
topic_name: Text,
qos: QoSProfile,
options: QoSOverridingOptions
) -> QoSProfile:
) -> None:
"""
Declare QoS parameters for a Publisher or a Subscription.
Expand Down Expand Up @@ -143,37 +149,42 @@ def _declare_qos_parameters(
f"{description.format('Provided QoS overrides')}, are not valid: {result.reason}")


def _get_allowed_policies(entity_type: Union[Type[Publisher], Type[Subscription]]):
def _get_allowed_policies(entity_type: Union[Type[Publisher],
Type[Subscription]]) -> List[QoSPolicyKind]:
allowed_policies = list(QoSPolicyKind.__members__.values())
if issubclass(entity_type, Subscription):
allowed_policies.remove(QoSPolicyKind.LIFESPAN)
return allowed_policies


QoSProfileAttributes = Union[QoSHistoryPolicy, int, QoSReliabilityPolicy, QoSDurabilityPolicy,
Duration, QoSLivelinessPolicy, bool]


def _get_qos_policy_parameter(qos: QoSProfile, policy: QoSPolicyKind) -> Union[str, int, bool]:
value = getattr(qos, policy.name.lower())
if policy in (
QoSPolicyKind.LIVELINESS, QoSPolicyKind.RELIABILITY,
QoSPolicyKind.HISTORY, QoSPolicyKind.DURABILITY
):
value = value.name.lower()
if value == 'unknown':
value: QoSProfileAttributes = getattr(qos, policy.name.lower())
if isinstance(value, (QoSHistoryPolicy, QoSReliabilityPolicy,
QoSDurabilityPolicy, QoSLivelinessPolicy)):
return_value: Union[str, int, bool] = value.name.lower()
if return_value == 'unknown':
raise ValueError('User provided QoS profile is invalid')
if policy in (
QoSPolicyKind.LIFESPAN, QoSPolicyKind.DEADLINE, QoSPolicyKind.LIVELINESS_LEASE_DURATION
):
value = value.nanoseconds
return value
elif isinstance(value, Duration):
return_value = value.nanoseconds
else:
return_value = value
return return_value


def _override_qos_policy_with_param(qos: QoSProfile, policy: QoSPolicyKind, param: Parameter):
def _override_qos_policy_with_param(qos: QoSProfile,
policy: QoSPolicyKind,
param: Parameter) -> None:
value = param.value
policy_name = policy.name.lower()
if policy in (
QoSPolicyKind.LIVELINESS, QoSPolicyKind.RELIABILITY,
QoSPolicyKind.HISTORY, QoSPolicyKind.DURABILITY
):
def capitalize_first_letter(x):
def capitalize_first_letter(x: str) -> str:
return x[0].upper() + x[1:]
# e.g. `policy=QosPolicyKind.LIVELINESS` -> `policy_enum_class=rclpy.qos.LivelinessPolicy`
policy_enum_class = getattr(
Expand Down

0 comments on commit 7d52cfa

Please sign in to comment.