Skip to content

Commit ec17d68

Browse files
authored
*_raw function (ros2#388)
* publish_raw function * subscription traits * listener raw * rebased * cleanup and linters * explicit test for deleter in unique_ptr * add missing copyright * cleanup * add rmw_serialize functions * linters * explicit differentiation between take and take_raw * cleanup debug messages * rename to rmw_message_init` * address comments * address review comments * raw->serialized * use size_t (ros2#497)
1 parent 1556b6e commit ec17d68

18 files changed

+608
-76
lines changed

rclcpp/CMakeLists.txt

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,17 @@ if(BUILD_TESTING)
260260
${PROJECT_NAME}
261261
)
262262
endif()
263+
ament_add_gtest(test_serialized_message_allocator test/test_serialized_message_allocator.cpp
264+
ENV RCL_ASSERT_RMW_ID_MATCHES=${rmw_implementation})
265+
if(TARGET test_serialized_message_allocator)
266+
target_include_directories(test_serialized_message_allocator PUBLIC
267+
${test_msgs_INCLUDE_DIRS}
268+
)
269+
target_link_libraries(test_serialized_message_allocator
270+
${PROJECT_NAME}
271+
${test_msgs_LIBRARIES}
272+
)
273+
endif()
263274
ament_add_gtest(test_service test/test_service.cpp)
264275
if(TARGET test_service)
265276
target_include_directories(test_service PUBLIC
@@ -280,6 +291,16 @@ if(BUILD_TESTING)
280291
)
281292
target_link_libraries(test_subscription ${PROJECT_NAME})
282293
endif()
294+
find_package(test_msgs REQUIRED)
295+
ament_add_gtest(test_subscription_traits test/test_subscription_traits.cpp)
296+
if(TARGET test_subscription_traits)
297+
target_include_directories(test_subscription_traits PUBLIC
298+
${rcl_INCLUDE_DIRS}
299+
)
300+
ament_target_dependencies(test_subscription_traits
301+
"test_msgs"
302+
)
303+
endif()
283304
ament_add_gtest(test_find_weak_nodes test/test_find_weak_nodes.cpp)
284305
if(TARGET test_find_weak_nodes)
285306
target_include_directories(test_find_weak_nodes PUBLIC

rclcpp/include/rclcpp/create_subscription.hpp

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,13 @@
2626
namespace rclcpp
2727
{
2828

29-
template<typename MessageT, typename CallbackT, typename AllocatorT, typename SubscriptionT>
30-
typename rclcpp::Subscription<MessageT, AllocatorT>::SharedPtr
29+
template<
30+
typename MessageT,
31+
typename CallbackT,
32+
typename AllocatorT,
33+
typename CallbackMessageT,
34+
typename SubscriptionT = rclcpp::Subscription<CallbackMessageT, AllocatorT>>
35+
typename std::shared_ptr<SubscriptionT>
3136
create_subscription(
3237
rclcpp::node_interfaces::NodeTopicsInterface * node_topics,
3338
const std::string & topic_name,
@@ -36,16 +41,17 @@ create_subscription(
3641
rclcpp::callback_group::CallbackGroup::SharedPtr group,
3742
bool ignore_local_publications,
3843
bool use_intra_process_comms,
39-
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT, AllocatorT>::SharedPtr
44+
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
45+
CallbackMessageT, AllocatorT>::SharedPtr
4046
msg_mem_strat,
4147
typename std::shared_ptr<AllocatorT> allocator)
4248
{
4349
auto subscription_options = rcl_subscription_get_default_options();
4450
subscription_options.qos = qos_profile;
4551
subscription_options.ignore_local_publications = ignore_local_publications;
4652

47-
auto factory =
48-
rclcpp::create_subscription_factory<MessageT, CallbackT, AllocatorT, SubscriptionT>(
53+
auto factory = rclcpp::create_subscription_factory
54+
<MessageT, CallbackT, AllocatorT, CallbackMessageT, SubscriptionT>(
4955
std::forward<CallbackT>(callback), msg_mem_strat, allocator);
5056

5157
auto sub = node_topics->create_subscription(

rclcpp/include/rclcpp/message_memory_strategy.hpp

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,15 @@
1818
#include <memory>
1919
#include <stdexcept>
2020

21+
#include "rcl/types.h"
22+
2123
#include "rclcpp/allocator/allocator_common.hpp"
24+
#include "rclcpp/exceptions.hpp"
2225
#include "rclcpp/macros.hpp"
2326
#include "rclcpp/visibility_control.hpp"
2427

28+
#include "rmw/serialized_message.h"
29+
2530
namespace rclcpp
2631
{
2732
namespace message_memory_strategy
@@ -39,14 +44,29 @@ class MessageMemoryStrategy
3944
using MessageAlloc = typename MessageAllocTraits::allocator_type;
4045
using MessageDeleter = allocator::Deleter<MessageAlloc, MessageT>;
4146

47+
using SerializedMessageAllocTraits = allocator::AllocRebind<rcl_serialized_message_t, Alloc>;
48+
using SerializedMessageAlloc = typename SerializedMessageAllocTraits::allocator_type;
49+
using SerializedMessageDeleter =
50+
allocator::Deleter<SerializedMessageAlloc, rcl_serialized_message_t>;
51+
52+
using BufferAllocTraits = allocator::AllocRebind<char, Alloc>;
53+
using BufferAlloc = typename BufferAllocTraits::allocator_type;
54+
using BufferDeleter = allocator::Deleter<BufferAlloc, char>;
55+
4256
MessageMemoryStrategy()
4357
{
4458
message_allocator_ = std::make_shared<MessageAlloc>();
59+
serialized_message_allocator_ = std::make_shared<SerializedMessageAlloc>();
60+
buffer_allocator_ = std::make_shared<BufferAlloc>();
61+
rcutils_allocator_ = allocator::get_rcl_allocator<char, BufferAlloc>(*buffer_allocator_.get());
4562
}
4663

4764
explicit MessageMemoryStrategy(std::shared_ptr<Alloc> allocator)
4865
{
4966
message_allocator_ = std::make_shared<MessageAlloc>(*allocator.get());
67+
serialized_message_allocator_ = std::make_shared<SerializedMessageAlloc>(*allocator.get());
68+
buffer_allocator_ = std::make_shared<BufferAlloc>(*allocator.get());
69+
rcutils_allocator_ = allocator::get_rcl_allocator<char, BufferAlloc>(*buffer_allocator_.get());
5070
}
5171

5272
/// Default factory method
@@ -62,15 +82,60 @@ class MessageMemoryStrategy
6282
return std::allocate_shared<MessageT, MessageAlloc>(*message_allocator_.get());
6383
}
6484

85+
virtual std::shared_ptr<rcl_serialized_message_t> borrow_serialized_message(size_t capacity)
86+
{
87+
auto msg = new rcl_serialized_message_t;
88+
*msg = rmw_get_zero_initialized_serialized_message();
89+
auto ret = rmw_serialized_message_init(msg, capacity, &rcutils_allocator_);
90+
if (ret != RCL_RET_OK) {
91+
rclcpp::exceptions::throw_from_rcl_error(ret);
92+
}
93+
94+
auto serialized_msg = std::shared_ptr<rcl_serialized_message_t>(msg,
95+
[](rmw_serialized_message_t * msg) {
96+
auto ret = rmw_serialized_message_fini(msg);
97+
delete msg;
98+
if (ret != RCL_RET_OK) {
99+
rclcpp::exceptions::throw_from_rcl_error(ret, "leaking memory");
100+
}
101+
});
102+
103+
return serialized_msg;
104+
}
105+
106+
virtual std::shared_ptr<rcl_serialized_message_t> borrow_serialized_message()
107+
{
108+
return borrow_serialized_message(default_buffer_capacity_);
109+
}
110+
111+
virtual void set_default_buffer_capacity(size_t capacity)
112+
{
113+
default_buffer_capacity_ = capacity;
114+
}
115+
65116
/// Release ownership of the message, which will deallocate it if it has no more owners.
66117
/** \param[in] msg Shared pointer to the message we are returning. */
67118
virtual void return_message(std::shared_ptr<MessageT> & msg)
68119
{
69120
msg.reset();
70121
}
71122

123+
virtual void return_serialized_message(std::shared_ptr<rcl_serialized_message_t> & serialized_msg)
124+
{
125+
serialized_msg.reset();
126+
}
127+
72128
std::shared_ptr<MessageAlloc> message_allocator_;
73129
MessageDeleter message_deleter_;
130+
131+
std::shared_ptr<SerializedMessageAlloc> serialized_message_allocator_;
132+
SerializedMessageDeleter serialized_message_deleter_;
133+
134+
std::shared_ptr<BufferAlloc> buffer_allocator_;
135+
BufferDeleter buffer_deleter_;
136+
size_t default_buffer_capacity_ = 0;
137+
138+
rcutils_allocator_t rcutils_allocator_;
74139
};
75140

76141
} // namespace message_memory_strategy

rclcpp/include/rclcpp/node.hpp

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
#include "rclcpp/publisher.hpp"
5454
#include "rclcpp/service.hpp"
5555
#include "rclcpp/subscription.hpp"
56+
#include "rclcpp/subscription_traits.hpp"
5657
#include "rclcpp/time.hpp"
5758
#include "rclcpp/timer.hpp"
5859
#include "rclcpp/visibility_control.hpp"
@@ -183,15 +184,17 @@ class Node : public std::enable_shared_from_this<Node>
183184
typename MessageT,
184185
typename CallbackT,
185186
typename Alloc = std::allocator<void>,
186-
typename SubscriptionT = rclcpp::Subscription<MessageT, Alloc>>
187+
typename SubscriptionT = rclcpp::Subscription<
188+
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>>
187189
std::shared_ptr<SubscriptionT>
188190
create_subscription(
189191
const std::string & topic_name,
190192
CallbackT && callback,
191193
const rmw_qos_profile_t & qos_profile = rmw_qos_profile_default,
192194
rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr,
193195
bool ignore_local_publications = false,
194-
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT, Alloc>::SharedPtr
196+
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
197+
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>::SharedPtr
195198
msg_mem_strat = nullptr,
196199
std::shared_ptr<Alloc> allocator = nullptr);
197200

@@ -214,15 +217,17 @@ class Node : public std::enable_shared_from_this<Node>
214217
typename MessageT,
215218
typename CallbackT,
216219
typename Alloc = std::allocator<void>,
217-
typename SubscriptionT = rclcpp::Subscription<MessageT, Alloc>>
220+
typename SubscriptionT = rclcpp::Subscription<
221+
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>>
218222
std::shared_ptr<SubscriptionT>
219223
create_subscription(
220224
const std::string & topic_name,
221-
size_t qos_history_depth,
222225
CallbackT && callback,
226+
size_t qos_history_depth,
223227
rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr,
224228
bool ignore_local_publications = false,
225-
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT, Alloc>::SharedPtr
229+
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
230+
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>::SharedPtr
226231
msg_mem_strat = nullptr,
227232
std::shared_ptr<Alloc> allocator = nullptr);
228233

rclcpp/include/rclcpp/node_impl.hpp

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -82,28 +82,35 @@ Node::create_publisher(
8282
allocator);
8383
}
8484

85-
template<typename MessageT, typename CallbackT, typename Alloc, typename SubscriptionT>
85+
template<
86+
typename MessageT,
87+
typename CallbackT,
88+
typename Alloc,
89+
typename SubscriptionT>
8690
std::shared_ptr<SubscriptionT>
8791
Node::create_subscription(
8892
const std::string & topic_name,
8993
CallbackT && callback,
9094
const rmw_qos_profile_t & qos_profile,
9195
rclcpp::callback_group::CallbackGroup::SharedPtr group,
9296
bool ignore_local_publications,
93-
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT, Alloc>::SharedPtr
97+
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
98+
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>::SharedPtr
9499
msg_mem_strat,
95100
std::shared_ptr<Alloc> allocator)
96101
{
102+
using CallbackMessageT = typename rclcpp::subscription_traits::has_message_type<CallbackT>::type;
103+
97104
if (!allocator) {
98105
allocator = std::make_shared<Alloc>();
99106
}
100107

101108
if (!msg_mem_strat) {
102109
using rclcpp::message_memory_strategy::MessageMemoryStrategy;
103-
msg_mem_strat = MessageMemoryStrategy<MessageT, Alloc>::create_default();
110+
msg_mem_strat = MessageMemoryStrategy<CallbackMessageT, Alloc>::create_default();
104111
}
105112

106-
return rclcpp::create_subscription<MessageT, CallbackT, Alloc, SubscriptionT>(
113+
return rclcpp::create_subscription<MessageT, CallbackT, Alloc, CallbackMessageT, SubscriptionT>(
107114
this->node_topics_.get(),
108115
topic_name,
109116
std::forward<CallbackT>(callback),
@@ -115,21 +122,26 @@ Node::create_subscription(
115122
allocator);
116123
}
117124

118-
template<typename MessageT, typename CallbackT, typename Alloc, typename SubscriptionT>
125+
template<
126+
typename MessageT,
127+
typename CallbackT,
128+
typename Alloc,
129+
typename SubscriptionT>
119130
std::shared_ptr<SubscriptionT>
120131
Node::create_subscription(
121132
const std::string & topic_name,
122-
size_t qos_history_depth,
123133
CallbackT && callback,
134+
size_t qos_history_depth,
124135
rclcpp::callback_group::CallbackGroup::SharedPtr group,
125136
bool ignore_local_publications,
126-
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT, Alloc>::SharedPtr
137+
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
138+
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>::SharedPtr
127139
msg_mem_strat,
128140
std::shared_ptr<Alloc> allocator)
129141
{
130142
rmw_qos_profile_t qos = rmw_qos_profile_default;
131143
qos.depth = qos_history_depth;
132-
return this->create_subscription<MessageT, CallbackT, Alloc, SubscriptionT>(
144+
return this->create_subscription<MessageT>(
133145
topic_name,
134146
std::forward<CallbackT>(callback),
135147
qos,

rclcpp/include/rclcpp/parameter_client.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,9 @@ class AsyncParametersClient
120120
auto msg_mem_strat =
121121
MessageMemoryStrategy<rcl_interfaces::msg::ParameterEvent, Alloc>::create_default();
122122

123+
using rcl_interfaces::msg::ParameterEvent;
123124
return rclcpp::create_subscription<
124-
rcl_interfaces::msg::ParameterEvent, CallbackT, Alloc, SubscriptionT>(
125+
ParameterEvent, CallbackT, Alloc, ParameterEvent, SubscriptionT>(
125126
this->node_topics_interface_.get(),
126127
"parameter_events",
127128
std::forward<CallbackT>(callback),

rclcpp/include/rclcpp/publisher.hpp

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -205,11 +205,8 @@ class Publisher : public PublisherBase
205205
ipm.publisher_id = intra_process_publisher_id_;
206206
ipm.message_sequence = message_seq;
207207
auto status = rcl_publish(&intra_process_publisher_handle_, &ipm);
208-
if (status != RCL_RET_OK) {
209-
// *INDENT-OFF* (prevent uncrustify from making unnecessary indents here)
210-
throw std::runtime_error(
211-
std::string("failed to publish intra process message: ") + rcl_get_error_string_safe());
212-
// *INDENT-ON*
208+
if (RCL_RET_OK != status) {
209+
rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish intra process message");
213210
}
214211
} else {
215212
// Always destroy the message, even if we don't consume it, for consistency.
@@ -279,6 +276,19 @@ class Publisher : public PublisherBase
279276
return this->publish(*msg);
280277
}
281278

279+
void
280+
publish(const rcl_serialized_message_t * serialized_msg)
281+
{
282+
if (store_intra_process_message_) {
283+
// TODO(Karsten1987): support serialized message passed by intraprocess
284+
throw std::runtime_error("storing serialized messages in intra process is not supported yet");
285+
}
286+
auto status = rcl_publish_serialized_message(&publisher_handle_, serialized_msg);
287+
if (RCL_RET_OK != status) {
288+
rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish serialized message");
289+
}
290+
}
291+
282292
std::shared_ptr<MessageAlloc> get_allocator() const
283293
{
284294
return message_allocator_;
@@ -289,11 +299,8 @@ class Publisher : public PublisherBase
289299
do_inter_process_publish(const MessageT * msg)
290300
{
291301
auto status = rcl_publish(&publisher_handle_, msg);
292-
if (status != RCL_RET_OK) {
293-
// *INDENT-OFF* (prevent uncrustify from making unnecessary indents here)
294-
throw std::runtime_error(
295-
std::string("failed to publish message: ") + rcl_get_error_string_safe());
296-
// *INDENT-ON*
302+
if (RCL_RET_OK != status) {
303+
rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish message");
297304
}
298305
}
299306

0 commit comments

Comments
 (0)