diff --git a/rosbag2_test_common/include/rosbag2_test_common/publisher_manager.hpp b/rosbag2_test_common/include/rosbag2_test_common/publisher_manager.hpp index 80d63fbdbf..f5c376b03b 100644 --- a/rosbag2_test_common/include/rosbag2_test_common/publisher_manager.hpp +++ b/rosbag2_test_common/include/rosbag2_test_common/publisher_manager.hpp @@ -75,13 +75,16 @@ class PublisherManager template void add_publisher( - const std::string & topic_name, std::shared_ptr message, size_t expected_messages = 0) + const std::string & topic_name, + std::shared_ptr message, + size_t expected_messages = 0, + const rclcpp::QoS & qos = rclcpp::QoS{rclcpp::KeepAll()}) { auto node_name = std::string("publisher") + std::to_string(counter_++); auto publisher_node = std::make_shared( node_name, rclcpp::NodeOptions().start_parameter_event_publisher(false).enable_rosout(false)); - auto publisher = publisher_node->create_publisher(topic_name, 10); + auto publisher = publisher_node->create_publisher(topic_name, qos); publisher_nodes_.push_back(publisher_node); publishers_.push_back( diff --git a/rosbag2_transport/CMakeLists.txt b/rosbag2_transport/CMakeLists.txt index f5fafca5f7..4cd64a0386 100644 --- a/rosbag2_transport/CMakeLists.txt +++ b/rosbag2_transport/CMakeLists.txt @@ -150,9 +150,10 @@ function(create_tests_for_rmw_implementation) rosbag2_transport_add_gmock(test_record test/rosbag2_transport/test_record.cpp - ${SKIP_TEST} + AMENT_DEPS test_msgs rosbag2_test_common + INCLUDE_DIRS $ LINK_LIBS rosbag2_transport - AMENT_DEPS test_msgs rosbag2_test_common) + ${SKIP_TEST}) rosbag2_transport_add_gmock(test_play test/rosbag2_transport/test_play.cpp diff --git a/rosbag2_transport/include/rosbag2_transport/record_options.hpp b/rosbag2_transport/include/rosbag2_transport/record_options.hpp index e53d9a1d31..adb0573c2b 100644 --- a/rosbag2_transport/include/rosbag2_transport/record_options.hpp +++ b/rosbag2_transport/include/rosbag2_transport/record_options.hpp @@ -17,8 +17,11 @@ #include #include +#include #include +#include "rclcpp/rclcpp.hpp" + namespace rosbag2_transport { struct RecordOptions @@ -32,6 +35,7 @@ struct RecordOptions std::string node_prefix = ""; std::string compression_mode = ""; std::string compression_format = ""; + std::unordered_map topic_qos_profile_overrides{}; bool include_hidden_topics = false; }; diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.cpp b/rosbag2_transport/src/rosbag2_transport/recorder.cpp index f0cf9ce0c1..09f69d0b7e 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.cpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.cpp @@ -46,6 +46,8 @@ namespace { +// TODO(emersonknapp) re-enable subscription_qos_for_topic once the cyclone situation is resolved +#ifdef ROSBAG2_ENABLE_ADAPTIVE_QOS_SUBSCRIPTION bool all_qos_same(const std::vector & values) { auto adjacent_different_elements_it = std::adjacent_find( @@ -58,6 +60,7 @@ bool all_qos_same(const std::vector & values) // No adjacent elements were different, so all are the same. return adjacent_different_elements_it == values.end(); } +#endif // ROSBAG2_ENABLE_ADAPTIVE_QOS_SUBSCRIPTION } // unnamed namespace namespace rosbag2_transport @@ -67,6 +70,7 @@ Recorder::Recorder(std::shared_ptr writer, std::shared_ptr< void Recorder::record(const RecordOptions & record_options) { + topic_qos_profile_overrides_ = record_options.topic_qos_profile_overrides; if (record_options.rmw_serialization_format.empty()) { throw std::runtime_error("No serialization format specified!"); } @@ -162,12 +166,7 @@ void Recorder::subscribe_topic(const rosbag2_storage::TopicMetadata & topic) // that callback called before we reached out the line: writer_->create_topic(topic) writer_->create_topic(topic); - // TODO(emersonknapp) re-enable common_qos_or_fallback once the cyclone situation is resolved - #ifdef ROSBAG2_ENABLE_ADAPTIVE_QOS_SUBSCRIPTION - Rosbag2QoS subscription_qos{common_qos_or_fallback(topic.name)}; - #else - Rosbag2QoS subscription_qos{}; - #endif // ROSBAG2_ENABLE_ADAPTIVE_QOS_SUBSCRIPTION + Rosbag2QoS subscription_qos{subscription_qos_for_topic(topic.name)}; auto subscription = create_subscription(topic.name, topic.type, subscription_qos); if (subscription) { subscriptions_.insert({topic.name, subscription}); @@ -218,8 +217,15 @@ std::string Recorder::serialized_offered_qos_profiles_for_topic(const std::strin return YAML::Dump(offered_qos_profiles); } -rclcpp::QoS Recorder::common_qos_or_fallback(const std::string & topic_name) +rclcpp::QoS Recorder::subscription_qos_for_topic(const std::string & topic_name) { + rclcpp::QoS subscription_qos{rclcpp::KeepAll()}; + if (topic_qos_profile_overrides_.count(topic_name)) { + subscription_qos = topic_qos_profile_overrides_.at(topic_name); + ROSBAG2_TRANSPORT_LOG_INFO_STREAM("Overriding subscription profile for " << topic_name); + } + // TODO(emersonknapp) re-enable subscription_qos_for_topic once the cyclone situation is resolved + #ifdef ROSBAG2_ENABLE_ADAPTIVE_QOS_SUBSCRIPTION auto endpoints = node_->get_publishers_info_by_topic(topic_name); if (!endpoints.empty() && all_qos_same(endpoints)) { return Rosbag2QoS(endpoints[0].qos_profile()).default_history(); @@ -229,7 +235,8 @@ rclcpp::QoS Recorder::common_qos_or_fallback(const std::string & topic_name) "Cannot determine what QoS to request, falling back to default QoS profile." ); topics_warned_about_incompatibility_.insert(topic_name); - return Rosbag2QoS{}; + #endif // ROSBAG2_ENABLE_ADAPTIVE_QOS_SUBSCRIPTION + return subscription_qos; } void Recorder::warn_if_new_qos_for_subscribed_topic(const std::string & topic_name) diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.hpp b/rosbag2_transport/src/rosbag2_transport/recorder.hpp index 25c842978e..2311b5839f 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.hpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.hpp @@ -85,11 +85,19 @@ class Recorder void record_messages() const; - /** Find the QoS profile that should be used for subscribing. - * If all currently offered QoS Profiles for a topic are the same, return that profile. - * Otherwise, print a warning and return a fallback value. - */ - rclcpp::QoS common_qos_or_fallback(const std::string & topic_name); + /** + * Find the QoS profile that should be used for subscribing. + * + * Profiles are prioritized by: + * 1. The override specified in the record_options, if one exists for the topic. + * 2. The publisher's offered QoS profile if all current publishers are offering the exact same + * compatibility profile. + * 3. The default Rosbag2QoS profile, if the above conditions are not met. + * + * \param topic_name The full name of the topic, with namespace (ex. /arm/joint_status). + * \return The QoS profile to be used for subscribing. + */ + rclcpp::QoS subscription_qos_for_topic(const std::string & topic_name); // Serialize all currently offered QoS profiles for a topic into a YAML list. std::string serialized_offered_qos_profiles_for_topic(const std::string & topic_name); @@ -101,6 +109,7 @@ class Recorder std::unordered_map> subscriptions_; std::unordered_set topics_warned_about_incompatibility_; std::string serialization_format_; + std::unordered_map topic_qos_profile_overrides_; }; } // namespace rosbag2_transport diff --git a/rosbag2_transport/test/rosbag2_transport/test_record.cpp b/rosbag2_transport/test/rosbag2_transport/test_record.cpp index ced11aec86..efe1a75c54 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record.cpp @@ -16,14 +16,15 @@ #include #include +#include #include #include "rclcpp/rclcpp.hpp" +#include "qos.hpp" #include "rosbag2_transport/rosbag2_transport.hpp" #include "test_msgs/msg/arrays.hpp" -#include "test_msgs/msg/basic_types.hpp" #include "test_msgs/message_fixtures.hpp" #include "record_integration_fixture.hpp" @@ -100,8 +101,7 @@ TEST_F(RecordIntegrationTestFixture, qos_is_stored_in_metadata) )); } -TEST_F(RecordIntegrationTestFixture, records_sensor_data) -{ +TEST_F(RecordIntegrationTestFixture, records_sensor_data) { using clock = std::chrono::system_clock; using namespace std::chrono_literals; @@ -141,3 +141,39 @@ TEST_F(RecordIntegrationTestFixture, records_sensor_data) EXPECT_FALSE(recorded_messages.empty()); } #endif // ROSBAG2_ENABLE_ADAPTIVE_QOS_SUBSCRIPTION + +TEST_F(RecordIntegrationTestFixture, topic_qos_overrides) +{ + const auto num_msgs = 3; + auto strict_msg = std::make_shared(); + strict_msg->string_value = "strict"; + const auto strict_topic = "/strict_topic"; + + rosbag2_transport::RecordOptions record_options = + {false, false, {strict_topic}, "rmw_format", 100ms}; + const auto profile_override = rclcpp::QoS{rclcpp::KeepAll()} + .best_effort().durability_volatile().avoid_ros_namespace_conventions(false); + std::unordered_map topic_qos_profile_overrides = { + {strict_topic, profile_override} + }; + record_options.topic_qos_profile_overrides = topic_qos_profile_overrides; + + // Create two BEST_EFFORT publishers on the same topic with different Durability policies. + // If no override is specified, then the recorder cannot see any published messages. + auto profile1 = rosbag2_transport::Rosbag2QoS{}.best_effort().durability_volatile(); + auto profile2 = rosbag2_transport::Rosbag2QoS{}.best_effort().transient_local(); + pub_man_.add_publisher( + strict_topic, strict_msg, num_msgs, profile1); + pub_man_.add_publisher( + strict_topic, strict_msg, num_msgs, profile2); + + start_recording(record_options); + run_publishers(); + stop_recording(); + + auto & writer = + static_cast(writer_->get_implementation_handle()); + auto recorded_messages = writer.get_messages(); + + ASSERT_GE(recorded_messages.size(), 0u); +}