Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,47 +20,63 @@ find_package(rclcpp REQUIRED)
find_package(rcutils REQUIRED)
# Leverage rosbag2's generic type support utilities
find_package(rosbag2_cpp REQUIRED)
find_package(rosidl_typesupport_cpp REQUIRED)
find_package(rosidl_default_generators REQUIRED)
find_package(yaml_cpp_vendor REQUIRED)
find_package(zstd_vendor REQUIRED)
find_package(zstd REQUIRED)

add_library(${PROJECT_NAME} SHARED
rosidl_generate_interfaces(${PROJECT_NAME}
msg/CompressedMsg.msg
)

add_library(${PROJECT_NAME}_lib SHARED
src/${PROJECT_NAME}/compress_messages.cpp
src/${PROJECT_NAME}/domain_bridge.cpp
src/${PROJECT_NAME}/domain_bridge_options.cpp
src/${PROJECT_NAME}/parse_domain_bridge_yaml_config.cpp
src/${PROJECT_NAME}/qos_options.cpp
src/${PROJECT_NAME}/topic_bridge_options.cpp
)

target_include_directories(${PROJECT_NAME} PUBLIC
target_include_directories(${PROJECT_NAME}_lib PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>
)

if(CMAKE_COMPILER_IS_GNUCXX)
target_link_libraries(${PROJECT_NAME} stdc++fs)
target_link_libraries(${PROJECT_NAME}_lib stdc++fs)
endif()

ament_target_dependencies(${PROJECT_NAME}
ament_target_dependencies(${PROJECT_NAME}_lib
rclcpp
rcutils
rosbag2_cpp
rosidl_typesupport_cpp
yaml_cpp_vendor
zstd
)

rosidl_target_interfaces(${PROJECT_NAME}_lib
${PROJECT_NAME} "rosidl_typesupport_cpp")

set_target_properties(${PROJECT_NAME}_lib PROPERTIES OUTPUT_NAME ${PROJECT_NAME})

add_executable(${PROJECT_NAME}_exec
src/domain_bridge.cpp
)

set_target_properties(${PROJECT_NAME}_exec PROPERTIES OUTPUT_NAME ${PROJECT_NAME} PREFIX "")

target_link_libraries(${PROJECT_NAME}_exec
${PROJECT_NAME}
${PROJECT_NAME}_lib
)

install(DIRECTORY include/
DESTINATION include
)

install(TARGETS ${PROJECT_NAME} ${PROJECT_NAME}_exec
install(TARGETS ${PROJECT_NAME}_lib ${PROJECT_NAME}_exec
ARCHIVE DESTINATION lib
LIBRARY DESTINATION lib
RUNTIME DESTINATION lib/${PROJECT_NAME}
Expand All @@ -71,7 +87,7 @@ install(DIRECTORY examples launch
)

ament_export_include_directories(include)
ament_export_libraries(${PROJECT_NAME})
ament_export_libraries(${PROJECT_NAME}_lib)
ament_export_dependencies(
rclcpp
rosbag2_cpp
Expand Down
39 changes: 39 additions & 0 deletions include/domain_bridge/compress_messages.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2021, Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef DOMAIN_BRIDGE__COMPRESS_MESSAGES_HPP_
#define DOMAIN_BRIDGE__COMPRESS_MESSAGES_HPP_

#include <zstd.h>

#include <vector>

#include "rclcpp/serialized_message.hpp"

#include "domain_bridge/visibility_control.hpp"

namespace domain_bridge
{

DOMAIN_BRIDGE_PUBLIC
std::vector<uint8_t>
compress_message(ZSTD_CCtx * ctx, rclcpp::SerializedMessage msg);

DOMAIN_BRIDGE_PUBLIC
rclcpp::SerializedMessage
decompress_message(ZSTD_DCtx * ctx, std::vector<uint8_t> compressed_msg);

} // namespace domain_bridge

#endif // DOMAIN_BRIDGE__COMPRESS_MESSAGES_HPP_
19 changes: 19 additions & 0 deletions include/domain_bridge/domain_bridge_options.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,19 @@ namespace domain_bridge
class DomainBridgeOptions
{
public:
enum class Mode
{
Normal,
Compress,
Decompress,
};

/// Constructor.
/**
* Default values:
*
* - name = "domain_bridge"
* - mode = Mode::Normal
*/
DOMAIN_BRIDGE_PUBLIC
DomainBridgeOptions() = default;
Expand Down Expand Up @@ -62,8 +70,19 @@ class DomainBridgeOptions
DomainBridgeOptions &
name(std::string name);

/// Get the domain bridge mode.
DOMAIN_BRIDGE_PUBLIC
Mode
mode() const;

/// Set the domain bridge mode.
DOMAIN_BRIDGE_PUBLIC
DomainBridgeOptions &
mode(Mode mode);

private:
std::string name_{"domain_bridge"};
Mode mode_{Mode::Normal};
}; // class DomainBridgeOptions

} // namespace domain_bridge
Expand Down
1 change: 1 addition & 0 deletions msg/CompressedMsg.msg
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
byte[] data
7 changes: 7 additions & 0 deletions package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,16 @@
<license>Apache 2.0</license>

<buildtool_depend>ament_cmake</buildtool_depend>
<buildtool_depend>rosidl_default_generators</buildtool_depend>

<depend>rclcpp</depend>
<depend>rcutils</depend>
<depend>rosbag2_cpp</depend>
<depend>rosidl_typesupport_cpp</depend>
<depend>yaml-cpp</depend>
<depend>zstd_vendor</depend>

<exec_depend>rosidl_default_runtime</exec_depend>

<test_depend>ament_cmake_gmock</test_depend>
<test_depend>ament_lint_auto</test_depend>
Expand All @@ -23,6 +28,8 @@
<test_depend>launch_testing_ament_cmake</test_depend>
<test_depend>test_msgs</test_depend>

<member_of_group>rosidl_interface_packages</member_of_group>

<export>
<build_type>ament_cmake</build_type>
</export>
Expand Down
106 changes: 106 additions & 0 deletions src/domain_bridge/compress_messages.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright 2021, Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "domain_bridge/compress_messages.hpp"

#include <zstd.h>

#include <stdexcept>
#include <sstream>
#include <vector>

namespace domain_bridge
{

// Increasing the compression level will:
// - Increase the time taken to compress
// - Decrease the size of the compressed data
// Setting to zero uses Zstd's default value of 3.
constexpr const int kDefaultZstdCompressionLevel = 1;

using ZstdDecompressReturnType = decltype(ZSTD_decompress(
nullptr, 0,
nullptr, 0));

void throw_on_zstd_error(const ZstdDecompressReturnType compression_result)
{
if (ZSTD_isError(compression_result)) {
std::stringstream error;
error << "ZSTD decompression error: " << ZSTD_getErrorName(compression_result);

throw std::runtime_error{error.str()};
}
}


using ZstdGetFrameContentSizeReturnType = decltype(ZSTD_getFrameContentSize(nullptr, 0));

void throw_on_invalid_frame_content(const ZstdGetFrameContentSizeReturnType frame_content)
{
if (frame_content == ZSTD_CONTENTSIZE_ERROR) {
throw std::runtime_error{"Unable to determine file size due to error."};
} else if (frame_content == ZSTD_CONTENTSIZE_UNKNOWN) {
throw std::runtime_error{"Unable to determine file size."};
}
}

std::vector<uint8_t>
compress_message(ZSTD_CCtx * ctx, rclcpp::SerializedMessage msg)
{
// Allocate based on compression bound and compress
const auto maximum_compressed_length =
ZSTD_compressBound(msg.size());
std::vector<uint8_t> compressed_buffer(maximum_compressed_length);

// Perform compression and check.
// compression_result is either the actual compressed size or an error code.
const auto compression_result = ZSTD_compressCCtx(
ctx,
compressed_buffer.data(), maximum_compressed_length,
msg.get_rcl_serialized_message().buffer, msg.get_rcl_serialized_message().buffer_length,
kDefaultZstdCompressionLevel);
throw_on_zstd_error(compression_result);

// Compression_buffer_length might be larger than the actual compression size
// Resize compressed_buffer so its size is the actual compression size.
compressed_buffer.resize(compression_result);
return compressed_buffer;
}

rclcpp::SerializedMessage
decompress_message(ZSTD_DCtx * ctx, std::vector<uint8_t> compressed_msg)
// void ZstdDecompressor::decompress_serialized_bag_message(
// rosbag2_storage::SerializedBagMessage * message)
{
const auto compressed_buffer_length = compressed_msg.size();

const auto decompressed_buffer_length =
ZSTD_getFrameContentSize(compressed_msg.data(), compressed_buffer_length);

throw_on_invalid_frame_content(decompressed_buffer_length);

rclcpp::SerializedMessage msg;
msg.reserve(decompressed_buffer_length);

const auto decompression_result = ZSTD_decompressDCtx(
ctx,
msg.get_rcl_serialized_message().buffer, decompressed_buffer_length,
compressed_msg.data(), compressed_buffer_length);
msg.get_rcl_serialized_message().buffer_length = decompressed_buffer_length;

throw_on_zstd_error(decompression_result);
return msg;
}

} // namespace domain_bridge
Loading