Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright 2016-2018 Proyectos y Sistemas de Mantenimiento SL (eProsima).
// Copyright 2020 Robert Bosch GmbH
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -94,6 +95,7 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener
if (conditionMutex_ != nullptr) {
std::unique_lock<std::mutex> clock(*conditionMutex_);
list.emplace_back(std::move(response));
sampleInfos_.push(sinfo);
// the change to list_has_data_ needs to be mutually exclusive with
// rmw_wait() which checks hasData() and decides if wait() needs to
// be called
Expand All @@ -102,6 +104,7 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener
conditionVariable_->notify_one();
} else {
list.emplace_back(std::move(response));
sampleInfos_.push(sinfo);
list_has_data_.store(true);
}
}
Expand All @@ -121,6 +124,16 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener
return popResponse(response);
}

/**
* Returns the SampleInfo_t at the front of the queue -- this corresponds
* to the next CustomServiceRequest to be returned on callinge getRequest().
*/
const eprosima::fastrtps::SampleInfo_t&
peekSampleInfo() const
{
return sampleInfos_.front();
}

void
attachCondition(std::mutex * conditionMutex, std::condition_variable * conditionVariable)
{
Expand Down Expand Up @@ -167,6 +180,7 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener
if (!list.empty()) {
response = std::move(list.front());
list.pop_front();
sampleInfos_.pop();
list_has_data_.store(!list.empty());
return true;
}
Expand All @@ -180,6 +194,7 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener
std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
std::set<eprosima::fastrtps::rtps::GUID_t> publishers_;
std::queue<eprosima::fastrtps::SampleInfo_t> sampleInfos_;
};

class ClientPubListener : public eprosima::fastrtps::PublisherListener
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright 2016-2018 Proyectos y Sistemas de Mantenimiento SL (eProsima).
// Copyright 2020 Robert Bosch GmbH
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -90,6 +91,7 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener
if (conditionMutex_ != nullptr) {
std::unique_lock<std::mutex> clock(*conditionMutex_);
list.push_back(request);
sampleInfos_.push(sinfo);
// the change to list_has_data_ needs to be mutually exclusive with
// rmw_wait() which checks hasData() and decides if wait() needs to
// be called
Expand All @@ -98,6 +100,7 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener
conditionVariable_->notify_one();
} else {
list.push_back(request);
sampleInfos_.push(sinfo);
list_has_data_.store(true);
}
}
Expand All @@ -115,19 +118,31 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener
if (!list.empty()) {
request = list.front();
list.pop_front();
sampleInfos_.pop();
list_has_data_.store(!list.empty());
}
} else {
if (!list.empty()) {
request = list.front();
list.pop_front();
sampleInfos_.pop();
list_has_data_.store(!list.empty());
}
}

return request;
}

/**
* Returns the SampleInfo_t at the front of the queue -- this corresponds
* to the next CustomServiceRequest to be returned on callinge getRequest().
*/
const eprosima::fastrtps::SampleInfo_t&
peekSampleInfo() const
{
return sampleInfos_.front();
}

void
attachCondition(std::mutex * conditionMutex, std::condition_variable * conditionVariable)
{
Expand Down Expand Up @@ -157,6 +172,7 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener
std::atomic_bool list_has_data_;
std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
std::queue<eprosima::fastrtps::SampleInfo_t> sampleInfos_;
};

#endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SERVICE_INFO_HPP_
2 changes: 2 additions & 0 deletions rmw_fastrtps_shared_cpp/src/listener_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ node_listener(rmw_context_t * context)
rmw_guard_conditions_t guard_conditions;
subscriptions.subscriber_count = 1;
subscriptions.subscribers = subscriptions_buffer;
rcutils_time_point_value_t sub_timestamps[1] = { 0 };
subscriptions.timestamps = sub_timestamps;
guard_conditions.guard_condition_count = 1;
guard_conditions.guard_conditions = guard_conditions_buffer;
// number of conditions of a subscription is 2
Expand Down
15 changes: 15 additions & 0 deletions rmw_fastrtps_shared_cpp/src/rmw_wait.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright 2016-2018 Proyectos y Sistemas de Mantenimiento SL (eProsima).
// Coypyright 2020 Robert Bosch GmbH
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -13,6 +14,7 @@
// limitations under the License.

#include "fastrtps/subscriber/Subscriber.h"
#include "fastrtps/subscriber/SampleInfo.h"

#include "rmw/error_handling.h"
#include "rmw/rmw.h"
Expand Down Expand Up @@ -191,13 +193,18 @@ __rmw_wait(
// after we check, it will be caught on the next call to this function).
lock.unlock();

eprosima::fastrtps::SampleInfo_t si;
if (subscriptions) {
for (size_t i = 0; i < subscriptions->subscriber_count; ++i) {
void * data = subscriptions->subscribers[i];
auto custom_subscriber_info = static_cast<CustomSubscriberInfo *>(data);
custom_subscriber_info->listener_->detachCondition();
if (!custom_subscriber_info->listener_->hasData()) {
subscriptions->subscribers[i] = 0;
subscriptions->timestamps[i] = 0;
} else {
subscriptions->timestamps[i] = custom_subscriber_info->subscriber_->\
get_first_untaken_info(&si) ? si.receptionTimestamp.to_ns() : 0;
}
}
}
Expand All @@ -209,6 +216,10 @@ __rmw_wait(
custom_client_info->listener_->detachCondition();
if (!custom_client_info->listener_->hasData()) {
clients->clients[i] = 0;
clients->timestamps[i] = 0;
} else {
clients->timestamps[i] = custom_client_info->listener_->\
peekSampleInfo().receptionTimestamp.to_ns();
}
}
}
Expand All @@ -220,6 +231,10 @@ __rmw_wait(
custom_service_info->listener_->detachCondition();
if (!custom_service_info->listener_->hasData()) {
services->services[i] = 0;
services->timestamps[i] = 0;
} else {
services->timestamps[i] = custom_service_info->listener_->\
peekSampleInfo().receptionTimestamp.to_ns();
}
}
}
Expand Down