diff --git a/CMakeLists.txt b/CMakeLists.txt index 04ee82f..ca952d6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -20,9 +20,18 @@ find_package(rclcpp REQUIRED) find_package(rcutils REQUIRED) # Leverage rosbag2's generic type support utilities find_package(rosbag2_cpp REQUIRED) +find_package(rosidl_typesupport_cpp REQUIRED) +find_package(rosidl_default_generators REQUIRED) find_package(yaml_cpp_vendor REQUIRED) +find_package(zstd_vendor REQUIRED) +find_package(zstd REQUIRED) -add_library(${PROJECT_NAME} SHARED +rosidl_generate_interfaces(${PROJECT_NAME} + msg/CompressedMsg.msg +) + +add_library(${PROJECT_NAME}_lib SHARED + src/${PROJECT_NAME}/compress_messages.cpp src/${PROJECT_NAME}/domain_bridge.cpp src/${PROJECT_NAME}/domain_bridge_options.cpp src/${PROJECT_NAME}/parse_domain_bridge_yaml_config.cpp @@ -30,22 +39,29 @@ add_library(${PROJECT_NAME} SHARED src/${PROJECT_NAME}/topic_bridge_options.cpp ) -target_include_directories(${PROJECT_NAME} PUBLIC +target_include_directories(${PROJECT_NAME}_lib PUBLIC $ $ ) if(CMAKE_COMPILER_IS_GNUCXX) - target_link_libraries(${PROJECT_NAME} stdc++fs) + target_link_libraries(${PROJECT_NAME}_lib stdc++fs) endif() -ament_target_dependencies(${PROJECT_NAME} +ament_target_dependencies(${PROJECT_NAME}_lib rclcpp rcutils rosbag2_cpp + rosidl_typesupport_cpp yaml_cpp_vendor + zstd ) +rosidl_target_interfaces(${PROJECT_NAME}_lib + ${PROJECT_NAME} "rosidl_typesupport_cpp") + +set_target_properties(${PROJECT_NAME}_lib PROPERTIES OUTPUT_NAME ${PROJECT_NAME}) + add_executable(${PROJECT_NAME}_exec src/domain_bridge.cpp ) @@ -53,14 +69,14 @@ add_executable(${PROJECT_NAME}_exec set_target_properties(${PROJECT_NAME}_exec PROPERTIES OUTPUT_NAME ${PROJECT_NAME} PREFIX "") target_link_libraries(${PROJECT_NAME}_exec - ${PROJECT_NAME} + ${PROJECT_NAME}_lib ) install(DIRECTORY include/ DESTINATION include ) -install(TARGETS ${PROJECT_NAME} ${PROJECT_NAME}_exec +install(TARGETS ${PROJECT_NAME}_lib ${PROJECT_NAME}_exec ARCHIVE DESTINATION lib LIBRARY DESTINATION lib RUNTIME DESTINATION lib/${PROJECT_NAME} @@ -71,7 +87,7 @@ install(DIRECTORY examples launch ) ament_export_include_directories(include) -ament_export_libraries(${PROJECT_NAME}) +ament_export_libraries(${PROJECT_NAME}_lib) ament_export_dependencies( rclcpp rosbag2_cpp diff --git a/include/domain_bridge/compress_messages.hpp b/include/domain_bridge/compress_messages.hpp new file mode 100644 index 0000000..86f4685 --- /dev/null +++ b/include/domain_bridge/compress_messages.hpp @@ -0,0 +1,39 @@ +// Copyright 2021, Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef DOMAIN_BRIDGE__COMPRESS_MESSAGES_HPP_ +#define DOMAIN_BRIDGE__COMPRESS_MESSAGES_HPP_ + +#include + +#include + +#include "rclcpp/serialized_message.hpp" + +#include "domain_bridge/visibility_control.hpp" + +namespace domain_bridge +{ + +DOMAIN_BRIDGE_PUBLIC +std::vector +compress_message(ZSTD_CCtx * ctx, rclcpp::SerializedMessage msg); + +DOMAIN_BRIDGE_PUBLIC +rclcpp::SerializedMessage +decompress_message(ZSTD_DCtx * ctx, std::vector compressed_msg); + +} // namespace domain_bridge + +#endif // DOMAIN_BRIDGE__COMPRESS_MESSAGES_HPP_ diff --git a/include/domain_bridge/domain_bridge_options.hpp b/include/domain_bridge/domain_bridge_options.hpp index fb1169b..6943872 100644 --- a/include/domain_bridge/domain_bridge_options.hpp +++ b/include/domain_bridge/domain_bridge_options.hpp @@ -26,11 +26,19 @@ namespace domain_bridge class DomainBridgeOptions { public: + enum class Mode + { + Normal, + Compress, + Decompress, + }; + /// Constructor. /** * Default values: * * - name = "domain_bridge" + * - mode = Mode::Normal */ DOMAIN_BRIDGE_PUBLIC DomainBridgeOptions() = default; @@ -62,8 +70,19 @@ class DomainBridgeOptions DomainBridgeOptions & name(std::string name); + /// Get the domain bridge mode. + DOMAIN_BRIDGE_PUBLIC + Mode + mode() const; + + /// Set the domain bridge mode. + DOMAIN_BRIDGE_PUBLIC + DomainBridgeOptions & + mode(Mode mode); + private: std::string name_{"domain_bridge"}; + Mode mode_{Mode::Normal}; }; // class DomainBridgeOptions } // namespace domain_bridge diff --git a/msg/CompressedMsg.msg b/msg/CompressedMsg.msg new file mode 100644 index 0000000..9e371fd --- /dev/null +++ b/msg/CompressedMsg.msg @@ -0,0 +1 @@ +byte[] data \ No newline at end of file diff --git a/package.xml b/package.xml index 57712b2..2257f53 100644 --- a/package.xml +++ b/package.xml @@ -8,11 +8,16 @@ Apache 2.0 ament_cmake + rosidl_default_generators rclcpp rcutils rosbag2_cpp + rosidl_typesupport_cpp yaml-cpp + zstd_vendor + + rosidl_default_runtime ament_cmake_gmock ament_lint_auto @@ -23,6 +28,8 @@ launch_testing_ament_cmake test_msgs + rosidl_interface_packages + ament_cmake diff --git a/src/domain_bridge/compress_messages.cpp b/src/domain_bridge/compress_messages.cpp new file mode 100644 index 0000000..77c55b4 --- /dev/null +++ b/src/domain_bridge/compress_messages.cpp @@ -0,0 +1,106 @@ +// Copyright 2021, Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "domain_bridge/compress_messages.hpp" + +#include + +#include +#include +#include + +namespace domain_bridge +{ + +// Increasing the compression level will: +// - Increase the time taken to compress +// - Decrease the size of the compressed data +// Setting to zero uses Zstd's default value of 3. +constexpr const int kDefaultZstdCompressionLevel = 1; + +using ZstdDecompressReturnType = decltype(ZSTD_decompress( + nullptr, 0, + nullptr, 0)); + +void throw_on_zstd_error(const ZstdDecompressReturnType compression_result) +{ + if (ZSTD_isError(compression_result)) { + std::stringstream error; + error << "ZSTD decompression error: " << ZSTD_getErrorName(compression_result); + + throw std::runtime_error{error.str()}; + } +} + + +using ZstdGetFrameContentSizeReturnType = decltype(ZSTD_getFrameContentSize(nullptr, 0)); + +void throw_on_invalid_frame_content(const ZstdGetFrameContentSizeReturnType frame_content) +{ + if (frame_content == ZSTD_CONTENTSIZE_ERROR) { + throw std::runtime_error{"Unable to determine file size due to error."}; + } else if (frame_content == ZSTD_CONTENTSIZE_UNKNOWN) { + throw std::runtime_error{"Unable to determine file size."}; + } +} + +std::vector +compress_message(ZSTD_CCtx * ctx, rclcpp::SerializedMessage msg) +{ + // Allocate based on compression bound and compress + const auto maximum_compressed_length = + ZSTD_compressBound(msg.size()); + std::vector compressed_buffer(maximum_compressed_length); + + // Perform compression and check. + // compression_result is either the actual compressed size or an error code. + const auto compression_result = ZSTD_compressCCtx( + ctx, + compressed_buffer.data(), maximum_compressed_length, + msg.get_rcl_serialized_message().buffer, msg.get_rcl_serialized_message().buffer_length, + kDefaultZstdCompressionLevel); + throw_on_zstd_error(compression_result); + + // Compression_buffer_length might be larger than the actual compression size + // Resize compressed_buffer so its size is the actual compression size. + compressed_buffer.resize(compression_result); + return compressed_buffer; +} + +rclcpp::SerializedMessage +decompress_message(ZSTD_DCtx * ctx, std::vector compressed_msg) +// void ZstdDecompressor::decompress_serialized_bag_message( +// rosbag2_storage::SerializedBagMessage * message) +{ + const auto compressed_buffer_length = compressed_msg.size(); + + const auto decompressed_buffer_length = + ZSTD_getFrameContentSize(compressed_msg.data(), compressed_buffer_length); + + throw_on_invalid_frame_content(decompressed_buffer_length); + + rclcpp::SerializedMessage msg; + msg.reserve(decompressed_buffer_length); + + const auto decompression_result = ZSTD_decompressDCtx( + ctx, + msg.get_rcl_serialized_message().buffer, decompressed_buffer_length, + compressed_msg.data(), compressed_buffer_length); + msg.get_rcl_serialized_message().buffer_length = decompressed_buffer_length; + + throw_on_zstd_error(decompression_result); + return msg; +} + +} // namespace domain_bridge diff --git a/src/domain_bridge/domain_bridge.cpp b/src/domain_bridge/domain_bridge.cpp index 24ecca1..eafed09 100644 --- a/src/domain_bridge/domain_bridge.cpp +++ b/src/domain_bridge/domain_bridge.cpp @@ -14,6 +14,8 @@ #include "domain_bridge/domain_bridge.hpp" +#include + #include #include #include @@ -26,23 +28,60 @@ #include #include -#include "domain_bridge/domain_bridge_options.hpp" -#include "domain_bridge/topic_bridge.hpp" -#include "domain_bridge/topic_bridge_options.hpp" - #include "rclcpp/executor.hpp" #include "rclcpp/expand_topic_or_service_name.hpp" #include "rclcpp/generic_publisher.hpp" #include "rclcpp/generic_subscription.hpp" #include "rclcpp/rclcpp.hpp" +#include "rclcpp/serialization.hpp" #include "rosbag2_cpp/typesupport_helpers.hpp" +#include "rosidl_typesupport_cpp/message_type_support.hpp" #include "rmw/types.h" +#include "domain_bridge/compress_messages.hpp" +#include "domain_bridge/domain_bridge_options.hpp" +#include "domain_bridge/topic_bridge.hpp" +#include "domain_bridge/topic_bridge_options.hpp" +#include "domain_bridge/msg/compressed_msg.hpp" + #include "wait_for_qos_handler.hpp" namespace domain_bridge { +/// \internal +/** + * A hack, because PublisherBase doesn't support publishing serialized messages and because + * GenericPublisher cannot be created with a typesupport handle :/ + */ +class SerializedPublisher +{ +public: + template + explicit SerializedPublisher(std::shared_ptr> impl) + : impl_(std::move(impl)), + publish_method_pointer_(static_cast( + &rclcpp::Publisher::publish)) + {} + + explicit SerializedPublisher(std::shared_ptr impl) + : impl_(std::move(impl)), + publish_method_pointer_(static_cast( + &rclcpp::GenericPublisher::publish)) + {} + + void publish(const rclcpp::SerializedMessage & message) + { + ((*impl_).*publish_method_pointer_)(message); // this is a bit horrible, but it works ... + } + +private: + std::shared_ptr impl_; + using PointerToMemberMethod = + void (rclcpp::PublisherBase::*)(const rclcpp::SerializedMessage & message); + PointerToMemberMethod publish_method_pointer_; +}; + /// Implementation of \ref DomainBridge. class DomainBridgeImpl { @@ -51,14 +90,28 @@ class DomainBridgeImpl using TopicBridgeMap = std::map< TopicBridge, std::pair< - std::shared_ptr, - std::shared_ptr>>; + std::shared_ptr, + std::shared_ptr>>; using TypesupportMap = std::unordered_map< std::string, std::shared_ptr>; explicit DomainBridgeImpl(const DomainBridgeOptions & options) : options_(options) - {} + { + switch (options.mode()) { + case DomainBridgeOptions::Mode::Decompress: + dctx_ = std::unique_ptr( + ZSTD_createDCtx(), + ZSTD_freeDCtx); + break; + case DomainBridgeOptions::Mode::Compress: + cctx_ = std::unique_ptr( + ZSTD_createCCtx(), + ZSTD_freeCCtx); + default: + break; + } + } ~DomainBridgeImpl() = default; @@ -109,35 +162,87 @@ class DomainBridgeImpl type, "rosidl_typesupport_cpp"); } - std::shared_ptr create_publisher( + std::shared_ptr + create_publisher( rclcpp::Node::SharedPtr node, const std::string & topic_name, const std::string & type, const rclcpp::QoS & qos, rclcpp::PublisherOptionsWithAllocator> & options) { - auto publisher = node->create_generic_publisher(topic_name, type, qos, options); + std::shared_ptr publisher; + if (options_.mode() != DomainBridgeOptions::Mode::Compress) { + publisher = std::make_shared( + node->create_generic_publisher(topic_name, type, qos, options)); + } else { + publisher = std::make_shared( + node->create_publisher(topic_name, qos, options)); + } return publisher; } - std::shared_ptr create_subscription( + std::shared_ptr create_subscription( rclcpp::Node::SharedPtr node, - std::shared_ptr publisher, + std::shared_ptr publisher, const std::string & topic_name, const std::string & type, const rclcpp::QoS & qos, rclcpp::SubscriptionOptionsWithAllocator> & options) { - auto subscription = node->create_generic_subscription( + std::function)> callback; + switch (options_.mode()) { + case DomainBridgeOptions::Mode::Compress: + callback = [ + serializer = rclcpp::Serialization{}, + publisher, + cctx = cctx_.get() + ](std::shared_ptr msg) + { + // Publish message into the other domain + domain_bridge::msg::CompressedMsg compressed_msg; + compressed_msg.data = domain_bridge::compress_message(cctx, std::move(*msg)); + rclcpp::SerializedMessage serialized_compressed_msg; + serializer.serialize_message(&compressed_msg, &serialized_compressed_msg); + publisher->publish(serialized_compressed_msg); + }; + break; + case DomainBridgeOptions::Mode::Decompress: + callback = [ + serializer = rclcpp::Serialization{}, + publisher, + dctx = dctx_.get() + ](std::shared_ptr serialized_compressed_msg) + { + // Publish message into the other domain + domain_bridge::msg::CompressedMsg compressed_msg; + serializer.deserialize_message(serialized_compressed_msg.get(), &compressed_msg); + rclcpp::SerializedMessage msg = domain_bridge::decompress_message( + dctx, std::move(compressed_msg.data)); + publisher->publish(msg); + }; + break; + default: // fallthrough + case DomainBridgeOptions::Mode::Normal: + callback = [publisher](std::shared_ptr msg) { + // Publish message into the other domain + publisher->publish(*msg); + }; + break; + } + if (options_.mode() != DomainBridgeOptions::Mode::Decompress) { + // Create subscription + return node->create_generic_subscription( + topic_name, + type, + qos, + callback, + options); + } + return node->create_subscription( topic_name, - type, qos, - [publisher](std::shared_ptr msg) { - // Publish message into the other domain - publisher->publish(*msg); - }, + callback, options); - return subscription; } void bridge_topic( @@ -291,6 +396,11 @@ class DomainBridgeImpl /// QoS event handler WaitForQosHandler wait_for_qos_handler_; + + // Warn: The destruction order does matter here! + // The subscription are capturing the raw pointer! + std::unique_ptr dctx_{nullptr, &ZSTD_freeDCtx}; + std::unique_ptr cctx_{nullptr, &ZSTD_freeCCtx}; }; // class DomainBridgeImpl DomainBridge::DomainBridge(const DomainBridgeOptions & options) diff --git a/src/domain_bridge/domain_bridge_options.cpp b/src/domain_bridge/domain_bridge_options.cpp index 6eb0c00..a2b7a94 100644 --- a/src/domain_bridge/domain_bridge_options.cpp +++ b/src/domain_bridge/domain_bridge_options.cpp @@ -32,4 +32,17 @@ DomainBridgeOptions::name(std::string name) return *this; } +DomainBridgeOptions::Mode +DomainBridgeOptions::mode() const +{ + return mode_; +} + +DomainBridgeOptions & +DomainBridgeOptions::mode(DomainBridgeOptions::Mode mode) +{ + mode_ = mode; + return *this; +} + } // namespace domain_bridge diff --git a/src/domain_bridge/parse_domain_bridge_yaml_config.cpp b/src/domain_bridge/parse_domain_bridge_yaml_config.cpp index bf8163d..bc80803 100644 --- a/src/domain_bridge/parse_domain_bridge_yaml_config.cpp +++ b/src/domain_bridge/parse_domain_bridge_yaml_config.cpp @@ -171,6 +171,20 @@ DomainBridgeConfig parse_domain_bridge_yaml_config(std::filesystem::path file_pa default_to_domain = config["to_domain"].as(); is_default_to_domain = true; } + if (config["mode"]) { + try { + auto mode_str = config["mode"].as(); + if ("compress" == mode_str) { + domain_bridge_config.options.mode(DomainBridgeOptions::Mode::Compress); + } else if ("decompress" == mode_str) { + domain_bridge_config.options.mode(DomainBridgeOptions::Mode::Decompress); + } else if ("normal" != mode_str) { + throw YamlParsingError(file_path, "unsupported mode value '" + mode_str + "'"); + } + } catch (const YAML::BadConversion &) { + throw YamlParsingError(file_path, "mode must be an string"); + } + } if (config["topics"]) { if (config["topics"].Type() != YAML::NodeType::Map) { diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index b082dab..f47fb3f 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -10,7 +10,7 @@ if(TARGET test_domain_bridge) ament_target_dependencies(test_domain_bridge "rclcpp" ) - target_link_libraries(test_domain_bridge ${PROJECT_NAME}) + target_link_libraries(test_domain_bridge ${PROJECT_NAME}_lib) endif() ament_add_gmock(test_parse_domain_bridge_yaml_config @@ -18,7 +18,7 @@ ament_add_gmock(test_parse_domain_bridge_yaml_config WORKING_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/domain_bridge" ) if(TARGET test_parse_domain_bridge_yaml_config) - target_link_libraries(test_parse_domain_bridge_yaml_config ${PROJECT_NAME}) + target_link_libraries(test_parse_domain_bridge_yaml_config ${PROJECT_NAME}_lib) endif() ament_add_gmock(test_qos_matching @@ -29,18 +29,18 @@ if(TARGET test_qos_matching) "rclcpp" "test_msgs" ) - target_link_libraries(test_qos_matching ${PROJECT_NAME}) + target_link_libraries(test_qos_matching ${PROJECT_NAME}_lib) endif() -ament_add_gmock(test_remap_name - domain_bridge/test_remap_name.cpp +ament_add_gmock(test_domain_bridge_end_to_end + domain_bridge/test_domain_bridge_end_to_end.cpp ) -if(TARGET test_remap_name) - ament_target_dependencies(test_remap_name +if(TARGET test_domain_bridge_end_to_end) + ament_target_dependencies(test_domain_bridge_end_to_end "rclcpp" "test_msgs" ) - target_link_libraries(test_remap_name ${PROJECT_NAME}) + target_link_libraries(test_domain_bridge_end_to_end ${PROJECT_NAME}_lib) endif() diff --git a/test/domain_bridge/config/compress_mode.yaml b/test/domain_bridge/config/compress_mode.yaml new file mode 100644 index 0000000..c07a05d --- /dev/null +++ b/test/domain_bridge/config/compress_mode.yaml @@ -0,0 +1 @@ +mode: compress \ No newline at end of file diff --git a/test/domain_bridge/config/decompress_mode.yaml b/test/domain_bridge/config/decompress_mode.yaml new file mode 100644 index 0000000..c100cfd --- /dev/null +++ b/test/domain_bridge/config/decompress_mode.yaml @@ -0,0 +1 @@ +mode: decompress \ No newline at end of file diff --git a/test/domain_bridge/config/invalid_mode.yaml b/test/domain_bridge/config/invalid_mode.yaml new file mode 100644 index 0000000..40e7b18 --- /dev/null +++ b/test/domain_bridge/config/invalid_mode.yaml @@ -0,0 +1 @@ +mode: asdbsd \ No newline at end of file diff --git a/test/domain_bridge/test_domain_bridge.cpp b/test/domain_bridge/test_domain_bridge.cpp index f61984c..1a6454b 100644 --- a/test/domain_bridge/test_domain_bridge.cpp +++ b/test/domain_bridge/test_domain_bridge.cpp @@ -253,3 +253,17 @@ TEST_F(TestDomainBridge, get_bridged_topics) } } } + +TEST_F(TestDomainBridge, remap_topic_name_invalid) +{ + const std::string topic_name("test_remap_topic_invalid_before"); + const std::string remap_name("test_remap_topic_1nv@lid_after"); + + domain_bridge::DomainBridge bridge; + domain_bridge::TopicBridgeOptions topic_bridge_options; + topic_bridge_options.remap_name(remap_name); + EXPECT_THROW( + bridge.bridge_topic( + topic_name, "test_msgs/msg/BasicTypes", 0, 2, topic_bridge_options), + rclcpp::exceptions::InvalidTopicNameError); +} diff --git a/test/domain_bridge/test_domain_bridge_end_to_end.cpp b/test/domain_bridge/test_domain_bridge_end_to_end.cpp new file mode 100644 index 0000000..2741cbf --- /dev/null +++ b/test/domain_bridge/test_domain_bridge_end_to_end.cpp @@ -0,0 +1,254 @@ +// Copyright 2021, Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include + +#include "rclcpp/context.hpp" +#include "rclcpp/executors.hpp" +#include "rclcpp/node.hpp" +#include "rclcpp/serialization.hpp" +#include "test_msgs/msg/basic_types.hpp" + +#include "domain_bridge/compress_messages.hpp" +#include "domain_bridge/domain_bridge.hpp" +#include "domain_bridge/msg/compressed_msg.hpp" + + +static constexpr std::size_t kDomain1{1u}; +static constexpr std::size_t kDomain2{2u}; + +using namespace std::chrono_literals; + +// TODO(ivanpauno): We could use parameterized testing here ... +class TestDomainBridgeEndToEnd : public ::testing::Test +{ +protected: + void SetUp() override + { + // Initialize contexts in different domains + rclcpp::InitOptions context_options; + + context_1_ = std::make_shared(); + context_options.auto_initialize_logging(true).set_domain_id(kDomain1); + context_1_->init(0, nullptr, context_options); + + context_2_ = std::make_shared(); + context_options.auto_initialize_logging(false).set_domain_id(kDomain2); + context_2_->init(0, nullptr, context_options); + + // Initialize one node in each domain + rclcpp::NodeOptions node_options; + + node_options.context(context_1_); + node_1_ = std::make_shared("node_1", node_options); + + node_options.context(context_2_); + node_2_ = std::make_shared("node_2", node_options); + } + std::shared_ptr context_1_; + std::shared_ptr context_2_; + std::shared_ptr node_1_; + std::shared_ptr node_2_; +}; + +static +bool +poll_condition(std::function condition, std::chrono::seconds timeout) +{ + auto start = std::chrono::steady_clock::now(); + while ( + !condition() && + (start + timeout > std::chrono::steady_clock::now())) + { + std::this_thread::sleep_for(50ms); + } + return condition(); +} + +class ScopedAsyncSpinner +{ +public: + explicit ScopedAsyncSpinner(std::shared_ptr context) + : executor_{get_executor_options_with_context(std::move(context))}, + thread_{[this, stop_token = promise_.get_future()] { + executor_.spin_until_future_complete(stop_token); + }} + {} + + ~ScopedAsyncSpinner() + { + promise_.set_value(); + // TODO(ivanpauno): Report bug in rclcpp. + // This shouldn't be needed if spin_until_future_complete() worked + // correctly. + executor_.cancel(); + thread_.join(); + } + + rclcpp::Executor & + get_executor() + { + return executor_; + } + +private: + static + rclcpp::ExecutorOptions + get_executor_options_with_context(std::shared_ptr context) + { + rclcpp::ExecutorOptions ret; + ret.context = std::move(context); + return ret; + } + + rclcpp::executors::SingleThreadedExecutor executor_; + std::promise promise_; + std::thread thread_; +}; + +TEST_F(TestDomainBridgeEndToEnd, remap_topic_name) +{ + const std::string topic_name("test_remap_topic_before"); + const std::string remap_name("test_remap_topic_after"); + + std::atomic got_message = false; + + auto pub = node_1_->create_publisher(topic_name, 1); + auto sub = node_2_->create_subscription( + remap_name, + 1, + [&got_message](const test_msgs::msg::BasicTypes &) {got_message = true;}); + + // Bridge the publisher topic to domain 2 with a remap option + domain_bridge::DomainBridge bridge; + domain_bridge::TopicBridgeOptions topic_bridge_options; + topic_bridge_options.remap_name(remap_name); + bridge.bridge_topic( + topic_name, "test_msgs/msg/BasicTypes", kDomain1, kDomain2, topic_bridge_options); + + EXPECT_TRUE(poll_condition([sub]() {return sub->get_publisher_count() == 1u;}, 3s)); + pub->publish(test_msgs::msg::BasicTypes{}); + ScopedAsyncSpinner spinner{context_1_}; + spinner.get_executor().add_node(node_1_); + spinner.get_executor().add_node(node_2_); + bridge.add_to_executor(spinner.get_executor()); + EXPECT_TRUE(poll_condition([&got_message]() {return got_message.load();}, 3s)); +} + +TEST_F(TestDomainBridgeEndToEnd, remap_topic_name_with_substitution) +{ + const std::string topic_name("test_remap_topic_with_sub"); + const std::string remap_name("~/test_remap_topic_with_sub"); + const std::string domain_bridge_name("my_test_name"); + const std::string expected_name("/my_test_name/test_remap_topic_with_sub"); + + std::atomic got_message = false; + + // Create a publisher on domain 1 + auto pub = node_1_->create_publisher(topic_name, 1); + auto sub = node_2_->create_subscription( + expected_name, + 1, + [&got_message](const test_msgs::msg::BasicTypes &) {got_message = true;}); + + // Bridge the publisher topic to domain 2 with a remap option + domain_bridge::DomainBridgeOptions domain_bridge_options; + domain_bridge_options.name(domain_bridge_name); + domain_bridge::DomainBridge bridge{domain_bridge_options}; + domain_bridge::TopicBridgeOptions topic_bridge_options; + topic_bridge_options.remap_name(remap_name); + bridge.bridge_topic( + topic_name, "test_msgs/msg/BasicTypes", kDomain1, kDomain2, topic_bridge_options); + + EXPECT_TRUE(poll_condition([sub]() {return sub->get_publisher_count() == 1u;}, 3s)); + pub->publish(test_msgs::msg::BasicTypes{}); + ScopedAsyncSpinner spinner{context_1_}; + spinner.get_executor().add_node(node_1_); + spinner.get_executor().add_node(node_2_); + bridge.add_to_executor(spinner.get_executor()); + EXPECT_TRUE(poll_condition([&got_message]() {return got_message.load();}, 3s)); +} + +TEST_F(TestDomainBridgeEndToEnd, compress_mode) +{ + const std::string topic_name("test_compress"); + const std::string domain_bridge_name("my_test_name"); + + std::atomic got_message = false; + + // Create a publisher on domain 1 + auto pub = node_1_->create_publisher(topic_name, 1); + auto sub = node_2_->create_subscription( + topic_name, + 1, + [&got_message](const domain_bridge::msg::CompressedMsg &) {got_message = true;}); + + // Bridge the publisher topic to domain 2 with a remap option + domain_bridge::DomainBridgeOptions domain_bridge_options; + domain_bridge_options.name(domain_bridge_name); + domain_bridge_options.mode(domain_bridge::DomainBridgeOptions::Mode::Compress); + domain_bridge::DomainBridge bridge{domain_bridge_options}; + bridge.bridge_topic( + topic_name, "test_msgs/msg/BasicTypes", kDomain1, kDomain2); + + EXPECT_TRUE(poll_condition([sub]() {return sub->get_publisher_count() == 1u;}, 3s)); + pub->publish(test_msgs::msg::BasicTypes{}); + ScopedAsyncSpinner spinner{context_1_}; + spinner.get_executor().add_node(node_1_); + spinner.get_executor().add_node(node_2_); + bridge.add_to_executor(spinner.get_executor()); + EXPECT_TRUE(poll_condition([&got_message]() {return got_message.load();}, 3s)); +} + +TEST_F(TestDomainBridgeEndToEnd, decompress_mode) +{ + const std::string topic_name("test_decompress"); + const std::string domain_bridge_name("my_test_name"); + + std::atomic got_message = false; + + // Create a publisher on domain 1 + auto pub = node_1_->create_publisher(topic_name, 1); + auto sub = node_2_->create_subscription( + topic_name, + 1, + [&got_message](const test_msgs::msg::BasicTypes &) {got_message = true;}); + + // Bridge the publisher topic to domain 2 with a remap option + domain_bridge::DomainBridgeOptions domain_bridge_options; + domain_bridge_options.name(domain_bridge_name); + domain_bridge_options.mode(domain_bridge::DomainBridgeOptions::Mode::Decompress); + domain_bridge::DomainBridge bridge{domain_bridge_options}; + bridge.bridge_topic( + topic_name, "test_msgs/msg/BasicTypes", kDomain1, kDomain2); + + EXPECT_TRUE(poll_condition([sub]() {return sub->get_publisher_count() == 1u;}, 3s)); + rclcpp::Serialization serializer; + test_msgs::msg::BasicTypes msg; + rclcpp::SerializedMessage serialized_msg; + serializer.serialize_message(&msg, &serialized_msg); + domain_bridge::msg::CompressedMsg compressed_msg; + std::unique_ptr cctx{ZSTD_createCCtx(), &ZSTD_freeCCtx}; + compressed_msg.data = domain_bridge::compress_message(cctx.get(), std::move(serialized_msg)); + pub->publish(compressed_msg); + ScopedAsyncSpinner spinner{context_1_}; + spinner.get_executor().add_node(node_1_); + spinner.get_executor().add_node(node_2_); + bridge.add_to_executor(spinner.get_executor()); + EXPECT_TRUE(poll_condition([&got_message]() {return got_message.load();}, 3s)); +} diff --git a/test/domain_bridge/test_parse_domain_bridge_yaml_config.cpp b/test/domain_bridge/test_parse_domain_bridge_yaml_config.cpp index 252256c..ad14196 100644 --- a/test/domain_bridge/test_parse_domain_bridge_yaml_config.cpp +++ b/test/domain_bridge/test_parse_domain_bridge_yaml_config.cpp @@ -53,6 +53,24 @@ TEST_F(TestParseDomainBridgeYamlConfig, name) EXPECT_EQ(config.topics.size(), 0u); } +TEST_F(TestParseDomainBridgeYamlConfig, mode) +{ + { + const std::string yaml_path = ( + test_yaml_dir_ / std::filesystem::path{"compress_mode.yaml"}).string(); + auto config = domain_bridge::parse_domain_bridge_yaml_config(yaml_path); + EXPECT_EQ(config.options.mode(), domain_bridge::DomainBridgeOptions::Mode::Compress); + EXPECT_EQ(config.topics.size(), 0u); + } + { + const std::string yaml_path = ( + test_yaml_dir_ / std::filesystem::path{"decompress_mode.yaml"}).string(); + auto config = domain_bridge::parse_domain_bridge_yaml_config(yaml_path); + EXPECT_EQ(config.options.mode(), domain_bridge::DomainBridgeOptions::Mode::Decompress); + EXPECT_EQ(config.topics.size(), 0u); + } +} + TEST_F(TestParseDomainBridgeYamlConfig, topics) { std::vector expected = { @@ -203,4 +221,11 @@ TEST_F(TestParseDomainBridgeYamlConfig, invalid) EXPECT_THROW( domain_bridge::parse_domain_bridge_yaml_config(yaml_path), domain_bridge::YamlParsingError); } + // Invalid mode + { + const std::string yaml_path = + (test_yaml_dir_ / std::filesystem::path{"invalid_mode.yaml"}).string(); + EXPECT_THROW( + domain_bridge::parse_domain_bridge_yaml_config(yaml_path), domain_bridge::YamlParsingError); + } } diff --git a/test/domain_bridge/test_remap_name.cpp b/test/domain_bridge/test_remap_name.cpp deleted file mode 100644 index fb1c678..0000000 --- a/test/domain_bridge/test_remap_name.cpp +++ /dev/null @@ -1,129 +0,0 @@ -// Copyright 2021, Open Source Robotics Foundation, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include - -#include -#include - -#include "rclcpp/context.hpp" -#include "rclcpp/node.hpp" -#include "test_msgs/msg/basic_types.hpp" - -#include "domain_bridge/domain_bridge.hpp" - -#include "wait_for_publisher.hpp" - -class TestDomainBridgeRemapName : public ::testing::Test -{ -protected: - static void SetUpTestCase() - { - // Initialize contexts in different domains - context_1_ = std::make_shared(); - rclcpp::InitOptions context_options_1; - context_options_1.auto_initialize_logging(false).set_domain_id(domain_1_); - context_1_->init(0, nullptr, context_options_1); - - context_2_ = std::make_shared(); - rclcpp::InitOptions context_options_2; - context_options_2.auto_initialize_logging(false).set_domain_id(domain_2_); - context_2_->init(0, nullptr, context_options_2); - - node_options_1_.context(context_1_); - node_options_2_.context(context_2_); - } - - static const std::size_t domain_1_{1u}; - static const std::size_t domain_2_{2u}; - static std::shared_ptr context_1_; - static std::shared_ptr context_2_; - static rclcpp::NodeOptions node_options_1_; - static rclcpp::NodeOptions node_options_2_; -}; - -const std::size_t TestDomainBridgeRemapName::domain_1_; -const std::size_t TestDomainBridgeRemapName::domain_2_; -std::shared_ptr TestDomainBridgeRemapName::context_1_; -std::shared_ptr TestDomainBridgeRemapName::context_2_; -rclcpp::NodeOptions TestDomainBridgeRemapName::node_options_1_; -rclcpp::NodeOptions TestDomainBridgeRemapName::node_options_2_; - -TEST_F(TestDomainBridgeRemapName, remap_topic_name) -{ - const std::string topic_name("test_remap_topic_before"); - const std::string remap_name("test_remap_topic_after"); - - // Create a publisher on domain 1 - auto node_1 = std::make_shared("test_remap_topic_node_1", node_options_1_); - auto pub = node_1->create_publisher(topic_name, 1); - - // Bridge the publisher topic to domain 2 with a remap option - domain_bridge::DomainBridge bridge; - domain_bridge::TopicBridgeOptions topic_bridge_options; - topic_bridge_options.remap_name(remap_name); - bridge.bridge_topic( - topic_name, "test_msgs/msg/BasicTypes", domain_1_, domain_2_, topic_bridge_options); - - // Wait for bridge publisher to appear on domain 2 - auto node_2 = std::make_shared("test_remap_topic_node_2", node_options_2_); - // Expect the remapped topic to appear - ASSERT_TRUE(wait_for_publisher(node_2, remap_name)); - // Do not expect the original topic name - ASSERT_FALSE(wait_for_publisher(node_2, topic_name, std::chrono::seconds(1))); -} - -TEST_F(TestDomainBridgeRemapName, remap_topic_name_with_substitution) -{ - const std::string topic_name("test_remap_topic_with_sub"); - const std::string remap_name("~/test_remap_topic_with_sub"); - const std::string domain_bridge_name("my_test_name"); - const std::string expected_name("my_test_name/test_remap_topic_with_sub"); - - // Create a publisher on domain 1 - auto node_1 = std::make_shared( - "test_remap_topic_with_sub_node_1", node_options_1_); - auto pub = node_1->create_publisher(topic_name, 1); - - // Bridge the publisher topic to domain 2 with a remap option - domain_bridge::DomainBridgeOptions domain_bridge_options; - domain_bridge_options.name(domain_bridge_name); - domain_bridge::DomainBridge bridge(domain_bridge_options); - domain_bridge::TopicBridgeOptions topic_bridge_options; - topic_bridge_options.remap_name(remap_name); - bridge.bridge_topic( - topic_name, "test_msgs/msg/BasicTypes", domain_1_, domain_2_, topic_bridge_options); - - // Wait for bridge publisher to appear on domain 2 - auto node_2 = std::make_shared( - "test_remap_topic_with_sub_node_2", node_options_2_); - // Expect the remapped topic to appear - ASSERT_TRUE(wait_for_publisher(node_2, expected_name)); - // Do not expect the original topic name - ASSERT_FALSE(wait_for_publisher(node_2, topic_name, std::chrono::seconds(1))); -} - -TEST_F(TestDomainBridgeRemapName, remap_topic_name_invalid) -{ - const std::string topic_name("test_remap_topic_invalid_before"); - const std::string remap_name("test_remap_topic_1nv@lid_after"); - - domain_bridge::DomainBridge bridge; - domain_bridge::TopicBridgeOptions topic_bridge_options; - topic_bridge_options.remap_name(remap_name); - EXPECT_THROW( - bridge.bridge_topic( - topic_name, "test_msgs/msg/BasicTypes", domain_1_, domain_2_, topic_bridge_options), - rclcpp::exceptions::InvalidTopicNameError); -}