Skip to content
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
fdad6e3
Add QoSOverridingOptions
ivanpauno Oct 28, 2020
3300d10
Fix Node.get_parameters_by_prefix() type annotation
ivanpauno Oct 28, 2020
67ac124
Add helper classmethod for using default policies
ivanpauno Oct 28, 2020
80781c9
Add test cases
ivanpauno Oct 28, 2020
5fbc30c
Please linters + improve test setup and teardown
ivanpauno Oct 28, 2020
5e6235b
Declare qos overrides parameters in create_publisher/create_subscript…
ivanpauno Oct 28, 2020
b18d0ba
qos -> QoS
ivanpauno Oct 30, 2020
ef1e7bb
qos -> QoS
ivanpauno Oct 30, 2020
fa434f9
qos -> QoS
ivanpauno Oct 30, 2020
2e5297f
qos -> QoS
ivanpauno Oct 30, 2020
9b06ffa
qos -> QoS
ivanpauno Oct 30, 2020
b34a04c
style
ivanpauno Oct 30, 2020
51bddf4
qos -> QoS
ivanpauno Oct 30, 2020
fad4467
qos -> QoS
ivanpauno Oct 30, 2020
85a8cb7
grammar
ivanpauno Oct 30, 2020
589d2a2
qos -> QoS
ivanpauno Oct 30, 2020
0c5e7ce
bug
ivanpauno Oct 30, 2020
17c834a
qos -> QoS
ivanpauno Oct 30, 2020
50700ac
Fix _get_qos_policy_parameter type annotation
ivanpauno Oct 30, 2020
523dd59
qos -> QoS
ivanpauno Oct 30, 2020
812d44f
_declare_qos_parameteres -> _declare_qos_parameters
ivanpauno Oct 30, 2020
c4f109b
Fix after rebasing
ivanpauno Oct 30, 2020
751a601
Handle exception gracefully
ivanpauno Nov 2, 2020
59951a3
Suppress exception context
ivanpauno Nov 3, 2020
9fce94b
flake8
ivanpauno Nov 3, 2020
f8ba331
Add test to CMakeLists
ivanpauno Nov 5, 2020
0446d24
Use declare_or_get logic
ivanpauno Nov 5, 2020
a9fc10f
Improve testing
ivanpauno Nov 5, 2020
6b6f2c2
Add error message in validation callback result
ivanpauno Nov 9, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 40 additions & 10 deletions rclpy/rclpy/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from typing import Iterator
from typing import List
from typing import Optional
from typing import Sequence
from typing import Tuple
from typing import TypeVar
from typing import Union
Expand Down Expand Up @@ -62,6 +63,8 @@
from rclpy.qos import QoSProfile
from rclpy.qos_event import PublisherEventCallbacks
from rclpy.qos_event import SubscriptionEventCallbacks
from rclpy.qos_overriding_options import _declare_qos_parameters
from rclpy.qos_overriding_options import QoSOverridingOptions
from rclpy.service import Service
from rclpy.subscription import Subscription
from rclpy.time_source import TimeSource
Expand Down Expand Up @@ -530,7 +533,10 @@ def get_parameter_or(

return self._parameters.get(name, alternative_value)

def get_parameters_by_prefix(self, prefix: str) -> List[Parameter]:
def get_parameters_by_prefix(self, prefix: str) -> Dict[str, Optional[Union[
bool, int, float, str, bytes,
Sequence[bool], Sequence[int], Sequence[float], Sequence[str]
]]]:
"""
Get parameters that have a given prefix in their names as a dictionary.

Expand All @@ -547,16 +553,14 @@ def get_parameters_by_prefix(self, prefix: str) -> List[Parameter]:
:param prefix: The prefix of the parameters to get.
:return: Dict of parameters with the given prefix.
"""
parameters_with_prefix = {}
if prefix:
prefix = prefix + PARAMETER_SEPARATOR_STRING
prefix_len = len(prefix)
for parameter_name in self._parameters:
if parameter_name.startswith(prefix):
parameters_with_prefix.update(
{parameter_name[prefix_len:]: self._parameters.get(parameter_name)})

return parameters_with_prefix
return {
param_name[prefix_len:]: param_value
for param_name, param_value in self._parameters.items()
if param_name.startswith(prefix)
}

def set_parameters(self, parameter_list: List[Parameter]) -> List[SetParametersResult]:
"""
Expand Down Expand Up @@ -1125,6 +1129,7 @@ def create_publisher(
*,
callback_group: Optional[CallbackGroup] = None,
event_callbacks: Optional[PublisherEventCallbacks] = None,
qos_overriding_options: Optional[QoSOverridingOptions] = None,
) -> Publisher:
"""
Create a new publisher.
Expand All @@ -1144,9 +1149,21 @@ def create_publisher(

callback_group = callback_group or self.default_callback_group

failed = False
try:
final_topic = self.resolve_topic_name(topic)
except RuntimeError:
failed = True
if failed:
self._validate_topic_or_service_name(topic)

if qos_overriding_options is None:
qos_overriding_options = QoSOverridingOptions([])
_declare_qos_parameters(
Publisher, self, final_topic, qos_profile, qos_overriding_options)

# this line imports the typesupport for the message module if not already done
check_for_type_support(msg_type)
failed = False
try:
with self.handle as node_capsule:
publisher_capsule = _rclpy.rclpy_create_publisher(
Expand Down Expand Up @@ -1182,6 +1199,7 @@ def create_subscription(
*,
callback_group: Optional[CallbackGroup] = None,
event_callbacks: Optional[SubscriptionEventCallbacks] = None,
qos_overriding_options: Optional[QoSOverridingOptions] = None,
raw: bool = False
) -> Subscription:
"""
Expand All @@ -1205,9 +1223,21 @@ def create_subscription(

callback_group = callback_group or self.default_callback_group

failed = False
try:
final_topic = self.resolve_topic_name(topic)
except RuntimeError:
failed = True
if failed:
self._validate_topic_or_service_name(topic)

if qos_overriding_options is None:
qos_overriding_options = QoSOverridingOptions([])
_declare_qos_parameters(
Subscription, self, final_topic, qos_profile, qos_overriding_options)

# this line imports the typesupport for the message module if not already done
check_for_type_support(msg_type)
failed = False
try:
with self.handle as capsule:
subscription_capsule = _rclpy.rclpy_create_subscription(
Expand Down
177 changes: 177 additions & 0 deletions rclpy/rclpy/qos_overriding_options.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
# Copyright 2020 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Callable
from typing import Iterable
from typing import Optional
from typing import Text
from typing import Type
from typing import TYPE_CHECKING
from typing import Union

from rcl_interfaces.msg import ParameterDescriptor

import rclpy
from rclpy.duration import Duration
from rclpy.parameter import Parameter
from rclpy.publisher import Publisher
from rclpy.qos import QoSPolicyKind
from rclpy.qos import QoSProfile
from rclpy.subscription import Subscription

if TYPE_CHECKING:
from rclpy.node import Node


class InvalidQosOverridesError(Exception):
pass


class QoSOverridingOptions:
"""Options to customize QoS parameter overrides."""

def __init__(
self,
policy_kinds: Iterable[QoSPolicyKind],
*,
callback: Optional[Callable[[QoSProfile], bool]] = None,
entity_id: Optional[Text] = None
):
"""
Construct a QoSOverridingOptions object.

:param policy_kinds: QoS kinds that will have a declared parameter.
:param callback: Callback that will be used to validate the QoS profile
after the paramter overrides get applied.
:param entity_id: Optional identifier, to disambiguate in the case that different QoS
policies for the same topic are desired.
"""
self._policy_kinds = policy_kinds
self._callback = callback
self._entity_id = entity_id

@property
def policy_kinds(self) -> Iterable[QoSPolicyKind]:
"""Get QoS policy kinds that will have a parameter override."""
return self._policy_kinds

@property
def callback(self) -> Optional[Callable[[QoSProfile], bool]]:
"""Get the validation callback."""
return self._callback

@property
def entity_id(self) -> Optional[Text]:
"""Get the optional entity ID."""
return self._entity_id

@classmethod
def with_default_policies(
cls, *,
callback: Optional[Callable[[QoSProfile], bool]] = None,
entity_id: Optional[Text] = None
) -> 'QoSOverridingOptions':
return cls(
policy_kinds=(QoSPolicyKind.HISTORY, QoSPolicyKind.DEPTH, QoSPolicyKind.RELIABILITY),
callback=callback,
entity_id=entity_id,
)


def _declare_qos_parameters(
entity_type: Union[Type[Publisher], Type[Subscription]],
node: 'Node',
topic_name: Text,
qos: QoSProfile,
options: QoSOverridingOptions
) -> QoSProfile:
"""
Declare QoS parameters for a Publisher or a Subscription.

:param entity_type: Either `rclpy.node.Publisher` or `rclpy.node.Subscription`.
:param node: Node used to declare the parameters.
:param topic_name: Topic name of the entity being created.
:param qos: Default QoS settings of the entity being created, that will be overridden
with the user provided QoS parameter overrides.
:param options: Options that indicates which parameters are going to be declared.
"""
if not issubclass(entity_type, (Publisher, Subscription)):
raise TypeError('Argument `entity_type` should be a subclass of Publisher or Subscription')
entity_type_str = 'publisher' if issubclass(entity_type, Publisher) else 'subscription'
id_suffix = '' if options.entity_id is None else f'_{options.entity_id}'
name = f'qos_overrides.{topic_name}.{entity_type_str}{id_suffix}.' '{}'
description = '{}' f' for {entity_type_str} `{topic_name}` with id `{options.entity_id}`'
allowed_policies = _get_allowed_policies(entity_type)
for policy in options.policy_kinds:
if policy not in allowed_policies:
continue
policy_name = policy.name.lower()
descriptor = ParameterDescriptor()
descriptor.description = description.format(policy_name)
descriptor.read_only = True
param = node.declare_parameter(
name.format(policy_name),
_get_qos_policy_parameter(qos, policy),
descriptor)
_override_qos_policy_with_param(qos, policy, param)
if options.callback is not None and not options.callback(qos):
raise InvalidQosOverridesError(
description.format('Provided QoS overrides') + ', are not valid')


def _get_allowed_policies(entity_type: Union[Type[Publisher], Type[Subscription]]):
allowed_policies = list(QoSPolicyKind.__members__.values())
if issubclass(entity_type, Subscription):
allowed_policies.remove(QoSPolicyKind.LIFESPAN)
return allowed_policies


def _get_qos_policy_parameter(qos: QoSProfile, policy: QoSPolicyKind) -> Union[str, int, bool]:
value = getattr(qos, policy.name.lower())
if policy in (
QoSPolicyKind.LIVELINESS, QoSPolicyKind.RELIABILITY,
QoSPolicyKind.HISTORY, QoSPolicyKind.DURABILITY
):
value = value.name.lower()
if value == 'unknown':
raise ValueError('User provided QoS profile is invalid')
if policy in (
QoSPolicyKind.LIFESPAN, QoSPolicyKind.DEADLINE, QoSPolicyKind.LIVELINESS_LEASE_DURATION
):
value = value.nanoseconds()
return value


def _override_qos_policy_with_param(qos: QoSProfile, policy: QoSPolicyKind, param: Parameter):
value = param.value
policy_name = policy.name.lower()
if policy in (
QoSPolicyKind.LIVELINESS, QoSPolicyKind.RELIABILITY,
QoSPolicyKind.HISTORY, QoSPolicyKind.DURABILITY
):
def capitalize_first_letter(x):
return x[0].upper() + x[1:]
# e.g. `policy=QosPolicyKind.LIVELINESS` -> `policy_enum_class=rclpy.qos.LivelinessPolicy`
policy_enum_class = getattr(
rclpy.qos, f'{capitalize_first_letter(policy_name)}Policy')
try:
value = policy_enum_class[value.upper()]
except KeyError:
raise RuntimeError(
f'Unexpected QoS override for policy `{policy.name.lower()}`: `{value}`')
if policy in (
QoSPolicyKind.LIFESPAN, QoSPolicyKind.DEADLINE, QoSPolicyKind.LIVELINESS_LEASE_DURATION
):
value = Duration(nanoseconds=value)
setattr(qos, policy.name.lower(), value)
Loading