Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,16 @@ class PublisherManager

template<class T>
void add_publisher(
const std::string & topic_name, std::shared_ptr<T> message, size_t expected_messages = 0)
const std::string & topic_name,
std::shared_ptr<T> message,
size_t expected_messages = 0,
const rclcpp::QoS & qos = rclcpp::QoS{rclcpp::KeepAll()})
{
auto node_name = std::string("publisher") + std::to_string(counter_++);
auto publisher_node = std::make_shared<rclcpp::Node>(
node_name,
rclcpp::NodeOptions().start_parameter_event_publisher(false).enable_rosout(false));
auto publisher = publisher_node->create_publisher<T>(topic_name, 10);
auto publisher = publisher_node->create_publisher<T>(topic_name, qos);

publisher_nodes_.push_back(publisher_node);
publishers_.push_back(
Expand Down
8 changes: 6 additions & 2 deletions rosbag2_transport/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic -Werror)
endif()

# TODO(piraka9011) Remove once testing is finished
add_definitions(-DROSBAG2_ENABLE_ADAPTIVE_QOS_SUBSCRIPTION)

# Windows supplies macros for min and max by default. We should only use min and max from stl
if(WIN32)
add_definitions(-DNOMINMAX)
Expand Down Expand Up @@ -150,9 +153,10 @@ function(create_tests_for_rmw_implementation)

rosbag2_transport_add_gmock(test_record
test/rosbag2_transport/test_record.cpp
${SKIP_TEST}
AMENT_DEPS test_msgs rosbag2_test_common
INCLUDE_DIRS src
LINK_LIBS rosbag2_transport
AMENT_DEPS test_msgs rosbag2_test_common)
${SKIP_TEST})

rosbag2_transport_add_gmock(test_play
test/rosbag2_transport/test_play.cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

#include <chrono>
#include <string>
#include <unordered_map>
#include <vector>

#include "rclcpp/rclcpp.hpp"

namespace rosbag2_transport
{
struct RecordOptions
Expand All @@ -32,6 +35,7 @@ struct RecordOptions
std::string node_prefix = "";
std::string compression_mode = "";
std::string compression_format = "";
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides{};
bool include_hidden_topics = false;
};

Expand Down
33 changes: 12 additions & 21 deletions rosbag2_transport/src/rosbag2_transport/recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,6 @@
#include "qos.hpp"
#include "rosbag2_node.hpp"

#ifdef _WIN32
// This is necessary because of a bug in yaml-cpp's cmake
#define YAML_CPP_DLL
// This is necessary because yaml-cpp does not always use dllimport/dllexport consistently
# pragma warning(push)
# pragma warning(disable:4251)
# pragma warning(disable:4275)
#endif
#include "yaml-cpp/yaml.h"
#ifdef _WIN32
# pragma warning(pop)
#endif

namespace
{
bool all_qos_same(const std::vector<rclcpp::TopicEndpointInfo> & values)
Expand All @@ -67,6 +54,7 @@ Recorder::Recorder(std::shared_ptr<rosbag2_cpp::Writer> writer, std::shared_ptr<

void Recorder::record(const RecordOptions & record_options)
{
topic_qos_profile_overrides_ = record_options.topic_qos_profile_overrides;
if (record_options.rmw_serialization_format.empty()) {
throw std::runtime_error("No serialization format specified!");
}
Expand Down Expand Up @@ -162,12 +150,7 @@ void Recorder::subscribe_topic(const rosbag2_storage::TopicMetadata & topic)
// that callback called before we reached out the line: writer_->create_topic(topic)
writer_->create_topic(topic);

// TODO(emersonknapp) re-enable common_qos_or_fallback once the cyclone situation is resolved
#ifdef ROSBAG2_ENABLE_ADAPTIVE_QOS_SUBSCRIPTION
Rosbag2QoS subscription_qos{common_qos_or_fallback(topic.name)};
#else
Rosbag2QoS subscription_qos{};
#endif // ROSBAG2_ENABLE_ADAPTIVE_QOS_SUBSCRIPTION
Rosbag2QoS subscription_qos{subscription_qos_for_topic(topic.name)};
auto subscription = create_subscription(topic.name, topic.type, subscription_qos);
if (subscription) {
subscriptions_.insert({topic.name, subscription});
Expand Down Expand Up @@ -218,8 +201,15 @@ std::string Recorder::serialized_offered_qos_profiles_for_topic(const std::strin
return YAML::Dump(offered_qos_profiles);
}

rclcpp::QoS Recorder::common_qos_or_fallback(const std::string & topic_name)
rclcpp::QoS Recorder::subscription_qos_for_topic(const std::string & topic_name)
{
rclcpp::QoS subscription_qos{rclcpp::KeepAll()};
if (topic_qos_profile_overrides_.count(topic_name)) {
subscription_qos = topic_qos_profile_overrides_.at(topic_name);
ROSBAG2_TRANSPORT_LOG_INFO_STREAM("Overriding subscription profile for " << topic_name);
}
// TODO(emersonknapp) re-enable subscription_qos_for_topic once the cyclone situation is resolved
#ifdef ROSBAG2_ENABLE_ADAPTIVE_QOS_SUBSCRIPTION
auto endpoints = node_->get_publishers_info_by_topic(topic_name);
if (!endpoints.empty() && all_qos_same(endpoints)) {
return Rosbag2QoS(endpoints[0].qos_profile()).default_history();
Expand All @@ -229,7 +219,8 @@ rclcpp::QoS Recorder::common_qos_or_fallback(const std::string & topic_name)
"Cannot determine what QoS to request, falling back to default QoS profile."
);
topics_warned_about_incompatibility_.insert(topic_name);
return Rosbag2QoS{};
#endif // ROSBAG2_ENABLE_ADAPTIVE_QOS_SUBSCRIPTION
return subscription_qos;
}

void Recorder::warn_if_new_qos_for_subscribed_topic(const std::string & topic_name)
Expand Down
19 changes: 14 additions & 5 deletions rosbag2_transport/src/rosbag2_transport/recorder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,19 @@ class Recorder

void record_messages() const;

/** Find the QoS profile that should be used for subscribing.
* If all currently offered QoS Profiles for a topic are the same, return that profile.
* Otherwise, print a warning and return a fallback value.
*/
rclcpp::QoS common_qos_or_fallback(const std::string & topic_name);
/**
* Find the QoS profile that should be used for subscribing.
*
* Profiles are prioritized by:
* 1. The override specified in the record_options, if one exists for the topic.
* 2. The publisher's offered QoS profile.
* If all current publishers are offering the exact same compatibility profile.
* 3. The default Rosbag2QoS profile, if the above conditions are not met.
*
* \param topic_name The full name of the topic, with namespace (ex. /arm/joint_status).
* \return The QoS profile to be used for subscribing.
*/
rclcpp::QoS subscription_qos_for_topic(const std::string & topic_name);

// Serialize all currently offered QoS profiles for a topic into a YAML list.
std::string serialized_offered_qos_profiles_for_topic(const std::string & topic_name);
Expand All @@ -101,6 +109,7 @@ class Recorder
std::unordered_map<std::string, std::shared_ptr<GenericSubscription>> subscriptions_;
std::unordered_set<std::string> topics_warned_about_incompatibility_;
std::string serialization_format_;
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides_;
};

} // namespace rosbag2_transport
Expand Down
42 changes: 39 additions & 3 deletions rosbag2_transport/test/rosbag2_transport/test_record.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@

#include <memory>
#include <string>
#include <unordered_map>
#include <vector>

#include "rclcpp/rclcpp.hpp"

#include "rosbag2_transport/qos.hpp"
#include "rosbag2_transport/rosbag2_transport.hpp"

#include "test_msgs/msg/arrays.hpp"
#include "test_msgs/msg/basic_types.hpp"
#include "test_msgs/message_fixtures.hpp"

#include "record_integration_fixture.hpp"
Expand Down Expand Up @@ -100,8 +101,7 @@ TEST_F(RecordIntegrationTestFixture, qos_is_stored_in_metadata)
));
}

