Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions rcl/include/rcl/wait.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ extern "C"
#include <stdbool.h>
#include <stddef.h>

#include <rcutils/time.h>

#include "rcl/client.h"
#include "rcl/guard_condition.h"
#include "rcl/macros.h"
Expand All @@ -40,18 +42,22 @@ typedef struct rcl_wait_set_t
{
/// Storage for subscription pointers.
const rcl_subscription_t ** subscriptions;
rcutils_time_point_value_t * subscriptions_timestamps;
size_t size_of_subscriptions;
/// Storage for guard condition pointers.
const rcl_guard_condition_t ** guard_conditions;
size_t size_of_guard_conditions;
/// Storage for timer pointers.
const rcl_timer_t ** timers;
rcutils_time_point_value_t * timers_timestamps;
size_t size_of_timers;
/// Storage for client pointers.
const rcl_client_t ** clients;
rcutils_time_point_value_t * clients_timestamps;
size_t size_of_clients;
/// Storage for service pointers.
const rcl_service_t ** services;
rcutils_time_point_value_t * services_timestamps;
size_t size_of_services;
/// Storage for event pointers.
const rcl_event_t ** events;
Expand Down Expand Up @@ -266,6 +272,10 @@ rcl_wait_set_clear(rcl_wait_set_t * wait_set);
*
* This can be called on an uninitialized (zero initialized) wait set.
*
* If RCL_RET_BAD_ALLOC is returned, i.e. allocation failed, the wait_set
* may be *partially* allocated and therefore, it must still be cleaned up
* with _fini.
*
* <hr>
* Attribute | Adherence
* ------------------ | -------------
Expand Down
177 changes: 128 additions & 49 deletions rcl/src/rcl/wait.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,46 +33,25 @@ extern "C"

#include "./context_impl.h"

typedef struct rcl_wait_set_impl_t
{
// number of subscriptions that have been added to the wait set
size_t subscription_index;
rmw_subscriptions_t rmw_subscriptions;
// number of guard_conditions that have been added to the wait set
size_t guard_condition_index;
rmw_guard_conditions_t rmw_guard_conditions;
// number of clients that have been added to the wait set
size_t client_index;
rmw_clients_t rmw_clients;
// number of services that have been added to the wait set
size_t service_index;
rmw_services_t rmw_services;
// number of events that have been added to the wait set
size_t event_index;
rmw_events_t rmw_events;

rmw_wait_set_t * rmw_wait_set;
// number of timers that have been added to the wait set
size_t timer_index;
// context with which the wait set is associated
rcl_context_t * context;
// allocator used in the wait set
rcl_allocator_t allocator;
} rcl_wait_set_impl_t;
#include "./wait_set_impl.h"

rcl_wait_set_t
rcl_get_zero_initialized_wait_set()
{
static rcl_wait_set_t null_wait_set = {
.subscriptions = NULL,
.subscriptions_timestamps = NULL,
.size_of_subscriptions = 0,
.guard_conditions = NULL,
.size_of_guard_conditions = 0,
.clients = NULL,
.clients_timestamps = NULL,
.size_of_clients = 0,
.services = NULL,
.services_timestamps = NULL,
.size_of_services = 0,
.timers = NULL,
.timers_timestamps = NULL,
.size_of_timers = 0,
.impl = NULL,
};
Expand Down Expand Up @@ -137,12 +116,15 @@ rcl_wait_set_init(
wait_set->impl, "allocating memory failed", return RCL_RET_BAD_ALLOC);
memset(wait_set->impl, 0, sizeof(rcl_wait_set_impl_t));
wait_set->impl->rmw_subscriptions.subscribers = NULL;
wait_set->impl->rmw_subscriptions.timestamps = NULL;
wait_set->impl->rmw_subscriptions.subscriber_count = 0;
wait_set->impl->rmw_guard_conditions.guard_conditions = NULL;
wait_set->impl->rmw_guard_conditions.guard_condition_count = 0;
wait_set->impl->rmw_clients.clients = NULL;
wait_set->impl->rmw_clients.timestamps = NULL;
wait_set->impl->rmw_clients.client_count = 0;
wait_set->impl->rmw_services.services = NULL;
wait_set->impl->rmw_services.timestamps = NULL;
wait_set->impl->rmw_services.service_count = 0;
wait_set->impl->rmw_events.events = NULL;
wait_set->impl->rmw_events.event_count = 0;
Expand Down Expand Up @@ -250,6 +232,18 @@ rcl_wait_set_get_allocator(const rcl_wait_set_t * wait_set, rcl_allocator_t * al
} \
} while (false)

/* This is for the case where the list has an associated timestamp list. */
#define SET_CLEAR_TIMESTAMP(Type) \
do { \
if (NULL != wait_set->Type ## s_timestamps) { \
memset( \
(void *)wait_set->Type ## s_timestamps, \
0, \
sizeof(rcutils_time_point_value_t *) * wait_set->size_of_ ## Type ## s); \
} \
SET_CLEAR(Type); \
} while (false)

#define SET_CLEAR_RMW(Type, RMWStorage, RMWCount) \
do { \
if (NULL != wait_set->impl->RMWStorage) { \
Expand All @@ -262,6 +256,19 @@ rcl_wait_set_get_allocator(const rcl_wait_set_t * wait_set, rcl_allocator_t * al
} \
} while (false)

#define SET_CLEAR_RMW_TIMESTAMP(Type, RMWStorage, RMWTimestamp, RMWCount) \
do { \
if (NULL != wait_set->impl->RMWTimestamp) { \
/* Also clear the rmw timestamp. */ \
memset( \
wait_set->impl->RMWTimestamp, \
0, \
sizeof(rcutils_time_point_value_t) * wait_set->impl->RMWCount); \
} \
/* Clear the rest. */ \
SET_CLEAR_RMW(Type, RMWStorage, RMWCount); \
} while (false)

#define SET_RESIZE(Type, ExtraDealloc, ExtraRealloc) \
do { \
rcl_allocator_t allocator = wait_set->impl->allocator; \
Expand All @@ -285,6 +292,36 @@ rcl_wait_set_get_allocator(const rcl_wait_set_t * wait_set, rcl_allocator_t * al
} \
} while (false)

/* First resizes the common fields that all types have, then resizes the timestamp array,
then calls the "extra" functions. */
#define SET_RESIZE_TIMESTAMP(Type, ExtraDealloc, ExtraRealloc) \
do { \
SET_RESIZE(Type,;,;); /* NOLINT */ \
rcl_allocator_t allocator = wait_set->impl->allocator; \
if (0 == Type ## s_size) { \
if (wait_set->Type ## s_timestamps) { \
allocator.deallocate((void *)wait_set->Type ## s_timestamps, allocator.state); \
wait_set->Type ## s_timestamps = NULL; \
} \
ExtraDealloc \
} else { \
rcutils_time_point_value_t * timestamps = \
(rcutils_time_point_value_t *)allocator.reallocate( \
(void *)wait_set->Type ## s_timestamps, \
sizeof(rcutils_time_point_value_t *) * Type ## s_size, \
allocator.state); \
RCL_CHECK_FOR_NULL_WITH_MSG( \
timestamps, "allocating memory failed", { \
allocator.deallocate((void *)wait_set->Type ## s_timestamps, allocator.state); \
wait_set->Type ## s_timestamps = NULL; return RCL_RET_BAD_ALLOC;}); \
wait_set->Type ## s_timestamps = timestamps; \
memset( \
(void *)wait_set->Type ## s_timestamps, 0, \
sizeof(rcutils_time_point_value_t) * Type ## s_size); \
ExtraRealloc \
} \
} while (false)

#define SET_RESIZE_RMW_DEALLOC(RMWStorage, RMWCount) \
/* Also deallocate the rmw storage. */ \
if (wait_set->impl->RMWStorage) { \
Expand All @@ -293,6 +330,14 @@ rcl_wait_set_get_allocator(const rcl_wait_set_t * wait_set, rcl_allocator_t * al
wait_set->impl->RMWCount = 0; \
}

#define SET_RESIZE_RMW_DEALLOC_TIMESTAMP(RMWStorage, RMWTimestamps, RMWCount) \
SET_RESIZE_RMW_DEALLOC(RMWStorage, RMWCount); \
/* Also deallocate the timestamp storage. */ \
if (wait_set->impl->RMWTimestamps) { \
allocator.deallocate((void *)wait_set->impl->RMWTimestamps, allocator.state); \
wait_set->impl->RMWTimestamps = NULL; \
}

#define SET_RESIZE_RMW_REALLOC(Type, RMWStorage, RMWCount) \
/* Also resize the rmw storage. */ \
wait_set->impl->RMWCount = 0; \
Expand All @@ -306,6 +351,22 @@ rcl_wait_set_get_allocator(const rcl_wait_set_t * wait_set, rcl_allocator_t * al
} \
memset(wait_set->impl->RMWStorage, 0, sizeof(void *) * Type ## s_size);

/* The RMW structures with a timestamp are twice as large. */
#define SET_RESIZE_RMW_REALLOC_TIMESTAMP(Type, RMWStorage, RMWTimestamps, RMWCount) \
SET_RESIZE_RMW_REALLOC(Type, RMWStorage, RMWCount); \
/* Also resize the rmw timestamp storage. */ \
wait_set->impl->RMWTimestamps = (rcutils_time_point_value_t *)allocator.reallocate( \
wait_set->impl->RMWTimestamps, sizeof(rcutils_time_point_value_t) * Type ## s_size, \
allocator.state); \
if (!wait_set->impl->RMWTimestamps) { \
allocator.deallocate((void *)wait_set->impl->RMWStorage, allocator.state); \
allocator.deallocate((void *)wait_set->Type ## s, allocator.state); \
wait_set->size_of_ ## Type ## s = 0; \
RCL_SET_ERROR_MSG("allocating memory failed"); \
return RCL_RET_BAD_ALLOC; \
} \
memset(wait_set->impl->RMWTimestamps, 0, sizeof(rcutils_time_point_value_t) * Type ## s_size);

/* Implementation-specific notes:
*
* Add the rmw representation to the underlying rmw array and increment
Expand Down Expand Up @@ -333,28 +394,31 @@ rcl_wait_set_clear(rcl_wait_set_t * wait_set)
RCL_CHECK_ARGUMENT_FOR_NULL(wait_set, RCL_RET_INVALID_ARGUMENT);
RCL_CHECK_ARGUMENT_FOR_NULL(wait_set->impl, RCL_RET_WAIT_SET_INVALID);

SET_CLEAR(subscription);
SET_CLEAR_TIMESTAMP(subscription);
SET_CLEAR(guard_condition);
SET_CLEAR(client);
SET_CLEAR(service);
SET_CLEAR_TIMESTAMP(client);
SET_CLEAR_TIMESTAMP(service);
SET_CLEAR(event);
SET_CLEAR(timer);
SET_CLEAR_TIMESTAMP(timer);

SET_CLEAR_RMW(
SET_CLEAR_RMW_TIMESTAMP(
subscription,
rmw_subscriptions.subscribers,
rmw_subscriptions.timestamps,
rmw_subscriptions.subscriber_count);
SET_CLEAR_RMW(
guard_condition,
rmw_guard_conditions.guard_conditions,
rmw_guard_conditions.guard_condition_count);
SET_CLEAR_RMW(
SET_CLEAR_RMW_TIMESTAMP(
clients,
rmw_clients.clients,
rmw_clients.timestamps,
rmw_clients.client_count);
SET_CLEAR_RMW(
SET_CLEAR_RMW_TIMESTAMP(
services,
rmw_services.services,
rmw_services.timestamps,
rmw_services.service_count);
SET_CLEAR_RMW(
events,
Expand All @@ -381,12 +445,14 @@ rcl_wait_set_resize(
{
RCL_CHECK_ARGUMENT_FOR_NULL(wait_set, RCL_RET_INVALID_ARGUMENT);
RCL_CHECK_ARGUMENT_FOR_NULL(wait_set->impl, RCL_RET_WAIT_SET_INVALID);
SET_RESIZE(
SET_RESIZE_TIMESTAMP(
subscription,
SET_RESIZE_RMW_DEALLOC(
rmw_subscriptions.subscribers, rmw_subscriptions.subscriber_count),
SET_RESIZE_RMW_REALLOC(
subscription, rmw_subscriptions.subscribers, rmw_subscriptions.subscriber_count)
SET_RESIZE_RMW_DEALLOC_TIMESTAMP(
rmw_subscriptions.subscribers, rmw_subscriptions.timestamps,
rmw_subscriptions.subscriber_count),
SET_RESIZE_RMW_REALLOC_TIMESTAMP(
subscription, rmw_subscriptions.subscribers, rmw_subscriptions.timestamps,
rmw_subscriptions.subscriber_count)
);
// Guard condition RCL size is the resize amount given
SET_RESIZE(guard_condition,;,;); // NOLINT
Expand Down Expand Up @@ -421,20 +487,20 @@ rcl_wait_set_resize(
memset(rmw_gcs->guard_conditions, 0, sizeof(void *) * num_rmw_gc);
}

SET_RESIZE(timer,;,;); // NOLINT
SET_RESIZE(
SET_RESIZE_TIMESTAMP(timer,;,;); // NOLINT
SET_RESIZE_TIMESTAMP(
client,
SET_RESIZE_RMW_DEALLOC(
rmw_clients.clients, rmw_clients.client_count),
SET_RESIZE_RMW_REALLOC(
client, rmw_clients.clients, rmw_clients.client_count)
SET_RESIZE_RMW_DEALLOC_TIMESTAMP(
rmw_clients.clients, rmw_clients.timestamps, rmw_clients.client_count),
SET_RESIZE_RMW_REALLOC_TIMESTAMP(
client, rmw_clients.clients, rmw_clients.timestamps, rmw_clients.client_count)
);
SET_RESIZE(
SET_RESIZE_TIMESTAMP(
service,
SET_RESIZE_RMW_DEALLOC(
rmw_services.services, rmw_services.service_count),
SET_RESIZE_RMW_REALLOC(
service, rmw_services.services, rmw_services.service_count)
SET_RESIZE_RMW_DEALLOC_TIMESTAMP(
rmw_services.services, rmw_services.timestamps, rmw_services.service_count),
SET_RESIZE_RMW_REALLOC_TIMESTAMP(
service, rmw_services.services, rmw_services.timestamps, rmw_services.service_count)
);
SET_RESIZE(
event,
Expand Down Expand Up @@ -630,6 +696,13 @@ rcl_wait(rcl_wait_set_t * wait_set, int64_t timeout)
RCUTILS_LOG_DEBUG_EXPRESSION_NAMED(is_ready, ROS_PACKAGE_NAME, "Timer in wait set is ready");
if (!is_ready) {
wait_set->timers[i] = NULL;
wait_set->timers_timestamps[i] = 0;
} else if(wait_set->timers_timestamps[i] == 0) {
ret = rcutils_system_time_now(&(wait_set->timers_timestamps[i]));
if (ret != RCL_RET_OK) {
wait_set->timers_timestamps[i] = 0;
// TODO(iluetkeb) should we report this as an error?
}
}
}
// Check for timeout, return RCL_RET_TIMEOUT only if it wasn't a timer.
Expand All @@ -644,6 +717,8 @@ rcl_wait(rcl_wait_set_t * wait_set, int64_t timeout)
is_ready, ROS_PACKAGE_NAME, "Subscription in wait set is ready");
if (!is_ready) {
wait_set->subscriptions[i] = NULL;
} else {
wait_set->subscriptions_timestamps[i] = wait_set->impl->rmw_subscriptions.timestamps[i];
}
}
// Set corresponding rcl guard_condition handles NULL.
Expand All @@ -661,6 +736,8 @@ rcl_wait(rcl_wait_set_t * wait_set, int64_t timeout)
RCUTILS_LOG_DEBUG_EXPRESSION_NAMED(is_ready, ROS_PACKAGE_NAME, "Client in wait set is ready");
if (!is_ready) {
wait_set->clients[i] = NULL;
} else {
wait_set->clients_timestamps[i] = wait_set->impl->rmw_clients.timestamps[i];
}
}
// Set corresponding rcl service handles NULL.
Expand All @@ -669,6 +746,8 @@ rcl_wait(rcl_wait_set_t * wait_set, int64_t timeout)
RCUTILS_LOG_DEBUG_EXPRESSION_NAMED(is_ready, ROS_PACKAGE_NAME, "Service in wait set is ready");
if (!is_ready) {
wait_set->services[i] = NULL;
} else {
wait_set->services_timestamps[i] = wait_set->impl->rmw_services.timestamps[i];
}
}
// Set corresponding rcl event handles NULL.
Expand Down
Loading