diff --git a/rclpy/CMakeLists.txt b/rclpy/CMakeLists.txt index 1c7d396b7..c9339adb7 100644 --- a/rclpy/CMakeLists.txt +++ b/rclpy/CMakeLists.txt @@ -233,6 +233,7 @@ if(BUILD_TESTING) test/test_publisher.py test/test_qos.py test/test_qos_event.py + test/test_qos_overriding_options.py test/test_rate.py test/test_serialization.py test/test_subscription.py diff --git a/rclpy/rclpy/node.py b/rclpy/rclpy/node.py index 2c28053d0..9535d9e71 100644 --- a/rclpy/rclpy/node.py +++ b/rclpy/rclpy/node.py @@ -20,6 +20,7 @@ from typing import Iterator from typing import List from typing import Optional +from typing import Sequence from typing import Tuple from typing import TypeVar from typing import Union @@ -43,6 +44,7 @@ from rclpy.constants import S_TO_NS from rclpy.context import Context from rclpy.exceptions import InvalidParameterValueException +from rclpy.exceptions import InvalidTopicNameException from rclpy.exceptions import NotInitializedException from rclpy.exceptions import ParameterAlreadyDeclaredException from rclpy.exceptions import ParameterImmutableException @@ -62,6 +64,8 @@ from rclpy.qos import QoSProfile from rclpy.qos_event import PublisherEventCallbacks from rclpy.qos_event import SubscriptionEventCallbacks +from rclpy.qos_overriding_options import _declare_qos_parameters +from rclpy.qos_overriding_options import QoSOverridingOptions from rclpy.service import Service from rclpy.subscription import Subscription from rclpy.time_source import TimeSource @@ -530,7 +534,10 @@ def get_parameter_or( return self._parameters.get(name, alternative_value) - def get_parameters_by_prefix(self, prefix: str) -> List[Parameter]: + def get_parameters_by_prefix(self, prefix: str) -> Dict[str, Optional[Union[ + bool, int, float, str, bytes, + Sequence[bool], Sequence[int], Sequence[float], Sequence[str] + ]]]: """ Get parameters that have a given prefix in their names as a dictionary. @@ -547,16 +554,14 @@ def get_parameters_by_prefix(self, prefix: str) -> List[Parameter]: :param prefix: The prefix of the parameters to get. :return: Dict of parameters with the given prefix. """ - parameters_with_prefix = {} if prefix: prefix = prefix + PARAMETER_SEPARATOR_STRING prefix_len = len(prefix) - for parameter_name in self._parameters: - if parameter_name.startswith(prefix): - parameters_with_prefix.update( - {parameter_name[prefix_len:]: self._parameters.get(parameter_name)}) - - return parameters_with_prefix + return { + param_name[prefix_len:]: param_value + for param_name, param_value in self._parameters.items() + if param_name.startswith(prefix) + } def set_parameters(self, parameter_list: List[Parameter]) -> List[SetParametersResult]: """ @@ -1125,6 +1130,7 @@ def create_publisher( *, callback_group: Optional[CallbackGroup] = None, event_callbacks: Optional[PublisherEventCallbacks] = None, + qos_overriding_options: Optional[QoSOverridingOptions] = None, ) -> Publisher: """ Create a new publisher. @@ -1144,9 +1150,26 @@ def create_publisher( callback_group = callback_group or self.default_callback_group + failed = False + try: + final_topic = self.resolve_topic_name(topic) + except RuntimeError: + # if it's name validation error, raise a more appropriate exception. + try: + self._validate_topic_or_service_name(topic) + except InvalidTopicNameException as ex: + raise ex from None + # else reraise the previous exception + raise + + if qos_overriding_options is None: + qos_overriding_options = QoSOverridingOptions([]) + _declare_qos_parameters( + Publisher, self, final_topic, qos_profile, qos_overriding_options) + # this line imports the typesupport for the message module if not already done - check_for_type_support(msg_type) failed = False + check_for_type_support(msg_type) try: with self.handle as node_capsule: publisher_capsule = _rclpy.rclpy_create_publisher( @@ -1182,6 +1205,7 @@ def create_subscription( *, callback_group: Optional[CallbackGroup] = None, event_callbacks: Optional[SubscriptionEventCallbacks] = None, + qos_overriding_options: Optional[QoSOverridingOptions] = None, raw: bool = False ) -> Subscription: """ @@ -1205,9 +1229,25 @@ def create_subscription( callback_group = callback_group or self.default_callback_group + try: + final_topic = self.resolve_topic_name(topic) + except RuntimeError: + # if it's name validation error, raise a more appropriate exception. + try: + self._validate_topic_or_service_name(topic) + except InvalidTopicNameException as ex: + raise ex from None + # else reraise the previous exception + raise + + if qos_overriding_options is None: + qos_overriding_options = QoSOverridingOptions([]) + _declare_qos_parameters( + Subscription, self, final_topic, qos_profile, qos_overriding_options) + # this line imports the typesupport for the message module if not already done - check_for_type_support(msg_type) failed = False + check_for_type_support(msg_type) try: with self.handle as capsule: subscription_capsule = _rclpy.rclpy_create_subscription( diff --git a/rclpy/rclpy/qos_overriding_options.py b/rclpy/rclpy/qos_overriding_options.py new file mode 100644 index 000000000..1db48cfd9 --- /dev/null +++ b/rclpy/rclpy/qos_overriding_options.py @@ -0,0 +1,190 @@ +# Copyright 2020 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Callable +from typing import Iterable +from typing import Optional +from typing import Text +from typing import Type +from typing import TYPE_CHECKING +from typing import Union + +from rcl_interfaces.msg import ParameterDescriptor +from rcl_interfaces.msg import SetParametersResult + +import rclpy +from rclpy.duration import Duration +from rclpy.exceptions import ParameterAlreadyDeclaredException +from rclpy.parameter import Parameter +from rclpy.publisher import Publisher +from rclpy.qos import QoSPolicyKind +from rclpy.qos import QoSProfile +from rclpy.subscription import Subscription + +if TYPE_CHECKING: + from rclpy.node import Node + + +class InvalidQosOverridesError(Exception): + pass + + +# Return type of qos validation callbacks +QosCallbackResult = SetParametersResult +# Qos callback type annotation +QosCallbackType = Callable[[QoSProfile], QosCallbackResult] + + +class QoSOverridingOptions: + """Options to customize QoS parameter overrides.""" + + def __init__( + self, + policy_kinds: Iterable[QoSPolicyKind], + *, + callback: Optional[QosCallbackType] = None, + entity_id: Optional[Text] = None + ): + """ + Construct a QoSOverridingOptions object. + + :param policy_kinds: QoS kinds that will have a declared parameter. + :param callback: Callback that will be used to validate the QoS profile + after the paramter overrides get applied. + :param entity_id: Optional identifier, to disambiguate in the case that different QoS + policies for the same topic are desired. + """ + self._policy_kinds = policy_kinds + self._callback = callback + self._entity_id = entity_id + + @property + def policy_kinds(self) -> Iterable[QoSPolicyKind]: + """Get QoS policy kinds that will have a parameter override.""" + return self._policy_kinds + + @property + def callback(self) -> Optional[QosCallbackType]: + """Get the validation callback.""" + return self._callback + + @property + def entity_id(self) -> Optional[Text]: + """Get the optional entity ID.""" + return self._entity_id + + @classmethod + def with_default_policies( + cls, *, + callback: Optional[QosCallbackType] = None, + entity_id: Optional[Text] = None + ) -> 'QoSOverridingOptions': + return cls( + policy_kinds=(QoSPolicyKind.HISTORY, QoSPolicyKind.DEPTH, QoSPolicyKind.RELIABILITY), + callback=callback, + entity_id=entity_id, + ) + + +def _declare_qos_parameters( + entity_type: Union[Type[Publisher], Type[Subscription]], + node: 'Node', + topic_name: Text, + qos: QoSProfile, + options: QoSOverridingOptions +) -> QoSProfile: + """ + Declare QoS parameters for a Publisher or a Subscription. + + :param entity_type: Either `rclpy.node.Publisher` or `rclpy.node.Subscription`. + :param node: Node used to declare the parameters. + :param topic_name: Topic name of the entity being created. + :param qos: Default QoS settings of the entity being created, that will be overridden + with the user provided QoS parameter overrides. + :param options: Options that indicates which parameters are going to be declared. + """ + if not issubclass(entity_type, (Publisher, Subscription)): + raise TypeError('Argument `entity_type` should be a subclass of Publisher or Subscription') + entity_type_str = 'publisher' if issubclass(entity_type, Publisher) else 'subscription' + id_suffix = '' if options.entity_id is None else f'_{options.entity_id}' + name = f'qos_overrides.{topic_name}.{entity_type_str}{id_suffix}.' '{}' + description = '{}' f' for {entity_type_str} `{topic_name}` with id `{options.entity_id}`' + allowed_policies = _get_allowed_policies(entity_type) + for policy in options.policy_kinds: + if policy not in allowed_policies: + continue + policy_name = policy.name.lower() + descriptor = ParameterDescriptor() + descriptor.description = description.format(policy_name) + descriptor.read_only = True + try: + param = node.declare_parameter( + name.format(policy_name), + _get_qos_policy_parameter(qos, policy), + descriptor) + except ParameterAlreadyDeclaredException: + param = node.get_parameter(name.format(policy_name)) + _override_qos_policy_with_param(qos, policy, param) + if options.callback is not None: + result = options.callback(qos) + if not result.successful: + raise InvalidQosOverridesError( + f"{description.format('Provided QoS overrides')}, are not valid: {result.reason}") + + +def _get_allowed_policies(entity_type: Union[Type[Publisher], Type[Subscription]]): + allowed_policies = list(QoSPolicyKind.__members__.values()) + if issubclass(entity_type, Subscription): + allowed_policies.remove(QoSPolicyKind.LIFESPAN) + return allowed_policies + + +def _get_qos_policy_parameter(qos: QoSProfile, policy: QoSPolicyKind) -> Union[str, int, bool]: + value = getattr(qos, policy.name.lower()) + if policy in ( + QoSPolicyKind.LIVELINESS, QoSPolicyKind.RELIABILITY, + QoSPolicyKind.HISTORY, QoSPolicyKind.DURABILITY + ): + value = value.name.lower() + if value == 'unknown': + raise ValueError('User provided QoS profile is invalid') + if policy in ( + QoSPolicyKind.LIFESPAN, QoSPolicyKind.DEADLINE, QoSPolicyKind.LIVELINESS_LEASE_DURATION + ): + value = value.nanoseconds() + return value + + +def _override_qos_policy_with_param(qos: QoSProfile, policy: QoSPolicyKind, param: Parameter): + value = param.value + policy_name = policy.name.lower() + if policy in ( + QoSPolicyKind.LIVELINESS, QoSPolicyKind.RELIABILITY, + QoSPolicyKind.HISTORY, QoSPolicyKind.DURABILITY + ): + def capitalize_first_letter(x): + return x[0].upper() + x[1:] + # e.g. `policy=QosPolicyKind.LIVELINESS` -> `policy_enum_class=rclpy.qos.LivelinessPolicy` + policy_enum_class = getattr( + rclpy.qos, f'{capitalize_first_letter(policy_name)}Policy') + try: + value = policy_enum_class[value.upper()] + except KeyError: + raise RuntimeError( + f'Unexpected QoS override for policy `{policy.name.lower()}`: `{value}`') + if policy in ( + QoSPolicyKind.LIFESPAN, QoSPolicyKind.DEADLINE, QoSPolicyKind.LIVELINESS_LEASE_DURATION + ): + value = Duration(nanoseconds=value) + setattr(qos, policy.name.lower(), value) diff --git a/rclpy/test/test_qos_overriding_options.py b/rclpy/test/test_qos_overriding_options.py new file mode 100644 index 000000000..933dc6be7 --- /dev/null +++ b/rclpy/test/test_qos_overriding_options.py @@ -0,0 +1,138 @@ +# Copyright 2020 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +import rclpy +from rclpy.node import Node +from rclpy.parameter import Parameter +from rclpy.publisher import Publisher +from rclpy.qos import QoSProfile +from rclpy.qos_overriding_options import _declare_qos_parameters +from rclpy.qos_overriding_options import InvalidQosOverridesError +from rclpy.qos_overriding_options import QosCallbackResult +from rclpy.qos_overriding_options import QoSOverridingOptions + + +@pytest.fixture(autouse=True) +def init_shutdown(): + rclpy.init() + yield + rclpy.shutdown() + + +def test_declare_qos_parameters(): + node = Node('my_node') + _declare_qos_parameters( + Publisher, node, '/my_topic', QoSProfile(depth=10), + QoSOverridingOptions.with_default_policies() + ) + qos_overrides = node.get_parameters_by_prefix('qos_overrides') + assert len(qos_overrides) == 3 + expected_params = ( + ('/my_topic.publisher.depth', 10), + ('/my_topic.publisher.history', 'keep_last'), + ('/my_topic.publisher.reliability', 'reliable'), + ) + for actual, expected in zip( + sorted(qos_overrides.items(), key=lambda x: x[0]), expected_params + ): + assert actual[0] == expected[0] # same param name + assert actual[1].value == expected[1] # same param value + + +def test_declare_qos_parameters_with_overrides(): + node = Node('my_node', parameter_overrides=[ + Parameter('qos_overrides./my_topic.publisher.depth', value=100), + Parameter('qos_overrides./my_topic.publisher.reliability', value='best_effort'), + ]) + for i in range(2): # try twice, the second time the parameters will be get and not declared + _declare_qos_parameters( + Publisher, node, '/my_topic', QoSProfile(depth=10), + QoSOverridingOptions.with_default_policies() + ) + qos_overrides = node.get_parameters_by_prefix('qos_overrides') + assert len(qos_overrides) == 3 + expected_params = ( + ('/my_topic.publisher.depth', 100), + ('/my_topic.publisher.history', 'keep_last'), + ('/my_topic.publisher.reliability', 'best_effort'), + ) + for actual, expected in zip( + sorted(qos_overrides.items(), key=lambda x: x[0]), expected_params + ): + assert actual[0] == expected[0] # same param name + assert actual[1].value == expected[1] # same param value + + +def test_declare_qos_parameters_with_happy_callback(): + def qos_validation_callback(qos): + result = QosCallbackResult() + result.successful = True + return result + + node = Node('my_node') + _declare_qos_parameters( + Publisher, node, '/my_topic', QoSProfile(depth=10), + QoSOverridingOptions.with_default_policies(callback=qos_validation_callback) + ) + qos_overrides = node.get_parameters_by_prefix('qos_overrides') + assert len(qos_overrides) == 3 + expected_params = ( + ('/my_topic.publisher.depth', 10), + ('/my_topic.publisher.history', 'keep_last'), + ('/my_topic.publisher.reliability', 'reliable'), + ) + for actual, expected in zip( + sorted(qos_overrides.items(), key=lambda x: x[0]), expected_params + ): + assert actual[0] == expected[0] # same param name + assert actual[1].value == expected[1] # same param value + + +def test_declare_qos_parameters_with_unhappy_callback(): + def qos_validation_callback(qos): + result = QosCallbackResult() + result.successful = False + result.reason = 'my_custom_error_message' + return result + + node = Node('my_node') + + with pytest.raises(InvalidQosOverridesError) as err: + _declare_qos_parameters( + Publisher, node, '/my_topic', QoSProfile(depth=10), + QoSOverridingOptions.with_default_policies(callback=qos_validation_callback) + ) + assert 'my_custom_error_message' in str(err.value) + + +def test_declare_qos_parameters_with_id(): + node = Node('my_node') + _declare_qos_parameters( + Publisher, node, '/my_topic', QoSProfile(depth=10), + QoSOverridingOptions.with_default_policies(entity_id='i_have_an_id') + ) + qos_overrides = node.get_parameters_by_prefix('qos_overrides') + assert len(qos_overrides) == 3 + expected_params = ( + ('/my_topic.publisher_i_have_an_id.depth', 10), + ('/my_topic.publisher_i_have_an_id.history', 'keep_last'), + ('/my_topic.publisher_i_have_an_id.reliability', 'reliable'), + ) + for actual, expected in zip( + sorted(qos_overrides.items(), key=lambda x: x[0]), expected_params + ): + assert actual[0] == expected[0] # same param name + assert actual[1].value == expected[1] # same param value