3030
3131#include " rclcpp/executor.hpp"
3232#include " rclcpp/expand_topic_or_service_name.hpp"
33- #include " rclcpp/generic_publisher.hpp"
34- #include " rclcpp/generic_subscription.hpp"
3533#include " rclcpp/rclcpp.hpp"
3634#include " rclcpp/serialization.hpp"
3735#include " rosbag2_cpp/typesupport_helpers.hpp"
4442#include " domain_bridge/topic_bridge_options.hpp"
4543#include " domain_bridge/msg/compressed_msg.hpp"
4644
45+ #include " generic_publisher.hpp"
46+ #include " generic_subscription.hpp"
4747#include " wait_for_graph_events.hpp"
4848
4949namespace domain_bridge
5050{
5151
52- // / \internal
53- /* *
54- * A hack, because PublisherBase doesn't support publishing serialized messages and because
55- * GenericPublisher cannot be created with a typesupport handle :/
56- */
57- class SerializedPublisher
52+ [[noreturn]] static void unreachable ()
5853{
59- public:
60- template <typename MessageT, typename AllocatorT>
61- explicit SerializedPublisher (std::shared_ptr<rclcpp::Publisher<MessageT, AllocatorT>> impl)
62- : impl_(std::move(impl)),
63- publish_method_pointer_(static_cast <decltype (publish_method_pointer_)>(
64- &rclcpp::Publisher<MessageT, AllocatorT>::publish))
65- {}
66-
67- explicit SerializedPublisher (std::shared_ptr<rclcpp::GenericPublisher> impl)
68- : impl_(std::move(impl)),
69- publish_method_pointer_(static_cast <decltype (publish_method_pointer_)>(
70- &rclcpp::GenericPublisher::publish))
71- {}
72-
73- void publish (const rclcpp::SerializedMessage & message)
74- {
75- ((*impl_).*publish_method_pointer_)(message); // this is a bit horrible, but it works ...
76- }
77-
78- private:
79- std::shared_ptr<rclcpp::PublisherBase> impl_;
80- using PointerToMemberMethod =
81- void (rclcpp::PublisherBase::*)(const rclcpp::SerializedMessage & message);
82- PointerToMemberMethod publish_method_pointer_;
83- };
54+ #if defined(__has_builtin)
55+ #if __has_builtin(__builtin_unreachable)
56+ __builtin_unreachable ();
57+ #endif
58+ #elif (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 5))
59+ __builtin_unreachable ();
60+ #endif
61+ throw std::logic_error (" This code should be unreachable." );
62+ }
8463
8564// / Implementation of \ref DomainBridge.
8665class DomainBridgeImpl
@@ -90,8 +69,8 @@ class DomainBridgeImpl
9069 using TopicBridgeMap = std::map<
9170 TopicBridge,
9271 std::pair<
93- std::shared_ptr<SerializedPublisher >,
94- std::shared_ptr<rclcpp::SubscriptionBase >>>;
72+ std::shared_ptr<GenericPublisher >,
73+ std::shared_ptr<GenericSubscription >>>;
9574 using ServiceBridgeMap = std::map<
9675 detail::ServiceBridge,
9776 std::pair<std::shared_ptr<rclcpp::ServiceBase>, std::shared_ptr<rclcpp::ClientBase>>>;
@@ -195,32 +174,39 @@ class DomainBridgeImpl
195174 type, " rosidl_typesupport_cpp" );
196175 }
197176
198- std::shared_ptr<SerializedPublisher >
177+ std::shared_ptr<GenericPublisher >
199178 create_publisher (
200179 rclcpp::Node::SharedPtr node,
201180 const std::string & topic_name,
202- const std::string & type,
203181 const rclcpp::QoS & qos,
204- rclcpp::PublisherOptionsWithAllocator<std::allocator<void >> & options)
182+ const rosidl_message_type_support_t & typesupport_handle,
183+ rclcpp::CallbackGroup::SharedPtr group)
205184 {
206- std::shared_ptr<SerializedPublisher> publisher;
207185 if (options_.mode () != DomainBridgeOptions::Mode::Compress) {
208- publisher = std::make_shared<SerializedPublisher>(
209- node->create_generic_publisher (topic_name, type, qos, options));
210- } else {
211- publisher = std::make_shared<SerializedPublisher>(
212- node->create_publisher <domain_bridge::msg::CompressedMsg>(topic_name, qos, options));
186+ auto publisher = std::make_shared<GenericPublisher>(
187+ node->get_node_base_interface ().get (),
188+ typesupport_handle,
189+ topic_name,
190+ qos);
191+ node->get_node_topics_interface ()->add_publisher (publisher, std::move (group));
192+ return publisher;
213193 }
194+ auto publisher = std::make_shared<GenericPublisher>(
195+ node->get_node_base_interface ().get (),
196+ *rosidl_typesupport_cpp::get_message_type_support_handle<domain_bridge::msg::CompressedMsg>(),
197+ topic_name,
198+ qos);
199+ node->get_node_topics_interface ()->add_publisher (publisher, std::move (group));
214200 return publisher;
215201 }
216202
217- std::shared_ptr<rclcpp::SubscriptionBase > create_subscription (
203+ std::shared_ptr<GenericSubscription > create_subscription (
218204 rclcpp::Node::SharedPtr node,
219- std::shared_ptr<SerializedPublisher > publisher,
205+ std::shared_ptr<GenericPublisher > publisher,
220206 const std::string & topic_name,
221- const std::string & type,
222207 const rclcpp::QoS & qos,
223- rclcpp::SubscriptionOptionsWithAllocator<std::allocator<void >> & options)
208+ const rosidl_message_type_support_t & typesupport_handle,
209+ rclcpp::CallbackGroup::SharedPtr group)
224210 {
225211 std::function<void (std::shared_ptr<rclcpp::SerializedMessage>)> callback;
226212 switch (options_.mode ()) {
@@ -236,7 +222,7 @@ class DomainBridgeImpl
236222 compressed_msg.data = domain_bridge::compress_message (cctx, std::move (*msg));
237223 rclcpp::SerializedMessage serialized_compressed_msg;
238224 serializer.serialize_message (&compressed_msg, &serialized_compressed_msg);
239- publisher->publish (serialized_compressed_msg);
225+ publisher->publish (serialized_compressed_msg. get_rcl_serialized_message () );
240226 };
241227 break ;
242228 case DomainBridgeOptions::Mode::Decompress:
@@ -251,31 +237,39 @@ class DomainBridgeImpl
251237 serializer.deserialize_message (serialized_compressed_msg.get (), &compressed_msg);
252238 rclcpp::SerializedMessage msg = domain_bridge::decompress_message (
253239 dctx, std::move (compressed_msg.data ));
254- publisher->publish (msg);
240+ publisher->publish (msg. get_rcl_serialized_message () );
255241 };
256242 break ;
257243 default : // fallthrough
258244 case DomainBridgeOptions::Mode::Normal:
259245 callback = [publisher](std::shared_ptr<rclcpp::SerializedMessage> msg) {
260246 // Publish message into the other domain
261- publisher->publish (* msg);
247+ publisher->publish (msg-> get_rcl_serialized_message () );
262248 };
263249 break ;
264250 }
265251 if (options_.mode () != DomainBridgeOptions::Mode::Decompress) {
266252 // Create subscription
267- return node->create_generic_subscription (
253+ auto subscription = std::make_shared<GenericSubscription>(
254+ node->get_node_base_interface ().get (),
255+ typesupport_handle,
268256 topic_name,
269- type,
270257 qos,
271- callback,
272- options);
258+ callback);
259+ node->get_node_topics_interface ()->add_subscription (subscription, std::move (group));
260+ return subscription;
273261 }
274- return node->create_subscription <domain_bridge::msg::CompressedMsg>(
262+ auto subscription = std::make_shared<GenericSubscription>(
263+ node->get_node_base_interface ().get (),
264+ *rosidl_typesupport_cpp::get_message_type_support_handle<domain_bridge::msg::CompressedMsg>(),
275265 topic_name,
276266 qos,
277- callback,
278- options);
267+ [publisher](std::shared_ptr<rclcpp::SerializedMessage> msg) {
268+ // Publish message into the other domain
269+ publisher->publish (msg->get_rcl_serialized_message ());
270+ });
271+ node->get_node_topics_interface ()->add_subscription (subscription, std::move (group));
272+ return subscription;
279273 }
280274
281275 void bridge_topic (
@@ -378,29 +372,26 @@ class DomainBridgeImpl
378372 std::cerr << warning << std::endl;
379373 }
380374
381- rclcpp::PublisherOptionsWithAllocator<std::allocator<void >> publisher_options;
382- rclcpp::SubscriptionOptionsWithAllocator<std::allocator<void >> subscription_options;
383-
384- publisher_options.callback_group = topic_options.callback_group ();
385- subscription_options.callback_group = topic_options.callback_group ();
375+ auto typesupport_handle = rosbag2_cpp::get_typesupport_handle (
376+ type, " rosidl_typesupport_cpp" , loaded_typesupports_.at (type));
386377
387378 // Create publisher for the 'to_domain'
388379 // The publisher should be created first so it is available to the subscription callback
389380 auto publisher = this ->create_publisher (
390381 to_domain_node,
391382 topic_remapped,
392- type,
393383 qos,
394- publisher_options);
384+ *typesupport_handle,
385+ topic_options.callback_group ());
395386
396387 // Create subscription for the 'from_domain'
397388 auto subscription = this ->create_subscription (
398389 from_domain_node,
399390 publisher,
400391 topic,
401- type,
402392 qos,
403- subscription_options);
393+ *typesupport_handle,
394+ topic_options.callback_group ());
404395
405396 this ->bridged_topics_ [topic_bridge] = {publisher, subscription};
406397 };
0 commit comments