Skip to content

Commit ff56a94

Browse files
authored
Qos configurability (#635)
Signed-off-by: Ivan Santiago Paunovic <[email protected]>
1 parent 25c2d02 commit ff56a94

File tree

4 files changed

+379
-10
lines changed

4 files changed

+379
-10
lines changed

rclpy/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ if(BUILD_TESTING)
233233
test/test_publisher.py
234234
test/test_qos.py
235235
test/test_qos_event.py
236+
test/test_qos_overriding_options.py
236237
test/test_rate.py
237238
test/test_serialization.py
238239
test/test_subscription.py

rclpy/rclpy/node.py

Lines changed: 50 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from typing import Iterator
2121
from typing import List
2222
from typing import Optional
23+
from typing import Sequence
2324
from typing import Tuple
2425
from typing import TypeVar
2526
from typing import Union
@@ -43,6 +44,7 @@
4344
from rclpy.constants import S_TO_NS
4445
from rclpy.context import Context
4546
from rclpy.exceptions import InvalidParameterValueException
47+
from rclpy.exceptions import InvalidTopicNameException
4648
from rclpy.exceptions import NotInitializedException
4749
from rclpy.exceptions import ParameterAlreadyDeclaredException
4850
from rclpy.exceptions import ParameterImmutableException
@@ -62,6 +64,8 @@
6264
from rclpy.qos import QoSProfile
6365
from rclpy.qos_event import PublisherEventCallbacks
6466
from rclpy.qos_event import SubscriptionEventCallbacks
67+
from rclpy.qos_overriding_options import _declare_qos_parameters
68+
from rclpy.qos_overriding_options import QoSOverridingOptions
6569
from rclpy.service import Service
6670
from rclpy.subscription import Subscription
6771
from rclpy.time_source import TimeSource
@@ -530,7 +534,10 @@ def get_parameter_or(
530534

531535
return self._parameters.get(name, alternative_value)
532536

533-
def get_parameters_by_prefix(self, prefix: str) -> List[Parameter]:
537+
def get_parameters_by_prefix(self, prefix: str) -> Dict[str, Optional[Union[
538+
bool, int, float, str, bytes,
539+
Sequence[bool], Sequence[int], Sequence[float], Sequence[str]
540+
]]]:
534541
"""
535542
Get parameters that have a given prefix in their names as a dictionary.
536543
@@ -547,16 +554,14 @@ def get_parameters_by_prefix(self, prefix: str) -> List[Parameter]:
547554
:param prefix: The prefix of the parameters to get.
548555
:return: Dict of parameters with the given prefix.
549556
"""
550-
parameters_with_prefix = {}
551557
if prefix:
552558
prefix = prefix + PARAMETER_SEPARATOR_STRING
553559
prefix_len = len(prefix)
554-
for parameter_name in self._parameters:
555-
if parameter_name.startswith(prefix):
556-
parameters_with_prefix.update(
557-
{parameter_name[prefix_len:]: self._parameters.get(parameter_name)})
558-
559-
return parameters_with_prefix
560+
return {
561+
param_name[prefix_len:]: param_value
562+
for param_name, param_value in self._parameters.items()
563+
if param_name.startswith(prefix)
564+
}
560565

561566
def set_parameters(self, parameter_list: List[Parameter]) -> List[SetParametersResult]:
562567
"""
@@ -1125,6 +1130,7 @@ def create_publisher(
11251130
*,
11261131
callback_group: Optional[CallbackGroup] = None,
11271132
event_callbacks: Optional[PublisherEventCallbacks] = None,
1133+
qos_overriding_options: Optional[QoSOverridingOptions] = None,
11281134
) -> Publisher:
11291135
"""
11301136
Create a new publisher.
@@ -1144,9 +1150,26 @@ def create_publisher(
11441150

11451151
callback_group = callback_group or self.default_callback_group
11461152

1153+
failed = False
1154+
try:
1155+
final_topic = self.resolve_topic_name(topic)
1156+
except RuntimeError:
1157+
# if it's name validation error, raise a more appropriate exception.
1158+
try:
1159+
self._validate_topic_or_service_name(topic)
1160+
except InvalidTopicNameException as ex:
1161+
raise ex from None
1162+
# else reraise the previous exception
1163+
raise
1164+
1165+
if qos_overriding_options is None:
1166+
qos_overriding_options = QoSOverridingOptions([])
1167+
_declare_qos_parameters(
1168+
Publisher, self, final_topic, qos_profile, qos_overriding_options)
1169+
11471170
# this line imports the typesupport for the message module if not already done
1148-
check_for_type_support(msg_type)
11491171
failed = False
1172+
check_for_type_support(msg_type)
11501173
try:
11511174
with self.handle as node_capsule:
11521175
publisher_capsule = _rclpy.rclpy_create_publisher(
@@ -1182,6 +1205,7 @@ def create_subscription(
11821205
*,
11831206
callback_group: Optional[CallbackGroup] = None,
11841207
event_callbacks: Optional[SubscriptionEventCallbacks] = None,
1208+
qos_overriding_options: Optional[QoSOverridingOptions] = None,
11851209
raw: bool = False
11861210
) -> Subscription:
11871211
"""
@@ -1205,9 +1229,25 @@ def create_subscription(
12051229

12061230
callback_group = callback_group or self.default_callback_group
12071231

1232+
try:
1233+
final_topic = self.resolve_topic_name(topic)
1234+
except RuntimeError:
1235+
# if it's name validation error, raise a more appropriate exception.
1236+
try:
1237+
self._validate_topic_or_service_name(topic)
1238+
except InvalidTopicNameException as ex:
1239+
raise ex from None
1240+
# else reraise the previous exception
1241+
raise
1242+
1243+
if qos_overriding_options is None:
1244+
qos_overriding_options = QoSOverridingOptions([])
1245+
_declare_qos_parameters(
1246+
Subscription, self, final_topic, qos_profile, qos_overriding_options)
1247+
12081248
# this line imports the typesupport for the message module if not already done
1209-
check_for_type_support(msg_type)
12101249
failed = False
1250+
check_for_type_support(msg_type)
12111251
try:
12121252
with self.handle as capsule:
12131253
subscription_capsule = _rclpy.rclpy_create_subscription(
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
# Copyright 2020 Open Source Robotics Foundation, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from typing import Callable
16+
from typing import Iterable
17+
from typing import Optional
18+
from typing import Text
19+
from typing import Type
20+
from typing import TYPE_CHECKING
21+
from typing import Union
22+
23+
from rcl_interfaces.msg import ParameterDescriptor
24+
from rcl_interfaces.msg import SetParametersResult
25+
26+
import rclpy
27+
from rclpy.duration import Duration
28+
from rclpy.exceptions import ParameterAlreadyDeclaredException
29+
from rclpy.parameter import Parameter
30+
from rclpy.publisher import Publisher
31+
from rclpy.qos import QoSPolicyKind
32+
from rclpy.qos import QoSProfile
33+
from rclpy.subscription import Subscription
34+
35+
if TYPE_CHECKING:
36+
from rclpy.node import Node
37+
38+
39+
class InvalidQosOverridesError(Exception):
40+
pass
41+
42+
43+
# Return type of qos validation callbacks
44+
QosCallbackResult = SetParametersResult
45+
# Qos callback type annotation
46+
QosCallbackType = Callable[[QoSProfile], QosCallbackResult]
47+
48+
49+
class QoSOverridingOptions:
50+
"""Options to customize QoS parameter overrides."""
51+
52+
def __init__(
53+
self,
54+
policy_kinds: Iterable[QoSPolicyKind],
55+
*,
56+
callback: Optional[QosCallbackType] = None,
57+
entity_id: Optional[Text] = None
58+
):
59+
"""
60+
Construct a QoSOverridingOptions object.
61+
62+
:param policy_kinds: QoS kinds that will have a declared parameter.
63+
:param callback: Callback that will be used to validate the QoS profile
64+
after the paramter overrides get applied.
65+
:param entity_id: Optional identifier, to disambiguate in the case that different QoS
66+
policies for the same topic are desired.
67+
"""
68+
self._policy_kinds = policy_kinds
69+
self._callback = callback
70+
self._entity_id = entity_id
71+
72+
@property
73+
def policy_kinds(self) -> Iterable[QoSPolicyKind]:
74+
"""Get QoS policy kinds that will have a parameter override."""
75+
return self._policy_kinds
76+
77+
@property
78+
def callback(self) -> Optional[QosCallbackType]:
79+
"""Get the validation callback."""
80+
return self._callback
81+
82+
@property
83+
def entity_id(self) -> Optional[Text]:
84+
"""Get the optional entity ID."""
85+
return self._entity_id
86+
87+
@classmethod
88+
def with_default_policies(
89+
cls, *,
90+
callback: Optional[QosCallbackType] = None,
91+
entity_id: Optional[Text] = None
92+
) -> 'QoSOverridingOptions':
93+
return cls(
94+
policy_kinds=(QoSPolicyKind.HISTORY, QoSPolicyKind.DEPTH, QoSPolicyKind.RELIABILITY),
95+
callback=callback,
96+
entity_id=entity_id,
97+
)
98+
99+
100+
def _declare_qos_parameters(
101+
entity_type: Union[Type[Publisher], Type[Subscription]],
102+
node: 'Node',
103+
topic_name: Text,
104+
qos: QoSProfile,
105+
options: QoSOverridingOptions
106+
) -> QoSProfile:
107+
"""
108+
Declare QoS parameters for a Publisher or a Subscription.
109+
110+
:param entity_type: Either `rclpy.node.Publisher` or `rclpy.node.Subscription`.
111+
:param node: Node used to declare the parameters.
112+
:param topic_name: Topic name of the entity being created.
113+
:param qos: Default QoS settings of the entity being created, that will be overridden
114+
with the user provided QoS parameter overrides.
115+
:param options: Options that indicates which parameters are going to be declared.
116+
"""
117+
if not issubclass(entity_type, (Publisher, Subscription)):
118+
raise TypeError('Argument `entity_type` should be a subclass of Publisher or Subscription')
119+
entity_type_str = 'publisher' if issubclass(entity_type, Publisher) else 'subscription'
120+
id_suffix = '' if options.entity_id is None else f'_{options.entity_id}'
121+
name = f'qos_overrides.{topic_name}.{entity_type_str}{id_suffix}.' '{}'
122+
description = '{}' f' for {entity_type_str} `{topic_name}` with id `{options.entity_id}`'
123+
allowed_policies = _get_allowed_policies(entity_type)
124+
for policy in options.policy_kinds:
125+
if policy not in allowed_policies:
126+
continue
127+
policy_name = policy.name.lower()
128+
descriptor = ParameterDescriptor()
129+
descriptor.description = description.format(policy_name)
130+
descriptor.read_only = True
131+
try:
132+
param = node.declare_parameter(
133+
name.format(policy_name),
134+
_get_qos_policy_parameter(qos, policy),
135+
descriptor)
136+
except ParameterAlreadyDeclaredException:
137+
param = node.get_parameter(name.format(policy_name))
138+
_override_qos_policy_with_param(qos, policy, param)
139+
if options.callback is not None:
140+
result = options.callback(qos)
141+
if not result.successful:
142+
raise InvalidQosOverridesError(
143+
f"{description.format('Provided QoS overrides')}, are not valid: {result.reason}")
144+
145+
146+
def _get_allowed_policies(entity_type: Union[Type[Publisher], Type[Subscription]]):
147+
allowed_policies = list(QoSPolicyKind.__members__.values())
148+
if issubclass(entity_type, Subscription):
149+
allowed_policies.remove(QoSPolicyKind.LIFESPAN)
150+
return allowed_policies
151+
152+
153+
def _get_qos_policy_parameter(qos: QoSProfile, policy: QoSPolicyKind) -> Union[str, int, bool]:
154+
value = getattr(qos, policy.name.lower())
155+
if policy in (
156+
QoSPolicyKind.LIVELINESS, QoSPolicyKind.RELIABILITY,
157+
QoSPolicyKind.HISTORY, QoSPolicyKind.DURABILITY
158+
):
159+
value = value.name.lower()
160+
if value == 'unknown':
161+
raise ValueError('User provided QoS profile is invalid')
162+
if policy in (
163+
QoSPolicyKind.LIFESPAN, QoSPolicyKind.DEADLINE, QoSPolicyKind.LIVELINESS_LEASE_DURATION
164+
):
165+
value = value.nanoseconds()
166+
return value
167+
168+
169+
def _override_qos_policy_with_param(qos: QoSProfile, policy: QoSPolicyKind, param: Parameter):
170+
value = param.value
171+
policy_name = policy.name.lower()
172+
if policy in (
173+
QoSPolicyKind.LIVELINESS, QoSPolicyKind.RELIABILITY,
174+
QoSPolicyKind.HISTORY, QoSPolicyKind.DURABILITY
175+
):
176+
def capitalize_first_letter(x):
177+
return x[0].upper() + x[1:]
178+
# e.g. `policy=QosPolicyKind.LIVELINESS` -> `policy_enum_class=rclpy.qos.LivelinessPolicy`
179+
policy_enum_class = getattr(
180+
rclpy.qos, f'{capitalize_first_letter(policy_name)}Policy')
181+
try:
182+
value = policy_enum_class[value.upper()]
183+
except KeyError:
184+
raise RuntimeError(
185+
f'Unexpected QoS override for policy `{policy.name.lower()}`: `{value}`')
186+
if policy in (
187+
QoSPolicyKind.LIFESPAN, QoSPolicyKind.DEADLINE, QoSPolicyKind.LIVELINESS_LEASE_DURATION
188+
):
189+
value = Duration(nanoseconds=value)
190+
setattr(qos, policy.name.lower(), value)

0 commit comments

Comments
 (0)