Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,8 @@ output_bags:
services: []
all_actions: false
actions: []
rmw_serialization_format: "" # defaults to using the format of the input topic
input_serialization_format: "" # defaults to using the format of the input topic
output_serialization_format: "" # defaults to using the format of the input topic
regex: ""
exclude_regex: ""
exclude_topics: []
Expand Down
4 changes: 2 additions & 2 deletions ros2bag/ros2bag/verb/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def add_recorder_arguments(parser: ArgumentParser) -> None:
# Core config
parser.add_argument(
'-f', '--serialization-format', default='', choices=serialization_choices,
help='The rmw serialization format in which the messages are saved, defaults to the '
help='The serialization format in which the messages are saved, defaults to the '
'rmw currently in use.')
parser.add_argument(
'-b', '--max-bag-size', type=int, default=0,
Expand Down Expand Up @@ -381,7 +381,7 @@ def main(self, *, args): # noqa: D102
record_options.actions = args.actions if args.actions else []

record_options.exclude_topic_types = args.exclude_topic_types
record_options.rmw_serialization_format = args.serialization_format
record_options.output_serialization_format = args.serialization_format
record_options.topic_polling_interval = datetime.timedelta(
milliseconds=args.polling_interval)
record_options.regex = args.regex
Expand Down
2 changes: 2 additions & 0 deletions rosbag2_py/rosbag2_py/_transport.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,10 @@ class RecordOptions:
ignore_leaf_topics: bool
include_hidden_topics: bool
include_unpublished_topics: bool
input_serialization_format: str
is_discovery_disabled: bool
node_prefix: str
output_serialization_format: str
regex: str
rmw_serialization_format: str
services: List[str]
Expand Down
30 changes: 26 additions & 4 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -578,8 +578,18 @@ class Recorder
rclcpp::init(arguments.argc(), arguments.argv(),
rclcpp::InitOptions(), rclcpp::SignalHandlerOptions::None);

if (record_options.rmw_serialization_format.empty()) {
record_options.rmw_serialization_format = std::string(rmw_get_serialization_format());
if (!record_options.rmw_serialization_format.empty() &&
record_options.output_serialization_format.empty())
{
record_options.output_serialization_format = record_options.rmw_serialization_format;
PyErr_WarnEx(PyExc_DeprecationWarning,
"The rmw_serialization_format option is deprecated and will be removed in a "
"future release.\nPlease use output_serialization_format instead.",
1
);
}
if (record_options.output_serialization_format.empty()) {
record_options.output_serialization_format = std::string(rmw_get_serialization_format());
}
auto writer = rosbag2_transport::ReaderWriterFactory::make_writer(record_options);

Expand Down Expand Up @@ -680,8 +690,18 @@ class Recorder
RecordOptions & record_options,
const std::string & node_name)
{
if (record_options.rmw_serialization_format.empty()) {
record_options.rmw_serialization_format = std::string(rmw_get_serialization_format());
if (!record_options.rmw_serialization_format.empty() &&
record_options.output_serialization_format.empty())
{
record_options.output_serialization_format = record_options.rmw_serialization_format;
PyErr_WarnEx(PyExc_DeprecationWarning,
"The rmw_serialization_format option is deprecated and will be removed in a "
"future release.\nPlease use output_serialization_format instead.",
1
);
}
if (record_options.output_serialization_format.empty()) {
record_options.output_serialization_format = std::string(rmw_get_serialization_format());
}
auto writer = rosbag2_transport::ReaderWriterFactory::make_writer(record_options);

Expand Down Expand Up @@ -911,6 +931,8 @@ PYBIND11_MODULE(_transport, m) {
.def_readwrite("topic_types", &RecordOptions::topic_types)
.def_readwrite("exclude_topic_types", &RecordOptions::exclude_topic_types)
.def_readwrite("rmw_serialization_format", &RecordOptions::rmw_serialization_format)
.def_readwrite("input_serialization_format", &RecordOptions::input_serialization_format)
.def_readwrite("output_serialization_format", &RecordOptions::output_serialization_format)
.def_readwrite("topic_polling_interval", &RecordOptions::topic_polling_interval)
.def_readwrite("regex", &RecordOptions::regex)
.def_readwrite("exclude_regex", &RecordOptions::exclude_regex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ TEST_P(Rosbag2CPPGetServiceInfoTest, get_service_info_for_bag_with_services_only
storage_options.storage_id = storage_id;
storage_options.uri = bag_path_str;
rosbag2_transport::RecordOptions record_options =
{false, true, false, false, {}, {}, {}, {}, {"/rosout"}, {}, {}, {}, "cdr", 100ms};
{false, true, false, false, {}, {}, {}, {}, {"/rosout"}, {}, {}, {}, {}, {}, "cdr", 100ms};
auto recorder = std::make_shared<rosbag2_transport::Recorder>(
std::move(writer), storage_options, record_options);
recorder->record();
Expand Down Expand Up @@ -292,7 +292,7 @@ TEST_P(Rosbag2CPPGetServiceInfoTest, get_service_info_for_bag_with_topics_and_se
storage_options.storage_id = storage_id;
storage_options.uri = bag_path_str;
rosbag2_transport::RecordOptions record_options =
{true, true, false, false, {}, {}, {}, {}, {"/rosout"}, {}, {}, {}, "cdr", 100ms};
{true, true, false, false, {}, {}, {}, {}, {"/rosout"}, {}, {}, {}, {}, {}, "cdr", 100ms};
auto recorder = std::make_shared<rosbag2_transport::Recorder>(
std::move(writer), storage_options, record_options);
recorder->record();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ struct RecordOptions
std::vector<std::string> exclude_topic_types;
std::vector<std::string> exclude_service_events; // service event topics list
std::vector<std::string> exclude_actions; // actions name list
// rmw_serialization_format deprecated. Use output_serialization_format instead
std::string rmw_serialization_format;
std::string input_serialization_format;
std::string output_serialization_format;
std::chrono::milliseconds topic_polling_interval{100};
std::string regex = "";
std::string exclude_regex = "";
Expand Down
10 changes: 8 additions & 2 deletions rosbag2_transport/src/rosbag2_transport/bag_rewrite.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,17 +114,23 @@ setup_topic_filtering(
rosbag2_transport::TopicFilter topic_filter{record_options, nullptr, true};
auto filtered_topics_and_types = topic_filter.filter_topics(input_topics);

std::string output_serialization_format = record_options.output_serialization_format;
// Fall back to the deprecated rmw_serialization_format if output format is unspecified
if (!record_options.rmw_serialization_format.empty() && output_serialization_format.empty()) {
output_serialization_format = record_options.rmw_serialization_format;
}

// Done filtering - set up writer
for (const auto & [topic_name, topic_type] : filtered_topics_and_types) {
rosbag2_storage::TopicMetadata topic_metadata;
topic_metadata.name = topic_name;
topic_metadata.type = topic_type;

// Take source serialization format for the topic if output format is unspecified
if (record_options.rmw_serialization_format.empty()) {
if (output_serialization_format.empty()) {
topic_metadata.serialization_format = input_topics_serialization_format[topic_name];
} else {
topic_metadata.serialization_format = record_options.rmw_serialization_format;
topic_metadata.serialization_format = output_serialization_format;
}

topic_metadata.offered_qos_profiles = input_topics_qos_profiles[topic_name];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,11 @@ RecordOptions get_record_options_from_node_params(rclcpp::Node & node)
record_options.rmw_serialization_format =
node.declare_parameter<std::string>("record.rmw_serialization_format", "cdr");

record_options.input_serialization_format =
node.declare_parameter<std::string>("record.input_serialization_format", "cdr");
record_options.output_serialization_format =
node.declare_parameter<std::string>("record.output_serialization_format", "cdr");

record_options.topic_polling_interval = param_utils::get_duration_from_node_param(
node, "record.topic_polling_interval",
0, 1000000).to_chrono<std::chrono::milliseconds>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class ReadersManagerImpl
earliest_timestamp_ = std::numeric_limits<rcutils_time_point_value_t>::max();
latest_timestamp_ = std::numeric_limits<rcutils_time_point_value_t>::min();
for (auto & [reader, options] : readers_with_options_) {
reader->open(options, {"", rmw_get_serialization_format()});
reader->open(options, {rmw_get_serialization_format(), rmw_get_serialization_format()});
if (reader->has_next()) {
next_messages_cache_.emplace_back(reader->read_next());
}
Expand Down
6 changes: 6 additions & 0 deletions rosbag2_transport/src/rosbag2_transport/record_options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ Node convert<rosbag2_transport::RecordOptions>::encode(
node["services"] = record_options.services;
node["actions"] = record_options.actions;
node["rmw_serialization_format"] = record_options.rmw_serialization_format;
node["input_serialization_format"] = record_options.input_serialization_format;
node["output_serialization_format"] = record_options.output_serialization_format;
node["topic_polling_interval"] = record_options.topic_polling_interval;
node["regex"] = record_options.regex;
node["exclude_regex"] = record_options.exclude_regex;
Expand Down Expand Up @@ -79,6 +81,10 @@ bool convert<rosbag2_transport::RecordOptions>::decode(
optional_assign<std::vector<std::string>>(node, "actions", record_options.actions);
optional_assign<std::string>(
node, "rmw_serialization_format", record_options.rmw_serialization_format);
optional_assign<std::string>(
node, "input_serialization_format", record_options.input_serialization_format);
optional_assign<std::string>(
node, "output_serialization_format", record_options.output_serialization_format);

optional_assign<std::chrono::milliseconds>(
node, "topic_polling_interval", record_options.topic_polling_interval);
Expand Down
28 changes: 22 additions & 6 deletions rosbag2_transport/src/rosbag2_transport/recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ class RecorderImpl
std::unique_ptr<TopicFilter> topic_filter_;
rclcpp::Event::SharedPtr discovery_graph_event_;
std::future<void> discovery_future_;
std::string serialization_format_;
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides_;
std::unordered_set<std::string> topic_unknown_types_;
rclcpp::Service<rosbag2_interfaces::srv::IsPaused>::SharedPtr srv_is_paused_;
Expand Down Expand Up @@ -269,15 +268,33 @@ void RecorderImpl::record()
}
paused_ = record_options_.start_paused;
topic_qos_profile_overrides_ = record_options_.topic_qos_profile_overrides;
if (record_options_.rmw_serialization_format.empty()) {
throw std::runtime_error("No serialization format specified!");
// Check serialization format options
if (!record_options_.rmw_serialization_format.empty() &&
record_options_.output_serialization_format.empty())
{
RCLCPP_WARN(node->get_logger(),
"The rmw_serialization_format option is deprecated and will be removed in a future release.\n"
"Please use output_serialization_format instead.");
record_options_.output_serialization_format = record_options_.rmw_serialization_format;
}
if (record_options_.input_serialization_format.empty()) {
record_options_.input_serialization_format = rmw_get_serialization_format();
RCLCPP_WARN(node->get_logger(),
"No input serialization format specified, using default rmw serialization format: '%s'.",
record_options_.input_serialization_format.c_str());
}
if (record_options_.output_serialization_format.empty()) {
record_options_.output_serialization_format = rmw_get_serialization_format();
RCLCPP_WARN(node->get_logger(),
"No output serialization format specified, using rmw serialization format. '%s'.",
record_options_.output_serialization_format.c_str());
}
subscriptions_.clear();
event_notifier_->reset_total_num_messages_lost_in_transport();
event_notifier_->reset_total_num_messages_lost_in_recorder();
writer_->open(
storage_options_,
{rmw_get_serialization_format(), record_options_.rmw_serialization_format});
{record_options_.input_serialization_format, record_options_.output_serialization_format});

// Only expose snapshot service when mode is enabled
if (storage_options_.snapshot_mode) {
Expand Down Expand Up @@ -343,7 +360,6 @@ void RecorderImpl::record()
};
writer_->add_event_callbacks(callbacks);

serialization_format_ = record_options_.rmw_serialization_format;
RCLCPP_INFO(node->get_logger(), "Listening for topics...");
if (!record_options_.use_sim_time) {
subscribe_topics(get_requested_or_available_topics());
Expand Down Expand Up @@ -522,7 +538,7 @@ void RecorderImpl::subscribe_topics(
0u,
topic_with_type.first,
topic_with_type.second,
serialization_format_,
record_options_.input_serialization_format,
offered_qos_profiles_for_topic(endpoint_infos),
type_description_hash_for_topic(endpoint_infos),
});
Expand Down
3 changes: 2 additions & 1 deletion rosbag2_transport/test/resources/recorder_node_params.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ recorder_params_node:
exclude_topic_types: ["sensor_msgs/msg/Image"]
services: ["service", "other_service"]
actions: ["action", "other_action"]
rmw_serialization_format: "cdr"
input_serialization_format: "cdr"
output_serialization_format: "cdr"
topic_polling_interval:
sec: 0
nsec: 10000000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ TEST_P(ComposableRecorderTests, recorder_can_parse_parameters_from_file) {
std::vector<std::string> services {"/service/_service_event", "/other_service/_service_event"};
EXPECT_EQ(record_options.services, services);
EXPECT_EQ(record_options.rmw_serialization_format, "cdr");
EXPECT_EQ(record_options.input_serialization_format, "cdr");
EXPECT_EQ(record_options.output_serialization_format, "cdr");
EXPECT_TRUE(record_options.topic_polling_interval == 0.01s);
EXPECT_EQ(record_options.regex, "[xyz]/topic");
EXPECT_EQ(record_options.exclude_regex, "(.*)");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ TEST_F(RecordIntegrationTestFixture, test_keyboard_controls)
auto keyboard_handler = std::make_shared<MockKeyboardHandler>();

rosbag2_transport::RecordOptions record_options =
{true, false, false, false, {}, {}, {}, {}, {}, {}, {}, {}, "rmw_format", 100ms};
{true, false, false, false, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, "rmw_format", 100ms};
record_options.start_paused = true;

auto recorder = std::make_shared<Recorder>(
Expand Down
22 changes: 14 additions & 8 deletions rosbag2_transport/test/rosbag2_transport/test_record.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are

rosbag2_transport::RecordOptions record_options =
{false, false, false, false, {string_topic, array_topic},
{}, {}, {}, {}, {}, {}, {}, "rmw_format", 50ms};
{}, {}, {}, {}, {}, {}, {}, {}, "rmw_format", "rmw_format", 50ms};
auto recorder = std::make_shared<rosbag2_transport::Recorder>(
std::move(writer_), storage_options_, record_options);
recorder->record();
Expand Down Expand Up @@ -140,7 +140,10 @@ TEST_F(RecordIntegrationTestFixture, can_record_again_after_stop)
test_topic, basic_type_message, num_messages_to_publish, rclcpp::QoS{rclcpp::KeepAll()}, 50ms);

rosbag2_transport::RecordOptions record_options =
{false, false, false, false, {test_topic}, {}, {}, {}, {}, {}, {}, {}, "rmw_format", 50ms};
{
false, false, false, false, {test_topic}, {}, {}, {}, {}, {}, {}, {}, {}, "rmw_format",
"rmw_format", 50ms
};
auto recorder = std::make_shared<rosbag2_transport::Recorder>(
std::move(writer_), storage_options_, record_options);
recorder->record();
Expand Down Expand Up @@ -224,7 +227,7 @@ TEST_F(RecordIntegrationTestFixture, qos_is_stored_in_metadata)
pub_manager.setup_publisher(topic, string_message, 2);

rosbag2_transport::RecordOptions record_options =
{false, false, false, false, {topic}, {}, {}, {}, {}, {}, {}, {}, "rmw_format", 100ms};
{false, false, false, false, {topic}, {}, {}, {}, {}, {}, {}, {}, {}, {}, "rmw_format", 100ms};
auto recorder = std::make_shared<rosbag2_transport::Recorder>(
std::move(writer_), storage_options_, record_options);
recorder->record();
Expand Down Expand Up @@ -289,7 +292,7 @@ TEST_F(RecordIntegrationTestFixture, records_sensor_data)
pub_manager.setup_publisher(topic, string_message, 2, rclcpp::SensorDataQoS());

rosbag2_transport::RecordOptions record_options =
{false, false, false, false, {topic}, {}, {}, {}, {}, {}, {}, {}, "rmw_format", 100ms};
{false, false, false, false, {topic}, {}, {}, {}, {}, {}, {}, {}, {}, {}, "rmw_format", 100ms};
auto recorder = std::make_shared<rosbag2_transport::Recorder>(
std::move(writer_), storage_options_, record_options);
recorder->record();
Expand Down Expand Up @@ -333,7 +336,7 @@ TEST_F(RecordIntegrationTestFixture, receives_latched_messages)
pub_manager.run_publishers();

rosbag2_transport::RecordOptions record_options =
{false, false, false, false, {topic}, {}, {}, {}, {}, {}, {}, {}, "rmw_format", 100ms};
{false, false, false, false, {topic}, {}, {}, {}, {}, {}, {}, {}, {}, {}, "rmw_format", 100ms};
auto recorder = std::make_shared<rosbag2_transport::Recorder>(
std::move(writer_), storage_options_, record_options);
recorder->record();
Expand Down Expand Up @@ -379,7 +382,7 @@ TEST_F(RecordIntegrationTestFixture, mixed_qos_subscribes) {
topic, profile_transient_local);

rosbag2_transport::RecordOptions record_options =
{false, false, false, false, {topic}, {}, {}, {}, {}, {}, {}, {}, "rmw_format", 100ms};
{false, false, false, false, {topic}, {}, {}, {}, {}, {}, {}, {}, {}, {}, "rmw_format", 100ms};
auto recorder = std::make_shared<rosbag2_transport::Recorder>(
std::move(writer_), storage_options_, record_options);
recorder->record();
Expand Down Expand Up @@ -428,7 +431,7 @@ TEST_F(RecordIntegrationTestFixture, duration_and_noncompatibility_policies_mixe
auto publisher_liveliness = create_pub(profile_liveliness);

rosbag2_transport::RecordOptions record_options =
{false, false, false, false, {topic}, {}, {}, {}, {}, {}, {}, {}, "rmw_format", 100ms};
{false, false, false, false, {topic}, {}, {}, {}, {}, {}, {}, {}, {}, {}, "rmw_format", 100ms};
auto recorder = std::make_shared<rosbag2_transport::Recorder>(
std::move(writer_), storage_options_, record_options);
recorder->record();
Expand Down Expand Up @@ -470,7 +473,10 @@ TEST_F(RecordIntegrationTestFixture, write_split_callback_is_called)
}

rosbag2_transport::RecordOptions record_options =
{false, false, false, false, {string_topic}, {}, {}, {}, {}, {}, {}, {}, "rmw_format", 10ms};
{
false, false, false, false, {string_topic}, {}, {}, {}, {}, {}, {}, {}, {}, {}, "rmw_format",
10ms
};
auto recorder = std::make_shared<rosbag2_transport::Recorder>(
std::move(writer_), storage_options_, record_options);

Expand Down
Loading
Loading