TEST_F(RecordIntegrationTestFixture, records_sensor_data)
{
TEST_F(RecordIntegrationTestFixture, records_sensor_data) {
using clock = std::chrono::system_clock;
using namespace std::chrono_literals;

Expand Down Expand Up @@ -141,3 +141,39 @@ TEST_F(RecordIntegrationTestFixture, records_sensor_data)
EXPECT_FALSE(recorded_messages.empty());
}
#endif // ROSBAG2_ENABLE_ADAPTIVE_QOS_SUBSCRIPTION

TEST_F(RecordIntegrationTestFixture, topic_qos_overrides)
{
const auto num_expected_msgs = 3;
auto strict_msg = std::make_shared<test_msgs::msg::Strings>();
strict_msg->string_value = "strict";
const auto strict_topic = "/strict_topic";

rosbag2_transport::RecordOptions record_options =
{false, false, {strict_topic}, "rmw_format", 100ms};
const auto profile_override = rclcpp::QoS{rclcpp::KeepAll()}
.best_effort().durability_volatile().avoid_ros_namespace_conventions(false);
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides = {
{strict_topic, profile_override}
};
record_options.topic_qos_profile_overrides = topic_qos_profile_overrides;

// Create two publishers on the same topic with different QoS profiles.
// If no override is specified, then the recorder cannot see any published messages.
auto profile1 = rosbag2_transport::Rosbag2QoS{}.best_effort().durability_volatile();
auto profile2 = rosbag2_transport::Rosbag2QoS{}.best_effort().transient_local();
pub_man_.add_publisher<test_msgs::msg::Strings>(
strict_topic, strict_msg, num_expected_msgs, profile1);
pub_man_.add_publisher<test_msgs::msg::Strings>(
strict_topic, strict_msg, num_expected_msgs, profile2);

start_recording(record_options);
run_publishers();
stop_recording();

auto & writer =
static_cast<MockSequentialWriter &>(writer_->get_implementation_handle());
auto recorded_messages = writer.get_messages();

ASSERT_GE(recorded_messages.size(), 0u);
}