From 7c085b3e427172ea001b8de1cda24492d92d9c35 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Wed, 25 Mar 2020 12:06:57 +0100 Subject: [PATCH 1/4] Refs #7918. Uncrustify --- include/fastrtps/subscriber/Subscriber.h | 24 +- .../fastrtps/subscriber/SubscriberHistory.h | 308 +++++++++--------- .../subscriber/Subscriber.cpp | 27 +- .../subscriber/SubscriberHistory.cpp | 41 ++- .../subscriber/SubscriberImpl.cpp | 127 ++++---- .../subscriber/SubscriberImpl.h | 58 +++- 6 files changed, 320 insertions(+), 265 deletions(-) diff --git a/include/fastrtps/subscriber/Subscriber.h b/include/fastrtps/subscriber/Subscriber.h index 50830f1785a..cd5f041c4ef 100644 --- a/include/fastrtps/subscriber/Subscriber.h +++ b/include/fastrtps/subscriber/Subscriber.h @@ -43,14 +43,21 @@ class RTPS_DllAPI Subscriber { friend class SubscriberImpl; - virtual ~Subscriber() {} + virtual ~Subscriber() + { + } public: + /** * Constructor from a SubscriberImpl pointer * @param pimpl Actual implementation of the subscriber */ - Subscriber(SubscriberImpl* pimpl) : mp_impl(pimpl) {} + Subscriber( + SubscriberImpl* pimpl) + : mp_impl(pimpl) + { + } /** * Get the associated GUID @@ -75,7 +82,8 @@ class RTPS_DllAPI Subscriber * @return true in case unread samples are available. * In other case, false. */ - bool wait_for_unread_samples(const Duration_t& timeout); + bool wait_for_unread_samples( + const Duration_t& timeout); /** * @brief Reads next unread sample from the Subscriber. @@ -106,7 +114,8 @@ class RTPS_DllAPI Subscriber * @param att Reference to a SubscriberAttributes object to update the parameters; * @return True if correctly updated, false if ANY of the updated parameters cannot be updated */ - bool updateAttributes(const SubscriberAttributes& att); + bool updateAttributes( + const SubscriberAttributes& att); /** * Get the Attributes of the Subscriber. @@ -141,15 +150,18 @@ class RTPS_DllAPI Subscriber * @brief Get the requested deadline missed status * @return The deadline missed status */ - void get_requested_deadline_missed_status(RequestedDeadlineMissedStatus& status); + void get_requested_deadline_missed_status( + RequestedDeadlineMissedStatus& status); /** * @brief Returns the liveliness changed status * @param status Liveliness changed status */ - void get_liveliness_changed_status(LivelinessChangedStatus& status); + void get_liveliness_changed_status( + LivelinessChangedStatus& status); private: + SubscriberImpl* mp_impl; }; diff --git a/include/fastrtps/subscriber/SubscriberHistory.h b/include/fastrtps/subscriber/SubscriberHistory.h index a85b97c8044..d8a6d8ce879 100644 --- a/include/fastrtps/subscriber/SubscriberHistory.h +++ b/include/fastrtps/subscriber/SubscriberHistory.h @@ -40,167 +40,167 @@ namespace fastrtps { * Class SubscriberHistory, container of the different CacheChanges of a subscriber * @ingroup FASTRTPS_MODULE */ -class SubscriberHistory: public rtps::ReaderHistory +class SubscriberHistory : public rtps::ReaderHistory { - public: - - /** - * Constructor. Requires information about the subscriber. - * @param topic_att TopicAttributes. - * @param type TopicDataType. - * @param qos ReaderQoS policy. - * @param payloadMax Maximum payload size per change. - * @param mempolicy Set wether the payloads ccan dynamically resized or not. - */ - SubscriberHistory( +public: + + /** + * Constructor. Requires information about the subscriber. + * @param topic_att TopicAttributes. + * @param type TopicDataType. + * @param qos ReaderQoS policy. + * @param payloadMax Maximum payload size per change. + * @param mempolicy Set wether the payloads ccan dynamically resized or not. + */ + SubscriberHistory( const TopicAttributes& topic_att, fastdds::dds::TopicDataType* type, const fastrtps::ReaderQos& qos, uint32_t payloadMax, rtps::MemoryManagementPolicy_t mempolicy); - virtual ~SubscriberHistory(); - - /** - * Called when a change is received by the Subscriber. Will add the change to the history. - * @pre Change should not be already present in the history. - * @param[in] change The received change - * @param unknown_missing_changes_up_to Number of missing changes before this one - * @return - */ - bool received_change( - rtps::CacheChange_t* change, - size_t unknown_missing_changes_up_to); - - /** @name Read or take data methods. - * Methods to read or take data from the History. - * @param data Pointer to the object where you want to read or take the information. - * @param info Pointer to a SampleInfo_t object where you want - * @param max_blocking_time Maximum time the function can be blocked. - * to store the information about the retrieved data - */ - ///@{ - bool readNextData( - void* data, - SampleInfo_t* info, - std::chrono::steady_clock::time_point& max_blocking_time); - - bool takeNextData( - void* data, - SampleInfo_t* info, - std::chrono::steady_clock::time_point& max_blocking_time); - ///@} - - /** - * This method is called to remove a change from the SubscriberHistory. - * @param change Pointer to the CacheChange_t. - * @return True if removed. - */ - bool remove_change_sub( - rtps::CacheChange_t* change); - - /** - * @brief A method to set the next deadline for the given instance - * @param handle The handle to the instance - * @param next_deadline_us The time point when the deadline will occur - * @return True if the deadline was set correctly - */ - bool set_next_deadline( - const rtps::InstanceHandle_t& handle, - const std::chrono::steady_clock::time_point& next_deadline_us); - - /** - * @brief A method to get the next instance handle that will miss the deadline and the time when the deadline will occur - * @param handle The handle to the instance - * @param next_deadline_us The time point when the instance will miss the deadline - * @return True if the deadline was retrieved successfully - */ - bool get_next_deadline( - rtps::InstanceHandle_t& handle, - std::chrono::steady_clock::time_point& next_deadline_us); - - private: - - using t_m_Inst_Caches = std::map; - - //!Map where keys are instance handles and values vectors of cache changes - t_m_Inst_Caches keyed_changes_; - //!Time point when the next deadline will occur (only used for topics with no key) - std::chrono::steady_clock::time_point next_deadline_us_; - //!HistoryQosPolicy values. - HistoryQosPolicy history_qos_; - //!ResourceLimitsQosPolicy values. - ResourceLimitsQosPolicy resource_limited_qos_; - //!Topic Attributes - TopicAttributes topic_att_; - //!TopicDataType - fastdds::dds::TopicDataType* type_; - //!ReaderQos - fastrtps::ReaderQos qos_; - - //!Type object to deserialize Key - void* get_key_object_; - - /// Function processing a received change - std::function receive_fn_; - - /** - * @brief Method that finds a key in m_keyedChanges or tries to add it if not found - * @param a_change The change to get the key from - * @param map_it A map iterator to the given key - * @return True if it was found or could be added to the map - */ - bool find_key( - rtps::CacheChange_t* a_change, - t_m_Inst_Caches::iterator* map_it); - - /** - * @brief Method that finds a key in m_keyedChanges or tries to add it if not found - * @param a_change The change to get the key from - * @param map_it A map iterator to the given key - * @return True if it was found or could be added to the map - */ - bool find_key_for_change( - rtps::CacheChange_t* a_change, - t_m_Inst_Caches::iterator& map_it); - - /** - * @name Variants of incoming change processing. - * Will be called with the history mutex taken. - * @param[in] change The received change - * @param unknown_missing_changes_up_to Number of missing changes before this one - * @return - */ - ///@{ - bool received_change_keep_all_no_key( - rtps::CacheChange_t* change, - size_t unknown_missing_changes_up_to); - - bool received_change_keep_last_no_key( - rtps::CacheChange_t* change, - size_t unknown_missing_changes_up_to); - - bool received_change_keep_all_with_key( - rtps::CacheChange_t* change, - size_t unknown_missing_changes_up_to); - - bool received_change_keep_last_with_key( - rtps::CacheChange_t* change, - size_t unknown_missing_changes_up_to); - ///@} - - bool add_received_change( - rtps::CacheChange_t* a_change); - - bool add_received_change_with_key( - rtps::CacheChange_t* a_change, - std::vector& instance_changes); - - bool deserialize_change( - rtps::CacheChange_t* change, - uint32_t ownership_strength, - void* data, - SampleInfo_t* info); + virtual ~SubscriberHistory(); + + /** + * Called when a change is received by the Subscriber. Will add the change to the history. + * @pre Change should not be already present in the history. + * @param[in] change The received change + * @param unknown_missing_changes_up_to Number of missing changes before this one + * @return + */ + bool received_change( + rtps::CacheChange_t* change, + size_t unknown_missing_changes_up_to); + + /** @name Read or take data methods. + * Methods to read or take data from the History. + * @param data Pointer to the object where you want to read or take the information. + * @param info Pointer to a SampleInfo_t object where you want + * @param max_blocking_time Maximum time the function can be blocked. + * to store the information about the retrieved data + */ + ///@{ + bool readNextData( + void* data, + SampleInfo_t* info, + std::chrono::steady_clock::time_point& max_blocking_time); + + bool takeNextData( + void* data, + SampleInfo_t* info, + std::chrono::steady_clock::time_point& max_blocking_time); + ///@} + + /** + * This method is called to remove a change from the SubscriberHistory. + * @param change Pointer to the CacheChange_t. + * @return True if removed. + */ + bool remove_change_sub( + rtps::CacheChange_t* change); + + /** + * @brief A method to set the next deadline for the given instance + * @param handle The handle to the instance + * @param next_deadline_us The time point when the deadline will occur + * @return True if the deadline was set correctly + */ + bool set_next_deadline( + const rtps::InstanceHandle_t& handle, + const std::chrono::steady_clock::time_point& next_deadline_us); + + /** + * @brief A method to get the next instance handle that will miss the deadline and the time when the deadline will occur + * @param handle The handle to the instance + * @param next_deadline_us The time point when the instance will miss the deadline + * @return True if the deadline was retrieved successfully + */ + bool get_next_deadline( + rtps::InstanceHandle_t& handle, + std::chrono::steady_clock::time_point& next_deadline_us); + +private: + + using t_m_Inst_Caches = std::map; + + //!Map where keys are instance handles and values vectors of cache changes + t_m_Inst_Caches keyed_changes_; + //!Time point when the next deadline will occur (only used for topics with no key) + std::chrono::steady_clock::time_point next_deadline_us_; + //!HistoryQosPolicy values. + HistoryQosPolicy history_qos_; + //!ResourceLimitsQosPolicy values. + ResourceLimitsQosPolicy resource_limited_qos_; + //!Topic Attributes + TopicAttributes topic_att_; + //!TopicDataType + fastdds::dds::TopicDataType* type_; + //!ReaderQos + fastrtps::ReaderQos qos_; + + //!Type object to deserialize Key + void* get_key_object_; + + /// Function processing a received change + std::function receive_fn_; + + /** + * @brief Method that finds a key in m_keyedChanges or tries to add it if not found + * @param a_change The change to get the key from + * @param map_it A map iterator to the given key + * @return True if it was found or could be added to the map + */ + bool find_key( + rtps::CacheChange_t* a_change, + t_m_Inst_Caches::iterator* map_it); + + /** + * @brief Method that finds a key in m_keyedChanges or tries to add it if not found + * @param a_change The change to get the key from + * @param map_it A map iterator to the given key + * @return True if it was found or could be added to the map + */ + bool find_key_for_change( + rtps::CacheChange_t* a_change, + t_m_Inst_Caches::iterator& map_it); + + /** + * @name Variants of incoming change processing. + * Will be called with the history mutex taken. + * @param[in] change The received change + * @param unknown_missing_changes_up_to Number of missing changes before this one + * @return + */ + ///@{ + bool received_change_keep_all_no_key( + rtps::CacheChange_t* change, + size_t unknown_missing_changes_up_to); + + bool received_change_keep_last_no_key( + rtps::CacheChange_t* change, + size_t unknown_missing_changes_up_to); + + bool received_change_keep_all_with_key( + rtps::CacheChange_t* change, + size_t unknown_missing_changes_up_to); + + bool received_change_keep_last_with_key( + rtps::CacheChange_t* change, + size_t unknown_missing_changes_up_to); + ///@} + + bool add_received_change( + rtps::CacheChange_t* a_change); + + bool add_received_change_with_key( + rtps::CacheChange_t* a_change, + std::vector& instance_changes); + + bool deserialize_change( + rtps::CacheChange_t* change, + uint32_t ownership_strength, + void* data, + SampleInfo_t* info); }; } // namespace fastrtps diff --git a/src/cpp/fastrtps_deprecated/subscriber/Subscriber.cpp b/src/cpp/fastrtps_deprecated/subscriber/Subscriber.cpp index a25eac18c7f..f2c258d64c4 100644 --- a/src/cpp/fastrtps_deprecated/subscriber/Subscriber.cpp +++ b/src/cpp/fastrtps_deprecated/subscriber/Subscriber.cpp @@ -30,21 +30,28 @@ const GUID_t& Subscriber::getGuid() return mp_impl->getGuid(); } -bool Subscriber::wait_for_unread_samples(const Duration_t& timeout) +bool Subscriber::wait_for_unread_samples( + const Duration_t& timeout) { return mp_impl->wait_for_unread_samples(timeout); } -bool Subscriber::readNextData(void* data,SampleInfo_t* info) +bool Subscriber::readNextData( + void* data, + SampleInfo_t* info) { - return mp_impl->readNextData(data,info); + return mp_impl->readNextData(data, info); } -bool Subscriber::takeNextData(void* data,SampleInfo_t* info) + +bool Subscriber::takeNextData( + void* data, + SampleInfo_t* info) { - return mp_impl->takeNextData(data,info); + return mp_impl->takeNextData(data, info); } -bool Subscriber::updateAttributes(const SubscriberAttributes& att) +bool Subscriber::updateAttributes( + const SubscriberAttributes& att) { return mp_impl->updateAttributes(att); } @@ -61,15 +68,17 @@ bool Subscriber::isInCleanState() const uint64_t Subscriber::get_unread_count() const { - return mp_impl->get_unread_count(); + return mp_impl->get_unread_count(); } -void Subscriber::get_requested_deadline_missed_status(RequestedDeadlineMissedStatus& status) +void Subscriber::get_requested_deadline_missed_status( + RequestedDeadlineMissedStatus& status) { mp_impl->get_requested_deadline_missed_status(status); } -void Subscriber::get_liveliness_changed_status(LivelinessChangedStatus &status) +void Subscriber::get_liveliness_changed_status( + LivelinessChangedStatus& status) { mp_impl->get_liveliness_changed_status(status); } diff --git a/src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp b/src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp index 5b740d0c075..2cd2d761e1f 100644 --- a/src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp +++ b/src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp @@ -71,14 +71,14 @@ SubscriberHistory::SubscriberHistory( if (topic_att.getTopicKind() == NO_KEY) { receive_fn_ = topic_att.historyQos.kind == KEEP_ALL_HISTORY_QOS ? - std::bind(&SubscriberHistory::received_change_keep_all_no_key, this, _1, _2) : - std::bind(&SubscriberHistory::received_change_keep_last_no_key, this, _1, _2); + std::bind(&SubscriberHistory::received_change_keep_all_no_key, this, _1, _2) : + std::bind(&SubscriberHistory::received_change_keep_last_no_key, this, _1, _2); } else { receive_fn_ = topic_att.historyQos.kind == KEEP_ALL_HISTORY_QOS ? - std::bind(&SubscriberHistory::received_change_keep_all_with_key, this, _1, _2) : - std::bind(&SubscriberHistory::received_change_keep_last_with_key, this, _1, _2); + std::bind(&SubscriberHistory::received_change_keep_all_with_key, this, _1, _2) : + std::bind(&SubscriberHistory::received_change_keep_last_with_key, this, _1, _2); } } @@ -157,7 +157,7 @@ bool SubscriberHistory::received_change_keep_all_with_key( return add_received_change_with_key(a_change, vit->second.cache_changes); } - logWarning(SUBSCRIBER, "Change not added due to maximum number of samples per instance";); + logWarning(SUBSCRIBER, "Change not added due to maximum number of samples per instance"); } return false; @@ -211,8 +211,8 @@ bool SubscriberHistory::add_received_change( } logInfo(SUBSCRIBER, topic_att_.getTopicDataType() - << ": Change " << a_change->sequenceNumber << " added from: " - << a_change->writerGUID;); + << ": Change " << a_change->sequenceNumber << " added from: " + << a_change->writerGUID; ); return true; } @@ -245,8 +245,8 @@ bool SubscriberHistory::add_received_change_with_key( instance_changes.push_back(a_change); logInfo(SUBSCRIBER, mp_reader->getGuid().entityId - << ": Change " << a_change->sequenceNumber << " added from: " - << a_change->writerGUID << " with KEY: " << a_change->instanceHandle;); + << ": Change " << a_change->sequenceNumber << " added from: " + << a_change->writerGUID << " with KEY: " << a_change->instanceHandle; ); return true; } @@ -274,7 +274,7 @@ bool SubscriberHistory::find_key_for_change( else if (!a_change->instanceHandle.isDefined()) { logWarning(SUBSCRIBER, "NO KEY in topic: " << topic_att_.topicName - << " and no method to obtain it";); + << " and no method to obtain it"; ); return false; } @@ -348,7 +348,6 @@ bool SubscriberHistory::readNextData( return false; } - bool SubscriberHistory::takeNextData( void* data, SampleInfo_t* info, @@ -400,7 +399,7 @@ bool SubscriberHistory::find_key( } else { - for (vit = keyed_changes_.begin(); vit!= keyed_changes_.end(); ++vit) + for (vit = keyed_changes_.begin(); vit != keyed_changes_.end(); ++vit) { if (vit->second.cache_changes.size() == 0) { @@ -486,8 +485,8 @@ bool SubscriberHistory::set_next_deadline( } bool SubscriberHistory::get_next_deadline( - InstanceHandle_t &handle, - std::chrono::steady_clock::time_point &next_deadline_us) + InstanceHandle_t& handle, + std::chrono::steady_clock::time_point& next_deadline_us) { if (mp_reader == nullptr || mp_mutex == nullptr) { @@ -504,13 +503,13 @@ bool SubscriberHistory::get_next_deadline( else if (topic_att_.getTopicKind() == WITH_KEY) { auto min = std::min_element(keyed_changes_.begin(), - keyed_changes_.end(), - []( - const std::pair &lhs, - const std::pair &rhs) - { - return lhs.second.next_deadline_us < rhs.second.next_deadline_us; - }); + keyed_changes_.end(), + []( + const std::pair& lhs, + const std::pair& rhs) + { + return lhs.second.next_deadline_us < rhs.second.next_deadline_us; + }); handle = min->first; next_deadline_us = min->second.next_deadline_us; return true; diff --git a/src/cpp/fastrtps_deprecated/subscriber/SubscriberImpl.cpp b/src/cpp/fastrtps_deprecated/subscriber/SubscriberImpl.cpp index b66fe5b11cc..558c070a241 100644 --- a/src/cpp/fastrtps_deprecated/subscriber/SubscriberImpl.cpp +++ b/src/cpp/fastrtps_deprecated/subscriber/SubscriberImpl.cpp @@ -51,10 +51,10 @@ SubscriberImpl::SubscriberImpl( , m_att(att) #pragma warning (disable : 4355 ) , m_history(att.topic, - ptype, - att.qos, - ptype->m_typeSize + 3/*Possible alignment*/, - att.historyMemoryPolicy) + ptype, + att.qos, + ptype->m_typeSize + 3 /*Possible alignment*/, + att.historyMemoryPolicy) , mp_listener(listen) , m_readerListener(this) , mp_userSubscriber(nullptr) @@ -64,18 +64,18 @@ SubscriberImpl::SubscriberImpl( , lifespan_duration_us_(m_att.qos.m_lifespan.duration.to_ns() * 1e-3) { deadline_timer_ = new TimedEvent(mp_participant->get_resource_event(), - [&]() -> bool + [&]() -> bool { return deadline_missed(); }, - att.qos.m_deadline.period.to_ns() * 1e-6); + att.qos.m_deadline.period.to_ns() * 1e-6); lifespan_timer_ = new TimedEvent(mp_participant->get_resource_event(), - [&]() -> bool + [&]() -> bool { return lifespan_expired(); }, - att.qos.m_lifespan.duration.to_ns() * 1e-6); + att.qos.m_lifespan.duration.to_ns() * 1e-6); } SubscriberImpl::~SubscriberImpl() @@ -83,38 +83,43 @@ SubscriberImpl::~SubscriberImpl() delete(lifespan_timer_); delete(deadline_timer_); - if(mp_reader != nullptr) + if (mp_reader != nullptr) { - logInfo(SUBSCRIBER,this->getGuid().entityId << " in topic: "<m_att.topic.topicName); + logInfo(SUBSCRIBER, this->getGuid().entityId << " in topic: " << this->m_att.topic.topicName); } RTPSDomain::removeRTPSReader(mp_reader); delete(this->mp_userSubscriber); } -bool SubscriberImpl::wait_for_unread_samples(const Duration_t& timeout) +bool SubscriberImpl::wait_for_unread_samples( + const Duration_t& timeout) { return mp_reader->wait_for_unread_cache(timeout); } -bool SubscriberImpl::readNextData(void* data,SampleInfo_t* info) +bool SubscriberImpl::readNextData( + void* data, + SampleInfo_t* info) { auto max_blocking_time = std::chrono::steady_clock::now() + #if HAVE_STRICT_REALTIME - std::chrono::microseconds(::TimeConv::Time_t2MicroSecondsInt64(m_att.qos.m_reliability.max_blocking_time)); + std::chrono::microseconds(::TimeConv::Time_t2MicroSecondsInt64(m_att.qos.m_reliability.max_blocking_time)); #else - std::chrono::hours(24); + std::chrono::hours(24); #endif return this->m_history.readNextData(data, info, max_blocking_time); } -bool SubscriberImpl::takeNextData(void* data,SampleInfo_t* info) +bool SubscriberImpl::takeNextData( + void* data, + SampleInfo_t* info) { auto max_blocking_time = std::chrono::steady_clock::now() + #if HAVE_STRICT_REALTIME - std::chrono::microseconds(::TimeConv::Time_t2MicroSecondsInt64(m_att.qos.m_reliability.max_blocking_time)); + std::chrono::microseconds(::TimeConv::Time_t2MicroSecondsInt64(m_att.qos.m_reliability.max_blocking_time)); #else - std::chrono::hours(24); + std::chrono::hours(24); #endif return this->m_history.takeNextData(data, info, max_blocking_time); } @@ -124,80 +129,81 @@ const GUID_t& SubscriberImpl::getGuid() return mp_reader->getGuid(); } -bool SubscriberImpl::updateAttributes(const SubscriberAttributes& att) +bool SubscriberImpl::updateAttributes( + const SubscriberAttributes& att) { bool updated = true; bool missing = false; - if(att.unicastLocatorList.size() != this->m_att.unicastLocatorList.size() || + if (att.unicastLocatorList.size() != this->m_att.unicastLocatorList.size() || att.multicastLocatorList.size() != this->m_att.multicastLocatorList.size()) { - logWarning(RTPS_READER,"Locator Lists cannot be changed or updated in this version"); + logWarning(RTPS_READER, "Locator Lists cannot be changed or updated in this version"); updated &= false; } else { - for(LocatorListConstIterator lit1 = this->m_att.unicastLocatorList.begin(); - lit1!=this->m_att.unicastLocatorList.end();++lit1) + for (LocatorListConstIterator lit1 = this->m_att.unicastLocatorList.begin(); + lit1 != this->m_att.unicastLocatorList.end(); ++lit1) { missing = true; - for(LocatorListConstIterator lit2 = att.unicastLocatorList.begin(); - lit2!= att.unicastLocatorList.end();++lit2) + for (LocatorListConstIterator lit2 = att.unicastLocatorList.begin(); + lit2 != att.unicastLocatorList.end(); ++lit2) { - if(*lit1 == *lit2) + if (*lit1 == *lit2) { missing = false; break; } } - if(missing) + if (missing) { - logWarning(RTPS_READER,"Locator: "<< *lit1 << " not present in new list"); - logWarning(RTPS_READER,"Locator Lists cannot be changed or updated in this version"); + logWarning(RTPS_READER, "Locator: " << *lit1 << " not present in new list"); + logWarning(RTPS_READER, "Locator Lists cannot be changed or updated in this version"); } } - for(LocatorListConstIterator lit1 = this->m_att.multicastLocatorList.begin(); - lit1!=this->m_att.multicastLocatorList.end();++lit1) + for (LocatorListConstIterator lit1 = this->m_att.multicastLocatorList.begin(); + lit1 != this->m_att.multicastLocatorList.end(); ++lit1) { missing = true; - for(LocatorListConstIterator lit2 = att.multicastLocatorList.begin(); - lit2!= att.multicastLocatorList.end();++lit2) + for (LocatorListConstIterator lit2 = att.multicastLocatorList.begin(); + lit2 != att.multicastLocatorList.end(); ++lit2) { - if(*lit1 == *lit2) + if (*lit1 == *lit2) { missing = false; break; } } - if(missing) + if (missing) { - logWarning(RTPS_READER,"Locator: "<< *lit1<< " not present in new list"); - logWarning(RTPS_READER,"Locator Lists cannot be changed or updated in this version"); + logWarning(RTPS_READER, "Locator: " << *lit1 << " not present in new list"); + logWarning(RTPS_READER, "Locator Lists cannot be changed or updated in this version"); } } } //TOPIC ATTRIBUTES - if(this->m_att.topic != att.topic) + if (this->m_att.topic != att.topic) { - logWarning(RTPS_READER,"Topic Attributes cannot be updated"); + logWarning(RTPS_READER, "Topic Attributes cannot be updated"); updated &= false; } //QOS: //CHECK IF THE QOS CAN BE SET - if(!this->m_att.qos.canQosBeUpdated(att.qos)) + if (!this->m_att.qos.canQosBeUpdated(att.qos)) { - updated &=false; + updated &= false; } - if(updated) + if (updated) { this->m_att.expectsInlineQos = att.expectsInlineQos; - if(this->m_att.qos.m_reliability.kind == RELIABLE_RELIABILITY_QOS) + if (this->m_att.qos.m_reliability.kind == RELIABLE_RELIABILITY_QOS) { //UPDATE TIMES: StatefulReader* sfr = (StatefulReader*)mp_reader; sfr->updateTimes(att.times); } - this->m_att.qos.setQos(att.qos,false); + this->m_att.qos.setQos(att.qos, false); //NOTIFY THE BUILTIN PROTOCOLS THAT THE READER HAS CHANGED mp_rtpsParticipant->updateReader(this->mp_reader, m_att.topic, m_att.qos); @@ -206,7 +212,7 @@ bool SubscriberImpl::updateAttributes(const SubscriberAttributes& att) if (m_att.qos.m_deadline.period != c_TimeInfinite) { deadline_duration_us_ = - duration>(m_att.qos.m_deadline.period.to_ns() * 1e-3); + duration >(m_att.qos.m_deadline.period.to_ns() * 1e-3); deadline_timer_->update_interval_millisec(m_att.qos.m_deadline.period.to_ns() * 1e-6); } else @@ -219,7 +225,8 @@ bool SubscriberImpl::updateAttributes(const SubscriberAttributes& att) if (m_att.qos.m_lifespan.duration != c_TimeInfinite) { lifespan_duration_us_ = - std::chrono::duration>(m_att.qos.m_lifespan.duration.to_ns() * 1e-3); + std::chrono::duration >(m_att.qos.m_lifespan.duration.to_ns() * 1e-3); lifespan_timer_->update_interval_millisec(m_att.qos.m_lifespan.duration.to_ns() * 1e-6); } else @@ -233,11 +240,11 @@ bool SubscriberImpl::updateAttributes(const SubscriberAttributes& att) void SubscriberImpl::SubscriberReaderListener::onNewCacheChangeAdded( RTPSReader* /*reader*/, - const CacheChange_t * const change_in) + const CacheChange_t* const change_in) { if (mp_subscriberImpl->onNewCacheChangeAdded(change_in)) { - if(mp_subscriberImpl->mp_listener != nullptr) + if (mp_subscriberImpl->mp_listener != nullptr) { //cout << "FIRST BYTE: "<< (int)change->serializedPayload.data[0] << endl; mp_subscriberImpl->mp_listener->onNewDataMessage(mp_subscriberImpl->mp_userSubscriber); @@ -245,29 +252,32 @@ void SubscriberImpl::SubscriberReaderListener::onNewCacheChangeAdded( } } -void SubscriberImpl::SubscriberReaderListener::onReaderMatched(RTPSReader* /*reader*/, MatchingInfo& info) +void SubscriberImpl::SubscriberReaderListener::onReaderMatched( + RTPSReader* /*reader*/, + MatchingInfo& info) { if (this->mp_subscriberImpl->mp_listener != nullptr) { - mp_subscriberImpl->mp_listener->onSubscriptionMatched(mp_subscriberImpl->mp_userSubscriber,info); + mp_subscriberImpl->mp_listener->onSubscriptionMatched(mp_subscriberImpl->mp_userSubscriber, info); } } void SubscriberImpl::SubscriberReaderListener::on_liveliness_changed( - RTPSReader *reader, - const LivelinessChangedStatus &status) + RTPSReader* reader, + const LivelinessChangedStatus& status) { (void)reader; if (mp_subscriberImpl->mp_listener != nullptr) { mp_subscriberImpl->mp_listener->on_liveliness_changed( - mp_subscriberImpl->mp_userSubscriber, - status); + mp_subscriberImpl->mp_userSubscriber, + status); } } -bool SubscriberImpl::onNewCacheChangeAdded(const CacheChange_t* const change_in) +bool SubscriberImpl::onNewCacheChangeAdded( + const CacheChange_t* const change_in) { if (m_att.qos.m_deadline.period != c_TimeInfinite) { @@ -281,7 +291,7 @@ bool SubscriberImpl::onNewCacheChangeAdded(const CacheChange_t* const change_in) } else if (timer_owner_ == change_in->instanceHandle || timer_owner_ == InstanceHandle_t()) { - if(deadline_timer_reschedule()) + if (deadline_timer_reschedule()) { deadline_timer_->cancel_timer(); deadline_timer_->restart_timer(); @@ -387,8 +397,8 @@ bool SubscriberImpl::deadline_missed() return deadline_timer_reschedule(); } - -void SubscriberImpl::get_requested_deadline_missed_status(RequestedDeadlineMissedStatus& status) +void SubscriberImpl::get_requested_deadline_missed_status( + RequestedDeadlineMissedStatus& status) { std::unique_lock lock(mp_reader->getMutex()); @@ -437,7 +447,8 @@ bool SubscriberImpl::lifespan_expired() return true; } -void SubscriberImpl::get_liveliness_changed_status(LivelinessChangedStatus &status) +void SubscriberImpl::get_liveliness_changed_status( + LivelinessChangedStatus& status) { std::unique_lock lock(mp_reader->getMutex()); diff --git a/src/cpp/fastrtps_deprecated/subscriber/SubscriberImpl.h b/src/cpp/fastrtps_deprecated/subscriber/SubscriberImpl.h index 9cc708dd437..29dbdeed609 100644 --- a/src/cpp/fastrtps_deprecated/subscriber/SubscriberImpl.h +++ b/src/cpp/fastrtps_deprecated/subscriber/SubscriberImpl.h @@ -33,8 +33,7 @@ namespace eprosima { namespace fastrtps { -namespace rtps -{ +namespace rtps { class RTPSReader; class RTPSParticipant; class TimedEvent; @@ -53,7 +52,8 @@ class Subscriber; class SubscriberImpl { friend class ParticipantImpl; - public: + +public: /** * @param p @@ -73,7 +73,8 @@ class SubscriberImpl * Method to block the current thread until an unread sasmple is available. * @param timeout Maximun time the function will be blocked if any sample is received. */ - bool wait_for_unread_samples(const eprosima::fastrtps::Duration_t& timeout); + bool wait_for_unread_samples( + const eprosima::fastrtps::Duration_t& timeout); /** @name Read or take data methods. @@ -82,8 +83,12 @@ class SubscriberImpl ///@{ - bool readNextData(void* data,SampleInfo_t* info); - bool takeNextData(void* data,SampleInfo_t* info); + bool readNextData( + void* data, + SampleInfo_t* info); + bool takeNextData( + void* data, + SampleInfo_t* info); ///@} @@ -92,7 +97,8 @@ class SubscriberImpl * @param att Reference to a SubscriberAttributes object to update the parameters; * @return True if correctly updated, false if ANY of the updated parameters cannot be updated */ - bool updateAttributes(const SubscriberAttributes& att); + bool updateAttributes( + const SubscriberAttributes& att); /** * Get associated GUID @@ -104,13 +110,19 @@ class SubscriberImpl * Get the Attributes of the Subscriber. * @return Attributes of the Subscriber. */ - const SubscriberAttributes& getAttributes() const {return m_att;} + const SubscriberAttributes& getAttributes() const + { + return m_att; + } /** * Get topic data type * @return Topic data type */ - fastdds::dds::TopicDataType* getType() {return mp_type;} + fastdds::dds::TopicDataType* getType() + { + return mp_type; + } /*! * @brief Returns there is a clean state with all Publishers. @@ -131,19 +143,22 @@ class SubscriberImpl * @param change The cache change that has been added * @return True if the change was added (due to some QoS it could have been 'rejected') */ - bool onNewCacheChangeAdded(const rtps::CacheChange_t* const change); + bool onNewCacheChangeAdded( + const rtps::CacheChange_t* const change); /** * @brief Get the requested deadline missed status * @return The deadline missed status */ - void get_requested_deadline_missed_status(RequestedDeadlineMissedStatus& status); + void get_requested_deadline_missed_status( + RequestedDeadlineMissedStatus& status); /** * @brief Returns the liveliness changed status * @param status Liveliness changed status */ - void get_liveliness_changed_status(LivelinessChangedStatus& status); + void get_liveliness_changed_status( + LivelinessChangedStatus& status); private: @@ -163,9 +178,18 @@ class SubscriberImpl class SubscriberReaderListener : public rtps::ReaderListener { - public: - SubscriberReaderListener(SubscriberImpl* s): mp_subscriberImpl(s) {} - virtual ~SubscriberReaderListener() {} +public: + + SubscriberReaderListener( + SubscriberImpl* s) + : mp_subscriberImpl(s) + { + } + + virtual ~SubscriberReaderListener() + { + } + void onReaderMatched( rtps::RTPSReader* reader, rtps::MatchingInfo& info) override; @@ -185,7 +209,7 @@ class SubscriberImpl //! A timer used to check for deadlines rtps::TimedEvent* deadline_timer_; //! Deadline duration in microseconds - std::chrono::duration> deadline_duration_us_; + std::chrono::duration > deadline_duration_us_; //! The current timer owner, i.e. the instance which started the deadline timer rtps::InstanceHandle_t timer_owner_; //! Requested deadline missed status @@ -194,7 +218,7 @@ class SubscriberImpl //! A timed callback to remove expired samples rtps::TimedEvent* lifespan_timer_; //! The lifespan duration - std::chrono::duration> lifespan_duration_us_; + std::chrono::duration > lifespan_duration_us_; /** * @brief Method called when an instance misses the deadline From 3bead21935c954e5ec21965e19c1d1bf95798ade Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Wed, 25 Mar 2020 12:55:38 +0100 Subject: [PATCH 2/4] Refs #7918. Adding new method get_first_untaken_info. --- include/fastrtps/subscriber/Subscriber.h | 8 ++++ .../fastrtps/subscriber/SubscriberHistory.h | 8 ++++ .../subscriber/Subscriber.cpp | 6 +++ .../subscriber/SubscriberHistory.cpp | 44 +++++++++++++++---- .../subscriber/SubscriberImpl.cpp | 6 +++ .../subscriber/SubscriberImpl.h | 8 ++++ 6 files changed, 71 insertions(+), 9 deletions(-) diff --git a/include/fastrtps/subscriber/Subscriber.h b/include/fastrtps/subscriber/Subscriber.h index cd5f041c4ef..619fa9b160d 100644 --- a/include/fastrtps/subscriber/Subscriber.h +++ b/include/fastrtps/subscriber/Subscriber.h @@ -109,6 +109,14 @@ class RTPS_DllAPI Subscriber void* sample, SampleInfo_t* info); + /** + * @brief Returns information about the first untaken sample. + * @param [out] info Pointer to a SampleInfo_t structure to store first untaken sample information. + * @return true if sample info was returned. false if there is no sample to take. + */ + bool get_first_untaken_info( + SampleInfo_t* info); + /** * Update the Attributes of the subscriber; * @param att Reference to a SubscriberAttributes object to update the parameters; diff --git a/include/fastrtps/subscriber/SubscriberHistory.h b/include/fastrtps/subscriber/SubscriberHistory.h index d8a6d8ce879..1905d555bf3 100644 --- a/include/fastrtps/subscriber/SubscriberHistory.h +++ b/include/fastrtps/subscriber/SubscriberHistory.h @@ -91,6 +91,14 @@ class SubscriberHistory : public rtps::ReaderHistory std::chrono::steady_clock::time_point& max_blocking_time); ///@} + /** + * @brief Returns information about the first untaken sample. + * @param [out] info Pointer to a SampleInfo_t structure to store first untaken sample information. + * @return true if sample info was returned. false if there is no sample to take. + */ + bool get_first_untaken_info( + SampleInfo_t* info); + /** * This method is called to remove a change from the SubscriberHistory. * @param change Pointer to the CacheChange_t. diff --git a/src/cpp/fastrtps_deprecated/subscriber/Subscriber.cpp b/src/cpp/fastrtps_deprecated/subscriber/Subscriber.cpp index f2c258d64c4..81ddb81c5c5 100644 --- a/src/cpp/fastrtps_deprecated/subscriber/Subscriber.cpp +++ b/src/cpp/fastrtps_deprecated/subscriber/Subscriber.cpp @@ -50,6 +50,12 @@ bool Subscriber::takeNextData( return mp_impl->takeNextData(data, info); } +bool Subscriber::get_first_untaken_info( + SampleInfo_t* info) +{ + return mp_impl->get_first_untaken_info(info); +} + bool Subscriber::updateAttributes( const SubscriberAttributes& att) { diff --git a/src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp b/src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp index 2cd2d761e1f..64491c04b53 100644 --- a/src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp +++ b/src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp @@ -35,6 +35,20 @@ using namespace rtps; using eprosima::fastdds::dds::TopicDataType; +static void get_sample_info( + SampleInfo_t* info, + CacheChange_t* change, + uint32_t ownership_strength) +{ + info->sampleKind = change->kind; + info->sample_identity.writer_guid(change->writerGUID); + info->sample_identity.sequence_number(change->sequenceNumber); + info->sourceTimestamp = change->sourceTimestamp; + info->ownershipStrength = ownership_strength; + info->iHandle = change->instanceHandle; + info->related_sample_identity = change->write_params.sample_identity(); +} + SubscriberHistory::SubscriberHistory( const TopicAttributes& topic_att, TopicDataType* type, @@ -298,14 +312,9 @@ bool SubscriberHistory::deserialize_change( if (info != nullptr) { - info->sampleKind = change->kind; - info->sample_identity.writer_guid(change->writerGUID); - info->sample_identity.sequence_number(change->sequenceNumber); - info->sourceTimestamp = change->sourceTimestamp; - info->ownershipStrength = ownership_strength; if (topic_att_.topicKind == WITH_KEY && - change->instanceHandle == c_InstanceHandle_Unknown && - change->kind == ALIVE) + change->instanceHandle == c_InstanceHandle_Unknown && + change->kind == ALIVE) { bool is_key_protected = false; #if HAVE_SECURITY @@ -313,8 +322,8 @@ bool SubscriberHistory::deserialize_change( #endif type_->getKey(data, &change->instanceHandle, is_key_protected); } - info->iHandle = change->instanceHandle; - info->related_sample_identity = change->write_params.sample_identity(); + + get_sample_info(info, change, ownership_strength); } return true; @@ -380,6 +389,23 @@ bool SubscriberHistory::takeNextData( return false; } +bool SubscriberHistory::get_first_untaken_info( + SampleInfo_t* info) +{ + std::lock_guard lock(*mp_mutex); + + CacheChange_t* change = nullptr; + WriterProxy* wp = nullptr; + if (mp_reader->nextUntakenCache(&change, &wp)) + { + uint32_t ownership = wp && qos_.m_ownership.kind == EXCLUSIVE_OWNERSHIP_QOS ? wp->ownership_strength() : 0; + get_sample_info(info, change, ownership); + return true; + } + + return false; +} + bool SubscriberHistory::find_key( CacheChange_t* a_change, t_m_Inst_Caches::iterator* vit_out) diff --git a/src/cpp/fastrtps_deprecated/subscriber/SubscriberImpl.cpp b/src/cpp/fastrtps_deprecated/subscriber/SubscriberImpl.cpp index 558c070a241..243c280bc36 100644 --- a/src/cpp/fastrtps_deprecated/subscriber/SubscriberImpl.cpp +++ b/src/cpp/fastrtps_deprecated/subscriber/SubscriberImpl.cpp @@ -124,6 +124,12 @@ bool SubscriberImpl::takeNextData( return this->m_history.takeNextData(data, info, max_blocking_time); } +bool SubscriberImpl::get_first_untaken_info( + SampleInfo_t* info) +{ + return m_history.get_first_untaken_info(info); +} + const GUID_t& SubscriberImpl::getGuid() { return mp_reader->getGuid(); diff --git a/src/cpp/fastrtps_deprecated/subscriber/SubscriberImpl.h b/src/cpp/fastrtps_deprecated/subscriber/SubscriberImpl.h index 29dbdeed609..989b93a6294 100644 --- a/src/cpp/fastrtps_deprecated/subscriber/SubscriberImpl.h +++ b/src/cpp/fastrtps_deprecated/subscriber/SubscriberImpl.h @@ -92,6 +92,14 @@ class SubscriberImpl ///@} + /** + * @brief Returns information about the first untaken sample. + * @param [out] info Pointer to a SampleInfo_t structure to store first untaken sample information. + * @return true if sample info was returned. false if there is no sample to take. + */ + bool get_first_untaken_info( + SampleInfo_t* info); + /** * Update the Attributes of the subscriber; * @param att Reference to a SubscriberAttributes object to update the parameters; From 0a5970c48d45b3c7e8a6a7cf79d6f390ea4e2fbe Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Wed, 25 Mar 2020 13:07:01 +0100 Subject: [PATCH 3/4] Refs #7918. receptionTimestamp added to CacheChange_t. --- include/fastdds/rtps/common/CacheChange.h | 2 ++ src/cpp/rtps/reader/StatefulReader.cpp | 2 ++ src/cpp/rtps/reader/StatelessReader.cpp | 1 + 3 files changed, 5 insertions(+) diff --git a/include/fastdds/rtps/common/CacheChange.h b/include/fastdds/rtps/common/CacheChange.h index 44e9cac627a..1399ac45cd5 100644 --- a/include/fastdds/rtps/common/CacheChange.h +++ b/include/fastdds/rtps/common/CacheChange.h @@ -64,6 +64,8 @@ struct RTPS_DllAPI CacheChange_t bool isRead = false; //!Source TimeStamp (only used in Readers) Time_t sourceTimestamp; + //!Reception TimeStamp (only used in Readers) + Time_t receptionTimestamp; WriteParams write_params; bool is_untyped_ = true; diff --git a/src/cpp/rtps/reader/StatefulReader.cpp b/src/cpp/rtps/reader/StatefulReader.cpp index cdf69ec2118..9baf91a6daf 100644 --- a/src/cpp/rtps/reader/StatefulReader.cpp +++ b/src/cpp/rtps/reader/StatefulReader.cpp @@ -715,6 +715,7 @@ bool StatefulReader::change_received( { if (mp_history->received_change(a_change, 0)) { + Time_t::now(a_change->receptionTimestamp); update_last_notified(a_change->writerGUID, a_change->sequenceNumber); if (getListener() != nullptr) { @@ -737,6 +738,7 @@ bool StatefulReader::change_received( // inside the call to mp_history->received_change if (mp_history->received_change(a_change, unknown_missing_changes_up_to)) { + Time_t::now(a_change->receptionTimestamp); GUID_t proxGUID = prox->guid(); // If KEEP_LAST and history full, make older changes as lost. diff --git a/src/cpp/rtps/reader/StatelessReader.cpp b/src/cpp/rtps/reader/StatelessReader.cpp index e86e810e020..7cb6745d8fc 100644 --- a/src/cpp/rtps/reader/StatelessReader.cpp +++ b/src/cpp/rtps/reader/StatelessReader.cpp @@ -160,6 +160,7 @@ bool StatelessReader::change_received( { if (mp_history->received_change(change, 0)) { + Time_t::now(change->receptionTimestamp); update_last_notified(change->writerGUID, change->sequenceNumber); ++total_unread_; From 6d590e7ff2719f6b9a54f091c0695994f7352475 Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Wed, 25 Mar 2020 13:11:53 +0100 Subject: [PATCH 4/4] Refs #7918. receptionTimestamp added to SampleInfo_t. --- include/fastrtps/subscriber/SampleInfo.h | 22 ++++++++++++++----- .../subscriber/SubscriberHistory.cpp | 1 + 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/include/fastrtps/subscriber/SampleInfo.h b/include/fastrtps/subscriber/SampleInfo.h index 6416a2c190b..2609f5e3b13 100644 --- a/include/fastrtps/subscriber/SampleInfo.h +++ b/include/fastrtps/subscriber/SampleInfo.h @@ -34,18 +34,30 @@ namespace fastrtps { * Class SampleInfo_t with information that is provided along a sample when reading data from a Subscriber. * @ingroup FASTRTPS_MODULE */ -class RTPS_DllAPI SampleInfo_t { +class RTPS_DllAPI SampleInfo_t +{ public: - SampleInfo_t():sampleKind(rtps::ALIVE), ownershipStrength(0), - sample_identity(rtps::SampleIdentity::unknown()), related_sample_identity(rtps::SampleIdentity::unknown()) {} - virtual ~SampleInfo_t(){}; + SampleInfo_t() + : sampleKind(rtps::ALIVE) + , ownershipStrength(0) + , sample_identity(rtps::SampleIdentity::unknown()) + , related_sample_identity(rtps::SampleIdentity::unknown()) + { + } + + virtual ~SampleInfo_t() + { + } + //!Sample kind. rtps::ChangeKind_t sampleKind; //!Ownership Strength of the writer of the sample (0 if the ownership kind is set to SHARED_OWNERSHIP_QOS). uint32_t ownershipStrength; //!Source timestamp of the sample. rtps::Time_t sourceTimestamp; + //!Reception timestamp of the sample. + rtps::Time_t receptionTimestamp; //!InstanceHandle of the data rtps::InstanceHandle_t iHandle; @@ -54,7 +66,7 @@ class RTPS_DllAPI SampleInfo_t { rtps::SampleIdentity related_sample_identity; }; -} /* namespace */ +} /* namespace fastrtps */ } /* namespace eprosima */ #endif /* SAMPLEINFO_H_ */ diff --git a/src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp b/src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp index 64491c04b53..a6559d24c7a 100644 --- a/src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp +++ b/src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp @@ -44,6 +44,7 @@ static void get_sample_info( info->sample_identity.writer_guid(change->writerGUID); info->sample_identity.sequence_number(change->sequenceNumber); info->sourceTimestamp = change->sourceTimestamp; + info->receptionTimestamp = change->receptionTimestamp; info->ownershipStrength = ownership_strength; info->iHandle = change->instanceHandle; info->related_sample_identity = change->write_params.sample_identity();