|
21 | 21 | #include "rclcpp/context.hpp" |
22 | 22 | #include "rclcpp/executors.hpp" |
23 | 23 | #include "rclcpp/node.hpp" |
| 24 | +#include "rclcpp/serialization.hpp" |
24 | 25 | #include "test_msgs/msg/basic_types.hpp" |
25 | 26 |
|
| 27 | +#include "domain_bridge/compress_messages.hpp" |
26 | 28 | #include "domain_bridge/domain_bridge.hpp" |
| 29 | +#include "domain_bridge/msg/compressed_msg.hpp" |
| 30 | + |
27 | 31 |
|
28 | 32 | static constexpr std::size_t kDomain1{1u}; |
29 | 33 | static constexpr std::size_t kDomain2{2u}; |
30 | 34 |
|
31 | 35 | using namespace std::chrono_literals; |
32 | 36 |
|
| 37 | +// TODO(ivanpauno): We could use parameterized testing here ... |
33 | 38 | class TestDomainBridgeEndToEnd : public ::testing::Test |
34 | 39 | { |
35 | 40 | protected: |
@@ -178,3 +183,72 @@ TEST_F(TestDomainBridgeEndToEnd, remap_topic_name_with_substitution) |
178 | 183 | bridge.add_to_executor(spinner.get_executor()); |
179 | 184 | EXPECT_TRUE(poll_condition([&got_message]() {return got_message.load();}, 3s)); |
180 | 185 | } |
| 186 | + |
| 187 | +TEST_F(TestDomainBridgeEndToEnd, compress_mode) |
| 188 | +{ |
| 189 | + const std::string topic_name("test_compress"); |
| 190 | + const std::string domain_bridge_name("my_test_name"); |
| 191 | + |
| 192 | + std::atomic<bool> got_message = false; |
| 193 | + |
| 194 | + // Create a publisher on domain 1 |
| 195 | + auto pub = node_1_->create_publisher<test_msgs::msg::BasicTypes>(topic_name, 1); |
| 196 | + auto sub = node_2_->create_subscription<domain_bridge::msg::CompressedMsg>( |
| 197 | + topic_name, |
| 198 | + 1, |
| 199 | + [&got_message](const domain_bridge::msg::CompressedMsg &) {got_message = true;}); |
| 200 | + |
| 201 | + // Bridge the publisher topic to domain 2 with a remap option |
| 202 | + domain_bridge::DomainBridgeOptions domain_bridge_options; |
| 203 | + domain_bridge_options.name(domain_bridge_name); |
| 204 | + domain_bridge_options.mode(domain_bridge::DomainBridgeOptions::Mode::Compress); |
| 205 | + domain_bridge::DomainBridge bridge{domain_bridge_options}; |
| 206 | + bridge.bridge_topic( |
| 207 | + topic_name, "test_msgs/msg/BasicTypes", kDomain1, kDomain2); |
| 208 | + |
| 209 | + EXPECT_TRUE(poll_condition([sub]() {return sub->get_publisher_count() == 1u;}, 3s)); |
| 210 | + pub->publish(test_msgs::msg::BasicTypes{}); |
| 211 | + ScopedAsyncSpinner spinner{context_1_}; |
| 212 | + spinner.get_executor().add_node(node_1_); |
| 213 | + spinner.get_executor().add_node(node_2_); |
| 214 | + bridge.add_to_executor(spinner.get_executor()); |
| 215 | + EXPECT_TRUE(poll_condition([&got_message]() {return got_message.load();}, 3s)); |
| 216 | +} |
| 217 | + |
| 218 | +TEST_F(TestDomainBridgeEndToEnd, decompress_mode) |
| 219 | +{ |
| 220 | + const std::string topic_name("test_decompress"); |
| 221 | + const std::string domain_bridge_name("my_test_name"); |
| 222 | + |
| 223 | + std::atomic<bool> got_message = false; |
| 224 | + |
| 225 | + // Create a publisher on domain 1 |
| 226 | + auto pub = node_1_->create_publisher<domain_bridge::msg::CompressedMsg>(topic_name, 1); |
| 227 | + auto sub = node_2_->create_subscription<test_msgs::msg::BasicTypes>( |
| 228 | + topic_name, |
| 229 | + 1, |
| 230 | + [&got_message](const test_msgs::msg::BasicTypes &) {got_message = true;}); |
| 231 | + |
| 232 | + // Bridge the publisher topic to domain 2 with a remap option |
| 233 | + domain_bridge::DomainBridgeOptions domain_bridge_options; |
| 234 | + domain_bridge_options.name(domain_bridge_name); |
| 235 | + domain_bridge_options.mode(domain_bridge::DomainBridgeOptions::Mode::Decompress); |
| 236 | + domain_bridge::DomainBridge bridge{domain_bridge_options}; |
| 237 | + bridge.bridge_topic( |
| 238 | + topic_name, "test_msgs/msg/BasicTypes", kDomain1, kDomain2); |
| 239 | + |
| 240 | + EXPECT_TRUE(poll_condition([sub]() {return sub->get_publisher_count() == 1u;}, 3s)); |
| 241 | + rclcpp::Serialization<test_msgs::msg::BasicTypes> serializer; |
| 242 | + test_msgs::msg::BasicTypes msg; |
| 243 | + rclcpp::SerializedMessage serialized_msg; |
| 244 | + serializer.serialize_message(&msg, &serialized_msg); |
| 245 | + domain_bridge::msg::CompressedMsg compressed_msg; |
| 246 | + std::unique_ptr<ZSTD_CCtx, size_t (*)(ZSTD_CCtx *)> cctx{ZSTD_createCCtx(), &ZSTD_freeCCtx}; |
| 247 | + compressed_msg.data = domain_bridge::compress_message(cctx.get(), std::move(serialized_msg)); |
| 248 | + pub->publish(compressed_msg); |
| 249 | + ScopedAsyncSpinner spinner{context_1_}; |
| 250 | + spinner.get_executor().add_node(node_1_); |
| 251 | + spinner.get_executor().add_node(node_2_); |
| 252 | + bridge.add_to_executor(spinner.get_executor()); |
| 253 | + EXPECT_TRUE(poll_condition([&got_message]() {return got_message.load();}, 3s)); |
| 254 | +} |
0 commit comments