Skip to content

Commit 94b04b2

Browse files
committed
Generalize reader and writer creation into common factory
Signed-off-by: Emerson Knapp <[email protected]>
1 parent fbfedc5 commit 94b04b2

File tree

6 files changed

+130
-118
lines changed

6 files changed

+130
-118
lines changed

rosbag2_py/src/rosbag2_py/_transport.cpp

Lines changed: 3 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,10 @@
2020
#include <utility>
2121
#include <vector>
2222

23-
#include "rosbag2_compression/compression_options.hpp"
24-
#include "rosbag2_compression/sequential_compression_reader.hpp"
25-
#include "rosbag2_compression/sequential_compression_writer.hpp"
26-
#include "rosbag2_cpp/reader.hpp"
27-
#include "rosbag2_cpp/readers/sequential_reader.hpp"
28-
#include "rosbag2_cpp/writer.hpp"
29-
#include "rosbag2_cpp/writers/sequential_writer.hpp"
3023
#include "rosbag2_storage/storage_options.hpp"
3124
#include "rosbag2_transport/play_options.hpp"
3225
#include "rosbag2_transport/player.hpp"
26+
#include "rosbag2_transport/reader_writer_factory.hpp"
3327
#include "rosbag2_transport/record_options.hpp"
3428
#include "rosbag2_transport/recorder.hpp"
3529

@@ -116,24 +110,7 @@ class Player
116110
const rosbag2_storage::StorageOptions & storage_options,
117111
PlayOptions & play_options)
118112
{
119-
std::unique_ptr<rosbag2_cpp::Reader> reader = nullptr;
120-
// Determine whether to build compression or regular reader
121-
{
122-
rosbag2_storage::MetadataIo metadata_io{};
123-
rosbag2_storage::BagMetadata metadata{};
124-
if (metadata_io.metadata_file_exists(storage_options.uri)) {
125-
metadata = metadata_io.read_metadata(storage_options.uri);
126-
if (!metadata.compression_format.empty()) {
127-
reader = std::make_unique<rosbag2_cpp::Reader>(
128-
std::make_unique<rosbag2_compression::SequentialCompressionReader>());
129-
}
130-
}
131-
if (reader == nullptr) {
132-
reader = std::make_unique<rosbag2_cpp::Reader>(
133-
std::make_unique<rosbag2_cpp::readers::SequentialReader>());
134-
}
135-
}
136-
113+
auto reader = rosbag2_transport::ReaderWriterFactory::make_reader(storage_options);
137114
auto player = std::make_shared<rosbag2_transport::Player>(
138115
std::move(reader), storage_options, play_options);
139116

@@ -175,31 +152,11 @@ class Recorder
175152
const rosbag2_storage::StorageOptions & storage_options,
176153
RecordOptions & record_options)
177154
{
178-
rosbag2_compression::CompressionOptions compression_options {
179-
record_options.compression_format,
180-
rosbag2_compression::compression_mode_from_string(record_options.compression_mode),
181-
record_options.compression_queue_size,
182-
record_options.compression_threads
183-
};
184-
if (compression_options.compression_threads < 1) {
185-
compression_options.compression_threads = std::thread::hardware_concurrency();
186-
}
187-
188155
if (record_options.rmw_serialization_format.empty()) {
189156
record_options.rmw_serialization_format = std::string(rmw_get_serialization_format());
190157
}
191158

192-
193-
std::unique_ptr<rosbag2_cpp::Writer> writer = nullptr;
194-
// Change writer based on recording options
195-
if (!record_options.compression_format.empty()) {
196-
writer = std::make_unique<rosbag2_cpp::Writer>(
197-
std::make_unique<rosbag2_compression::SequentialCompressionWriter>(compression_options));
198-
} else {
199-
writer = std::make_unique<rosbag2_cpp::Writer>(
200-
std::make_unique<rosbag2_cpp::writers::SequentialWriter>());
201-
}
202-
159+
auto writer = rosbag2_transport::ReaderWriterFactory::make_writer(record_options);
203160
auto recorder = std::make_shared<rosbag2_transport::Recorder>(
204161
std::move(writer), storage_options, record_options);
205162
recorder->record();

rosbag2_transport/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ add_library(${PROJECT_NAME} SHARED
3939
src/rosbag2_transport/bag_rewrite.cpp
4040
src/rosbag2_transport/player.cpp
4141
src/rosbag2_transport/qos.cpp
42+
src/rosbag2_transport/reader_writer_factory.cpp
4243
src/rosbag2_transport/recorder.cpp
4344
src/rosbag2_transport/record_options.cpp
4445
src/rosbag2_transport/topic_filter.cpp)

rosbag2_transport/include/rosbag2_transport/bag_rewrite.hpp

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -32,28 +32,16 @@ namespace rosbag2_transport
3232
/// - compress
3333
/// - serialization format conversion
3434
///
35-
/// \param input_bags - a vector of Readers for bag inputs. Must already be "open"ed
36-
/// \param output_bags - full "recording" configuration of the bag(s) to output.
37-
/// Must already be "open"ed
38-
/// Each output bag will be passed every message from each input bag in timestamp order,
39-
/// and is responsible for being configured to flter/process as needed.
40-
void bag_rewrite(
41-
const std::vector<std::unique_ptr<rosbag2_cpp::Reader>> & input_bags,
42-
const std::vector<
43-
std::pair<std::unique_ptr<rosbag2_cpp::Writer>, rosbag2_transport::RecordOptions>
44-
> & output_bags
45-
);
46-
47-
/// See above bag_rewrite documentation
48-
/// Convenience signature to create and open necessary Readers and Writers, given
49-
/// the relevant Options structs.
35+
/// \param input_options - a vector of Readers for bags to read messages from
36+
/// \param output_bags - full "recording" configuration of the bag(s) to write messages to
37+
/// Each output bag will be passed messages from every input bag, on topics that pass its
38+
/// filtering options
5039
void bag_rewrite(
5140
const std::vector<rosbag2_storage::StorageOptions> & input_options,
5241
const std::vector<
5342
std::pair<rosbag2_storage::StorageOptions, rosbag2_transport::RecordOptions>
5443
> & output_options
5544
);
56-
5745
} // namespace rosbag2_transport
5846

