Skip to content

Commit 8d42eaa

Browse files
Add content-filtered-topic interfaces (#1506)
Signed-off-by: Barry Xu <[email protected]>
1 parent 30cc567 commit 8d42eaa

File tree

7 files changed

+517
-9
lines changed

7 files changed

+517
-9
lines changed

rclpy/rclpy/impl/_rclpy_pybind11.pyi

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ from rclpy.impl import service_introspection as service_introspection
3131
from rclpy.node import Node as RCLPyNode
3232
from rclpy.parameter import Parameter
3333
from rclpy.subscription import MessageInfo
34+
from rclpy.subscription_content_filter_options import ContentFilterOptions
3435
from rclpy.task import Future
3536
from rclpy.task import Task
3637
from rclpy.type_support import (Action, FeedbackMessage, FeedbackT, GetResultServiceRequest,
@@ -600,7 +601,8 @@ class Timer(Destroyable):
600601
class Subscription(Destroyable, Generic[MsgT]):
601602

602603
def __init__(self, node: Node, pymsg_type: type[MsgT], topic: str,
603-
pyqos_profile: rmw_qos_profile_t) -> None: ...
604+
pyqos_profile: rmw_qos_profile_t,
605+
content_filter_options: Optional[ContentFilterOptions] = None) -> None: ...
604606

605607
@property
606608
def pointer(self) -> int:
@@ -624,6 +626,15 @@ class Subscription(Destroyable, Generic[MsgT]):
624626
def clear_on_new_message_callback(self) -> None:
625627
"""Clear the on new message callback function for the subscription."""
626628

629+
def is_cft_enabled(self) -> bool:
630+
"""Check if content filtering is enabled for this subscription."""
631+
632+
def set_content_filter(self, filter_expression: str, expression_parameters: list[str]) -> None:
633+
"""Set the filter expression and expression parameters for the subscription."""
634+
635+
def get_content_filter(self) -> ContentFilterOptions:
636+
"""Get the filter expression and expression parameters for the subscription."""
637+
627638

628639
class rcl_time_point_t:
629640

rclpy/rclpy/node.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
from rclpy.subscription import GenericSubscriptionCallback
8787
from rclpy.subscription import Subscription
8888
from rclpy.subscription import SubscriptionCallbackUnion
89+
from rclpy.subscription_content_filter_options import ContentFilterOptions
8990
from rclpy.time_source import TimeSource
9091
from rclpy.timer import Rate
9192
from rclpy.timer import Timer
@@ -1642,7 +1643,8 @@ def create_subscription(
16421643
callback_group: Optional[CallbackGroup] = None,
16431644
event_callbacks: Optional[SubscriptionEventCallbacks] = None,
16441645
qos_overriding_options: Optional[QoSOverridingOptions] = None,
1645-
raw: Literal[True]
1646+
raw: Literal[True],
1647+
content_filter_options: Optional[ContentFilterOptions] = None
16461648
) -> Subscription[MsgT]: ...
16471649

16481650
@overload
@@ -1656,7 +1658,8 @@ def create_subscription(
16561658
callback_group: Optional[CallbackGroup] = None,
16571659
event_callbacks: Optional[SubscriptionEventCallbacks] = None,
16581660
qos_overriding_options: Optional[QoSOverridingOptions] = None,
1659-
raw: bool = False
1661+
raw: bool = False,
1662+
content_filter_options: Optional[ContentFilterOptions] = None
16601663
) -> Subscription[MsgT]: ...
16611664

16621665
def create_subscription(
@@ -1669,7 +1672,8 @@ def create_subscription(
16691672
callback_group: Optional[CallbackGroup] = None,
16701673
event_callbacks: Optional[SubscriptionEventCallbacks] = None,
16711674
qos_overriding_options: Optional[QoSOverridingOptions] = None,
1672-
raw: bool = False
1675+
raw: bool = False,
1676+
content_filter_options: Optional[ContentFilterOptions] = None
16731677
) -> Subscription[MsgT]:
16741678
"""
16751679
Create a new subscription.
@@ -1687,6 +1691,7 @@ def create_subscription(
16871691
:param event_callbacks: User-defined callbacks for middleware events.
16881692
:param raw: If ``True``, then received messages will be stored in raw binary
16891693
representation.
1694+
:param content_filter_options: The filter expression and parameters for content filtering.
16901695
"""
16911696
qos_profile = self._validate_qos_or_depth_parameter(qos_profile)
16921697

@@ -1714,7 +1719,8 @@ def create_subscription(
17141719
try:
17151720
with self.handle:
17161721
subscription_object = _rclpy.Subscription(
1717-
self.handle, msg_type, topic, qos_profile.get_c_qos_profile())
1722+
self.handle, msg_type, topic, qos_profile.get_c_qos_profile(),
1723+
content_filter_options)
17181724
except ValueError:
17191725
failed = True
17201726
if failed:

rclpy/rclpy/subscription.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
from rclpy.event_handler import SubscriptionEventCallbacks
3232
from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy
3333
from rclpy.qos import QoSProfile
34+
from rclpy.subscription_content_filter_options import ContentFilterOptions
3435
from rclpy.type_support import MsgT
3536
from typing_extensions import TypeAlias
3637

@@ -48,6 +49,9 @@ class MessageInfo(TypedDict):
4849
publisher_gid: Optional[PublisherGID]
4950

5051

52+
# Re-export exception defined in _rclpy C extension.
53+
RCLError = _rclpy.RCLError
54+
5155
# Left to support Legacy TypeVars.
5256
MsgType = TypeVar('MsgType')
5357

@@ -200,6 +204,34 @@ def logger_name(self) -> str:
200204
with self.handle:
201205
return self.__subscription.get_logger_name()
202206

207+
@property
208+
def is_cft_enabled(self) -> bool:
209+
"""Check if content filtering is enabled for the subscription."""
210+
with self.handle:
211+
return self.__subscription.is_cft_enabled()
212+
213+
def set_content_filter(self, filter_expression: str, expression_parameters: list[str]) -> None:
214+
"""
215+
Set the filter expression and expression parameters for the subscription.
216+
217+
:param filter_expression: The filter expression to set.
218+
:param expression_parameters: The expression parameters to set.
219+
:raises: RCLError if internal error occurred when calling the rcl function.
220+
"""
221+
with self.handle:
222+
self.__subscription.set_content_filter(filter_expression, expression_parameters)
223+
224+
def get_content_filter(self) -> ContentFilterOptions:
225+
"""
226+
Get the filter expression and expression parameters for the subscription.
227+
228+
:return: ContentFilterOptions object containing the filter expression and expression
229+
parameters.
230+
:raises: RCLError if internal error occurred when calling the rcl function.
231+
"""
232+
with self.handle:
233+
return self.__subscription.get_content_filter()
234+
203235
def __enter__(self) -> 'Subscription[MsgT]':
204236
return self
205237

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# Copyright 2025 Sony Group Corporation.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from typing import NamedTuple
16+
17+
18+
class ContentFilterOptions(NamedTuple):
19+
"""Options to configure content filtered topic in the subscription."""
20+
21+
filter_expression: str
22+
expression_parameters: list[str]

rclpy/src/rclpy/subscription.cpp

Lines changed: 137 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
#include <pybind11/pybind11.h>
16+
#include <pybind11/stl.h>
1617

1718
#include <rcl/error_handling.h>
1819
#include <rcl/node.h>
@@ -21,10 +22,14 @@
2122
#include <rosidl_runtime_c/message_type_support_struct.h>
2223
#include <rmw/types.h>
2324

25+
#include <cstddef>
2426
#include <memory>
2527
#include <stdexcept>
2628
#include <string>
2729
#include <utility>
30+
#include <vector>
31+
32+
#include <rcpputils/scope_exit.hpp>
2833

2934
#include "exceptions.hpp"
3035
#include "node.hpp"
@@ -39,9 +44,25 @@ namespace rclpy
3944
{
4045
using events_executor::RclEventCallbackTrampoline;
4146

47+
namespace
48+
{
49+
std::vector<const char *>
50+
get_c_vector_string(const std::vector<std::string> & strings_in)
51+
{
52+
std::vector<const char *> cstrings;
53+
cstrings.reserve(strings_in.size());
54+
55+
for (size_t i = 0; i < strings_in.size(); ++i) {
56+
cstrings.push_back(strings_in[i].c_str());
57+
}
58+
59+
return cstrings;
60+
}
61+
} // namespace
62+
4263
Subscription::Subscription(
4364
Node & node, py::object pymsg_type, std::string topic,
44-
py::object pyqos_profile)
65+
py::object pyqos_profile, py::object content_filter_options)
4566
: node_(node)
4667
{
4768
auto msg_type = static_cast<rosidl_message_type_support_t *>(
@@ -75,6 +96,24 @@ Subscription::Subscription(
7596

7697
*rcl_subscription_ = rcl_get_zero_initialized_subscription();
7798

99+
std::string filter_expression;
100+
std::vector<std::string> expression_parameters;
101+
if (!content_filter_options.is_none()) {
102+
filter_expression = content_filter_options.attr("filter_expression").cast<std::string>();
103+
expression_parameters =
104+
content_filter_options.attr("expression_parameters").cast<std::vector<std::string>>();
105+
std::vector<const char *> cstrings =
106+
get_c_vector_string(expression_parameters);
107+
rcl_ret_t ret = rcl_subscription_options_set_content_filter_options(
108+
filter_expression.c_str(),
109+
cstrings.size(),
110+
cstrings.data(),
111+
&subscription_ops);
112+
if (RCL_RET_OK != ret) {
113+
throw rclpy::RCLError("Failed to set content_filter_options");
114+
}
115+
}
116+
78117
rcl_ret_t ret = rcl_subscription_init(
79118
rcl_subscription_.get(), node_.rcl_ptr(), msg_type,
80119
topic.c_str(), &subscription_ops);
@@ -237,11 +276,98 @@ Subscription::clear_on_new_message_callback()
237276
}
238277
}
239278

279+
bool
280+
Subscription::is_cft_enabled() const
281+
{
282+
return rcl_subscription_is_cft_enabled(rcl_subscription_.get());
283+
}
284+
285+
void
286+
Subscription::set_content_filter(
287+
const std::string & filter_expression,
288+
const std::vector<std::string> & expression_parameters)
289+
{
290+
rcl_subscription_content_filter_options_t options =
291+
rcl_get_zero_initialized_subscription_content_filter_options();
292+
std::vector<const char *> cstrings = get_c_vector_string(expression_parameters);
293+
rcl_ret_t ret = rcl_subscription_content_filter_options_init(
294+
rcl_subscription_.get(),
295+
filter_expression.c_str(),
296+
cstrings.size(),
297+
cstrings.data(),
298+
&options);
299+
if (RCL_RET_OK != ret) {
300+
throw RCLError("Failed to init subscription content_filtered_topic option");
301+
}
302+
303+
RCPPUTILS_SCOPE_EXIT(
304+
{
305+
rcl_ret_t ret = rcl_subscription_content_filter_options_fini(
306+
rcl_subscription_.get(), &options);
307+
if (RCL_RET_OK != ret) {
308+
throw RCLError(
309+
"Failed to fini subscription content_filtered_topic option: " +
310+
std::string(rcl_get_error_string().str));
311+
}
312+
});
313+
314+
ret = rcl_subscription_set_content_filter(
315+
rcl_subscription_.get(),
316+
&options);
317+
if (RCL_RET_OK != ret) {
318+
throw RCLError("Failed to set cft expression parameters");
319+
}
320+
}
321+
322+
py::object
323+
Subscription::get_content_filter() const
324+
{
325+
rcl_subscription_content_filter_options_t options =
326+
rcl_get_zero_initialized_subscription_content_filter_options();
327+
328+
rcl_ret_t ret = rcl_subscription_get_content_filter(
329+
rcl_subscription_.get(),
330+
&options);
331+
if (RCL_RET_OK != ret) {
332+
throw RCLError("Failed to get cft expression parameters");
333+
}
334+
335+
RCPPUTILS_SCOPE_EXIT(
336+
{
337+
rcl_ret_t ret = rcl_subscription_content_filter_options_fini(
338+
rcl_subscription_.get(), &options);
339+
if (RCL_RET_OK != ret) {
340+
throw RCLError(
341+
"Failed to fini subscription content_filtered_topic option: " +
342+
std::string(rcl_get_error_string().str));
343+
}
344+
});
345+
346+
rmw_subscription_content_filter_options_t & content_filter_options =
347+
options.rmw_subscription_content_filter_options;
348+
std::vector<std::string> expression_parameters;
349+
for (size_t i = 0; i < content_filter_options.expression_parameters.size; ++i) {
350+
expression_parameters.push_back(content_filter_options.expression_parameters.data[i]);
351+
}
352+
353+
py::object content_filter_options_class =
354+
py::module_::import("rclpy.subscription_content_filter_options").attr("ContentFilterOptions");
355+
356+
return content_filter_options_class(
357+
std::string(content_filter_options.filter_expression),
358+
expression_parameters);
359+
}
360+
240361
void
241362
define_subscription(py::object module)
242363
{
243364
py::class_<Subscription, Destroyable, std::shared_ptr<Subscription>>(module, "Subscription")
244-
.def(py::init<Node &, py::object, std::string, py::object>())
365+
.def(py::init<Node &, py::object, std::string, py::object, py::object>(),
366+
py::arg("node"),
367+
py::arg("msg_type"),
368+
py::arg("topic"),
369+
py::arg("qos_profile"),
370+
py::arg("content_filter_options") = py::none())
245371
.def_property_readonly(
246372
"pointer", [](const Subscription & subscription) {
247373
return reinterpret_cast<size_t>(subscription.rcl_ptr());
@@ -262,6 +388,14 @@ define_subscription(py::object module)
262388
.def(
263389
"set_on_new_message_callback", &Subscription::set_on_new_message_callback,
264390
py::arg("callback"))
265-
.def("clear_on_new_message_callback", &Subscription::clear_on_new_message_callback);
391+
.def("clear_on_new_message_callback", &Subscription::clear_on_new_message_callback)
392+
.def("is_cft_enabled", &Subscription::is_cft_enabled,
393+
"Check if content filtering is enabled for this subscription.")
394+
.def(
395+
"set_content_filter", &Subscription::set_content_filter,
396+
"Set the filter expression and expression parameters for the subscription.")
397+
.def(
398+
"get_content_filter", &Subscription::get_content_filter,
399+
"Get the filter expression and expression parameters for the subscription.");
266400
}
267401
} // namespace rclpy

0 commit comments

Comments
 (0)