Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 26 additions & 25 deletions rclpy/rclpy/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
from typing import Any
from typing import Callable
from typing import Dict
from typing import Final
from typing import Iterator
from typing import List
from typing import Optional
from typing import overload
from typing import Sequence
from typing import Tuple
from typing import Type
from typing import TypeVar
Expand Down Expand Up @@ -100,9 +102,10 @@
from rclpy.validate_topic_name import validate_topic_name
from rclpy.waitable import Waitable
from typing_extensions import deprecated
from typing_extensions import TypeAlias


HIDDEN_NODE_PREFIX = '_'
HIDDEN_NODE_PREFIX: Final = '_'

# Left to support Legacy TypeVar.
MsgType = TypeVar('MsgType')
Expand All @@ -113,7 +116,7 @@

# Re-export exception defined in _rclpy C extension.
# `Node.get_*_names_and_types_by_node` methods may raise this error.
NodeNameNonExistentError = _rclpy.NodeNameNonExistentError
NodeNameNonExistentError: TypeAlias = _rclpy.NodeNameNonExistentError


class Node:
Expand Down Expand Up @@ -439,49 +442,47 @@ def declare_parameter(
args = (name, value, descriptor)
return self.declare_parameters('', [args], ignore_override)[0]

ParameterInput: TypeAlias = Union[AllowableParameterValue, Parameter.Type, ParameterValue]

@overload
@deprecated('when declaring a parameter only providing its name is deprecated. '
'You have to either:\n'
'\t- Pass a name and a default value different to "PARAMETER NOT SET"'
' (and optionally a descriptor).\n'
'\t- Pass a name and a parameter type.\n'
'\t- Pass a name and a descriptor with `dynamic_typing=True')
def declare_parameters(
self,
namespace: str,
parameters: List[Union[
Tuple[str],
Tuple[str, Parameter.Type],
Tuple[str, Union[AllowableParameterValue, Parameter.Type, ParameterValue],
ParameterDescriptor],
parameters: Sequence[Union[
Tuple[str, ParameterInput],
Tuple[str, ParameterInput, ParameterDescriptor],
]],
ignore_override: bool = False
) -> List[Parameter[Any]]: ...

@overload
@deprecated('when declaring a parameter only providing its name is deprecated. '
'You have to either:\n'
'\t- Pass a name and a default value different to "PARAMETER NOT SET"'
' (and optionally a descriptor).\n'
'\t- Pass a name and a parameter type.\n'
'\t- Pass a name and a descriptor with `dynamic_typing=True')
def declare_parameters(
self,
namespace: str,
parameters: List[Union[
Tuple[str, Parameter.Type],
Tuple[str, Union[AllowableParameterValue, Parameter.Type, ParameterValue],
ParameterDescriptor],
parameters: Sequence[Union[
Tuple[str],
Tuple[str, ParameterInput],
Tuple[str, ParameterInput, ParameterDescriptor],
]],
ignore_override: bool = False
) -> List[Parameter[Any]]: ...

def declare_parameters(
self,
namespace: str,
parameters: Union[List[Union[
parameters: Union[Sequence[Union[
Tuple[str],
Tuple[str, Parameter.Type],
Tuple[str, Union[AllowableParameterValue, Parameter.Type, ParameterValue],
ParameterDescriptor]]],
List[Union[
Tuple[str, Parameter.Type],
Tuple[str, Union[AllowableParameterValue, Parameter.Type, ParameterValue],
ParameterDescriptor]]]],
Tuple[str, ParameterInput],
Tuple[str, ParameterInput, ParameterDescriptor]]],
Sequence[Union[
Tuple[str, ParameterInput],
Tuple[str, ParameterInput, ParameterDescriptor]]]],
ignore_override: bool = False
) -> List[Parameter[Any]]:
"""
Expand Down
64 changes: 57 additions & 7 deletions rclpy/rclpy/parameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations

import array
from enum import IntEnum
Expand All @@ -20,6 +21,7 @@
from typing import Final
from typing import Generic
from typing import List
from typing import Literal
from typing import Optional
from typing import overload
from typing import Tuple
Expand Down Expand Up @@ -78,7 +80,7 @@ class Type(IntEnum):
@classmethod
def from_parameter_value(cls,
parameter_value: AllowableParameterValue
) -> 'Parameter.Type':
) -> Parameter.Type:
"""
Get a Parameter.Type from a given variable.

Expand Down Expand Up @@ -143,7 +145,7 @@ def check(self, parameter_value: AllowableParameterValue) -> bool:
return False

@classmethod
def from_parameter_msg(cls, param_msg: ParameterMsg) -> 'Parameter[Any]':
def from_parameter_msg(cls, param_msg: ParameterMsg) -> Parameter[Any]:
value = None
type_ = Parameter.Type(value=param_msg.value.type)
if Parameter.Type.BOOL == type_:
Expand All @@ -167,17 +169,65 @@ def from_parameter_msg(cls, param_msg: ParameterMsg) -> 'Parameter[Any]':
return cls(param_msg.name, type_, value)

@overload
def __init__(self, name: str, type_: Optional['Parameter.Type'] = None,
value: None = None) -> None: ...
def __init__(self: Parameter[None], name: str) -> None: ...

@overload
def __init__(self, name: str, type_: 'Parameter.Type',
value: AllowableParameterValueT) -> None: ...
def __init__(self: Parameter[None], name: str, type_: Literal[Parameter.Type.NOT_SET]
) -> None: ...

@overload
def __init__(self: Parameter[bool], name: str, type_: Literal[Parameter.Type.BOOL]
) -> None: ...

@overload
def __init__(self: Parameter[int], name: str, type_: Literal[Parameter.Type.INTEGER]
) -> None: ...

@overload
def __init__(self: Parameter[float], name: str, type_: Literal[Parameter.Type.DOUBLE]
) -> None: ...

@overload
def __init__(self: Parameter[str], name: str, type_: Literal[Parameter.Type.STRING]
) -> None: ...

@overload
def __init__(self: Parameter[Union[list[bytes], Tuple[bytes, ...]]],
name: str,
type_: Literal[Parameter.Type.BYTE_ARRAY]) -> None: ...

@overload
def __init__(self: Parameter[Union[list[bool], Tuple[bool, ...]]],
name: str,
type_: Literal[Parameter.Type.BOOL_ARRAY]) -> None: ...

@overload
def __init__(self: Parameter[Union[list[int], Tuple[int, ...], array.array[int]]],
name: str,
type_: Literal[Parameter.Type.INTEGER_ARRAY]) -> None: ...

@overload
def __init__(self: Parameter[Union[list[float], Tuple[float, ...], array.array[float]]],
name: str,
type_: Literal[Parameter.Type.DOUBLE_ARRAY]) -> None: ...

@overload
def __init__(self: Parameter[Union[list[str], Tuple[str, ...], array.array[str]]],
name: str,
type_: Literal[Parameter.Type.STRING_ARRAY]) -> None: ...

@overload
def __init__(self, name: str, *, value: AllowableParameterValueT) -> None: ...

def __init__(self, name: str, type_: Optional['Parameter.Type'] = None, value=None) -> None:
@overload
def __init__(self, name: str, type_: Optional[Parameter.Type] = None,
value: None = None) -> None: ...

@overload
def __init__(self, name: str, type_: Parameter.Type,
value: AllowableParameterValueT) -> None: ...

def __init__(self, name: str, type_: Optional[Parameter.Type] = None, value=None) -> None:
if type_ is None:
# This will raise a TypeError if it is not possible to get a type from the value.
type_ = Parameter.Type.from_parameter_value(value)
Expand Down
7 changes: 4 additions & 3 deletions rclpy/test/test_events_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import rclpy.executors
import rclpy.experimental
import rclpy.node
import rclpy.parameter
import rclpy.qos
import rclpy.time
import rclpy.timer
Expand Down Expand Up @@ -183,7 +184,7 @@ class TimerTestNode(rclpy.node.Node):
def __init__(
self,
index: int = 0,
parameter_overrides: typing.Optional[list[rclpy.Parameter]] = None,
parameter_overrides: typing.Optional[list[rclpy.parameter.Parameter[bool]]] = None,
) -> None:
super().__init__(f'test_timer{index}', parameter_overrides=parameter_overrides)
self._timer_events = 0
Expand Down Expand Up @@ -233,7 +234,7 @@ class ActionServerTestNode(rclpy.node.Node):
def __init__(self) -> None:
super().__init__(
'test_action_server_node',
parameter_overrides=[rclpy.Parameter('use_sim_time', value=True)],
parameter_overrides=[rclpy.parameter.Parameter('use_sim_time', value=True)],
)
self._got_goal_future: typing.Optional[rclpy.Future[test_msgs.action.Fibonacci.Goal]] = (
None
Expand Down Expand Up @@ -565,7 +566,7 @@ def test_service(self) -> None:
def test_timers(self) -> None:
realtime_node = TimerTestNode(index=0)
rostime_node = TimerTestNode(
index=1, parameter_overrides=[rclpy.Parameter('use_sim_time', value=True)]
index=1, parameter_overrides=[rclpy.parameter.Parameter('use_sim_time', value=True)]
)
clock_node = ClockPublisherNode()
for node in [realtime_node, rostime_node, clock_node]:
Expand Down
Loading