5947
#endif // ROSBAG2_TRANSPORT__BAG_REWRITE_HPP_
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef ROSBAG2_TRANSPORT__READER_WRITER_FACTORY_HPP_
16+
#define ROSBAG2_TRANSPORT__READER_WRITER_FACTORY_HPP_
17+
18+
#include <memory>
19+
20+
#include "rosbag2_cpp/reader.hpp"
21+
#include "rosbag2_cpp/writer.hpp"
22+
#include "rosbag2_storage/storage_options.hpp"
23+
#include "rosbag2_transport/record_options.hpp"
24+
#include "rosbag2_transport/visibility_control.hpp"
25+
26+
namespace rosbag2_transport
27+
{
28+
class ROSBAG2_TRANSPORT_PUBLIC ReaderWriterFactory
29+
{
30+
public:
31+
/// Create a Reader with the appropriate underlying implementation.
32+
static std::unique_ptr<rosbag2_cpp::Reader> make_reader(
33+
const rosbag2_storage::StorageOptions & storage_options);
34+
35+
/// Create a Writer with the appropriate underlying implementation.
36+
static std::unique_ptr<rosbag2_cpp::Writer> make_writer(
37+
const rosbag2_transport::RecordOptions & record_options);
38+
};
39+
} // namespace rosbag2_transport
40+
41+
#endif // ROSBAG2_TRANSPORT__READER_WRITER_FACTORY_HPP_

rosbag2_transport/src/rosbag2_transport/bag_rewrite.cpp

Lines changed: 11 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
#include "rosbag2_transport/bag_rewrite.hpp"
16+
1517
#include <memory>
1618
#include <string>
1719
#include <unordered_map>
@@ -26,59 +28,14 @@
2628
#include "rosbag2_cpp/readers/sequential_reader.hpp"
2729
#include "rosbag2_cpp/writer.hpp"
2830
#include "rosbag2_cpp/writers/sequential_writer.hpp"
29-
#include "rosbag2_transport/bag_rewrite.hpp"
31+
#include "rosbag2_transport/reader_writer_factory.hpp"
3032

3133
#include "logging.hpp"
3234
#include "topic_filter.hpp"
3335

