From 459caec51f57304d37af88adf611ec6d4fc0d085 Mon Sep 17 00:00:00 2001 From: Emerson Knapp Date: Fri, 3 Apr 2020 13:43:55 -0700 Subject: [PATCH 01/16] Remove assumptions in tests about inconsistently-reported qos values, just check that they are reported at all Signed-off-by: Emerson Knapp --- .../test/rosbag2_transport/test_qos.cpp | 10 +++--- .../test/rosbag2_transport/test_record.cpp | 36 +++++++++---------- 2 files changed, 22 insertions(+), 24 deletions(-) diff --git a/rosbag2_transport/test/rosbag2_transport/test_qos.cpp b/rosbag2_transport/test/rosbag2_transport/test_qos.cpp index 37f2e135eb..4284828138 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_qos.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_qos.cpp @@ -64,18 +64,16 @@ TEST(TestQoS, supports_version_4) rmw_time_t zerotime{0, 0}; // Explicitly set up the same QoS profile in case defaults change - rclcpp::QoS expected_qos(10); - expected_qos - .keep_last(10) + auto expected_qos = rosbag2_transport::Rosbag2QoS{} + .default_history() .reliable() .durability_volatile() .deadline(zerotime) .lifespan(zerotime) .liveliness(RMW_QOS_POLICY_LIVELINESS_SYSTEM_DEFAULT) - .liveliness_lease_duration(zerotime) - .avoid_ros_namespace_conventions(false); + .liveliness_lease_duration(zerotime); // Any values not present in the YAML should take the default value in both profiles - EXPECT_EQ(actual_qos, expected_qos); + EXPECT_TRUE(actual_qos.compatibility_policies_equal(expected_qos)); } TEST(TestQoS, detect_new_qos_fields) diff --git a/rosbag2_transport/test/rosbag2_transport/test_record.cpp b/rosbag2_transport/test/rosbag2_transport/test_record.cpp index ced11aec86..827179668b 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record.cpp @@ -65,7 +65,6 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are EXPECT_THAT(array_messages[0]->float32_values, Eq(array_message->float32_values)); } -#ifdef ROSBAG2_ENABLE_ADAPTIVE_QOS_SUBSCRIPTION TEST_F(RecordIntegrationTestFixture, qos_is_stored_in_metadata) { auto string_message = get_messages_strings()[1]; @@ -80,26 +79,27 @@ TEST_F(RecordIntegrationTestFixture, qos_is_stored_in_metadata) auto recorded_topics = writer.get_topics(); std::string serialized_profiles = recorded_topics.at(topic).offered_qos_profiles; // Basic smoke test that the profile was serialized into the metadata as a string. - EXPECT_THAT( - serialized_profiles, ContainsRegex( - "- history: 3\n" - " depth: 0\n" - " reliability: 1\n" - " durability: 2\n" - " deadline:\n" - " sec: 2147483647\n" - " nsec: 4294967295\n" - " lifespan:\n" - " sec: 2147483647\n" - " nsec: 4294967295\n" - " liveliness: 1\n" - " liveliness_lease_duration:\n" - " sec: 2147483647\n" - " nsec: 4294967295\n" - " avoid_ros_namespace_conventions: false" + EXPECT_THAT(serialized_profiles, ContainsRegex("reliability: 1\n")); + EXPECT_THAT(serialized_profiles, ContainsRegex("durability: 2\n")); + EXPECT_THAT(serialized_profiles, ContainsRegex( + "deadline:\n" + " sec: .+\n" + " nsec: .+\n" + )); + EXPECT_THAT(serialized_profiles, ContainsRegex( + "lifespan:\n" + " sec: .+\n" + " nsec: .+\n" + )); + EXPECT_THAT(serialized_profiles, ContainsRegex("liveliness: 1\n")); + EXPECT_THAT(serialized_profiles, ContainsRegex( + "liveliness_lease_duration:\n" + " sec: .+\n" + " nsec: .+\n" )); } +#ifdef ROSBAG2_ENABLE_ADAPTIVE_QOS_SUBSCRIPTION TEST_F(RecordIntegrationTestFixture, records_sensor_data) { using clock = std::chrono::system_clock; From c97f1a14408b87a24d76f0c43d1ded4d107906a9 Mon Sep 17 00:00:00 2001 From: Emerson Knapp Date: Fri, 3 Apr 2020 14:59:00 -0700 Subject: [PATCH 02/16] Fix uncrustify Signed-off-by: Emerson Knapp --- .../test/rosbag2_transport/test_record.cpp | 27 ++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/rosbag2_transport/test/rosbag2_transport/test_record.cpp b/rosbag2_transport/test/rosbag2_transport/test_record.cpp index 827179668b..3187d53391 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record.cpp @@ -81,21 +81,24 @@ TEST_F(RecordIntegrationTestFixture, qos_is_stored_in_metadata) // Basic smoke test that the profile was serialized into the metadata as a string. EXPECT_THAT(serialized_profiles, ContainsRegex("reliability: 1\n")); EXPECT_THAT(serialized_profiles, ContainsRegex("durability: 2\n")); - EXPECT_THAT(serialized_profiles, ContainsRegex( - "deadline:\n" - " sec: .+\n" - " nsec: .+\n" + EXPECT_THAT( + serialized_profiles, ContainsRegex( + "deadline:\n" + " sec: .+\n" + " nsec: .+\n" )); - EXPECT_THAT(serialized_profiles, ContainsRegex( - "lifespan:\n" - " sec: .+\n" - " nsec: .+\n" + EXPECT_THAT( + serialized_profiles, ContainsRegex( + "lifespan:\n" + " sec: .+\n" + " nsec: .+\n" )); EXPECT_THAT(serialized_profiles, ContainsRegex("liveliness: 1\n")); - EXPECT_THAT(serialized_profiles, ContainsRegex( - "liveliness_lease_duration:\n" - " sec: .+\n" - " nsec: .+\n" + EXPECT_THAT( + serialized_profiles, ContainsRegex( + "liveliness_lease_duration:\n" + " sec: .+\n" + " nsec: .+\n" )); } From 1b87bda08861e68295a717dcb4f7087b0be4c36f Mon Sep 17 00:00:00 2001 From: Emerson Knapp Date: Fri, 3 Apr 2020 17:11:35 -0700 Subject: [PATCH 03/16] Add the compatibility equality function here for now Signed-off-by: Emerson Knapp --- .../src/rosbag2_transport/qos.cpp | 32 +++++++++++++++++++ .../src/rosbag2_transport/qos.hpp | 7 ++++ 2 files changed, 39 insertions(+) diff --git a/rosbag2_transport/src/rosbag2_transport/qos.cpp b/rosbag2_transport/src/rosbag2_transport/qos.cpp index 4587f49659..c66bde4b09 100644 --- a/rosbag2_transport/src/rosbag2_transport/qos.cpp +++ b/rosbag2_transport/src/rosbag2_transport/qos.cpp @@ -68,3 +68,35 @@ bool convert::decode( return true; } } // namespace YAML + + +namespace +{ +/// Check if two rmw_time_t have the same values. +bool operator==(const rmw_time_t & left, const rmw_time_t & right) +{ + return left.sec == right.sec && left.nsec == right.nsec; +} +} // unnamed namespace + + +namespace rosbag2_transport +{ + // TODO(emersonknapp) it is the long term goal to have similar logic exist within rmw/rcl/rclcpp, + // but it's not yet determined where that logic will go. We should remove this function before + // G-Turtle + bool Rosbag2QoS::compatibility_policies_exactly_equal(const rclcpp::QoS & other) const + { + // Note that these policies do not affect QoS compatibility + // * History + Depth (relevant only for local behavior) + // * Lifespan (only relevant locally on Publishers) + // * avoid ros namespace conventions (this affects choice of topic names, not quality of service) + const auto & pl = get_rmw_qos_profile(); + const auto & pr = other.get_rmw_qos_profile(); + return pl.reliability == pr.reliability && + pl.durability == pr.durability && + pl.deadline == pr.deadline && + pl.liveliness == pr.liveliness && + pl.liveliness_lease_duration == pr.liveliness_lease_duration; + } +} // namespace rosbag2_transport diff --git a/rosbag2_transport/src/rosbag2_transport/qos.hpp b/rosbag2_transport/src/rosbag2_transport/qos.hpp index f158fcfb55..04214da076 100644 --- a/rosbag2_transport/src/rosbag2_transport/qos.hpp +++ b/rosbag2_transport/src/rosbag2_transport/qos.hpp @@ -46,6 +46,13 @@ class Rosbag2QoS : public rclcpp::QoS keep_last(rmw_qos_profile_default.depth); return *this; } + + /// Determine if all policies that affect QoS compatibility are equal. + /* + Note that this method does not check if the QoS policies are compatible. + This is a simple check for exact equality in all policies that affect compatibility. + */ + bool compatibility_policies_exactly_equal(const rclcpp::QoS & other) const; }; } // namespace rosbag2_transport From 7bdab8a14b72a075f5fba719b00dc6db6ec958a4 Mon Sep 17 00:00:00 2001 From: Emerson Knapp Date: Fri, 3 Apr 2020 17:15:32 -0700 Subject: [PATCH 04/16] Fix cpplint Signed-off-by: Emerson Knapp --- .../src/rosbag2_transport/qos.cpp | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/rosbag2_transport/src/rosbag2_transport/qos.cpp b/rosbag2_transport/src/rosbag2_transport/qos.cpp index c66bde4b09..903bbe9f44 100644 --- a/rosbag2_transport/src/rosbag2_transport/qos.cpp +++ b/rosbag2_transport/src/rosbag2_transport/qos.cpp @@ -82,21 +82,21 @@ bool operator==(const rmw_time_t & left, const rmw_time_t & right) namespace rosbag2_transport { - // TODO(emersonknapp) it is the long term goal to have similar logic exist within rmw/rcl/rclcpp, - // but it's not yet determined where that logic will go. We should remove this function before - // G-Turtle - bool Rosbag2QoS::compatibility_policies_exactly_equal(const rclcpp::QoS & other) const - { - // Note that these policies do not affect QoS compatibility - // * History + Depth (relevant only for local behavior) - // * Lifespan (only relevant locally on Publishers) - // * avoid ros namespace conventions (this affects choice of topic names, not quality of service) - const auto & pl = get_rmw_qos_profile(); - const auto & pr = other.get_rmw_qos_profile(); - return pl.reliability == pr.reliability && - pl.durability == pr.durability && - pl.deadline == pr.deadline && - pl.liveliness == pr.liveliness && - pl.liveliness_lease_duration == pr.liveliness_lease_duration; - } +// TODO(emersonknapp) it is the long term goal to have similar logic exist within rmw/rcl/rclcpp, +// but it's not yet determined where that logic will go. We should remove this function before +// G-Turtle +bool Rosbag2QoS::compatibility_policies_exactly_equal(const rclcpp::QoS & other) const +{ + // Note that these policies do not affect QoS compatibility + // * History + Depth (relevant only for local behavior) + // * Lifespan (only relevant locally on Publishers) + // * avoid ros namespace conventions (this affects choice of topic names, not quality of service) + const auto & pl = get_rmw_qos_profile(); + const auto & pr = other.get_rmw_qos_profile(); + return pl.reliability == pr.reliability && + pl.durability == pr.durability && + pl.deadline == pr.deadline && + pl.liveliness == pr.liveliness && + pl.liveliness_lease_duration == pr.liveliness_lease_duration; +} } // namespace rosbag2_transport From 51dc7b91789380de632efa2e99100f88493fbe71 Mon Sep 17 00:00:00 2001 From: Emerson Knapp Date: Mon, 6 Apr 2020 16:24:58 -0700 Subject: [PATCH 05/16] Fix build issue Signed-off-by: Emerson Knapp --- rosbag2_transport/test/rosbag2_transport/test_qos.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rosbag2_transport/test/rosbag2_transport/test_qos.cpp b/rosbag2_transport/test/rosbag2_transport/test_qos.cpp index 4284828138..1102c49045 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_qos.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_qos.cpp @@ -73,7 +73,7 @@ TEST(TestQoS, supports_version_4) .liveliness(RMW_QOS_POLICY_LIVELINESS_SYSTEM_DEFAULT) .liveliness_lease_duration(zerotime); // Any values not present in the YAML should take the default value in both profiles - EXPECT_TRUE(actual_qos.compatibility_policies_equal(expected_qos)); + EXPECT_TRUE(actual_qos.compatibility_policies_exactly_equal(expected_qos)); } TEST(TestQoS, detect_new_qos_fields) From 7cdf66ffcd978b08329e680534d776ead746f147 Mon Sep 17 00:00:00 2001 From: Emerson Knapp Date: Mon, 6 Apr 2020 16:29:42 -0700 Subject: [PATCH 06/16] Fix comment formatting Signed-off-by: Emerson Knapp --- rosbag2_transport/src/rosbag2_transport/qos.hpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rosbag2_transport/src/rosbag2_transport/qos.hpp b/rosbag2_transport/src/rosbag2_transport/qos.hpp index 04214da076..cf72980d29 100644 --- a/rosbag2_transport/src/rosbag2_transport/qos.hpp +++ b/rosbag2_transport/src/rosbag2_transport/qos.hpp @@ -48,10 +48,10 @@ class Rosbag2QoS : public rclcpp::QoS } /// Determine if all policies that affect QoS compatibility are equal. - /* - Note that this method does not check if the QoS policies are compatible. - This is a simple check for exact equality in all policies that affect compatibility. - */ + /** + * Note that this method does not check if the QoS policies are compatible. + * This is a simple check for exact equality in all policies that affect compatibility. + */ bool compatibility_policies_exactly_equal(const rclcpp::QoS & other) const; }; } // namespace rosbag2_transport From 1890394172ad3ee3def0db220057d53913889cc1 Mon Sep 17 00:00:00 2001 From: Emerson Knapp Date: Mon, 6 Apr 2020 18:40:03 -0700 Subject: [PATCH 07/16] WIP lax adaptive subscription Signed-off-by: Emerson Knapp --- .../src/rosbag2_transport/qos.cpp | 32 ------ .../src/rosbag2_transport/qos.hpp | 7 -- .../src/rosbag2_transport/recorder.cpp | 99 ++++++++++++------- .../src/rosbag2_transport/recorder.hpp | 7 +- .../test/rosbag2_transport/test_qos.cpp | 9 +- .../test/rosbag2_transport/test_record.cpp | 2 - .../rosbag2_transport/test_rosbag2_node.cpp | 49 --------- 7 files changed, 73 insertions(+), 132 deletions(-) diff --git a/rosbag2_transport/src/rosbag2_transport/qos.cpp b/rosbag2_transport/src/rosbag2_transport/qos.cpp index 903bbe9f44..4587f49659 100644 --- a/rosbag2_transport/src/rosbag2_transport/qos.cpp +++ b/rosbag2_transport/src/rosbag2_transport/qos.cpp @@ -68,35 +68,3 @@ bool convert::decode( return true; } } // namespace YAML - - -namespace -{ -/// Check if two rmw_time_t have the same values. -bool operator==(const rmw_time_t & left, const rmw_time_t & right) -{ - return left.sec == right.sec && left.nsec == right.nsec; -} -} // unnamed namespace - - -namespace rosbag2_transport -{ -// TODO(emersonknapp) it is the long term goal to have similar logic exist within rmw/rcl/rclcpp, -// but it's not yet determined where that logic will go. We should remove this function before -// G-Turtle -bool Rosbag2QoS::compatibility_policies_exactly_equal(const rclcpp::QoS & other) const -{ - // Note that these policies do not affect QoS compatibility - // * History + Depth (relevant only for local behavior) - // * Lifespan (only relevant locally on Publishers) - // * avoid ros namespace conventions (this affects choice of topic names, not quality of service) - const auto & pl = get_rmw_qos_profile(); - const auto & pr = other.get_rmw_qos_profile(); - return pl.reliability == pr.reliability && - pl.durability == pr.durability && - pl.deadline == pr.deadline && - pl.liveliness == pr.liveliness && - pl.liveliness_lease_duration == pr.liveliness_lease_duration; -} -} // namespace rosbag2_transport diff --git a/rosbag2_transport/src/rosbag2_transport/qos.hpp b/rosbag2_transport/src/rosbag2_transport/qos.hpp index cf72980d29..f158fcfb55 100644 --- a/rosbag2_transport/src/rosbag2_transport/qos.hpp +++ b/rosbag2_transport/src/rosbag2_transport/qos.hpp @@ -46,13 +46,6 @@ class Rosbag2QoS : public rclcpp::QoS keep_last(rmw_qos_profile_default.depth); return *this; } - - /// Determine if all policies that affect QoS compatibility are equal. - /** - * Note that this method does not check if the QoS policies are compatible. - * This is a simple check for exact equality in all policies that affect compatibility. - */ - bool compatibility_policies_exactly_equal(const rclcpp::QoS & other) const; }; } // namespace rosbag2_transport diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.cpp b/rosbag2_transport/src/rosbag2_transport/recorder.cpp index f0cf9ce0c1..e55175e120 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.cpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.cpp @@ -44,22 +44,6 @@ # pragma warning(pop) #endif -namespace -{ -bool all_qos_same(const std::vector & values) -{ - auto adjacent_different_elements_it = std::adjacent_find( - values.begin(), - values.end(), - [](const rclcpp::TopicEndpointInfo & left, const rclcpp::TopicEndpointInfo & right) -> bool { - return left.qos_profile() != right.qos_profile(); - } - ); - // No adjacent elements were different, so all are the same. - return adjacent_different_elements_it == values.end(); -} -} // unnamed namespace - namespace rosbag2_transport { Recorder::Recorder(std::shared_ptr writer, std::shared_ptr node) @@ -102,11 +86,9 @@ void Recorder::topics_discovery( while (rclcpp::ok()) { auto topics_to_subscribe = get_requested_or_available_topics(requested_topics, include_hidden_topics); - #ifdef ROSBAG2_ENABLE_ADAPTIVE_QOS_SUBSCRIPTION for (const auto & topic_and_type : topics_to_subscribe) { warn_if_new_qos_for_subscribed_topic(topic_and_type.first); } - #endif // ROSBAG2_ENABLE_ADAPTIVE_QOS_SUBSCRIPTION auto missing_topics = get_missing_topics(topics_to_subscribe); subscribe_topics(missing_topics); @@ -162,12 +144,7 @@ void Recorder::subscribe_topic(const rosbag2_storage::TopicMetadata & topic) // that callback called before we reached out the line: writer_->create_topic(topic) writer_->create_topic(topic); - // TODO(emersonknapp) re-enable common_qos_or_fallback once the cyclone situation is resolved - #ifdef ROSBAG2_ENABLE_ADAPTIVE_QOS_SUBSCRIPTION - Rosbag2QoS subscription_qos{common_qos_or_fallback(topic.name)}; - #else - Rosbag2QoS subscription_qos{}; - #endif // ROSBAG2_ENABLE_ADAPTIVE_QOS_SUBSCRIPTION + Rosbag2QoS subscription_qos{qos_for_topic(topic.name)}; auto subscription = create_subscription(topic.name, topic.type, subscription_qos); if (subscription) { subscriptions_.insert({topic.name, subscription}); @@ -218,18 +195,74 @@ std::string Recorder::serialized_offered_qos_profiles_for_topic(const std::strin return YAML::Dump(offered_qos_profiles); } -rclcpp::QoS Recorder::common_qos_or_fallback(const std::string & topic_name) +rclcpp::QoS Recorder::adapt_qos_to_publishers(const std::string & topic_name) const { auto endpoints = node_->get_publishers_info_by_topic(topic_name); - if (!endpoints.empty() && all_qos_same(endpoints)) { - return Rosbag2QoS(endpoints[0].qos_profile()).default_history(); + size_t num_endpoints = endpoints.size(); + size_t reliability_reliable_endpoints_count = 0; + size_t durability_transient_local_endpoints_count = 0; + for (const auto & endpoint : endpoints) { + const auto & profile = endpoint.qos_profile().get_rmw_qos_profile(); + if (profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) { + reliability_reliable_endpoints_count++; + } + if (profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { + durability_transient_local_endpoints_count++; + } + } + + // We set policies in order as defined in rmw_qos_profile_t + Rosbag2QoS request_qos; + // Policy: history, depth + // History does not affect compatibility + request_qos.default_history(); + + // Policy: reliability + if (reliability_reliable_endpoints_count == num_endpoints) { + request_qos.reliable(); + } else { + request_qos.best_effort(); } - ROSBAG2_TRANSPORT_LOG_WARN_STREAM( - "Topic " << topic_name << " has publishers offering different QoS settings. " - "Cannot determine what QoS to request, falling back to default QoS profile." - ); - topics_warned_about_incompatibility_.insert(topic_name); - return Rosbag2QoS{}; + if (reliability_reliable_endpoints_count > 0) { + ROSBAG2_TRANSPORT_LOG_WARN_STREAM( + "Some, but not all, publishers on topic \"" << topic_name << "\" are offering " + "Reliable reliability. Falling back to Best Effort as it will connect to all publishers. " + "Some messages from Reliable publishers could be dropped."); + } + + // Policy: durability + // If all publishers offer transient_local, we can request it and receive latched messages + if (durability_transient_local_endpoints_count == num_endpoints) { + request_qos.transient_local(); + } else { + request_qos.durability_volatile(); + } + if (durability_transient_local_endpoints_count > 0) { + ROSBAG2_TRANSPORT_LOG_WARN_STREAM( + "Some, but not all, publishers on topic \"" << topic_name << "\" are offering " + "Transient Local durability. Falling back to Volatile as it will connect to all publishers. " + "Previously-published latched messages will not be retrieved for this topic." + ); + } + + // Policy: deadline + // Deadline does not affect delivery of messages, + // and we do not record DeadlineMissed events. + // We can always use unspecified deadline, which will be compatible with all publishers. + + // Policy: lifespan + // Lifespan does not affect compatibiliy + + // Policy: liveliness, liveliness_lease_duration + // Liveliness does not affect delivery of messages, + // and we do not record LivelinessChanged events. + // We can always use unspecified liveliness, which will be compatible with all publishers. + return request_qos; +} + +rclcpp::QoS Recorder::qos_for_topic(const std::string & topic_name) const +{ + return adapt_qos_to_publishers(topic_name); } void Recorder::warn_if_new_qos_for_subscribed_topic(const std::string & topic_name) diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.hpp b/rosbag2_transport/src/rosbag2_transport/recorder.hpp index 25c842978e..d1c0419805 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.hpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.hpp @@ -85,11 +85,8 @@ class Recorder void record_messages() const; - /** Find the QoS profile that should be used for subscribing. - * If all currently offered QoS Profiles for a topic are the same, return that profile. - * Otherwise, print a warning and return a fallback value. - */ - rclcpp::QoS common_qos_or_fallback(const std::string & topic_name); + rclcpp::QoS qos_for_topic(const std::string & topic_name) const; + rclcpp::QoS adapt_qos_to_publishers(const std::string & topic_name) const; // Serialize all currently offered QoS profiles for a topic into a YAML list. std::string serialized_offered_qos_profiles_for_topic(const std::string & topic_name); diff --git a/rosbag2_transport/test/rosbag2_transport/test_qos.cpp b/rosbag2_transport/test/rosbag2_transport/test_qos.cpp index 1102c49045..248abeb096 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_qos.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_qos.cpp @@ -60,7 +60,7 @@ TEST(TestQoS, supports_version_4) YAML::Node loaded_node = YAML::Load(serialized_profiles); auto deserialized_profiles = loaded_node.as>(); ASSERT_EQ(deserialized_profiles.size(), 1u); - rosbag2_transport::Rosbag2QoS actual_qos = deserialized_profiles[0]; + auto actual_qos = deserialized_profiles[0].get_rmw_qos_profile(); rmw_time_t zerotime{0, 0}; // Explicitly set up the same QoS profile in case defaults change @@ -71,9 +71,10 @@ TEST(TestQoS, supports_version_4) .deadline(zerotime) .lifespan(zerotime) .liveliness(RMW_QOS_POLICY_LIVELINESS_SYSTEM_DEFAULT) - .liveliness_lease_duration(zerotime); - // Any values not present in the YAML should take the default value in both profiles - EXPECT_TRUE(actual_qos.compatibility_policies_exactly_equal(expected_qos)); + .liveliness_lease_duration(zerotime).get_rmw_qos_profile(); + + EXPECT_EQ(actual_qos.reliability, expected_qos.reliability); + EXPECT_EQ(actual_qos.durability, expected_qos.durability); } TEST(TestQoS, detect_new_qos_fields) diff --git a/rosbag2_transport/test/rosbag2_transport/test_record.cpp b/rosbag2_transport/test/rosbag2_transport/test_record.cpp index 3187d53391..c140cd95d9 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record.cpp @@ -102,7 +102,6 @@ TEST_F(RecordIntegrationTestFixture, qos_is_stored_in_metadata) )); } -#ifdef ROSBAG2_ENABLE_ADAPTIVE_QOS_SUBSCRIPTION TEST_F(RecordIntegrationTestFixture, records_sensor_data) { using clock = std::chrono::system_clock; @@ -143,4 +142,3 @@ TEST_F(RecordIntegrationTestFixture, records_sensor_data) EXPECT_EQ(recorded_topics.size(), 1u); EXPECT_FALSE(recorded_messages.empty()); } -#endif // ROSBAG2_ENABLE_ADAPTIVE_QOS_SUBSCRIPTION diff --git a/rosbag2_transport/test/rosbag2_transport/test_rosbag2_node.cpp b/rosbag2_transport/test/rosbag2_transport/test_rosbag2_node.cpp index 40b28b75d8..16daf94b2a 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_rosbag2_node.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_rosbag2_node.cpp @@ -259,52 +259,3 @@ TEST_F(RosBag2NodeFixture, get_all_topics_with_types_returns_all_topics) EXPECT_THAT(topics_and_types.find(second_topic)->second, StrEq("test_msgs/msg/Strings")); EXPECT_THAT(topics_and_types.find(third_topic)->second, StrEq("test_msgs/msg/Strings")); } - -#ifdef ROSBAG2_ENABLE_ADAPTIVE_QOS_SUBSCRIPTION -TEST_F(RosBag2NodeFixture, mixed_qos_falls_back_to_default) -{ - using clock = std::chrono::system_clock; - using namespace std::chrono_literals; - - std::string topic = "/string_topic"; - test_msgs::msg::Strings msg; - msg.string_value = "Hello"; - - // two QoS profiles that are not compatible with the default request, but are not equal - auto profile1 = rosbag2_transport::Rosbag2QoS().best_effort().durability_volatile(); - auto publisher1 = publisher_node_->create_publisher(topic, profile1); - - auto profile2 = rosbag2_transport::Rosbag2QoS().best_effort().transient_local(); - auto publisher2 = publisher_node_->create_publisher(topic, profile2); - - rosbag2_transport::StorageOptions storage_options {"uri", "storage_id", 0}; - auto writer = std::make_shared(std::make_unique()); - writer->open(storage_options, {rmw_get_serialization_format(), "rmw_format"}); - auto recorder = std::make_shared(writer, node_); - auto recording_future = std::async( - std::launch::async, - [recorder, topic]() { - recorder->record({false, false, {topic}, "rmw_format", 100ms}); - }); - - auto start = clock::now(); - // Takes < 20ms normally, timeout chosen as "a very long time" - auto timeout = 5s; - bool timed_out = false; - auto success_condition = [recorder, topic]() -> bool { - return recorder->topics_using_fallback_qos().count(topic) != 0; - }; - while (!success_condition()) { - if ((clock::now() - start) > timeout) { - timed_out = true; - break; - } - // rate limit the busy-wait and let other threads work - std::this_thread::sleep_for(10ms); - } - ASSERT_FALSE(timed_out); - // We must shut down rclcpp before returning from this test case or recording will block forever - rclcpp::shutdown(); - recording_future.get(); -} -#endif // ROSBAG2_ENABLE_ADAPTIVE_QOS_SUBSCRIPTION From e6f29a9fd1a8e6de4df852bec7f05a8452aae806 Mon Sep 17 00:00:00 2001 From: Emerson Knapp Date: Mon, 6 Apr 2020 18:58:14 -0700 Subject: [PATCH 08/16] Update warning behavior Signed-off-by: Emerson Knapp --- .../src/rosbag2_transport/recorder.cpp | 42 ++++++++++++------- 1 file changed, 27 insertions(+), 15 deletions(-) diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.cpp b/rosbag2_transport/src/rosbag2_transport/recorder.cpp index e55175e120..adc3234d19 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.cpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.cpp @@ -225,9 +225,10 @@ rclcpp::QoS Recorder::adapt_qos_to_publishers(const std::string & topic_name) co } if (reliability_reliable_endpoints_count > 0) { ROSBAG2_TRANSPORT_LOG_WARN_STREAM( - "Some, but not all, publishers on topic \"" << topic_name << "\" are offering " - "Reliable reliability. Falling back to Best Effort as it will connect to all publishers. " - "Some messages from Reliable publishers could be dropped."); + "Some, but not all, publishers on topic \"" << topic_name << "\" " + "are offering Reliable reliability. " + "Falling back to Best Effort as it will connect to all publishers. " + "Some messages from Reliable publishers could be dropped."); } // Policy: durability @@ -239,15 +240,15 @@ rclcpp::QoS Recorder::adapt_qos_to_publishers(const std::string & topic_name) co } if (durability_transient_local_endpoints_count > 0) { ROSBAG2_TRANSPORT_LOG_WARN_STREAM( - "Some, but not all, publishers on topic \"" << topic_name << "\" are offering " - "Transient Local durability. Falling back to Volatile as it will connect to all publishers. " - "Previously-published latched messages will not be retrieved for this topic." - ); + "Some, but not all, publishers on topic \"" << topic_name << "\" " + "are offering Transient Local durability. " + "Falling back to Volatile as it will connect to all publishers. " + "Previously-published latched messages will not be retrieved."); } // Policy: deadline // Deadline does not affect delivery of messages, - // and we do not record DeadlineMissed events. + // and we do not record Deadline"Missed events. // We can always use unspecified deadline, which will be compatible with all publishers. // Policy: lifespan @@ -276,17 +277,28 @@ void Recorder::warn_if_new_qos_for_subscribed_topic(const std::string & topic_na // Already warned about this topic return; } - const auto & used_qos = existing_subscription->second->qos_profile(); + const auto & used_profile = existing_subscription->second->qos_profile().get_rmw_qos_profile(); auto publishers_info = node_->get_publishers_info_by_topic(topic_name); for (const auto & info : publishers_info) { - if (info.qos_profile() != used_qos) { + auto new_profile = info.qos_profile().get_rmw_qos_profile(); + if ( + new_profile.reliability == RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT && + used_profile.reliability != RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT) + { ROSBAG2_TRANSPORT_LOG_WARN_STREAM( - "A new publisher for subscribed topic " << topic_name << " was found that is offering " - "a (possibly) incompatible QoS profile. Not changing subscription QoS. " - "Messages from this publisher may not be recorded." - ); + "A new publisher for subscribed topic " << topic_name << " was found offering " + "Best Effort reliability, but rosbag already subscribed requesting Reliable. " + "Messages will not be recorded from this new publisher."); + topics_warned_about_incompatibility_.insert(topic_name); + } else if ( + new_profile.durability == RMW_QOS_POLICY_DURABILITY_VOLATILE && + used_profile.durability != RMW_QOS_POLICY_DURABILITY_VOLATILE) + { + ROSBAG2_TRANSPORT_LOG_WARN_STREAM( + "A new publisher for susbcribed topic " << topic_name << " was found offering " + "Volatile durability, but rosbag2 already subscribed requesting Transient Local. " + "Messages from this new publisher will not be recorded."); topics_warned_about_incompatibility_.insert(topic_name); - return; } } } From 171063be67d3dff47f9d8d979bfd7531ece698bf Mon Sep 17 00:00:00 2001 From: Emerson Knapp Date: Mon, 6 Apr 2020 21:13:56 -0700 Subject: [PATCH 09/16] Add a bunch of tests Signed-off-by: Emerson Knapp --- .../src/rosbag2_transport/recorder.cpp | 28 ++-- .../test/rosbag2_transport/test_record.cpp | 150 ++++++++++++++++-- 2 files changed, 148 insertions(+), 30 deletions(-) diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.cpp b/rosbag2_transport/src/rosbag2_transport/recorder.cpp index adc3234d19..d414752fec 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.cpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.cpp @@ -221,30 +221,30 @@ rclcpp::QoS Recorder::adapt_qos_to_publishers(const std::string & topic_name) co if (reliability_reliable_endpoints_count == num_endpoints) { request_qos.reliable(); } else { + if (reliability_reliable_endpoints_count > 0) { + ROSBAG2_TRANSPORT_LOG_WARN_STREAM( + "Some, but not all, publishers on topic \"" << topic_name << "\" " + "are offering Reliable reliability. " + "Falling back to Best Effort as it will connect to all publishers. " + "Some messages from Reliable publishers could be dropped."); + } request_qos.best_effort(); } - if (reliability_reliable_endpoints_count > 0) { - ROSBAG2_TRANSPORT_LOG_WARN_STREAM( - "Some, but not all, publishers on topic \"" << topic_name << "\" " - "are offering Reliable reliability. " - "Falling back to Best Effort as it will connect to all publishers. " - "Some messages from Reliable publishers could be dropped."); - } // Policy: durability // If all publishers offer transient_local, we can request it and receive latched messages if (durability_transient_local_endpoints_count == num_endpoints) { request_qos.transient_local(); } else { + if (durability_transient_local_endpoints_count > 0) { + ROSBAG2_TRANSPORT_LOG_WARN_STREAM( + "Some, but not all, publishers on topic \"" << topic_name << "\" " + "are offering Transient Local durability. " + "Falling back to Volatile as it will connect to all publishers. " + "Previously-published latched messages will not be retrieved."); + } request_qos.durability_volatile(); } - if (durability_transient_local_endpoints_count > 0) { - ROSBAG2_TRANSPORT_LOG_WARN_STREAM( - "Some, but not all, publishers on topic \"" << topic_name << "\" " - "are offering Transient Local durability. " - "Falling back to Volatile as it will connect to all publishers. " - "Previously-published latched messages will not be retrieved."); - } // Policy: deadline // Deadline does not affect delivery of messages, diff --git a/rosbag2_transport/test/rosbag2_transport/test_record.cpp b/rosbag2_transport/test/rosbag2_transport/test_record.cpp index c140cd95d9..b303846c32 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record.cpp @@ -28,6 +28,19 @@ #include "record_integration_fixture.hpp" +template +bool wait_for(Timeout timeout, const Node & node, Condition condition) { + using clock = std::chrono::system_clock; + auto start = clock::now(); + while (!condition()) { + if ((clock::now() - start) > timeout) { + return false; + } + rclcpp::spin_some(node); + } + return true; +} + TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are_recorded) { auto array_message = get_messages_arrays()[0]; @@ -104,9 +117,6 @@ TEST_F(RecordIntegrationTestFixture, qos_is_stored_in_metadata) TEST_F(RecordIntegrationTestFixture, records_sensor_data) { - using clock = std::chrono::system_clock; - using namespace std::chrono_literals; - std::string topic = "/string_topic"; start_recording({false, false, {topic}, "rmw_format", 100ms}); @@ -120,25 +130,133 @@ TEST_F(RecordIntegrationTestFixture, records_sensor_data) publisher->publish(msg); } ); - MockSequentialWriter & writer = - static_cast(writer_->get_implementation_handle()); + auto & writer = static_cast(writer_->get_implementation_handle()); - auto start = clock::now(); // Takes ~200ms normally, 5s chosen as "a very long time" - auto timeout = 5s; - bool timed_out = false; - while (writer.get_messages().empty()) { - if ((clock::now() - start) > timeout) { - timed_out = true; - break; - } - rclcpp::spin_some(publisher_node); - } + bool succeeded = wait_for( + std::chrono::seconds(5), publisher_node, + [&writer]() { + return writer.get_messages().size() > 0; + }); + ASSERT_TRUE(succeeded); stop_recording(); - ASSERT_FALSE(timed_out); auto recorded_messages = writer.get_messages(); auto recorded_topics = writer.get_topics(); EXPECT_EQ(recorded_topics.size(), 1u); EXPECT_FALSE(recorded_messages.empty()); } + +TEST_F(RecordIntegrationTestFixture, receives_latched_messages) +{ + // Ensure rosbag2 can receive Transient Local Durability "latched messages" + const std::string topic = "/latched_topic"; + const size_t num_latched_messages = 3; + + auto publisher_node = std::make_shared("publisher_for_latched_test"); + auto profile_transient_local = rclcpp::QoS(num_latched_messages).transient_local(); + auto publisher_transient_local = publisher_node->create_publisher( + topic, profile_transient_local); + + // Publish messages before starting recording + test_msgs::msg::Strings msg; + msg.string_value = "Hello"; + for (size_t i = 0; i < num_latched_messages; i++) { + publisher_transient_local->publish(msg); + rclcpp::spin_some(publisher_node); + } + start_recording({false, false, {topic}, "rmw_format", 100ms}); + auto & writer = static_cast(writer_->get_implementation_handle()); + + // Takes ~200ms normally, 5s chosen as "a very long time" + bool succeeded = wait_for( + std::chrono::seconds(5), publisher_node, + [&writer, num_latched_messages]() { + return writer.get_messages().size() == num_latched_messages; + }); + ASSERT_TRUE(succeeded); +} + +TEST_F(RecordIntegrationTestFixture, mixed_qos_subscribes) { + // Ensure that rosbag2 subscribes to publishers that offer different durability policies + const size_t arbitrary_history = 5; + + std::string topic = "/string_topic"; + test_msgs::msg::Strings msg; + msg.string_value = "Hello"; + + auto profile_volatile = rclcpp::QoS(arbitrary_history).reliable().durability_volatile(); + auto profile_transient_local = rclcpp::QoS(arbitrary_history).reliable().transient_local(); + + auto publisher_node = std::make_shared("publisher_for_qos_test"); + auto publisher_volatile = publisher_node->create_publisher( + topic, profile_volatile); + auto publisher_transient_local = publisher_node->create_publisher( + topic, profile_transient_local); + + // auto publish_timer_volatile = publisher_node->create_wall_timer( + // 50ms, [msg, publisher_volatile]() -> void { + // publisher_volatile->publish(msg); + // } + // ); + // auto publish_timer_transient_local = publisher_node->create_wall_timer( + // 50ms, [msg, publisher_transient_local]() -> void { + // publisher_transient_local->publish(msg); + // } + // ); + + start_recording({false, false, {topic}, "rmw_format", 100ms}); + + // Takes ~200ms normally, 5s chosen as "a very long time" + bool succeeded = wait_for( + std::chrono::seconds(5), publisher_node, + [publisher_volatile, publisher_transient_local]() { + // This test is a success if rosbag2 has connected to both publishers + return publisher_volatile->get_subscription_count() && + publisher_transient_local->get_subscription_count(); + }); + stop_recording(); + ASSERT_TRUE(succeeded); +} + +TEST_F(RecordIntegrationTestFixture, duration_and_noncompatibility_policies_mixed) { + // Ensure that the duration-based and non-compatibility QoS policies don't affect subscription + // These values are arbitrary, the significance is that they are non-default + const std::string topic = "/mixed_nondelivery_policies"; + const size_t same_history = 5; + const size_t different_history = 12; + const rmw_time_t deadline{0, 1000}; + const rmw_time_t lifespan{3, 12}; + const auto liveliness = RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_NODE; + const rmw_time_t liveliness_lease_duration{0, 5000000}; + + auto publisher_node = std::make_shared("publisher_for"); + auto create_pub = [publisher_node, topic](auto qos) { + return publisher_node->create_publisher(topic, qos); + }; + + auto profile_history = rclcpp::QoS(different_history); + auto publisher_history = create_pub(profile_history); + + auto profile_lifespan = rclcpp::QoS(same_history).lifespan(lifespan); + auto publisher_lifespan = create_pub(profile_lifespan); + + auto profile_deadline = rclcpp::QoS(same_history).deadline(deadline); + auto publisher_deadline = create_pub(profile_deadline); + + auto profile_liveliness = rclcpp::QoS(same_history) + .liveliness(liveliness).liveliness_lease_duration(liveliness_lease_duration); + auto publisher_liveliness = create_pub(profile_liveliness); + + start_recording({false, false, {topic}, "rmw_format", 100ms}); + bool succeeded = wait_for( + std::chrono::seconds(5), publisher_node, + [publisher_history, publisher_lifespan, publisher_deadline, publisher_liveliness]() { + return publisher_history->get_subscription_count() && + publisher_lifespan->get_subscription_count() && + publisher_deadline->get_subscription_count() && + publisher_liveliness->get_subscription_count(); + }); + stop_recording(); + ASSERT_TRUE(succeeded); +} From 498c57cc6386b42b2a2f01d321eb22cb13d3b447 Mon Sep 17 00:00:00 2001 From: Emerson Knapp Date: Mon, 6 Apr 2020 21:22:15 -0700 Subject: [PATCH 10/16] Fix linters Signed-off-by: Emerson Knapp --- .../src/rosbag2_transport/recorder.cpp | 15 ++++++------ .../test/rosbag2_transport/test_record.cpp | 23 +++++++++++-------- 2 files changed, 21 insertions(+), 17 deletions(-) diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.cpp b/rosbag2_transport/src/rosbag2_transport/recorder.cpp index d414752fec..9dcb615c99 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.cpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.cpp @@ -281,19 +281,20 @@ void Recorder::warn_if_new_qos_for_subscribed_topic(const std::string & topic_na auto publishers_info = node_->get_publishers_info_by_topic(topic_name); for (const auto & info : publishers_info) { auto new_profile = info.qos_profile().get_rmw_qos_profile(); - if ( + bool incompatible_reliability = new_profile.reliability == RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT && - used_profile.reliability != RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT) - { + used_profile.reliability != RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT; + bool incompatible_durability = + new_profile.durability == RMW_QOS_POLICY_DURABILITY_VOLATILE && + used_profile.durability != RMW_QOS_POLICY_DURABILITY_VOLATILE; + + if (incompatible_reliability) { ROSBAG2_TRANSPORT_LOG_WARN_STREAM( "A new publisher for subscribed topic " << topic_name << " was found offering " "Best Effort reliability, but rosbag already subscribed requesting Reliable. " "Messages will not be recorded from this new publisher."); topics_warned_about_incompatibility_.insert(topic_name); - } else if ( - new_profile.durability == RMW_QOS_POLICY_DURABILITY_VOLATILE && - used_profile.durability != RMW_QOS_POLICY_DURABILITY_VOLATILE) - { + } else if (incompatible_durability) { ROSBAG2_TRANSPORT_LOG_WARN_STREAM( "A new publisher for susbcribed topic " << topic_name << " was found offering " "Volatile durability, but rosbag2 already subscribed requesting Transient Local. " diff --git a/rosbag2_transport/test/rosbag2_transport/test_record.cpp b/rosbag2_transport/test/rosbag2_transport/test_record.cpp index b303846c32..4078c01df2 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record.cpp @@ -28,8 +28,9 @@ #include "record_integration_fixture.hpp" -template -bool wait_for(Timeout timeout, const Node & node, Condition condition) { +template +bool wait_for(Timeout timeout, const Node & node, Condition condition) +{ using clock = std::chrono::system_clock; auto start = clock::now(); while (!condition()) { @@ -212,8 +213,9 @@ TEST_F(RecordIntegrationTestFixture, mixed_qos_subscribes) { std::chrono::seconds(5), publisher_node, [publisher_volatile, publisher_transient_local]() { // This test is a success if rosbag2 has connected to both publishers - return publisher_volatile->get_subscription_count() && - publisher_transient_local->get_subscription_count(); + return + publisher_volatile->get_subscription_count() && + publisher_transient_local->get_subscription_count(); }); stop_recording(); ASSERT_TRUE(succeeded); @@ -232,8 +234,8 @@ TEST_F(RecordIntegrationTestFixture, duration_and_noncompatibility_policies_mixe auto publisher_node = std::make_shared("publisher_for"); auto create_pub = [publisher_node, topic](auto qos) { - return publisher_node->create_publisher(topic, qos); - }; + return publisher_node->create_publisher(topic, qos); + }; auto profile_history = rclcpp::QoS(different_history); auto publisher_history = create_pub(profile_history); @@ -252,10 +254,11 @@ TEST_F(RecordIntegrationTestFixture, duration_and_noncompatibility_policies_mixe bool succeeded = wait_for( std::chrono::seconds(5), publisher_node, [publisher_history, publisher_lifespan, publisher_deadline, publisher_liveliness]() { - return publisher_history->get_subscription_count() && - publisher_lifespan->get_subscription_count() && - publisher_deadline->get_subscription_count() && - publisher_liveliness->get_subscription_count(); + return + publisher_history->get_subscription_count() && + publisher_lifespan->get_subscription_count() && + publisher_deadline->get_subscription_count() && + publisher_liveliness->get_subscription_count(); }); stop_recording(); ASSERT_TRUE(succeeded); From 786c230d1702b6c427b54638345407558b0f82eb Mon Sep 17 00:00:00 2001 From: Emerson Knapp Date: Mon, 6 Apr 2020 21:25:10 -0700 Subject: [PATCH 11/16] Need to stop recording for the test to exit Signed-off-by: Emerson Knapp --- rosbag2_transport/test/rosbag2_transport/test_record.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/rosbag2_transport/test/rosbag2_transport/test_record.cpp b/rosbag2_transport/test/rosbag2_transport/test_record.cpp index 4078c01df2..ad8dd391e1 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record.cpp @@ -175,6 +175,7 @@ TEST_F(RecordIntegrationTestFixture, receives_latched_messages) [&writer, num_latched_messages]() { return writer.get_messages().size() == num_latched_messages; }); + stop_recording(); ASSERT_TRUE(succeeded); } From e87a6ffa161a4fd0a82212489fb13edcb99235bc Mon Sep 17 00:00:00 2001 From: Emerson Knapp Date: Mon, 6 Apr 2020 21:47:14 -0700 Subject: [PATCH 12/16] Update comments with real timing Signed-off-by: Emerson Knapp --- .../test/rosbag2_transport/test_record.cpp | 19 ++++--------------- 1 file changed, 4 insertions(+), 15 deletions(-) diff --git a/rosbag2_transport/test/rosbag2_transport/test_record.cpp b/rosbag2_transport/test/rosbag2_transport/test_record.cpp index ad8dd391e1..cca9e5920b 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record.cpp @@ -133,7 +133,7 @@ TEST_F(RecordIntegrationTestFixture, records_sensor_data) ); auto & writer = static_cast(writer_->get_implementation_handle()); - // Takes ~200ms normally, 5s chosen as "a very long time" + // Takes ~200ms in local testing, 5s chosen as a very long timeout bool succeeded = wait_for( std::chrono::seconds(5), publisher_node, [&writer]() { @@ -169,7 +169,7 @@ TEST_F(RecordIntegrationTestFixture, receives_latched_messages) start_recording({false, false, {topic}, "rmw_format", 100ms}); auto & writer = static_cast(writer_->get_implementation_handle()); - // Takes ~200ms normally, 5s chosen as "a very long time" + // Takes ~100ms in local testing, 5s chosen as a very long timeout bool succeeded = wait_for( std::chrono::seconds(5), publisher_node, [&writer, num_latched_messages]() { @@ -196,20 +196,8 @@ TEST_F(RecordIntegrationTestFixture, mixed_qos_subscribes) { auto publisher_transient_local = publisher_node->create_publisher( topic, profile_transient_local); - // auto publish_timer_volatile = publisher_node->create_wall_timer( - // 50ms, [msg, publisher_volatile]() -> void { - // publisher_volatile->publish(msg); - // } - // ); - // auto publish_timer_transient_local = publisher_node->create_wall_timer( - // 50ms, [msg, publisher_transient_local]() -> void { - // publisher_transient_local->publish(msg); - // } - // ); - start_recording({false, false, {topic}, "rmw_format", 100ms}); - - // Takes ~200ms normally, 5s chosen as "a very long time" + // Takes ~100ms in local testing, 5s chosen as a very long timeout bool succeeded = wait_for( std::chrono::seconds(5), publisher_node, [publisher_volatile, publisher_transient_local]() { @@ -252,6 +240,7 @@ TEST_F(RecordIntegrationTestFixture, duration_and_noncompatibility_policies_mixe auto publisher_liveliness = create_pub(profile_liveliness); start_recording({false, false, {topic}, "rmw_format", 100ms}); + // Takes ~200ms in local testing, 5s chosen as a very long timeout bool succeeded = wait_for( std::chrono::seconds(5), publisher_node, [publisher_history, publisher_lifespan, publisher_deadline, publisher_liveliness]() { From aea1b2b0420abfd36bc072b6458448bde84ee180 Mon Sep 17 00:00:00 2001 From: Emerson Knapp Date: Tue, 7 Apr 2020 00:34:31 -0700 Subject: [PATCH 13/16] Fix OSX build Signed-off-by: Emerson Knapp --- rosbag2_transport/test/rosbag2_transport/test_record.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rosbag2_transport/test/rosbag2_transport/test_record.cpp b/rosbag2_transport/test/rosbag2_transport/test_record.cpp index cca9e5920b..9126e93e28 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record.cpp @@ -172,7 +172,7 @@ TEST_F(RecordIntegrationTestFixture, receives_latched_messages) // Takes ~100ms in local testing, 5s chosen as a very long timeout bool succeeded = wait_for( std::chrono::seconds(5), publisher_node, - [&writer, num_latched_messages]() { + [&writer]() { return writer.get_messages().size() == num_latched_messages; }); stop_recording(); From b5e9131366a8a7ed002b6df073cb9e85daa9f023 Mon Sep 17 00:00:00 2001 From: Emerson Knapp Date: Tue, 7 Apr 2020 12:35:39 -0700 Subject: [PATCH 14/16] Move wait_for utility out to common place and update warning wording Signed-off-by: Emerson Knapp --- .../include/rosbag2_test_common/wait_for.hpp | 39 +++++++++++++++++++ .../src/rosbag2_transport/recorder.cpp | 10 +++-- .../test/rosbag2_transport/test_record.cpp | 23 +++-------- 3 files changed, 51 insertions(+), 21 deletions(-) create mode 100644 rosbag2_test_common/include/rosbag2_test_common/wait_for.hpp diff --git a/rosbag2_test_common/include/rosbag2_test_common/wait_for.hpp b/rosbag2_test_common/include/rosbag2_test_common/wait_for.hpp new file mode 100644 index 0000000000..8a12feb2b8 --- /dev/null +++ b/rosbag2_test_common/include/rosbag2_test_common/wait_for.hpp @@ -0,0 +1,39 @@ +// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef ROSBAG2_TEST_COMMON__WAIT_FOR_HPP_ +#define ROSBAG2_TEST_COMMON__WAIT_FOR_HPP_ + +#include + +#include "rclcpp/rclcpp.hpp" + +namespace rosbag2_test_common +{ +template +bool spin_and_wait_for(Timeout timeout, const Node & node, Condition condition) +{ + using clock = std::chrono::system_clock; + auto start = clock::now(); + while (!condition()) { + if ((clock::now() - start) > timeout) { + return false; + } + rclcpp::spin_some(node); + } + return true; +} +} // namespace rosbag2_test_common + +#endif // ROSBAG2_TEST_COMMON__WAIT_FOR_HPP_ diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.cpp b/rosbag2_transport/src/rosbag2_transport/recorder.cpp index 9dcb615c99..c49581d6f6 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.cpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.cpp @@ -224,8 +224,9 @@ rclcpp::QoS Recorder::adapt_qos_to_publishers(const std::string & topic_name) co if (reliability_reliable_endpoints_count > 0) { ROSBAG2_TRANSPORT_LOG_WARN_STREAM( "Some, but not all, publishers on topic \"" << topic_name << "\" " - "are offering Reliable reliability. " - "Falling back to Best Effort as it will connect to all publishers. " + "are offering RMW_QOS_POLICY_RELIABILITY_RELIABLE. " + "Falling back to RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT " + "as it will connect to all publishers. " "Some messages from Reliable publishers could be dropped."); } request_qos.best_effort(); @@ -239,8 +240,9 @@ rclcpp::QoS Recorder::adapt_qos_to_publishers(const std::string & topic_name) co if (durability_transient_local_endpoints_count > 0) { ROSBAG2_TRANSPORT_LOG_WARN_STREAM( "Some, but not all, publishers on topic \"" << topic_name << "\" " - "are offering Transient Local durability. " - "Falling back to Volatile as it will connect to all publishers. " + "are offering RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL. " + "Falling back to RMW_QOS_POLICY_DURABILITY_VOLATILE " + "as it will connect to all publishers. " "Previously-published latched messages will not be retrieved."); } request_qos.durability_volatile(); diff --git a/rosbag2_transport/test/rosbag2_transport/test_record.cpp b/rosbag2_transport/test/rosbag2_transport/test_record.cpp index 9126e93e28..6adc742488 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record.cpp @@ -20,6 +20,8 @@ #include "rclcpp/rclcpp.hpp" +#include "rosbag2_test_common/wait_for.hpp" + #include "rosbag2_transport/rosbag2_transport.hpp" #include "test_msgs/msg/arrays.hpp" @@ -28,19 +30,6 @@ #include "record_integration_fixture.hpp" -template -bool wait_for(Timeout timeout, const Node & node, Condition condition) -{ - using clock = std::chrono::system_clock; - auto start = clock::now(); - while (!condition()) { - if ((clock::now() - start) > timeout) { - return false; - } - rclcpp::spin_some(node); - } - return true; -} TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are_recorded) { @@ -134,7 +123,7 @@ TEST_F(RecordIntegrationTestFixture, records_sensor_data) auto & writer = static_cast(writer_->get_implementation_handle()); // Takes ~200ms in local testing, 5s chosen as a very long timeout - bool succeeded = wait_for( + bool succeeded = rosbag2_test_common::spin_and_wait_for( std::chrono::seconds(5), publisher_node, [&writer]() { return writer.get_messages().size() > 0; @@ -170,7 +159,7 @@ TEST_F(RecordIntegrationTestFixture, receives_latched_messages) auto & writer = static_cast(writer_->get_implementation_handle()); // Takes ~100ms in local testing, 5s chosen as a very long timeout - bool succeeded = wait_for( + bool succeeded = rosbag2_test_common::spin_and_wait_for( std::chrono::seconds(5), publisher_node, [&writer]() { return writer.get_messages().size() == num_latched_messages; @@ -198,7 +187,7 @@ TEST_F(RecordIntegrationTestFixture, mixed_qos_subscribes) { start_recording({false, false, {topic}, "rmw_format", 100ms}); // Takes ~100ms in local testing, 5s chosen as a very long timeout - bool succeeded = wait_for( + bool succeeded = rosbag2_test_common::spin_and_wait_for( std::chrono::seconds(5), publisher_node, [publisher_volatile, publisher_transient_local]() { // This test is a success if rosbag2 has connected to both publishers @@ -241,7 +230,7 @@ TEST_F(RecordIntegrationTestFixture, duration_and_noncompatibility_policies_mixe start_recording({false, false, {topic}, "rmw_format", 100ms}); // Takes ~200ms in local testing, 5s chosen as a very long timeout - bool succeeded = wait_for( + bool succeeded = rosbag2_test_common::spin_and_wait_for( std::chrono::seconds(5), publisher_node, [publisher_history, publisher_lifespan, publisher_deadline, publisher_liveliness]() { return From 1ff52ae7740a183bd35f196e9240a0e289520095 Mon Sep 17 00:00:00 2001 From: Emerson Knapp Date: Tue, 7 Apr 2020 13:04:13 -0700 Subject: [PATCH 15/16] Fix up merge Signed-off-by: Emerson Knapp --- .../src/rosbag2_transport/recorder.cpp | 37 +++++++++---------- .../src/rosbag2_transport/recorder.hpp | 10 ++++- 2 files changed, 26 insertions(+), 21 deletions(-) diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.cpp b/rosbag2_transport/src/rosbag2_transport/recorder.cpp index e713091b7d..1e1cc2780d 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.cpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.cpp @@ -196,10 +196,19 @@ std::string Recorder::serialized_offered_qos_profiles_for_topic(const std::strin return YAML::Dump(offered_qos_profiles); } +rclcpp::QoS Recorder::subscription_qos_for_topic(const std::string & topic_name) const +{ + if (topic_qos_profile_overrides_.count(topic_name)) { + ROSBAG2_TRANSPORT_LOG_INFO_STREAM("Overriding subscription profile for " << topic_name); + return topic_qos_profile_overrides_.at(topic_name); + } else { + return adapt_qos_to_publishers(topic_name); + } + return adapt_qos_to_publishers(topic_name); +} + rclcpp::QoS Recorder::adapt_qos_to_publishers(const std::string & topic_name) const { - // TODO(emersonknapp) re-enable subscription_qos_for_topic once the cyclone situation is resolved - #ifdef ROSBAG2_ENABLE_ADAPTIVE_QOS_SUBSCRIPTION auto endpoints = node_->get_publishers_info_by_topic(topic_name); size_t num_endpoints = endpoints.size(); size_t reliability_reliable_endpoints_count = 0; @@ -265,18 +274,6 @@ rclcpp::QoS Recorder::adapt_qos_to_publishers(const std::string & topic_name) co return request_qos; } -rclcpp::QoS Recorder::qos_for_topic(const std::string & topic_name) const -{ - rosbag2_transport::Rosbag2QoS subscription_qos{}; - if (topic_qos_profile_overrides_.count(topic_name)) { - ROSBAG2_TRANSPORT_LOG_INFO_STREAM("Overriding subscription profile for " << topic_name); - return topic_qos_profile_overrides_.at(topic_name); - } else { - return adapt_qos_to_publishers(topic_name); - } - return adapt_qos_to_publishers(topic_name); -} - void Recorder::warn_if_new_qos_for_subscribed_topic(const std::string & topic_name) { auto existing_subscription = subscriptions_.find(topic_name); @@ -301,14 +298,16 @@ void Recorder::warn_if_new_qos_for_subscribed_topic(const std::string & topic_na if (incompatible_reliability) { ROSBAG2_TRANSPORT_LOG_WARN_STREAM( - "A new publisher for subscribed topic " << topic_name << " was found offering " - "Best Effort reliability, but rosbag already subscribed requesting Reliable. " - "Messages will not be recorded from this new publisher."); + "A new publisher for subscribed topic " << topic_name << " " + "was found offering RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT, " + "but rosbag already subscribed requesting RMW_QOS_POLICY_RELIABILITY_RELIABLE. " + "Messages from this new publisher will not be recorded."); topics_warned_about_incompatibility_.insert(topic_name); } else if (incompatible_durability) { ROSBAG2_TRANSPORT_LOG_WARN_STREAM( - "A new publisher for susbcribed topic " << topic_name << " was found offering " - "Volatile durability, but rosbag2 already subscribed requesting Transient Local. " + "A new publisher for susbcribed topic " << topic_name << " " + "was found offering RMW_QOS_POLICY_DURABILITY_VOLATILE, " + "but rosbag2 already subscribed requesting RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL. " "Messages from this new publisher will not be recorded."); topics_warned_about_incompatibility_.insert(topic_name); } diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.hpp b/rosbag2_transport/src/rosbag2_transport/recorder.hpp index b2bdca326e..59f4032c29 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.hpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.hpp @@ -85,7 +85,6 @@ class Recorder void record_messages() const; - rclcpp::QoS adapt_qos_to_publishers(const std::string & topic_name) const; /** * Find the QoS profile that should be used for subscribing. * @@ -96,7 +95,14 @@ class Recorder * \param topic_name The full name of the topic, with namespace (ex. /arm/joint_status). * \return The QoS profile to be used for subscribing. */ - rclcpp::QoS subscription_qos_for_topic(const std::string & topic_name); + rclcpp::QoS subscription_qos_for_topic(const std::string & topic_name) const; + /** + * Try to subscribe using publishers' offered QoS policies. + * + * Fall back to sensible defaults when we can't adapt robustly, + * erring in favor of creating compatible connections. + */ + rclcpp::QoS adapt_qos_to_publishers(const std::string & topic_name) const; // Serialize all currently offered QoS profiles for a topic into a YAML list. std::string serialized_offered_qos_profiles_for_topic(const std::string & topic_name); From c8f99673e7870813f089f301319c5cf7d61e353c Mon Sep 17 00:00:00 2001 From: Emerson Knapp Date: Tue, 7 Apr 2020 15:32:09 -0700 Subject: [PATCH 16/16] Fix lambda capture for both osx and windows Signed-off-by: Emerson Knapp --- rosbag2_transport/test/rosbag2_transport/test_record.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rosbag2_transport/test/rosbag2_transport/test_record.cpp b/rosbag2_transport/test/rosbag2_transport/test_record.cpp index e1eb4e79ee..e490930440 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record.cpp @@ -160,7 +160,9 @@ TEST_F(RecordIntegrationTestFixture, receives_latched_messages) // Takes ~100ms in local testing, 5s chosen as a very long timeout bool succeeded = rosbag2_test_common::spin_and_wait_for( std::chrono::seconds(5), publisher_node, - [&writer]() { + // The = is necessary due to an implementation difference between clang and windows + // Clang claims capture is unecessary (and we use -Werror), but windows needs it + [&writer, num_latched_messages = num_latched_messages]() { return writer.get_messages().size() == num_latched_messages; }); stop_recording();