Skip to content

Commit 1030a4a

Browse files
authored
Add compressing and decompressing modes (#24)
* Add domain_bridge/msg/CompressedMsg interface. * Add compress and decompress mode to the bridge. * Refactor existing tests to better reuse code. * Add tests for compress and decompress modes. Signed-off-by: Ivan Santiago Paunovic <[email protected]>
1 parent f6ab617 commit 1030a4a

17 files changed

+654
-162
lines changed

CMakeLists.txt

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,47 +20,63 @@ find_package(rclcpp REQUIRED)
2020
find_package(rcutils REQUIRED)
2121
# Leverage rosbag2's generic type support utilities
2222
find_package(rosbag2_cpp REQUIRED)
23+
find_package(rosidl_typesupport_cpp REQUIRED)
24+
find_package(rosidl_default_generators REQUIRED)
2325
find_package(yaml_cpp_vendor REQUIRED)
26+
find_package(zstd_vendor REQUIRED)
27+
find_package(zstd REQUIRED)
2428

25-
add_library(${PROJECT_NAME} SHARED
29+
rosidl_generate_interfaces(${PROJECT_NAME}
30+
msg/CompressedMsg.msg
31+
)
32+
33+
add_library(${PROJECT_NAME}_lib SHARED
34+
src/${PROJECT_NAME}/compress_messages.cpp
2635
src/${PROJECT_NAME}/domain_bridge.cpp
2736
src/${PROJECT_NAME}/domain_bridge_options.cpp
2837
src/${PROJECT_NAME}/parse_domain_bridge_yaml_config.cpp
2938
src/${PROJECT_NAME}/qos_options.cpp
3039
src/${PROJECT_NAME}/topic_bridge_options.cpp
3140
)
3241

33-
target_include_directories(${PROJECT_NAME} PUBLIC
42+
target_include_directories(${PROJECT_NAME}_lib PUBLIC
3443
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
3544
$<INSTALL_INTERFACE:include>
3645
)
3746

3847
if(CMAKE_COMPILER_IS_GNUCXX)
39-
target_link_libraries(${PROJECT_NAME} stdc++fs)
48+
target_link_libraries(${PROJECT_NAME}_lib stdc++fs)
4049
endif()
4150

42-
ament_target_dependencies(${PROJECT_NAME}
51+
ament_target_dependencies(${PROJECT_NAME}_lib
4352
rclcpp
4453
rcutils
4554
rosbag2_cpp
55+
rosidl_typesupport_cpp
4656
yaml_cpp_vendor
57+
zstd
4758
)
4859

60+
rosidl_target_interfaces(${PROJECT_NAME}_lib
61+
${PROJECT_NAME} "rosidl_typesupport_cpp")
62+
63+
set_target_properties(${PROJECT_NAME}_lib PROPERTIES OUTPUT_NAME ${PROJECT_NAME})
64+
4965
add_executable(${PROJECT_NAME}_exec
5066
src/domain_bridge.cpp
5167
)
5268

5369
set_target_properties(${PROJECT_NAME}_exec PROPERTIES OUTPUT_NAME ${PROJECT_NAME} PREFIX "")
5470

5571
target_link_libraries(${PROJECT_NAME}_exec
56-
${PROJECT_NAME}
72+
${PROJECT_NAME}_lib
5773
)
5874

5975
install(DIRECTORY include/
6076
DESTINATION include
6177
)
6278

63-
install(TARGETS ${PROJECT_NAME} ${PROJECT_NAME}_exec
79+
install(TARGETS ${PROJECT_NAME}_lib ${PROJECT_NAME}_exec
6480
ARCHIVE DESTINATION lib
6581
LIBRARY DESTINATION lib
6682
RUNTIME DESTINATION lib/${PROJECT_NAME}
@@ -71,7 +87,7 @@ install(DIRECTORY examples launch
7187
)
7288

7389
ament_export_include_directories(include)
74-
ament_export_libraries(${PROJECT_NAME})
90+
ament_export_libraries(${PROJECT_NAME}_lib)
7591
ament_export_dependencies(
7692
rclcpp
7793
rosbag2_cpp
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Copyright 2021, Open Source Robotics Foundation, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef DOMAIN_BRIDGE__COMPRESS_MESSAGES_HPP_
16+
#define DOMAIN_BRIDGE__COMPRESS_MESSAGES_HPP_
17+
18+
#include <zstd.h>
19+
20+
#include <vector>
21+
22+
#include "rclcpp/serialized_message.hpp"
23+
24+
#include "domain_bridge/visibility_control.hpp"
25+
26+
namespace domain_bridge
27+
{
28+
29+
DOMAIN_BRIDGE_PUBLIC
30+
std::vector<uint8_t>
31+
compress_message(ZSTD_CCtx * ctx, rclcpp::SerializedMessage msg);
32+
33+
DOMAIN_BRIDGE_PUBLIC
34+
rclcpp::SerializedMessage
35+
decompress_message(ZSTD_DCtx * ctx, std::vector<uint8_t> compressed_msg);
36+
37+
} // namespace domain_bridge
38+
39+
#endif // DOMAIN_BRIDGE__COMPRESS_MESSAGES_HPP_

include/domain_bridge/domain_bridge_options.hpp

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,19 @@ namespace domain_bridge
2626
class DomainBridgeOptions
2727
{
2828
public:
29+
enum class Mode
30+
{
31+
Normal,
32+
Compress,
33+
Decompress,
34+
};
35+
2936
/// Constructor.
3037
/**
3138
* Default values:
3239
*
3340
* - name = "domain_bridge"
41+
* - mode = Mode::Normal
3442
*/
3543
DOMAIN_BRIDGE_PUBLIC
3644
DomainBridgeOptions() = default;
@@ -62,8 +70,19 @@ class DomainBridgeOptions
6270
DomainBridgeOptions &
6371
name(std::string name);
6472

73+
/// Get the domain bridge mode.
74+
DOMAIN_BRIDGE_PUBLIC
75+
Mode
76+
mode() const;
77+
78+
/// Set the domain bridge mode.
79+
DOMAIN_BRIDGE_PUBLIC
80+
DomainBridgeOptions &
81+
mode(Mode mode);
82+
6583
private:
6684
std::string name_{"domain_bridge"};
85+
Mode mode_{Mode::Normal};
6786
}; // class DomainBridgeOptions
6887

6988
} // namespace domain_bridge

msg/CompressedMsg.msg

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
byte[] data

package.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,16 @@
88
<license>Apache 2.0</license>
99

1010
<buildtool_depend>ament_cmake</buildtool_depend>
11+
<buildtool_depend>rosidl_default_generators</buildtool_depend>
1112

1213
<depend>rclcpp</depend>
1314
<depend>rcutils</depend>
1415
<depend>rosbag2_cpp</depend>
16+
<depend>rosidl_typesupport_cpp</depend>
1517
<depend>yaml-cpp</depend>
18+
<depend>zstd_vendor</depend>
19+
20+
<exec_depend>rosidl_default_runtime</exec_depend>
1621

1722
<test_depend>ament_cmake_gmock</test_depend>
1823
<test_depend>ament_lint_auto</test_depend>
@@ -23,6 +28,8 @@
2328
<test_depend>launch_testing_ament_cmake</test_depend>
2429
<test_depend>test_msgs</test_depend>
2530

31+
<member_of_group>rosidl_interface_packages</member_of_group>
32+
2633
<export>
2734
<build_type>ament_cmake</build_type>
2835
</export>
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
// Copyright 2021, Open Source Robotics Foundation, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "domain_bridge/compress_messages.hpp"
16+
17+
#include <zstd.h>
18+
19+
#include <stdexcept>
20+
#include <sstream>
21+
#include <vector>
22+
23+
namespace domain_bridge
24+
{
25+
26+
// Increasing the compression level will:
27+
// - Increase the time taken to compress
28+
// - Decrease the size of the compressed data
29+
// Setting to zero uses Zstd's default value of 3.
30+
constexpr const int kDefaultZstdCompressionLevel = 1;
31+
32+
using ZstdDecompressReturnType = decltype(ZSTD_decompress(
33+
nullptr, 0,
34+
nullptr, 0));
35+
36+
void throw_on_zstd_error(const ZstdDecompressReturnType compression_result)
37+
{
38+
if (ZSTD_isError(compression_result)) {
39+
std::stringstream error;
40+
error << "ZSTD decompression error: " << ZSTD_getErrorName(compression_result);
41+
42+
throw std::runtime_error{error.str()};
43+
}
44+
}
45+
46+
47+
using ZstdGetFrameContentSizeReturnType = decltype(ZSTD_getFrameContentSize(nullptr, 0));
48+
49+
void throw_on_invalid_frame_content(const ZstdGetFrameContentSizeReturnType frame_content)
50+
{
51+
if (frame_content == ZSTD_CONTENTSIZE_ERROR) {
52+
throw std::runtime_error{"Unable to determine file size due to error."};
53+
} else if (frame_content == ZSTD_CONTENTSIZE_UNKNOWN) {
54+
throw std::runtime_error{"Unable to determine file size."};
55+
}
56+
}
57+
58+
std::vector<uint8_t>
59+
compress_message(ZSTD_CCtx * ctx, rclcpp::SerializedMessage msg)
60+
{
61+
// Allocate based on compression bound and compress
62+
const auto maximum_compressed_length =
63+
ZSTD_compressBound(msg.size());
64+
std::vector<uint8_t> compressed_buffer(maximum_compressed_length);
65+
66+
// Perform compression and check.
67+
// compression_result is either the actual compressed size or an error code.
68+
const auto compression_result = ZSTD_compressCCtx(
69+
ctx,
70+
compressed_buffer.data(), maximum_compressed_length,
71+
msg.get_rcl_serialized_message().buffer, msg.get_rcl_serialized_message().buffer_length,
72+
kDefaultZstdCompressionLevel);
73+
throw_on_zstd_error(compression_result);
74+
75+
// Compression_buffer_length might be larger than the actual compression size
76+
// Resize compressed_buffer so its size is the actual compression size.
77+
compressed_buffer.resize(compression_result);
78+
return compressed_buffer;
79+
}
80+
81+
rclcpp::SerializedMessage
82+
decompress_message(ZSTD_DCtx * ctx, std::vector<uint8_t> compressed_msg)
83+
// void ZstdDecompressor::decompress_serialized_bag_message(
84+
// rosbag2_storage::SerializedBagMessage * message)
85+
{
86+
const auto compressed_buffer_length = compressed_msg.size();
87+
88+
const auto decompressed_buffer_length =
89+
ZSTD_getFrameContentSize(compressed_msg.data(), compressed_buffer_length);
90+
91+
throw_on_invalid_frame_content(decompressed_buffer_length);
92+
93+
rclcpp::SerializedMessage msg;
94+
msg.reserve(decompressed_buffer_length);
95+
96+
const auto decompression_result = ZSTD_decompressDCtx(
97+
ctx,
98+
msg.get_rcl_serialized_message().buffer, decompressed_buffer_length,
99+
compressed_msg.data(), compressed_buffer_length);
100+
msg.get_rcl_serialized_message().buffer_length = decompressed_buffer_length;
101+
102+
throw_on_zstd_error(decompression_result);
103+
return msg;
104+
}
105+
106+
} // namespace domain_bridge

0 commit comments

Comments
 (0)