Skip to content

Commit 85aaa4e

Browse files
committed
Generalize topic filter pipeline utility to be reusable
Improve readability, and mildly improve performance by processing one topic at a time and only creating one new map container, rather than copying to new containers on every operation. Signed-off-by: Emerson Knapp <[email protected]>
1 parent 94b04b2 commit 85aaa4e

File tree

6 files changed

+250
-349
lines changed

6 files changed

+250
-349
lines changed

rosbag2_transport/CMakeLists.txt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -179,11 +179,6 @@ function(create_tests_for_rmw_implementation)
179179
LINK_LIBS rosbag2_transport
180180
AMENT_DEPS test_msgs rosbag2_test_common)
181181

182-
rosbag2_transport_add_gmock(test_topic_filter
183-
test/rosbag2_transport/test_topic_filter.cpp
184-
INCLUDE_DIRS
185-
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/src/rosbag2_transport>
186-
LINK_LIBS rosbag2_transport)
187182
endfunction()
188183

189184
if(BUILD_TESTING)
@@ -201,6 +196,11 @@ if(BUILD_TESTING)
201196
target_include_directories(test_record_options PRIVATE include)
202197
target_link_libraries(test_record_options ${PROJECT_NAME})
203198

199+
ament_add_gmock(test_topic_filter
200+
test/rosbag2_transport/test_topic_filter.cpp)
201+
target_include_directories(test_topic_filter PRIVATE
202+
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/src/rosbag2_transport>)
203+
target_link_libraries(test_topic_filter rosbag2_transport)
204204
endif()
205205

206206
ament_package()

rosbag2_transport/src/rosbag2_transport/bag_rewrite.cpp

Lines changed: 17 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,16 @@
1414

1515
#include "rosbag2_transport/bag_rewrite.hpp"
1616

17+
#include <map>
1718
#include <memory>
1819
#include <string>
1920
#include <unordered_map>
2021
#include <unordered_set>
2122
#include <utility>
2223
#include <vector>
2324

24-
#include "rosbag2_compression/compression_options.hpp"
25-
#include "rosbag2_compression/sequential_compression_reader.hpp"
26-
#include "rosbag2_compression/sequential_compression_writer.hpp"
2725
#include "rosbag2_cpp/reader.hpp"
28-
#include "rosbag2_cpp/readers/sequential_reader.hpp"
2926
#include "rosbag2_cpp/writer.hpp"
30-
#include "rosbag2_cpp/writers/sequential_writer.hpp"
3127
#include "rosbag2_transport/reader_writer_factory.hpp"
3228