3436
namespace
3537
{
3638

37-
/// Create a Reader with the appropriate underlying implementation.
38-
std::unique_ptr<rosbag2_cpp::Reader> make_reader(
39-
const rosbag2_storage::StorageOptions & storage_options)
40-
{
41-
rosbag2_storage::MetadataIo metadata_io;
42-
std::unique_ptr<rosbag2_cpp::reader_interfaces::BaseReaderInterface> reader_impl;
43-
44-
if (metadata_io.metadata_file_exists(storage_options.uri)) {
45-
auto metadata = metadata_io.read_metadata(storage_options.uri);
46-
if (!metadata.compression_format.empty()) {
47-
reader_impl = std::make_unique<rosbag2_compression::SequentialCompressionReader>();
48-
}
49-
}
50-
if (!reader_impl) {
51-
reader_impl = std::make_unique<rosbag2_cpp::readers::SequentialReader>();
52-
}
53-
54-
return std::make_unique<rosbag2_cpp::Reader>(std::move(reader_impl));
55-
}
56-
57-
58-
/// Create a Writer with the appropriate underlying implementation.
59-
std::unique_ptr<rosbag2_cpp::Writer> make_writer(
60-
const rosbag2_transport::RecordOptions & record_options)
61-
{
62-
std::unique_ptr<rosbag2_cpp::writer_interfaces::BaseWriterInterface> writer_impl;
63-
if (!record_options.compression_format.empty()) {
64-
rosbag2_compression::CompressionOptions compression_options {
65-
record_options.compression_format,
66-
rosbag2_compression::compression_mode_from_string(record_options.compression_mode),
67-
record_options.compression_queue_size,
68-
record_options.compression_threads
69-
};
70-
if (compression_options.compression_threads < 1) {
71-
compression_options.compression_threads = std::thread::hardware_concurrency();
72-
}
73-
writer_impl = std::make_unique<rosbag2_compression::SequentialCompressionWriter>(
74-
compression_options);
75-
} else {
76-
writer_impl = std::make_unique<rosbag2_cpp::writers::SequentialWriter>();
77-
}
78-
79-
return std::make_unique<rosbag2_cpp::Writer>(std::move(writer_impl));
80-
}
81-
8239
/// Find the next chronological message from all opened input bags.
8340
/// Updates the next_messages queue as necessary.
8441
/// next_messages is needed because Reader has no "peek" interface, we cannot put a message back.
@@ -207,12 +164,7 @@ setup_topic_filtering(
207164
return filtered_outputs;
208165
}
209166

210-
} // namespace
211-
212-
namespace rosbag2_transport
213-
{
214-
215-
void bag_rewrite(
167+
void perform_rewrite(
216168
const std::vector<std::unique_ptr<rosbag2_cpp::Reader>> & input_bags,
217169
const std::vector<
218170
std::pair<std::unique_ptr<rosbag2_cpp::Writer>, rosbag2_transport::RecordOptions>
@@ -239,6 +191,10 @@ void bag_rewrite(
239191
}
240192
}
241193

194+
} // namespace
195+
196+
namespace rosbag2_transport
197+
{
242198
void bag_rewrite(
243199
const std::vector<rosbag2_storage::StorageOptions> & input_options,
244200
const std::vector<
@@ -252,7 +208,7 @@ void bag_rewrite(
252208
> output_bags;
253209

254210
for (const auto & storage_options : input_options) {
255-
auto reader = make_reader(storage_options);
211+
auto reader = ReaderWriterFactory::make_reader(storage_options);
256212
reader->open(storage_options);
257213
input_bags.push_back(std::move(reader));
258214
}
@@ -265,12 +221,11 @@ void bag_rewrite(
265221
// room in the cache, which may require some new APIs
266222
auto modified_storage_options = storage_options;
267223
modified_storage_options.max_cache_size = 0u;
268-
auto writer = make_writer(record_options);
224+
auto writer = ReaderWriterFactory::make_writer(record_options);
269225
writer->open(modified_storage_options);
270226
output_bags.push_back(std::make_pair(std::move(writer), record_options));
271227
}
272228

273-
bag_rewrite(input_bags, output_bags);
229+
perform_rewrite(input_bags, output_bags);
274230
}
275-
276231
} // namespace rosbag2_transport
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "rosbag2_transport/reader_writer_factory.hpp"
16+
17+
#include <memory>
18+
#include <utility>
19+
20+
#include "rosbag2_compression/compression_options.hpp"
21+
#include "rosbag2_compression/sequential_compression_reader.hpp"
22+
#include "rosbag2_compression/sequential_compression_writer.hpp"
23+
#include "rosbag2_storage/metadata_io.hpp"
24+
25+
namespace rosbag2_transport
26+
{
27+
28+
std::unique_ptr<rosbag2_cpp::Reader> ReaderWriterFactory::make_reader(
29+
const rosbag2_storage::StorageOptions & storage_options)
30+
{
31+
rosbag2_storage::MetadataIo metadata_io;
32+
std::unique_ptr<rosbag2_cpp::reader_interfaces::BaseReaderInterface> reader_impl;
33+
34+
if (metadata_io.metadata_file_exists(storage_options.uri)) {
35+
auto metadata = metadata_io.read_metadata(storage_options.uri);
36+
if (!metadata.compression_format.empty()) {
37+
reader_impl = std::make_unique<rosbag2_compression::SequentialCompressionReader>();
38+
}
39+
}
40+
if (!reader_impl) {
41+
reader_impl = std::make_unique<rosbag2_cpp::readers::SequentialReader>();
42+
}
43+
44+
return std::make_unique<rosbag2_cpp::Reader>(std::move(reader_impl));
45+
}
46+
47+
std::unique_ptr<rosbag2_cpp::Writer> ReaderWriterFactory::make_writer(
48+
const rosbag2_transport::RecordOptions & record_options)
49+
{
50+
std::unique_ptr<rosbag2_cpp::writer_interfaces::BaseWriterInterface> writer_impl;
51+
if (!record_options.compression_format.empty()) {
52+
rosbag2_compression::CompressionOptions compression_options {
53+
record_options.compression_format,
54+
rosbag2_compression::compression_mode_from_string(record_options.compression_mode),
55+
record_options.compression_queue_size,
56+
record_options.compression_threads
57+
};
58+
if (compression_options.compression_threads < 1) {
59+
compression_options.compression_threads = std::thread::hardware_concurrency();
60+
}
61+
writer_impl = std::make_unique<rosbag2_compression::SequentialCompressionWriter>(
62+
compression_options);
63+
} else {
64+
writer_impl = std::make_unique<rosbag2_cpp::writers::SequentialWriter>();
65+
}
66+
67+
return std::make_unique<rosbag2_cpp::Writer>(std::move(writer_impl));
68+
}
69+
70+
} // namespace rosbag2_transport

0 commit comments

Comments
 (0)