Skip to content

Commit 3c877ac

Browse files
hidmicahcorde
authored andcommitted
Ensure compliant subscription API. (#419)
Signed-off-by: Michel Hidalgo <[email protected]>
1 parent 064a975 commit 3c877ac

File tree

7 files changed

+245
-181
lines changed

7 files changed

+245
-181
lines changed

rmw_fastrtps_cpp/src/rmw_subscription.cpp

Lines changed: 39 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -61,15 +61,12 @@ rmw_create_subscription(
6161
const rmw_qos_profile_t * qos_policies,
6262
const rmw_subscription_options_t * subscription_options)
6363
{
64-
if (!node) {
65-
RMW_SET_ERROR_MSG("node handle is null");
66-
return nullptr;
67-
}
68-
69-
if (node->implementation_identifier != eprosima_fastrtps_identifier) {
70-
RMW_SET_ERROR_MSG("node handle not from this implementation");
71-
return nullptr;
72-
}
64+
RMW_CHECK_ARGUMENT_FOR_NULL(node, nullptr);
65+
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
66+
node,
67+
node->implementation_identifier,
68+
eprosima_fastrtps_identifier,
69+
return nullptr);
7370

7471
auto participant_info =
7572
static_cast<CustomParticipantInfo *>(node->context->impl->participant_info);
@@ -99,8 +96,18 @@ rmw_create_subscription(
9996
static_cast<void *>(&msg),
10097
nullptr);
10198
if (RMW_RET_OK != rmw_ret) {
102-
rmw_fastrtps_shared_cpp::__rmw_destroy_subscription(
99+
rmw_error_state_t error_state = *rmw_get_error_state();
100+
rmw_reset_error();
101+
static_cast<void>(common_context->graph_cache.dissociate_writer(
102+
info->subscription_gid_, common_context->gid, node->name, node->namespace_));
103+
rmw_ret = rmw_fastrtps_shared_cpp::__rmw_destroy_subscription(
103104
eprosima_fastrtps_identifier, node, subscription);
105+
if (RMW_RET_OK != rmw_ret) {
106+
RMW_SAFE_FWRITE_TO_STDERR(rmw_get_error_string().str);
107+
RMW_SAFE_FWRITE_TO_STDERR(" during '" RCUTILS_STRINGIFY(__function__) "' cleanup\n");
108+
rmw_reset_error();
109+
}
110+
rmw_set_error_state(error_state.message, error_state.file, error_state.line_number);
104111
return nullptr;
105112
}
106113
}
@@ -121,13 +128,33 @@ rmw_subscription_get_actual_qos(
121128
const rmw_subscription_t * subscription,
122129
rmw_qos_profile_t * qos)
123130
{
124-
return rmw_fastrtps_shared_cpp::__rmw_subscription_get_actual_qos(
125-
subscription, qos);
131+
RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT);
132+
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
133+
subscription,
134+
subscription->implementation_identifier,
135+
eprosima_fastrtps_identifier,
136+
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
137+
RMW_CHECK_ARGUMENT_FOR_NULL(qos, RMW_RET_INVALID_ARGUMENT);
138+
139+
return rmw_fastrtps_shared_cpp::__rmw_subscription_get_actual_qos(subscription, qos);
126140
}
127141

128142
rmw_ret_t
129143
rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription)
130144
{
145+
RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT);
146+
RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT);
147+
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
148+
node,
149+
node->implementation_identifier,
150+
eprosima_fastrtps_identifier,
151+
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
152+
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
153+
subscription,
154+
subscription->implementation_identifier,
155+
eprosima_fastrtps_identifier,
156+
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
157+
131158
return rmw_fastrtps_shared_cpp::__rmw_destroy_subscription(
132159
eprosima_fastrtps_identifier, node, subscription);
133160
}

rmw_fastrtps_cpp/src/subscription.cpp

