Skip to content

Commit efed5be

Browse files
Fix unique network flows with TCP transports (#5461) (#5486)
* Refs #22055: Add regression tests Signed-off-by: cferreiragonz <[email protected]> * Refs #22055: Fix unique flows for TCP Signed-off-by: cferreiragonz <[email protected]> * Refs #22055: Fix tests Signed-off-by: cferreiragonz <[email protected]> --------- Signed-off-by: cferreiragonz <[email protected]> (cherry picked from commit 81cdb10) Co-authored-by: Carlos Ferreira González <[email protected]>
1 parent cbe6216 commit efed5be

File tree

3 files changed

+147
-1
lines changed

3 files changed

+147
-1
lines changed

src/cpp/rtps/participant/RTPSParticipantImpl.cpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1737,7 +1737,15 @@ bool RTPSParticipantImpl::createAndAssociateReceiverswithEndpoint(
17371737
// Set port on unicast locators
17381738
for (Locator_t& loc : attributes.unicastLocatorList)
17391739
{
1740-
loc.port = port;
1740+
// Set logical port only TCP locators
1741+
if (LOCATOR_KIND_TCPv4 == loc.kind || LOCATOR_KIND_TCPv6 == loc.kind)
1742+
{
1743+
IPLocator::setLogicalPort(loc, port);
1744+
}
1745+
else
1746+
{
1747+
loc.port = port;
1748+
}
17411749
}
17421750

17431751
// Try creating receiver resources

test/blackbox/api/dds-pim/PubSubParticipant.hpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -689,6 +689,13 @@ class PubSubParticipant
689689
return false;
690690
}
691691

692+
PubSubParticipant& initial_peers(
693+
const eprosima::fastdds::rtps::LocatorList& initial_peers)
694+
{
695+
participant_qos_.wire_protocol().builtin.initialPeersList = initial_peers;
696+
return *this;
697+
}
698+
692699
PubSubParticipant& pub_property_policy(
693700
const eprosima::fastdds::rtps::PropertyPolicy property_policy)
694701
{

test/blackbox/common/BlackboxTestsTransportTCP.cpp

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
#include "../api/dds-pim/TCPReqRepHelloWorldRequester.hpp"
2727
#include "../api/dds-pim/TCPReqRepHelloWorldReplier.hpp"
28+
#include "PubSubParticipant.hpp"
2829
#include "PubSubReader.hpp"
2930
#include "PubSubWriter.hpp"
3031
#include "DatagramInjectionTransport.hpp"
@@ -1385,6 +1386,136 @@ TEST_P(TransportTCP, TCP_initial_peers_connection)
13851386
p3.block_for_all();
13861387
}
13871388

1389+
TEST_P(TransportTCP, tcp_unique_network_flows_init)
1390+
{
1391+
// TCP Writer creation should fail as feature is not implemented for writers
1392+
{
1393+
PubSubWriter<HelloWorldPubSubType> writer(TEST_TOPIC_NAME);
1394+
PropertyPolicy properties;
1395+
properties.properties().emplace_back("fastdds.unique_network_flows", "");
1396+
1397+
test_transport_->add_listener_port(global_port);
1398+
writer.disable_builtin_transport().add_user_transport_to_pparams(test_transport_);
1399+
1400+
writer.entity_property_policy(properties).init();
1401+
1402+
EXPECT_FALSE(writer.isInitialized());
1403+
}
1404+
1405+
// Two readers on the same participant not requesting unique flows should give the same logical port and same physical port
1406+
{
1407+
PubSubParticipant<HelloWorldPubSubType> participant(0, 2, 0, 0);
1408+
1409+
participant.sub_topic_name(TEST_TOPIC_NAME);
1410+
1411+
participant.disable_builtin_transport().add_user_transport_to_pparams(test_transport_);
1412+
1413+
ASSERT_TRUE(participant.init_participant());
1414+
ASSERT_TRUE(participant.init_subscriber(0));
1415+
ASSERT_TRUE(participant.init_subscriber(1));
1416+
1417+
LocatorList_t locators;
1418+
LocatorList_t locators2;
1419+
1420+
participant.get_native_reader(0).get_listening_locators(locators);
1421+
participant.get_native_reader(1).get_listening_locators(locators2);
1422+
1423+
EXPECT_TRUE(locators == locators2);
1424+
// LocatorList size depends on the number of interfaces. Different address but same port.
1425+
ASSERT_GT(locators.size(), 0);
1426+
ASSERT_GT(locators2.size(), 0);
1427+
auto locator1 = locators.begin();
1428+
auto locator2 = locators2.begin();
1429+
EXPECT_EQ(IPLocator::getPhysicalPort(*locator1), IPLocator::getPhysicalPort(*locator2));
1430+
EXPECT_EQ(IPLocator::getLogicalPort(*locator1), IPLocator::getLogicalPort(*locator2));
1431+
}
1432+
1433+
// Two TCP readers on the same participant requesting unique flows should give different logical ports but same physical port
1434+
{
1435+
PubSubParticipant<HelloWorldPubSubType> participant(0, 2, 0, 0);
1436+
1437+
PropertyPolicy properties;
1438+
properties.properties().emplace_back("fastdds.unique_network_flows", "");
1439+
participant.sub_topic_name(TEST_TOPIC_NAME).sub_property_policy(properties);
1440+
1441+
participant.disable_builtin_transport().add_user_transport_to_pparams(test_transport_);
1442+
1443+
ASSERT_TRUE(participant.init_participant());
1444+
ASSERT_TRUE(participant.init_subscriber(0));
1445+
ASSERT_TRUE(participant.init_subscriber(1));
1446+
1447+
LocatorList_t locators;
1448+
LocatorList_t locators2;
1449+
1450+
participant.get_native_reader(0).get_listening_locators(locators);
1451+
participant.get_native_reader(1).get_listening_locators(locators2);
1452+
1453+
EXPECT_FALSE(locators == locators2);
1454+
// LocatorList size depends on the number of interfaces. Different address but same port.
1455+
ASSERT_GT(locators.size(), 0);
1456+
ASSERT_GT(locators2.size(), 0);
1457+
auto locator1 = locators.begin();
1458+
auto locator2 = locators2.begin();
1459+
EXPECT_EQ(IPLocator::getPhysicalPort(*locator1), IPLocator::getPhysicalPort(*locator2));
1460+
EXPECT_NE(IPLocator::getLogicalPort(*locator1), IPLocator::getLogicalPort(*locator2));
1461+
}
1462+
}
1463+
1464+
TEST_P(TransportTCP, tcp_unique_network_flows_communication)
1465+
{
1466+
PubSubParticipant<HelloWorldPubSubType> readers(0, 2, 0, 2);
1467+
PubSubWriter<HelloWorldPubSubType> writer(TEST_TOPIC_NAME);
1468+
1469+
PropertyPolicy properties;
1470+
properties.properties().emplace_back("fastdds.unique_network_flows", "");
1471+
readers.disable_builtin_transport().add_user_transport_to_pparams(test_transport_);
1472+
1473+
eprosima::fastdds::rtps::Locator_t initial_peer_locator;
1474+
if (use_ipv6)
1475+
{
1476+
initial_peer_locator.kind = LOCATOR_KIND_TCPv6;
1477+
eprosima::fastdds::rtps::IPLocator::setIPv6(initial_peer_locator, "::1");
1478+
}
1479+
else
1480+
{
1481+
initial_peer_locator.kind = LOCATOR_KIND_TCPv4;
1482+
eprosima::fastdds::rtps::IPLocator::setIPv4(initial_peer_locator, "127.0.0.1");
1483+
}
1484+
eprosima::fastdds::rtps::IPLocator::setPhysicalPort(initial_peer_locator, global_port);
1485+
eprosima::fastdds::rtps::LocatorList_t initial_peer_list;
1486+
initial_peer_list.push_back(initial_peer_locator);
1487+
1488+
readers.sub_topic_name(TEST_TOPIC_NAME)
1489+
.sub_property_policy(properties)
1490+
.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
1491+
.initial_peers(initial_peer_list);
1492+
1493+
ASSERT_TRUE(readers.init_participant());
1494+
ASSERT_TRUE(readers.init_subscriber(0));
1495+
ASSERT_TRUE(readers.init_subscriber(1));
1496+
1497+
test_transport_->add_listener_port(global_port);
1498+
writer.disable_builtin_transport()
1499+
.add_user_transport_to_pparams(test_transport_)
1500+
.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
1501+
.history_depth(100);
1502+
1503+
writer.init();
1504+
ASSERT_TRUE(writer.isInitialized());
1505+
1506+
// Wait for discovery.
1507+
writer.wait_discovery();
1508+
readers.sub_wait_discovery();
1509+
1510+
// Send data
1511+
auto data = default_helloworld_data_generator();
1512+
writer.send(data);
1513+
// In this test all data should be sent.
1514+
ASSERT_TRUE(data.empty());
1515+
// Block until readers have acknowledged all samples.
1516+
EXPECT_TRUE(writer.waitForAllAcked(std::chrono::seconds(30)));
1517+
}
1518+
13881519
#ifdef INSTANTIATE_TEST_SUITE_P
13891520
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
13901521
#else

0 commit comments

Comments
 (0)