Skip to content

Commit 84cac59

Browse files
Michael Carrolljacobperron
andauthored
Add support for taking a sequence of messages (#614)
* Add support for taking a sequence of messages Signed-off-by: Michael Carroll <[email protected]> * Fix uncrustify Signed-off-by: Michael Carroll <[email protected]> * Apply suggestions from code review Co-Authored-By: Jacob Perron <[email protected]> Signed-off-by: Michael Carroll <[email protected]> * Address reviewer feedback. Signed-off-by: Michael Carroll <[email protected]> * Remove vertical whitespace Signed-off-by: Michael Carroll <[email protected]> * Address reviewer feedback Signed-off-by: Michael Carroll <[email protected]> * Remove blank line Signed-off-by: Michael Carroll <[email protected]> Co-authored-by: Jacob Perron <[email protected]>
1 parent 643f6ec commit 84cac59

3 files changed

Lines changed: 255 additions & 3 deletions

File tree

rcl/include/rcl/subscription.h

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ extern "C"
2626
#include "rcl/node.h"
2727
#include "rcl/visibility_control.h"
2828

29+
#include "rmw/message_sequence.h"
30+
2931
/// Internal rcl implementation struct.
3032
struct rcl_subscription_impl_t;
3133

@@ -203,7 +205,7 @@ rcl_subscription_get_default_options(void);
203205
/// Take a ROS message from a topic using a rcl subscription.
204206
/**
205207
* It is the job of the caller to ensure that the type of the ros_message
206-
* argument and the type associate with the subscription, via the type
208+
* argument and the type associated with the subscription, via the type
207209
* support, match.
208210
* Passing a different type to rcl_take produces undefined behavior and cannot
209211
* be checked by this function and therefore no deliberate error will occur.
@@ -227,7 +229,7 @@ rcl_subscription_get_default_options(void);
227229
* be allocated for a dynamically sized array in the target message, then the
228230
* allocator given in the subscription options is used.
229231
*
230-
* The rmw message_info struct contains meta information about this particular
232+
* The rmw_message_info struct contains meta information about this particular
231233
* message instance, like what the GUID of the publisher which published it
232234
* originally or whether or not the message received from within the same
233235
* process.
@@ -248,7 +250,7 @@ rcl_subscription_get_default_options(void);
248250
* \param[inout] ros_message type-erased ptr to a allocated ROS message
249251
* \param[out] message_info rmw struct which contains meta-data for the message
250252
* \param[in] allocation structure pointer used for memory preallocation (may be NULL)
251-
* \return `RCL_RET_OK` if the message was published, or
253+
* \return `RCL_RET_OK` if the message was taken, or
252254
* \return `RCL_RET_INVALID_ARGUMENT` if any arguments are invalid, or
253255
* \return `RCL_RET_SUBSCRIPTION_INVALID` if the subscription is invalid, or
254256
* \return `RCL_RET_BAD_ALLOC` if allocating memory failed, or
@@ -266,6 +268,58 @@ rcl_take(
266268
rmw_subscription_allocation_t * allocation
267269
);
268270

271+
/// Take a sequence of messages from a topic using a rcl subscription.
272+
/**
273+
* In contrast to `rcl_take`, this function can take multiple messages at
274+
* the same time.
275+
* It is the job of the caller to ensure that the type of the message_sequence
276+
* argument and the type associated with the subscription, via the type
277+
* support, match.
278+
*
279+
* The message_sequence pointer should point to an already allocated sequence
280+
* of ROS messages of the correct type, into which the taken ROS messages will
281+
* be copied if messages are available.
282+
* The message_sequence `size` member will be set to the number of messages
283+
* correctly taken.
284+
*
285+
* The rmw_message_info_sequence struct contains meta information about the
286+
* corresponding message instance index.
287+
* The message_info_sequence argument should be an already allocated
288+
* rmw_message_info_sequence_t structure.
289+
*
290+
* <hr>
291+
* Attribute | Adherence
292+
* ------------------ | -------------
293+
* Allocates Memory | Maybe [1]
294+
* Thread-Safe | No
295+
* Uses Atomics | No
296+
* Lock-Free | Yes
297+
* <i>[1] only if storage in the serialized_message is insufficient</i>
298+
*
299+
* \param[in] subscription the handle to the subscription from which to take.
300+
* \param[in] count number of messages to attempt to take.
301+
* \param[inout] message_sequence pointer to a (pre-allocated) message sequence.
302+
* \param[inout] message_info_sequence pointer to a (pre-allocated) message info sequence.
303+
* \param[in] allocation structure pointer used for memory preallocation (may be NULL)
304+
* \return `RCL_RET_OK` if one or more messages was taken, or
305+
* \return `RCL_RET_INVALID_ARGUMENT` if any arguments are invalid, or
306+
* \return `RCL_RET_SUBSCRIPTION_INVALID` if the subscription is invalid, or
307+
* \return `RCL_RET_BAD_ALLOC` if allocating memory failed, or
308+
* \return `RCL_RET_SUBSCRIPTION_TAKE_FAILED` if take failed but no error
309+
* occurred in the middleware, or
310+
* \return `RCL_RET_ERROR` if an unspecified error occurs.
311+
*/
312+
RCL_PUBLIC
313+
RCL_WARN_UNUSED
314+
rcl_ret_t
315+
rcl_take_sequence(
316+
const rcl_subscription_t * subscription,
317+
size_t count,
318+
rmw_message_sequence_t * message_sequence,
319+
rmw_message_info_sequence_t * message_info_sequence,
320+
rmw_subscription_allocation_t * allocation
321+
);
322+
269323
/// Take a serialized raw message from a topic using a rcl subscription.
270324
/**
271325
* In contrast to `rcl_take`, this function stores the taken message in

rcl/src/rcl/subscription.c

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,52 @@ rcl_take(
283283
return RCL_RET_OK;
284284
}
285285

286+
rcl_ret_t
287+
rcl_take_sequence(
288+
const rcl_subscription_t * subscription,
289+
size_t count,
290+
rmw_message_sequence_t * message_sequence,
291+
rmw_message_info_sequence_t * message_info_sequence,
292+
rmw_subscription_allocation_t * allocation
293+
)
294+
{
295+
// Set the sizes to zero to indicate that there are no valid messages
296+
message_sequence->size = 0u;
297+
message_info_sequence->size = 0u;
298+
299+
RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "Subscription taking %zu messages", count);
300+
if (!rcl_subscription_is_valid(subscription)) {
301+
return RCL_RET_SUBSCRIPTION_INVALID; // error message already set
302+
}
303+
RCL_CHECK_ARGUMENT_FOR_NULL(message_sequence, RCL_RET_INVALID_ARGUMENT);
304+
RCL_CHECK_ARGUMENT_FOR_NULL(message_info_sequence, RCL_RET_INVALID_ARGUMENT);
305+
306+
if (message_sequence->capacity < count) {
307+
RCL_SET_ERROR_MSG("Insufficient message sequence capacity for requested count");
308+
return RCL_RET_INVALID_ARGUMENT;
309+
}
310+
311+
if (message_info_sequence->capacity < count) {
312+
RCL_SET_ERROR_MSG("Insufficient message info sequence capacity for requested count");
313+
return RCL_RET_INVALID_ARGUMENT;
314+
}
315+
316+
size_t taken = 0u;
317+
rmw_ret_t ret = rmw_take_sequence(
318+
subscription->impl->rmw_handle, count, message_sequence, message_info_sequence, &taken,
319+
allocation);
320+
if (ret != RMW_RET_OK) {
321+
RCL_SET_ERROR_MSG(rmw_get_error_string().str);
322+
return rcl_convert_rmw_ret_to_rcl_ret(ret);
323+
}
324+
RCUTILS_LOG_DEBUG_NAMED(
325+
ROS_PACKAGE_NAME, "Subscription took %zu messages", taken);
326+
if (0u == taken) {
327+
return RCL_RET_SUBSCRIPTION_TAKE_FAILED;
328+
}
329+
return RCL_RET_OK;
330+
}
331+
286332
rcl_ret_t
287333
rcl_take_serialized_message(
288334
const rcl_subscription_t * subscription,

rcl/test/rcl/test_subscription.cpp

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,3 +278,155 @@ TEST_F(CLASSNAME(TestSubscriptionFixture, RMW_IMPLEMENTATION), test_subscription
278278
ASSERT_EQ(std::string(test_string), std::string(msg.string_value.data, msg.string_value.size));
279279
}
280280
}
281+
282+
/* Basic nominal test of a subscription taking a sequence.
283+
*/
284+
TEST_F(
285+
CLASSNAME(
286+
TestSubscriptionFixture,
287+
RMW_IMPLEMENTATION), test_subscription_nominal_string_sequence) {
288+
rcl_ret_t ret;
289+
rcl_publisher_t publisher = rcl_get_zero_initialized_publisher();
290+
const rosidl_message_type_support_t * ts =
291+
ROSIDL_GET_MSG_TYPE_SUPPORT(test_msgs, msg, Strings);
292+
const char * topic = "rcl_test_subscription_nominal_string_sequence_chatter";
293+
rcl_publisher_options_t publisher_options = rcl_publisher_get_default_options();
294+
ret = rcl_publisher_init(&publisher, this->node_ptr, ts, topic, &publisher_options);
295+
ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
296+
OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT(
297+
{
298+
rcl_ret_t ret = rcl_publisher_fini(&publisher, this->node_ptr);
299+
EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
300+
});
301+
rcl_subscription_t subscription = rcl_get_zero_initialized_subscription();
302+
rcl_subscription_options_t subscription_options = rcl_subscription_get_default_options();
303+
ret = rcl_subscription_init(&subscription, this->node_ptr, ts, topic, &subscription_options);
304+
ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
305+
OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT(
306+
{
307+
rcl_ret_t ret = rcl_subscription_fini(&subscription, this->node_ptr);
308+
EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
309+
});
310+
// TODO(wjwwood): add logic to wait for the connection to be established
311+
// probably using the count_subscriptions busy wait mechanism
312+
// until then we will sleep for a short period of time
313+
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
314+
const char * test_string = "testing";
315+
{
316+
test_msgs__msg__Strings msg;
317+
test_msgs__msg__Strings__init(&msg);
318+
ASSERT_TRUE(rosidl_runtime_c__String__assign(&msg.string_value, test_string));
319+
ret = rcl_publish(&publisher, &msg, nullptr);
320+
ret = rcl_publish(&publisher, &msg, nullptr);
321+
ret = rcl_publish(&publisher, &msg, nullptr);
322+
test_msgs__msg__Strings__fini(&msg);
323+
ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
324+
}
325+
bool success;
326+
wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100, success);
327+
ASSERT_TRUE(success);
328+
auto allocator = rcutils_get_default_allocator();
329+
{
330+
size_t size = 1;
331+
rmw_message_info_sequence_t message_infos;
332+
rmw_message_info_sequence_init(&message_infos, size, &allocator);
333+
334+
rmw_message_sequence_t messages;
335+
rmw_message_sequence_init(&messages, size, &allocator);
336+
337+
auto seq = test_msgs__msg__Strings__Sequence__create(size);
338+
339+
for (size_t ii = 0; ii < size; ++ii) {
340+
messages.data[ii] = &seq->data[ii];
341+
}
342+
343+
OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT(
344+
{
345+
rmw_message_info_sequence_fini(&message_infos);
346+
rmw_message_sequence_fini(&messages);
347+
test_msgs__msg__Strings__Sequence__destroy(seq);
348+
});
349+
350+
// Attempt to take more than capacity allows.
351+
ret = rcl_take_sequence(&subscription, 5, &messages, &message_infos, nullptr);
352+
ASSERT_EQ(RCL_RET_INVALID_ARGUMENT, ret) << rcl_get_error_string().str;
353+
354+
ASSERT_EQ(0u, messages.size);
355+
ASSERT_EQ(0u, message_infos.size);
356+
}
357+
358+
{
359+
size_t size = 5;
360+
rmw_message_info_sequence_t message_infos;
361+
rmw_message_info_sequence_init(&message_infos, size, &allocator);
362+
363+
rmw_message_sequence_t messages;
364+
rmw_message_sequence_init(&messages, size, &allocator);
365+
366+
auto seq = test_msgs__msg__Strings__Sequence__create(size);
367+
368+
for (size_t ii = 0; ii < size; ++ii) {
369+
messages.data[ii] = &seq->data[ii];
370+
}
371+
372+
OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT(
373+
{
374+
rmw_message_info_sequence_fini(&message_infos);
375+
rmw_message_sequence_fini(&messages);
376+
test_msgs__msg__Strings__Sequence__destroy(seq);
377+
});
378+
379+
ret = rcl_take_sequence(&subscription, 5, &messages, &message_infos, nullptr);
380+
ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
381+
ASSERT_EQ(3u, messages.size);
382+
ASSERT_EQ(3u, message_infos.size);
383+
}
384+
385+
{
386+
test_msgs__msg__Strings msg;
387+
test_msgs__msg__Strings__init(&msg);
388+
ASSERT_TRUE(rosidl_runtime_c__String__assign(&msg.string_value, test_string));
389+
ret = rcl_publish(&publisher, &msg, nullptr);
390+
ret = rcl_publish(&publisher, &msg, nullptr);
391+
ret = rcl_publish(&publisher, &msg, nullptr);
392+
ret = rcl_publish(&publisher, &msg, nullptr);
393+
ret = rcl_publish(&publisher, &msg, nullptr);
394+
test_msgs__msg__Strings__fini(&msg);
395+
ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
396+
}
397+
398+
// Give a brief moment for publications to go through.
399+
std::this_thread::sleep_for(std::chrono::milliseconds(500));
400+
// Take fewer messages than are available in the subscription
401+
{
402+
size_t size = 3;
403+
rmw_message_info_sequence_t message_infos;
404+
rmw_message_info_sequence_init(&message_infos, size, &allocator);
405+
406+
rmw_message_sequence_t messages;
407+
rmw_message_sequence_init(&messages, size, &allocator);
408+
409+
auto seq = test_msgs__msg__Strings__Sequence__create(size);
410+
411+
for (size_t ii = 0; ii < size; ++ii) {
412+
messages.data[ii] = &seq->data[ii];
413+
}
414+
415+
OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT(
416+
{
417+
rmw_message_info_sequence_fini(&message_infos);
418+
rmw_message_sequence_fini(&messages);
419+
test_msgs__msg__Strings__Sequence__destroy(seq);
420+
});
421+
422+
ret = rcl_take_sequence(&subscription, 3, &messages, &message_infos, nullptr);
423+
424+
ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
425+
ASSERT_EQ(3u, messages.size);
426+
ASSERT_EQ(3u, message_infos.size);
427+
428+
ASSERT_EQ(
429+
std::string(test_string),
430+
std::string(seq->data[0].string_value.data, seq->data[0].string_value.size));
431+
}
432+
}

0 commit comments

Comments
 (0)