3329
#include "logging.hpp"
@@ -83,67 +79,29 @@ setup_topic_filtering(
8379
> & output_bags)
8480
{
8581
std::unordered_map<std::string, std::vector<rosbag2_cpp::Writer *>> filtered_outputs;
86-
std::unordered_map<std::string, std::string> input_topics;
82+
std::map<std::string, std::vector<std::string>> input_topics;
8783
std::unordered_map<std::string, YAML::Node> input_topics_qos_profiles;
8884

89-
// Filter inputs
90-
{
91-
std::unordered_map<std::string, std::unordered_set<std::string>> topic_names_and_types;
92-
std::unordered_set<std::string> unknown_types;
93-
94-
for (const auto & input_bag : input_bags) {
95-
auto bag_topics_and_types = input_bag->get_all_topics_and_types();
96-
for (const auto & topic_metadata : bag_topics_and_types) {
97-
const std::string & topic_name = topic_metadata.name;
98-
topic_names_and_types.try_emplace(topic_name);
99-
topic_names_and_types[topic_name].insert(topic_metadata.type);
100-
101-
// Gather all offered qos profiles from all inputs
102-
input_topics_qos_profiles.try_emplace(topic_name);
103-
YAML::Node & all_offered = input_topics_qos_profiles[topic_name];
104-
YAML::Node offered_qos_profiles = YAML::Load(topic_metadata.offered_qos_profiles);
105-
for (auto qos : offered_qos_profiles) {
106-
all_offered.push_back(qos);
107-
}
85+
for (const auto & input_bag : input_bags) {
86+
auto bag_topics_and_types = input_bag->get_all_topics_and_types();
87+
for (const auto & topic_metadata : bag_topics_and_types) {
88+
const std::string & topic_name = topic_metadata.name;
89+
input_topics.try_emplace(topic_name);
90+
input_topics[topic_name].push_back(topic_metadata.type);
91+
92+
// Gather all offered qos profiles from all inputs
93+
input_topics_qos_profiles.try_emplace(topic_name);
94+
YAML::Node & all_offered = input_topics_qos_profiles[topic_name];
95+
YAML::Node offered_qos_profiles = YAML::Load(topic_metadata.offered_qos_profiles);
96+
for (auto qos : offered_qos_profiles) {
97+
all_offered.push_back(qos);
10898
}
10999
}
110-
111-
// Filter topics with more than one type
112-
for (const auto & [topic_name, topic_types] : topic_names_and_types) {
113-
if (topic_types.size() > 1) {
114-
ROSBAG2_TRANSPORT_LOG_WARN_STREAM(
115-
"Topic '" << topic_name << "' has multiple types from inputs. " <<
116-
"Topics must have a single type, skipping topic.");
117-
} else {
118-
std::string topic_type = *topic_types.begin();
119-
input_topics[topic_name] = topic_type;
120-
}
121-
}
122-
123-
input_topics = rosbag2_transport::topic_filter::filter_topics_with_known_type(
124-
input_topics, unknown_types);
125100
}
126101

127-
// Filter to outputs
128102
for (const auto & [writer, record_options] : output_bags) {
129-
auto filtered_topics_and_types = input_topics;
130-
if (!record_options.topics.empty()) {
131-
std::vector<std::string> expanded_topics;
132-
for (const std::string & topic : record_options.topics) {
133-
auto expanded_topic = rclcpp::expand_topic_or_service_name(
134-
topic, "dummy_node_name", "/", false);
135-
expanded_topics.push_back(expanded_topic);
136-
}
137-
filtered_topics_and_types = rosbag2_transport::topic_filter::filter_topics(
138-
expanded_topics, input_topics);
139-
}
140-
if (!record_options.regex.empty() || !record_options.exclude.empty()) {
141-
filtered_topics_and_types = rosbag2_transport::topic_filter::filter_topics_using_regex(
142-
filtered_topics_and_types,
143-
record_options.regex,
144-
record_options.exclude,
145-
record_options.all);
146-
}
103+
rosbag2_transport::TopicFilter topic_filter{record_options};
104+
auto filtered_topics_and_types = topic_filter.filter_topics(input_topics);
147105

148106
// Done filtering - set up writer
149107
for (const auto & [topic_name, topic_type] : filtered_topics_and_types) {

rosbag2_transport/src/rosbag2_transport/recorder.cpp

Lines changed: 6 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ Recorder::Recorder(
9090
RCLCPP_INFO_STREAM(
9191
get_logger(),
9292
"Press " << key_str << " for pausing/resuming");
93+
94+
for (auto & topic : record_options_.topics) {
95+
topic = rclcpp::expand_topic_or_service_name(topic, get_name(), get_namespace(), false);
96+
}
9397
}
9498

9599
Recorder::~Recorder()
@@ -193,35 +197,8 @@ std::unordered_map<std::string, std::string>
193197
Recorder::get_requested_or_available_topics()
194198
{
195199
auto all_topics_and_types = this->get_topic_names_and_types();
196-
auto filtered_topics_and_types = topic_filter::filter_topics_with_more_than_one_type(
197-
all_topics_and_types, record_options_.include_hidden_topics);
198-
199-
filtered_topics_and_types = topic_filter::filter_topics_with_known_type(
200-
filtered_topics_and_types, topic_unknown_types_);
201-
202-
if (!record_options_.topics.empty()) {
203-
// expand specified topics
204-
std::vector<std::string> expanded_topics;
205-
expanded_topics.reserve(record_options_.topics.size());
206-
for (const auto & topic : record_options_.topics) {
207-
expanded_topics.push_back(
208-
rclcpp::expand_topic_or_service_name(
209-
topic, this->get_name(), this->get_namespace(), false));
210-
}
211-
filtered_topics_and_types = topic_filter::filter_topics(
212-
expanded_topics, filtered_topics_and_types);
213-
}
214-
215-
if (record_options_.regex.empty() && record_options_.exclude.empty()) {
216-
return filtered_topics_and_types;
217-
}
218-
219-
return topic_filter::filter_topics_using_regex(
220-
filtered_topics_and_types,
221-
record_options_.regex,
222-
record_options_.exclude,
223-
record_options_.all
224-
);
200+
TopicFilter topic_filter{record_options_};
201+
return topic_filter.filter_topics(all_topics_and_types);
225202
}
226203

227204
std::unordered_map<std::string, std::string>

0 commit comments

Comments
 (0)