diff --git a/rosbag2_transport/src/rosbag2_transport/qos.cpp b/rosbag2_transport/src/rosbag2_transport/qos.cpp index 4587f49659..9cfe892cf1 100644 --- a/rosbag2_transport/src/rosbag2_transport/qos.cpp +++ b/rosbag2_transport/src/rosbag2_transport/qos.cpp @@ -12,6 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include +#include + +#include "rosbag2_transport/logging.hpp" + #include "qos.hpp" namespace YAML @@ -68,3 +73,128 @@ bool convert::decode( return true; } } // namespace YAML + +namespace rosbag2_transport +{ +Rosbag2QoS Rosbag2QoS::adapt_request_to_offers( + const std::string & topic_name, const std::vector & endpoints) +{ + if (endpoints.empty()) { + return Rosbag2QoS{}; + } + size_t num_endpoints = endpoints.size(); + size_t reliability_reliable_endpoints_count = 0; + size_t durability_transient_local_endpoints_count = 0; + for (const auto & endpoint : endpoints) { + const auto & profile = endpoint.qos_profile().get_rmw_qos_profile(); + if (profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { + reliability_reliable_endpoints_count++; + } + if (profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { + durability_transient_local_endpoints_count++; + } + } + + // We set policies in order as defined in rmw_qos_profile_t + Rosbag2QoS request_qos{}; + // Policy: history, depth + // History does not affect compatibility + + // Policy: reliability + if (reliability_reliable_endpoints_count == num_endpoints) { + request_qos.reliable(); + } else { + if (reliability_reliable_endpoints_count > 0) { + ROSBAG2_TRANSPORT_LOG_WARN_STREAM( + "Some, but not all, publishers on topic \"" << topic_name << "\" " + "are offering RMW_QOS_POLICY_RELIABILITY_RELIABLE. " + "Falling back to RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT " + "as it will connect to all publishers. " + "Some messages from Reliable publishers could be dropped."); + } + request_qos.best_effort(); + } + + // Policy: durability + // If all publishers offer transient_local, we can request it and receive latched messages + if (durability_transient_local_endpoints_count == num_endpoints) { + request_qos.transient_local(); + } else { + if (durability_transient_local_endpoints_count > 0) { + ROSBAG2_TRANSPORT_LOG_WARN_STREAM( + "Some, but not all, publishers on topic \"" << topic_name << "\" " + "are offering RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL. " + "Falling back to RMW_QOS_POLICY_DURABILITY_VOLATILE " + "as it will connect to all publishers. " + "Previously-published latched messages will not be retrieved."); + } + request_qos.durability_volatile(); + } + // Policy: deadline + // Deadline does not affect delivery of messages, + // and we do not record Deadline"Missed events. + // We can always use unspecified deadline, which will be compatible with all publishers. + + // Policy: lifespan + // Lifespan does not affect compatibiliy + + // Policy: liveliness, liveliness_lease_duration + // Liveliness does not affect delivery of messages, + // and we do not record LivelinessChanged events. + // We can always use unspecified liveliness, which will be compatible with all publishers. + return request_qos; +} + +namespace +{ +bool operator==(const rmw_time_t & lhs, const rmw_time_t & rhs) +{ + return lhs.sec == rhs.sec && lhs.nsec == rhs.nsec; +} + +/** Check if all QoS profiles in the vector are identical when only looking at + * policies that affect compatibility. + * This means it excludes history and lifespan from the equality check. + */ +bool all_profiles_effectively_same(const std::vector & profiles) +{ + auto iterator = profiles.begin(); + const auto p_ref = iterator->get_rmw_qos_profile(); + iterator++; + for (; iterator != profiles.end(); iterator++) { + const auto p_next = iterator->get_rmw_qos_profile(); + bool compatibility_equals_previous = ( + // excluding history + p_ref.reliability == p_next.reliability && + p_ref.durability == p_next.durability && + p_ref.deadline == p_next.deadline && + // excluding lifespan + p_ref.liveliness == p_next.liveliness && + p_ref.liveliness_lease_duration == p_next.liveliness_lease_duration + ); + if (!compatibility_equals_previous) { + return false; + } + } + return true; +} +} // unnamed namespace + +Rosbag2QoS Rosbag2QoS::adapt_offer_to_recorded_offers( + const std::string & topic_name, const std::vector & profiles) +{ + if (profiles.empty()) { + return Rosbag2QoS{}; + } + if (all_profiles_effectively_same(profiles)) { + auto result = profiles[0]; + return result.default_history(); + } + + ROSBAG2_TRANSPORT_LOG_WARN_STREAM( + "Not all original publishers on topic " << topic_name << " offered the same QoS profiles. " + "Rosbag2 cannot yet choose an adapted profile to offer for this mixed case. " + "Falling back to the rosbag2_transport default publisher offer."); + return Rosbag2QoS{}; +} +} // namespace rosbag2_transport diff --git a/rosbag2_transport/src/rosbag2_transport/qos.hpp b/rosbag2_transport/src/rosbag2_transport/qos.hpp index f158fcfb55..7425976d60 100644 --- a/rosbag2_transport/src/rosbag2_transport/qos.hpp +++ b/rosbag2_transport/src/rosbag2_transport/qos.hpp @@ -15,6 +15,10 @@ #ifndef ROSBAG2_TRANSPORT__QOS_HPP_ #define ROSBAG2_TRANSPORT__QOS_HPP_ +#include +#include + +#include "rclcpp/node_interfaces/node_graph_interface.hpp" #include "rclcpp/qos.hpp" #ifdef _WIN32 @@ -46,6 +50,30 @@ class Rosbag2QoS : public rclcpp::QoS keep_last(rmw_qos_profile_default.depth); return *this; } + + // Create an adaptive QoS profile to use for subscribing to a set of offers from publishers. + /** + * - Uses rosbag2_transport defaults for History since they do not affect compatibility. + * - Adapts Durability and Reliability, falling back to the least strict publisher when there + * is a mixed offer. This behavior errs on the side of forming connections with all publishers. + * - Does not specify Lifespan, Deadline, or Liveliness to be maximally compatible, because + * these policies do not affect message delivery. + */ + static Rosbag2QoS adapt_request_to_offers( + const std::string & topic_name, + const std::vector & endpoints); + + // Create a QoS profile to offer for playback. + /** + * This logic exists because rosbag2 does not record on a per-publisher basis, so we try to + * get as close as possible to the original system's behavior, given a single publisher. + * If all profiles are the same (excepting History & Lifespan, which are purely local), + * that exact value is returned. + * Otherwise, fall back to the rosbag2 default and emit a warning. + */ + static Rosbag2QoS adapt_offer_to_recorded_offers( + const std::string & topic_name, + const std::vector & profiles); }; } // namespace rosbag2_transport diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.cpp b/rosbag2_transport/src/rosbag2_transport/recorder.cpp index 1e1cc2780d..3257f0d0b2 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.cpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.cpp @@ -201,77 +201,9 @@ rclcpp::QoS Recorder::subscription_qos_for_topic(const std::string & topic_name) if (topic_qos_profile_overrides_.count(topic_name)) { ROSBAG2_TRANSPORT_LOG_INFO_STREAM("Overriding subscription profile for " << topic_name); return topic_qos_profile_overrides_.at(topic_name); - } else { - return adapt_qos_to_publishers(topic_name); } - return adapt_qos_to_publishers(topic_name); -} - -rclcpp::QoS Recorder::adapt_qos_to_publishers(const std::string & topic_name) const -{ - auto endpoints = node_->get_publishers_info_by_topic(topic_name); - size_t num_endpoints = endpoints.size(); - size_t reliability_reliable_endpoints_count = 0; - size_t durability_transient_local_endpoints_count = 0; - for (const auto & endpoint : endpoints) { - const auto & profile = endpoint.qos_profile().get_rmw_qos_profile(); - if (profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { - reliability_reliable_endpoints_count++; - } - if (profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { - durability_transient_local_endpoints_count++; - } - } - - // We set policies in order as defined in rmw_qos_profile_t - Rosbag2QoS request_qos; - // Policy: history, depth - // History does not affect compatibility - request_qos.default_history(); - - // Policy: reliability - if (reliability_reliable_endpoints_count == num_endpoints) { - request_qos.reliable(); - } else { - if (reliability_reliable_endpoints_count > 0) { - ROSBAG2_TRANSPORT_LOG_WARN_STREAM( - "Some, but not all, publishers on topic \"" << topic_name << "\" " - "are offering RMW_QOS_POLICY_RELIABILITY_RELIABLE. " - "Falling back to RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT " - "as it will connect to all publishers. " - "Some messages from Reliable publishers could be dropped."); - } - request_qos.best_effort(); - } - - // Policy: durability - // If all publishers offer transient_local, we can request it and receive latched messages - if (durability_transient_local_endpoints_count == num_endpoints) { - request_qos.transient_local(); - } else { - if (durability_transient_local_endpoints_count > 0) { - ROSBAG2_TRANSPORT_LOG_WARN_STREAM( - "Some, but not all, publishers on topic \"" << topic_name << "\" " - "are offering RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL. " - "Falling back to RMW_QOS_POLICY_DURABILITY_VOLATILE " - "as it will connect to all publishers. " - "Previously-published latched messages will not be retrieved."); - } - request_qos.durability_volatile(); - } - // Policy: deadline - // Deadline does not affect delivery of messages, - // and we do not record Deadline"Missed events. - // We can always use unspecified deadline, which will be compatible with all publishers. - - // Policy: lifespan - // Lifespan does not affect compatibiliy - - // Policy: liveliness, liveliness_lease_duration - // Liveliness does not affect delivery of messages, - // and we do not record LivelinessChanged events. - // We can always use unspecified liveliness, which will be compatible with all publishers. - return request_qos; + return Rosbag2QoS::adapt_request_to_offers( + topic_name, node_->get_publishers_info_by_topic(topic_name)); } void Recorder::warn_if_new_qos_for_subscribed_topic(const std::string & topic_name) diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.hpp b/rosbag2_transport/src/rosbag2_transport/recorder.hpp index 59f4032c29..6b1976adae 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.hpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.hpp @@ -88,21 +88,13 @@ class Recorder /** * Find the QoS profile that should be used for subscribing. * - * Profiles are prioritized by: - * 1. The override specified in the record_options, if one exists for the topic. - * 2. Adapted subscription via `adapt_qos_to_publishers` + * Uses the override from record_options, if it is specified for this topic. + * Otherwise, falls back to Rosbag2QoS::adapt_request_to_offers * * \param topic_name The full name of the topic, with namespace (ex. /arm/joint_status). * \return The QoS profile to be used for subscribing. */ rclcpp::QoS subscription_qos_for_topic(const std::string & topic_name) const; - /** - * Try to subscribe using publishers' offered QoS policies. - * - * Fall back to sensible defaults when we can't adapt robustly, - * erring in favor of creating compatible connections. - */ - rclcpp::QoS adapt_qos_to_publishers(const std::string & topic_name) const; // Serialize all currently offered QoS profiles for a topic into a YAML list. std::string serialized_offered_qos_profiles_for_topic(const std::string & topic_name); diff --git a/rosbag2_transport/test/rosbag2_transport/test_qos.cpp b/rosbag2_transport/test/rosbag2_transport/test_qos.cpp index 248abeb096..50e0774103 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_qos.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_qos.cpp @@ -95,3 +95,140 @@ TEST(TestQoS, detect_new_qos_fields) }; EXPECT_EQ(profile.history, RMW_QOS_POLICY_HISTORY_SYSTEM_DEFAULT); // fix "unused variable" } + + +using rosbag2_transport::Rosbag2QoS; // NOLINT +class AdaptiveQoSTest : public ::testing::Test +{ +public: + AdaptiveQoSTest() = default; + + rclcpp::TopicEndpointInfo make_endpoint(const rclcpp::QoS & qos) + { + rcl_topic_endpoint_info_t endpoint_info { + "some_node_name", + "some_node_namespace", + "some_topic_type", + RMW_ENDPOINT_PUBLISHER, + {0}, + qos.get_rmw_qos_profile(), + }; + + return rclcpp::TopicEndpointInfo(endpoint_info); + } + + void add_endpoint(const rclcpp::QoS & qos) + { + endpoints_.push_back(make_endpoint(qos)); + } + + const std::string topic_name_{"/topic"}; + std::vector endpoints_{}; + const rosbag2_transport::Rosbag2QoS default_offer_{}; + const rosbag2_transport::Rosbag2QoS default_request_{}; +}; + +TEST_F(AdaptiveQoSTest, adapt_request_empty_returns_default) +{ + auto adapted_request = Rosbag2QoS::adapt_request_to_offers(topic_name_, endpoints_); + EXPECT_EQ(default_request_, adapted_request); +} + +TEST_F(AdaptiveQoSTest, adapt_request_single_offer_returns_same_values) +{ + // Set up this offer to use nondefault reliability and durability, + // expect to see those values in the output + auto nondefault_offer = Rosbag2QoS{}.best_effort().transient_local(); + add_endpoint(nondefault_offer); + + auto adapted_request = Rosbag2QoS::adapt_request_to_offers(topic_name_, endpoints_); + + auto expected = nondefault_offer.get_rmw_qos_profile(); + auto actual = adapted_request.get_rmw_qos_profile(); + EXPECT_EQ(expected.reliability, actual.reliability); + EXPECT_EQ(expected.durability, actual.durability); +} + +TEST_F(AdaptiveQoSTest, adapt_request_multiple_similar_offers_returns_same_values) +{ + auto nondefault_offer = Rosbag2QoS{}.best_effort().transient_local(); + const size_t num_endpoints{3}; + for (size_t i = 0; i < num_endpoints; i++) { + add_endpoint(nondefault_offer); + } + + auto adapted_request = Rosbag2QoS::adapt_request_to_offers(topic_name_, endpoints_); + + auto expected = nondefault_offer.get_rmw_qos_profile(); + auto actual = adapted_request.get_rmw_qos_profile(); + EXPECT_EQ(expected.reliability, actual.reliability); + EXPECT_EQ(expected.durability, actual.durability); +} + +TEST_F(AdaptiveQoSTest, adapt_request_mixed_reliability_offers_return_best_effort) +{ + add_endpoint(Rosbag2QoS{}.best_effort()); + add_endpoint(Rosbag2QoS{}.reliable()); + auto adapted_request = Rosbag2QoS::adapt_request_to_offers(topic_name_, endpoints_); + EXPECT_EQ( + adapted_request.get_rmw_qos_profile().reliability, RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT); +} + +TEST_F(AdaptiveQoSTest, adapt_request_mixed_durability_offers_return_volatile) +{ + add_endpoint(Rosbag2QoS{}.transient_local()); + add_endpoint(Rosbag2QoS{}.durability_volatile()); + auto adapted_request = Rosbag2QoS::adapt_request_to_offers(topic_name_, endpoints_); + EXPECT_EQ(adapted_request.get_rmw_qos_profile().durability, RMW_QOS_POLICY_DURABILITY_VOLATILE); +} + +TEST_F(AdaptiveQoSTest, adapt_offer_empty_returns_default) +{ + auto adapted_offer = Rosbag2QoS::adapt_offer_to_recorded_offers(topic_name_, {}); + EXPECT_EQ(adapted_offer, default_offer_); +} + +TEST_F(AdaptiveQoSTest, adapt_offer_single_offer_returns_same_values) +{ + // Set up this offer to use nondefault reliability and durability, + // expect to see those values in the output + auto nondefault_offer = Rosbag2QoS{Rosbag2QoS{}.best_effort().transient_local()}; + std::vector offers = {nondefault_offer}; + + auto adapted_offer = Rosbag2QoS::adapt_offer_to_recorded_offers(topic_name_, offers); + EXPECT_EQ(nondefault_offer, adapted_offer); +} + +TEST_F(AdaptiveQoSTest, adapt_offer_multiple_offers_with_same_settings_return_identical) +{ + auto nondefault_offer = Rosbag2QoS{Rosbag2QoS{}.best_effort().transient_local()}; + auto adapted_offer = Rosbag2QoS::adapt_offer_to_recorded_offers( + topic_name_, {nondefault_offer, nondefault_offer, nondefault_offer}); + EXPECT_EQ(nondefault_offer, adapted_offer); +} + +TEST_F(AdaptiveQoSTest, adapt_offer_mixed_compatibility_returns_default) +{ + // When the offers have mixed values for policies that affect compatibility, + // it should fall back to the default. + std::vector offers = { + Rosbag2QoS{Rosbag2QoS{}.best_effort()}, + Rosbag2QoS{Rosbag2QoS{}.reliable()}, + }; + auto adapted_offer = Rosbag2QoS::adapt_offer_to_recorded_offers(topic_name_, offers); + EXPECT_EQ(adapted_offer, default_offer_); +} + +TEST_F(AdaptiveQoSTest, adapt_offer_mixed_non_compatibility_returns_first) +{ + // Some QoS policies don't affect compatibility, so even if their values are mixed we should + // receive the first value. + rclcpp::Duration nonstandard_duration(12, 34); + size_t nonstandard_history{20}; + std::vector offers = { + Rosbag2QoS{Rosbag2QoS{}.lifespan(nonstandard_duration)}, + Rosbag2QoS{Rosbag2QoS{}.keep_last(nonstandard_history)}, + }; + auto adapted_offer = Rosbag2QoS::adapt_offer_to_recorded_offers(topic_name_, offers); + EXPECT_EQ(adapted_offer, offers[0]); +}