Lines changed: 62 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121
#include "rmw/allocators.h"
2222
#include "rmw/error_handling.h"
2323
#include "rmw/rmw.h"
24+
#include "rmw/validate_full_topic_name.h"
25+
26+
#include "rcpputils/scope_exit.hpp"
2427

2528
#include "rmw_fastrtps_shared_cpp/custom_participant_info.hpp"
2629
#include "rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp"
@@ -55,27 +58,30 @@ create_subscription(
5558
bool keyed,
5659
bool create_subscription_listener)
5760
{
58-
if (!topic_name || strlen(topic_name) == 0) {
59-
RMW_SET_ERROR_MSG("subscription topic is null or empty string");
60-
return nullptr;
61-
}
62-
if (!qos_policies) {
63-
RMW_SET_ERROR_MSG("qos_policies is null");
64-
return nullptr;
65-
}
66-
if (!subscription_options) {
67-
RMW_SET_ERROR_MSG("subscription_options is null");
61+
RMW_CHECK_ARGUMENT_FOR_NULL(participant_info, nullptr);
62+
RMW_CHECK_ARGUMENT_FOR_NULL(type_supports, nullptr);
63+
RMW_CHECK_ARGUMENT_FOR_NULL(topic_name, nullptr);
64+
if (0 == strlen(topic_name)) {
65+
RMW_SET_ERROR_MSG("topic_name argument is an empty string");
6866
return nullptr;
6967
}
70-
if (!participant_info) {
71-
RMW_SET_ERROR_MSG("participant_info is null");
72-
return nullptr;
68+
RMW_CHECK_ARGUMENT_FOR_NULL(qos_policies, nullptr);
69+
if (!qos_policies->avoid_ros_namespace_conventions) {
70+
int validation_result = RMW_TOPIC_VALID;
71+
rmw_ret_t ret = rmw_validate_full_topic_name(topic_name, &validation_result, nullptr);
72+
if (RMW_RET_OK != ret) {
73+
return nullptr;
74+
}
75+
if (RMW_TOPIC_VALID != validation_result) {
76+
const char * reason = rmw_full_topic_name_validation_result_string(validation_result);
77+
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("invalid topic_name argument: %s", reason);
78+
return nullptr;
79+
}
7380
}
81+
RMW_CHECK_ARGUMENT_FOR_NULL(subscription_options, nullptr);
7482
Participant * participant = participant_info->participant;
75-
if (!participant) {
76-
RMW_SET_ERROR_MSG("participant handle is null");
77-
return nullptr;
78-
}
83+
RMW_CHECK_FOR_NULL_WITH_MSG(participant, "participant handle is null", return nullptr);
84+
7985
const rosidl_message_type_support_t * type_support = get_message_typesupport_handle(
8086
type_supports, RMW_FASTRTPS_CPP_TYPESUPPORT_C);
8187
if (!type_support) {
@@ -89,17 +95,31 @@ create_subscription(
8995
if (!is_valid_qos(*qos_policies)) {
9096
return nullptr;
9197
}
92-
CustomSubscriberInfo * info = nullptr;
93-
rmw_subscription_t * rmw_subscription = nullptr;
94-
eprosima::fastrtps::SubscriberAttributes subscriberParam;
9598

9699
// Load default XML profile.
100+
eprosima::fastrtps::SubscriberAttributes subscriberParam;
97101
Domain::getDefaultSubscriberAttributes(subscriberParam);
98-
info = new (std::nothrow) CustomSubscriberInfo();
102+
103+
CustomSubscriberInfo * info = new (std::nothrow) CustomSubscriberInfo();
99104
if (!info) {
100105
RMW_SET_ERROR_MSG("failed to allocate CustomSubscriberInfo");
101106
return nullptr;
102107
}
108+
auto cleanup_info = rcpputils::make_scope_exit(
109+
[info, participant]() {
110+
if (info->type_support_) {
111+
_unregister_type(participant, info->type_support_);
112+
}
113+
if (info->subscriber_) {
114+
if (!Domain::removeSubscriber(info->subscriber_)) {
115+
RMW_SAFE_FWRITE_TO_STDERR(
116+
"Failed to remove subscriber after '"
117+
RCUTILS_STRINGIFY(__function__) "' failed.\n");
118+
}
119+
}
120+
delete info->listener_;
121+
delete info;
122+
});
103123
info->typesupport_identifier_ = type_support->typesupport_identifier;
104124
info->type_support_impl_ = type_support->data;
105125

@@ -113,7 +133,7 @@ create_subscription(
113133
info->type_support_ = new (std::nothrow) MessageTypeSupport_cpp(callbacks);
114134
if (!info->type_support_) {
115135
RMW_SET_ERROR_MSG("failed to allocate MessageTypeSupport_cpp");
116-
goto fail;
136+
return nullptr;
117137
}
118138
_register_type(participant, info->type_support_);
119139
}
@@ -128,47 +148,49 @@ create_subscription(
128148
subscriberParam.topic.topicName = _create_topic_name(qos_policies, ros_topic_prefix, topic_name);
129149

130150
if (!get_datareader_qos(*qos_policies, subscriberParam)) {
131-
RMW_SET_ERROR_MSG("failed to get datareader qos");
132-
goto fail;
151+
return nullptr;
133152
}
134-
info->listener_ = nullptr;
153+
135154
if (create_subscription_listener) {
136155
info->listener_ = new (std::nothrow) SubListener(info);
137156
if (!info->listener_) {
138157
RMW_SET_ERROR_MSG("create_subscriber() could not create subscriber listener");
139-
goto fail;
158+
return nullptr;
140159
}
141160
}
161+
142162
info->subscriber_ = Domain::createSubscriber(participant, subscriberParam, info->listener_);
143163
if (!info->subscriber_) {
144164
RMW_SET_ERROR_MSG("create_subscriber() could not create subscriber");
145-
goto fail;
165+
return nullptr;
146166
}
147167
info->subscription_gid_ = rmw_fastrtps_shared_cpp::create_rmw_gid(
148168
eprosima_fastrtps_identifier, info->subscriber_->getGuid());
149-
rmw_subscription = rmw_subscription_allocate();
169+
170+
rmw_subscription_t * rmw_subscription = rmw_subscription_allocate();
150171
if (!rmw_subscription) {
151172
RMW_SET_ERROR_MSG("failed to allocate subscription");
152-
goto fail;
173+
return nullptr;
153174
}
175+
auto cleanup_subscription = rcpputils::make_scope_exit(
176+
[rmw_subscription]() {
177+
rmw_free(const_cast<char *>(rmw_subscription->topic_name));
178+
rmw_subscription_free(rmw_subscription);
179+
});
180+
154181
rmw_subscription->implementation_identifier = eprosima_fastrtps_identifier;
155182
rmw_subscription->data = info;
183+
156184
rmw_subscription->topic_name = rcutils_strdup(topic_name, rcutils_get_default_allocator());
157185
if (!rmw_subscription->topic_name) {
158186
RMW_SET_ERROR_MSG("failed to allocate memory for subscription topic name");
159-
goto fail;
187+
return nullptr;
160188
}
161-
162189
rmw_subscription->options = *subscription_options;
163-
return rmw_subscription;
190+
rmw_subscription->can_loan_messages = false;
164191

165-
fail:
166-
if (info != nullptr) {
167-
delete info->type_support_;
168-
delete info->listener_;
169-
delete info;
170-
}
171-
rmw_subscription_free(rmw_subscription);
172-
return nullptr;
192+
cleanup_subscription.cancel();
193+
cleanup_info.cancel();
194+
return rmw_subscription;
173195
}
174196
} // namespace rmw_fastrtps_cpp

rmw_fastrtps_dynamic_cpp/src/rmw_subscription.cpp

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -63,15 +63,12 @@ rmw_create_subscription(
6363
const rmw_qos_profile_t * qos_policies,
6464
const rmw_subscription_options_t * subscription_options)
6565
{
66-
if (!node) {
67-
RMW_SET_ERROR_MSG("node handle is null");
68-
return nullptr;
69-
}
70-
71-
if (node->implementation_identifier != eprosima_fastrtps_identifier) {
72-
RMW_SET_ERROR_MSG("node handle not from this implementation");
73-
return nullptr;
74-
}
66+
RMW_CHECK_ARGUMENT_FOR_NULL(node, nullptr);
67+
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
68+
node,
69+
node->implementation_identifier,
70+
eprosima_fastrtps_identifier,
71+
return nullptr);
7572

7673
auto participant_info =
7774
static_cast<CustomParticipantInfo *>(node->context->impl->participant_info);
@@ -102,8 +99,18 @@ rmw_create_subscription(
10299
static_cast<void *>(&msg),
103100
nullptr);
104101
if (RMW_RET_OK != rmw_ret) {
105-
rmw_fastrtps_shared_cpp::__rmw_destroy_subscription(
102+
rmw_error_state_t error_state = *rmw_get_error_state();
103+
rmw_reset_error();
104+
static_cast<void>(common_context->graph_cache.dissociate_writer(
105+
info->subscription_gid_, common_context->gid, node->name, node->namespace_));
106+
rmw_ret = rmw_fastrtps_shared_cpp::__rmw_destroy_subscription(
106107
eprosima_fastrtps_identifier, node, subscription);
108+
if (RMW_RET_OK != rmw_ret) {
109+
RMW_SAFE_FWRITE_TO_STDERR(rmw_get_error_string().str);
110+
RMW_SAFE_FWRITE_TO_STDERR(" during '" RCUTILS_STRINGIFY(__function__) "' cleanup\n");
111+
rmw_reset_error();
112+
}
113+
rmw_set_error_state(error_state.message, error_state.file, error_state.line_number);
107114
return nullptr;
108115
}
109116
}
@@ -124,21 +131,37 @@ rmw_subscription_get_actual_qos(
124131
const rmw_subscription_t * subscription,
125132
rmw_qos_profile_t * qos)
126133
{
127-
return rmw_fastrtps_shared_cpp::__rmw_subscription_get_actual_qos(
128-
subscription, qos);
134+
RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT);
135+
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
136+
subscription,
137+
subscription->implementation_identifier,
138+
eprosima_fastrtps_identifier,
139+
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
140+
RMW_CHECK_ARGUMENT_FOR_NULL(qos, RMW_RET_INVALID_ARGUMENT);
141+
142+
return rmw_fastrtps_shared_cpp::__rmw_subscription_get_actual_qos(subscription, qos);
129143
}
130144

131145
using BaseTypeSupport = rmw_fastrtps_dynamic_cpp::BaseTypeSupport;
132146

133147
rmw_ret_t
134148
rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription)
135149
{
136-
auto info = static_cast<CustomSubscriberInfo *>(subscription->data);
137-
RCUTILS_CHECK_FOR_NULL_WITH_MSG(info, "subscription info pointer is null", return RMW_RET_ERROR);
150+
RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT);
151+
RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT);
152+
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
153+
node,
154+
node->implementation_identifier,
155+
eprosima_fastrtps_identifier,
156+
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
157+
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
158+
subscription,
159+
subscription->implementation_identifier,
160+
eprosima_fastrtps_identifier,
161+
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
138162

163+
auto info = static_cast<CustomSubscriberInfo *>(subscription->data);
139164
auto impl = static_cast<const BaseTypeSupport *>(info->type_support_impl_);
140-
RCUTILS_CHECK_FOR_NULL_WITH_MSG(impl, "publisher type support is null", return RMW_RET_ERROR);
141-
142165
auto ros_type_support = static_cast<const rosidl_message_type_support_t *>(
143166
impl->ros_type_support());
144167

0 commit comments

Comments
 (0)