diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp index 07318a095f..972773d992 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp @@ -1,4 +1,5 @@ // Copyright 2016-2018 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// Copyright 2020 Robert Bosch GmbH // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -94,6 +95,7 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener if (conditionMutex_ != nullptr) { std::unique_lock clock(*conditionMutex_); list.emplace_back(std::move(response)); + sampleInfos_.push(sinfo); // the change to list_has_data_ needs to be mutually exclusive with // rmw_wait() which checks hasData() and decides if wait() needs to // be called @@ -102,6 +104,7 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener conditionVariable_->notify_one(); } else { list.emplace_back(std::move(response)); + sampleInfos_.push(sinfo); list_has_data_.store(true); } } @@ -121,6 +124,16 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener return popResponse(response); } + /** + * Returns the SampleInfo_t at the front of the queue -- this corresponds + * to the next CustomServiceRequest to be returned on callinge getRequest(). + */ + const eprosima::fastrtps::SampleInfo_t& + peekSampleInfo() const + { + return sampleInfos_.front(); + } + void attachCondition(std::mutex * conditionMutex, std::condition_variable * conditionVariable) { @@ -167,6 +180,7 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener if (!list.empty()) { response = std::move(list.front()); list.pop_front(); + sampleInfos_.pop(); list_has_data_.store(!list.empty()); return true; } @@ -180,6 +194,7 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); std::set publishers_; + std::queue sampleInfos_; }; class ClientPubListener : public eprosima::fastrtps::PublisherListener diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp index 34d37667a6..dfb76ff0e6 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp @@ -1,4 +1,5 @@ // Copyright 2016-2018 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// Copyright 2020 Robert Bosch GmbH // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -90,6 +91,7 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener if (conditionMutex_ != nullptr) { std::unique_lock clock(*conditionMutex_); list.push_back(request); + sampleInfos_.push(sinfo); // the change to list_has_data_ needs to be mutually exclusive with // rmw_wait() which checks hasData() and decides if wait() needs to // be called @@ -98,6 +100,7 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener conditionVariable_->notify_one(); } else { list.push_back(request); + sampleInfos_.push(sinfo); list_has_data_.store(true); } } @@ -115,12 +118,14 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener if (!list.empty()) { request = list.front(); list.pop_front(); + sampleInfos_.pop(); list_has_data_.store(!list.empty()); } } else { if (!list.empty()) { request = list.front(); list.pop_front(); + sampleInfos_.pop(); list_has_data_.store(!list.empty()); } } @@ -128,6 +133,16 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener return request; } + /** + * Returns the SampleInfo_t at the front of the queue -- this corresponds + * to the next CustomServiceRequest to be returned on callinge getRequest(). + */ + const eprosima::fastrtps::SampleInfo_t& + peekSampleInfo() const + { + return sampleInfos_.front(); + } + void attachCondition(std::mutex * conditionMutex, std::condition_variable * conditionVariable) { @@ -157,6 +172,7 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener std::atomic_bool list_has_data_; std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); + std::queue sampleInfos_; }; #endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SERVICE_INFO_HPP_ diff --git a/rmw_fastrtps_shared_cpp/src/listener_thread.cpp b/rmw_fastrtps_shared_cpp/src/listener_thread.cpp index 8df2bb70b9..ac33c3f6d5 100644 --- a/rmw_fastrtps_shared_cpp/src/listener_thread.cpp +++ b/rmw_fastrtps_shared_cpp/src/listener_thread.cpp @@ -125,6 +125,8 @@ node_listener(rmw_context_t * context) rmw_guard_conditions_t guard_conditions; subscriptions.subscriber_count = 1; subscriptions.subscribers = subscriptions_buffer; + rcutils_time_point_value_t sub_timestamps[1] = { 0 }; + subscriptions.timestamps = sub_timestamps; guard_conditions.guard_condition_count = 1; guard_conditions.guard_conditions = guard_conditions_buffer; // number of conditions of a subscription is 2 diff --git a/rmw_fastrtps_shared_cpp/src/rmw_wait.cpp b/rmw_fastrtps_shared_cpp/src/rmw_wait.cpp index 86ce664e8b..421f526ab1 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_wait.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_wait.cpp @@ -1,4 +1,5 @@ // Copyright 2016-2018 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// Coypyright 2020 Robert Bosch GmbH // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -13,6 +14,7 @@ // limitations under the License. #include "fastrtps/subscriber/Subscriber.h" +#include "fastrtps/subscriber/SampleInfo.h" #include "rmw/error_handling.h" #include "rmw/rmw.h" @@ -191,6 +193,7 @@ __rmw_wait( // after we check, it will be caught on the next call to this function). lock.unlock(); + eprosima::fastrtps::SampleInfo_t si; if (subscriptions) { for (size_t i = 0; i < subscriptions->subscriber_count; ++i) { void * data = subscriptions->subscribers[i]; @@ -198,6 +201,10 @@ __rmw_wait( custom_subscriber_info->listener_->detachCondition(); if (!custom_subscriber_info->listener_->hasData()) { subscriptions->subscribers[i] = 0; + subscriptions->timestamps[i] = 0; + } else { + subscriptions->timestamps[i] = custom_subscriber_info->subscriber_->\ + get_first_untaken_info(&si) ? si.receptionTimestamp.to_ns() : 0; } } } @@ -209,6 +216,10 @@ __rmw_wait( custom_client_info->listener_->detachCondition(); if (!custom_client_info->listener_->hasData()) { clients->clients[i] = 0; + clients->timestamps[i] = 0; + } else { + clients->timestamps[i] = custom_client_info->listener_->\ + peekSampleInfo().receptionTimestamp.to_ns(); } } } @@ -220,6 +231,10 @@ __rmw_wait( custom_service_info->listener_->detachCondition(); if (!custom_service_info->listener_->hasData()) { services->services[i] = 0; + services->timestamps[i] = 0; + } else { + services->timestamps[i] = custom_service_info->listener_->\ + peekSampleInfo().receptionTimestamp.to_ns(); } } }