Skip to content

Commit 2abc509

Browse files
authored
Bag rewriter (C++) (#920)
* Add C++ bag rewriter Signed-off-by: Emerson Knapp <[email protected]>
1 parent a060c1d commit 2abc509

File tree

12 files changed

+526
-5
lines changed

12 files changed

+526
-5
lines changed

rosbag2_transport/CMakeLists.txt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ find_package(shared_queues_vendor REQUIRED)
3636
find_package(yaml_cpp_vendor REQUIRED)
3737

3838
add_library(${PROJECT_NAME} SHARED
39+
src/rosbag2_transport/bag_rewrite.cpp
3940
src/rosbag2_transport/player.cpp
4041
src/rosbag2_transport/qos.cpp
4142
src/rosbag2_transport/reader_writer_factory.cpp
@@ -186,6 +187,8 @@ function(create_tests_for_rmw_implementation)
186187
endfunction()
187188

188189
if(BUILD_TESTING)
190+
add_definitions(-D_SRC_RESOURCES_DIR_PATH="${CMAKE_CURRENT_SOURCE_DIR}/test/resources")
191+
189192
find_package(ament_cmake_gmock REQUIRED)
190193
find_package(ament_index_cpp REQUIRED)
191194
find_package(ament_lint_auto REQUIRED)
@@ -205,6 +208,12 @@ if(BUILD_TESTING)
205208
target_include_directories(test_topic_filter PRIVATE
206209
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/src/rosbag2_transport>)
207210
target_link_libraries(test_topic_filter rosbag2_transport)
211+
212+
ament_add_gmock(test_rewrite
213+
test/rosbag2_transport/test_rewrite.cpp)
214+
target_include_directories(test_rewrite PRIVATE include)
215+
target_link_libraries(test_rewrite ${PROJECT_NAME})
216+
ament_target_dependencies(test_rewrite keyboard_handler rcpputils rosbag2_cpp test_msgs)
208217
endif()
209218

210219
ament_package()
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef ROSBAG2_TRANSPORT__BAG_REWRITE_HPP_
16+
#define ROSBAG2_TRANSPORT__BAG_REWRITE_HPP_
17+
18+
#include <memory>
19+
#include <utility>
20+
#include <vector>
21+
22+
#include "rosbag2_storage/storage_options.hpp"
23+
#include "rosbag2_transport/record_options.hpp"
24+
#include "rosbag2_transport/visibility_control.hpp"
25+
26+
namespace rosbag2_transport
27+
{
28+
/// Given one or more existing bags, write out one or more new bags with new settings.
29+
/// This generic feature enables (but is not limited to) the following features:
30+
/// - merge (multiple input bags, one output bag)
31+
/// - split (one input bag, one output bag with some size or duration splitting values)
32+
/// - filter (input bag(s) - output bag(s) accept different topics)
33+
/// - compress
34+
/// - serialization format conversion
35+
///
36+
/// Note: If a serialization format is not specified for an output bag's RecordOptions,
37+
/// any topic going into it will use the serialization format of the last input with that topic.
38+
///
39+
/// \param input_options vector of settings to create Readers for bags to read messages from
40+
/// \param output_bags - full "recording" configuration of the bag(s) to write messages to
41+
/// Each output bag will be passed messages from every input bag,
42+
/// on topics that pass its filtering settings
43+
ROSBAG2_TRANSPORT_PUBLIC
44+
void bag_rewrite(
45+
const std::vector<rosbag2_storage::StorageOptions> & input_options,
46+
const std::vector<
47+
std::pair<rosbag2_storage::StorageOptions, rosbag2_transport::RecordOptions>
48+
> & output_options
49+
);
50+
} // namespace rosbag2_transport
51+
52+
#endif // ROSBAG2_TRANSPORT__BAG_REWRITE_HPP_

rosbag2_transport/package.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
<test_depend>ament_lint_auto</test_depend>
2828
<test_depend>ament_lint_common</test_depend>
2929
<test_depend>rmw_implementation_cmake</test_depend>
30+
<test_depend>rosbag2_compression_zstd</test_depend>
3031
<test_depend>rosbag2_test_common</test_depend>
3132
<test_depend>test_msgs</test_depend>
3233

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "rosbag2_transport/bag_rewrite.hpp"
16+
17+
#include <map>
18+
#include <memory>
19+
#include <string>
20+
#include <unordered_map>
21+
#include <unordered_set>
22+
#include <utility>
23+
#include <vector>
24+
25+
#include "rosbag2_cpp/reader.hpp"
26+
#include "rosbag2_cpp/writer.hpp"
27+
#include "rosbag2_transport/reader_writer_factory.hpp"
28+
29+
#include "logging.hpp"
30+
#include "topic_filter.hpp"
31+
32+
namespace
33+
{
34+
35+
/// Find the next chronological message from all opened input bags.
36+
/// Updates the next_messages queue as necessary.
37+
/// next_messages is needed because Reader has no "peek" interface, we cannot put a message back.
38+
/// Returns nullptr when all input bags have been fully read.
39+
std::shared_ptr<rosbag2_storage::SerializedBagMessage> get_next(
40+
const std::vector<std::unique_ptr<rosbag2_cpp::Reader>> & input_bags,
41+
std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>> & next_messages)
42+
{
43+
// find message with lowest timestamp
44+
std::shared_ptr<rosbag2_storage::SerializedBagMessage> earliest_msg = nullptr;
45+
size_t earliest_msg_index = -1;
46+
for (size_t i = 0; i < next_messages.size(); i++) {
47+
// refill queue if bag not empty
48+
if (next_messages[i] == nullptr && input_bags[i]->has_next()) {
49+
next_messages[i] = input_bags[i]->read_next();
50+
}
51+
52+
auto & msg = next_messages[i];
53+
if (msg == nullptr) {
54+
continue;
55+
}
56+
if (earliest_msg == nullptr || msg->time_stamp < earliest_msg->time_stamp) {
57+
earliest_msg = msg;
58+
earliest_msg_index = i;
59+
}
60+
}
61+
62+
// clear returned message from queue before returning it, so it can be refilled next time
63+
if (earliest_msg != nullptr) {
64+
next_messages[earliest_msg_index].reset();
65+
}
66+
return earliest_msg;
67+
}
68+
69+
/// Discover what topics are in the inputs, filter out topics that can't be processed,
70+
/// create_topic on Writers that will receive topics.
71+
/// Return a map f topic -> vector of which Writers want to receive that topic,
72+
/// based on the RecordOptions.
73+
/// The output vector has bare pointers to the uniquely owned Writers,
74+
/// so this may not outlive the output_bags Writers.
75+
std::unordered_map<std::string, std::vector<rosbag2_cpp::Writer *>>
76+
setup_topic_filtering(
77+
const std::vector<std::unique_ptr<rosbag2_cpp::Reader>> & input_bags,
78+
const std::vector<
79+
std::pair<std::unique_ptr<rosbag2_cpp::Writer>, rosbag2_transport::RecordOptions>
80+
> & output_bags)
81+
{
82+
std::unordered_map<std::string, std::vector<rosbag2_cpp::Writer *>> filtered_outputs;
83+
std::map<std::string, std::vector<std::string>> input_topics;
84+
std::unordered_map<std::string, YAML::Node> input_topics_qos_profiles;
85+
std::unordered_map<std::string, std::string> input_topics_serialization_format;
86+
87+
for (const auto & input_bag : input_bags) {
88+
auto bag_topics_and_types = input_bag->get_all_topics_and_types();
89+
for (const auto & topic_metadata : bag_topics_and_types) {
90+
const std::string & topic_name = topic_metadata.name;
91+
input_topics.try_emplace(topic_name);
92+
input_topics[topic_name].push_back(topic_metadata.type);
93+
input_topics_serialization_format[topic_name] = topic_metadata.serialization_format;
94+
95+
// Gather all offered qos profiles from all inputs
96+
input_topics_qos_profiles.try_emplace(topic_name);
97+
YAML::Node & all_offered = input_topics_qos_profiles[topic_name];
98+
YAML::Node offered_qos_profiles = YAML::Load(topic_metadata.offered_qos_profiles);
99+
for (auto qos : offered_qos_profiles) {
100+
all_offered.push_back(qos);
101+
}
102+
}
103+
}
104+
105+
for (const auto & [writer, record_options] : output_bags) {
106+
rosbag2_transport::TopicFilter topic_filter{record_options};
107+
auto filtered_topics_and_types = topic_filter.filter_topics(input_topics);
108+
109+
// Done filtering - set up writer
110+
for (const auto & [topic_name, topic_type] : filtered_topics_and_types) {
111+
rosbag2_storage::TopicMetadata topic_metadata;
112+
topic_metadata.name = topic_name;
113+
topic_metadata.type = topic_type;
114+
115+
// Take source serialization format for the topic if output format is unspecified
116+
if (record_options.rmw_serialization_format.empty()) {
117+
topic_metadata.serialization_format = input_topics_serialization_format[topic_name];
118+
} else {
119+
topic_metadata.serialization_format = record_options.rmw_serialization_format;
120+
}
121+
122+
std::stringstream qos_profiles;
123+
qos_profiles << input_topics_qos_profiles[topic_name];
124+
topic_metadata.offered_qos_profiles = qos_profiles.str();
125+
writer->create_topic(topic_metadata);
126+
127+
filtered_outputs.try_emplace(topic_name);
128+
filtered_outputs[topic_name].push_back(writer.get());
129+
}
130+
}
131+
132+
return filtered_outputs;
133+
}
134+
135+
void perform_rewrite(
136+
const std::vector<std::unique_ptr<rosbag2_cpp::Reader>> & input_bags,
137+
const std::vector<
138+
std::pair<std::unique_ptr<rosbag2_cpp::Writer>, rosbag2_transport::RecordOptions>
139+
> & output_bags
140+
)
141+
{
142+
if (input_bags.empty() || output_bags.empty()) {
143+
throw std::runtime_error("Must provide at least one input and one output bag to rewrite.");
144+
}
145+
146+
auto topic_outputs = setup_topic_filtering(input_bags, output_bags);
147+
148+
std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>> next_messages;
149+
next_messages.resize(input_bags.size(), nullptr);
150+
151+
std::shared_ptr<rosbag2_storage::SerializedBagMessage> next_msg;
152+
while (next_msg = get_next(input_bags, next_messages)) {
153+
auto topic_writers = topic_outputs.find(next_msg->topic_name);
154+
if (topic_writers != topic_outputs.end()) {
155+
for (auto writer : topic_writers->second) {
156+
writer->write(next_msg);
157+
}
158+
}
159+
}
160+
}
161+
162+
} // namespace
163+
164+
namespace rosbag2_transport
165+
{
166+
void bag_rewrite(
167+
const std::vector<rosbag2_storage::StorageOptions> & input_options,
168+
const std::vector<
169+
std::pair<rosbag2_storage::StorageOptions, rosbag2_transport::RecordOptions>
170+
> & output_options
171+
)
172+
{
173+
std::vector<std::unique_ptr<rosbag2_cpp::Reader>> input_bags;
174+
std::vector<
175+
std::pair<std::unique_ptr<rosbag2_cpp::Writer>, rosbag2_transport::RecordOptions>
176+
> output_bags;
177+
178+
for (const auto & storage_options : input_options) {
179+
auto reader = ReaderWriterFactory::make_reader(storage_options);
180+
reader->open(storage_options);
181+
input_bags.push_back(std::move(reader));
182+
}
183+
184+
for (auto & [storage_options, record_options] : output_options) {
185+
// TODO(emersonknapp) - utilize cache to get better performance.
186+
// For now, zero cache allows for synchronous writes which are guaranteed to go through.
187+
// With cache enabled, the buffer could overflow and drop messages in this fast-write loop.
188+
// To enable the cache we will need to implement a mechanism for the writer to take messages
189+
// only when it is able to, which will likely require some new APIs.
190+
auto zero_cache_storage_options = storage_options;
191+
zero_cache_storage_options.max_cache_size = 0u;
192+
auto writer = ReaderWriterFactory::make_writer(record_options);
193+
writer->open(zero_cache_storage_options);
194+
output_bags.push_back(std::make_pair(std::move(writer), record_options));
195+
}
196+
197+
perform_rewrite(input_bags, output_bags);
198+
}
199+
} // namespace rosbag2_transport

rosbag2_transport/src/rosbag2_transport/topic_filter.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,9 @@ bool TopicFilter::take_topic(
124124
return false;
125125
}
126126

127-
if (record_options_.ignore_leaf_topics && is_leaf_topic(topic_name, *node_graph_)) {
127+
if (record_options_.ignore_leaf_topics && node_graph_ &&
128+
is_leaf_topic(topic_name, *node_graph_))
129+
{
128130
return false;
129131
}
130132

rosbag2_transport/src/rosbag2_transport/topic_filter.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class ROSBAG2_TRANSPORT_PUBLIC TopicFilter
4040
public:
4141
explicit TopicFilter(
4242
RecordOptions record_options,
43-
rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph,
43+
rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph = nullptr,
4444
bool allow_unknown_types = false);
4545
virtual ~TopicFilter();
4646

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
rosbag2_bagfile_information:
2+
version: 5
3+
storage_identifier: sqlite3
4+
duration:
5+
nanoseconds: 100000000
6+
starting_time:
7+
nanoseconds_since_epoch: 0
8+
message_count: 150
9+
topics_with_message_count:
10+
- topic_metadata:
11+
name: a_empty
12+
type: test_msgs/msg/Empty
13+
serialization_format: cdr
14+
offered_qos_profiles: "- history: 1\n depth: 1\n reliability: 1\n durability: 1\n deadline:\n sec: 9223372036\n nsec: 854775807\n lifespan:\n sec: 9223372036\n nsec: 854775807\n liveliness: 1\n liveliness_lease_duration:\n sec: 9223372036\n nsec: 854775807\n avoid_ros_namespace_conventions: false\n- history: 1\n depth: 2\n reliability: 2\n durability: 1\n deadline:\n sec: 9223372036\n nsec: 854775807\n lifespan:\n sec: 9223372036\n nsec: 854775807\n liveliness: 1\n liveliness_lease_duration:\n sec: 9223372036\n nsec: 854775807\n avoid_ros_namespace_conventions: false"
15+
message_count: 100
16+
- topic_metadata:
17+
name: b_basictypes
18+
type: test_msgs/msg/BasicTypes
19+
serialization_format: cdr
20+
offered_qos_profiles: "- history: 1\n depth: 1\n reliability: 1\n durability: 1\n deadline:\n sec: 9223372036\n nsec: 854775807\n lifespan:\n sec: 9223372036\n nsec: 854775807\n liveliness: 1\n liveliness_lease_duration:\n sec: 9223372036\n nsec: 854775807\n avoid_ros_namespace_conventions: false"
21+
message_count: 50
22+
compression_format: ""
23+
compression_mode: ""
24+
relative_file_paths:
25+
- rewriter_a_0.db3
26+
files:
27+
- path: rewriter_a_0.db3
28+
starting_time:
29+
nanoseconds_since_epoch: 0
30+
duration:
31+
nanoseconds: 100000000
32+
message_count: 150
24 KB
Binary file not shown.
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
rosbag2_bagfile_information:
2+
version: 5
3+
storage_identifier: sqlite3
4+
duration:
5+
nanoseconds: 222000000
6+
starting_time:
7+
nanoseconds_since_epoch: 0
8+
message_count: 75
9+
topics_with_message_count:
10+
- topic_metadata:
11+
name: c_strings
12+
type: test_msgs/msg/Strings
13+
serialization_format: cdr
14+
offered_qos_profiles: "- history: 1\n depth: 1\n reliability: 1\n durability: 1\n deadline:\n sec: 9223372036\n nsec: 854775807\n lifespan:\n sec: 9223372036\n nsec: 854775807\n liveliness: 1\n liveliness_lease_duration:\n sec: 9223372036\n nsec: 854775807\n avoid_ros_namespace_conventions: false"
15+
message_count: 50
16+
- topic_metadata:
17+
name: a_empty
18+
type: test_msgs/msg/Empty
19+
serialization_format: cdr
20+
offered_qos_profiles: "- history: 1\n depth: 1\n reliability: 1\n durability: 1\n deadline:\n sec: 9223372036\n nsec: 854775807\n lifespan:\n sec: 9223372036\n nsec: 854775807\n liveliness: 1\n liveliness_lease_duration:\n sec: 9223372036\n nsec: 854775807\n avoid_ros_namespace_conventions: false"
21+
message_count: 25
22+
compression_format: ""
23+
compression_mode: ""
24+
relative_file_paths:
25+
- rewriter_b_0.db3
26+
files:
27+
- path: rewriter_b_0.db3
28+
starting_time:
29+
nanoseconds_since_epoch: 0
30+
duration:
31+
nanoseconds: 222000000
32+
message_count: 75
32 KB
Binary file not shown.

0 commit comments

Comments
 (0)