Skip to content

Commit 2e89320

Browse files
MiguelCompanyrichiware
authored andcommitted
Fixing data race on UDPTransportInterface [5458] (#540)
* Refs #5458. Fixing data race on UDPTransportInterface. * Refs #5458. Fixing timeouts on Mac. * Refs #5458 Fixing data race on ReceiverResource dtor. * Refs #5458 Clang-tidy warnings * Refs #5458 Fixing Mac timeout. * Refs #5458 Fixing valgrind invalid read. * Refs #5458 Requested changes. * Refs #5458 Fixing situation when socket doesn't close nicely. * Refs #5458 Trying to fix Mac issue. * Refs #5458 Trying to fix Mac issue. * Refs #5458 Trying to fix Mac issue. * Refs #5458 Fixed Mac issue. * Refs #5458 Avoid loop when multicast. Much lesser wait time until continue. * Refs #5458 Requested changes.
1 parent 87d38b6 commit 2e89320

File tree

11 files changed

+239
-150
lines changed

11 files changed

+239
-150
lines changed

include/fastrtps/rtps/network/ReceiverResource.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,12 +68,18 @@ class ReceiverResource : public TransportReceiverInterface
6868
*/
6969
void UnregisterReceiver(MessageReceiver* receiver);
7070

71+
/**
72+
* Closes related ChannelResources.
73+
*/
74+
void disable();
75+
7176
/**
7277
* Resources can only be transfered through move semantics. Copy, assignment, and
7378
* construction outside of the factory are forbidden.
7479
*/
7580
ReceiverResource(ReceiverResource&&);
76-
~ReceiverResource();
81+
82+
~ReceiverResource() override;
7783

7884
private:
7985
ReceiverResource() = delete;

include/fastrtps/transport/UDPChannelResource.h

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@
1616
#define UDP_CHANNEL_RESOURCE_INFO_
1717

1818
#include <fastrtps/transport/ChannelResource.h>
19+
#include <fastrtps/rtps/common/Locator.h>
1920
#include <asio.hpp>
2021

2122
namespace eprosima{
2223
namespace fastrtps{
2324
namespace rtps{
2425

2526
class TransportReceiverInterface;
27+
class UDPTransportInterface;
2628

2729
#if defined(ASIO_HAS_MOVE)
2830
// Typedefs
@@ -73,14 +75,16 @@ class TransportReceiverInterface;
7375
class UDPChannelResource : public ChannelResource
7476
{
7577
public:
76-
UDPChannelResource(eProsimaUDPSocket& socket);
7778

7879
UDPChannelResource(
80+
UDPTransportInterface* transport,
7981
eProsimaUDPSocket& socket,
80-
uint32_t maxMsgSize);
82+
uint32_t maxMsgSize,
83+
const Locator_t& locator,
84+
const std::string& sInterface,
85+
TransportReceiverInterface* receiver);
8186

82-
UDPChannelResource(UDPChannelResource&& channelResource);
83-
virtual ~UDPChannelResource();
87+
virtual ~UDPChannelResource() override;
8488

8589
UDPChannelResource& operator=(UDPChannelResource&& channelResource)
8690
{
@@ -91,7 +95,7 @@ class UDPChannelResource : public ChannelResource
9195
void only_multicast_purpose(const bool value)
9296
{
9397
only_multicast_purpose_ = value;
94-
};
98+
}
9599

96100
bool& only_multicast_purpose()
97101
{
@@ -132,12 +136,49 @@ class UDPChannelResource : public ChannelResource
132136
return message_receiver_;
133137
}
134138

139+
inline virtual void disable() override
140+
{
141+
ChannelResource::disable();
142+
}
143+
144+
void release(
145+
const Locator_t& locator,
146+
const asio::ip::address& address);
147+
148+
protected:
149+
/**
150+
* Function to be called from a new thread, which takes cares of performing a blocking receive
151+
* operation on the ReceiveResource
152+
* @param input_locator - Locator that triggered the creation of the resource
153+
*/
154+
void perform_listen_operation(
155+
Locator_t input_locator);
156+
157+
/**
158+
* Blocking Receive from the specified channel.
159+
* @param receive_buffer vector with enough capacity (not size) to accomodate a full receive buffer. That
160+
* capacity must not be less than the receive_buffer_size supplied to this class during construction.
161+
* @param receive_buffer_capacity Maximum size of the receive_buffer.
162+
* @param[out] receive_buffer_size Size of the received buffer.
163+
* @param[out] remote_locator Locator describing the remote restination we received a packet from.
164+
*/
165+
bool Receive(
166+
octet* receive_buffer,
167+
uint32_t receive_buffer_capacity,
168+
uint32_t& receive_buffer_size,
169+
Locator_t& remote_locator);
170+
135171
private:
136172

137173
TransportReceiverInterface* message_receiver_; //Associated Readers/Writers inside of MessageReceiver
138174
eProsimaUDPSocket socket_;
139175
bool only_multicast_purpose_;
140176
std::string interface_;
177+
UDPTransportInterface* transport_;
178+
bool closing_;
179+
std::mutex mtx_closing_;
180+
std::condition_variable cv_closing_;
181+
141182
UDPChannelResource(const UDPChannelResource&) = delete;
142183
UDPChannelResource& operator=(const UDPChannelResource&) = delete;
143184
};

include/fastrtps/transport/UDPTransportInterface.h

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -64,18 +64,6 @@ class UDPTransportInterface : public TransportInterface
6464
SendResourceList& sender_resource_list,
6565
const Locator_t&) override;
6666

67-
/**
68-
* Blocking Receive from the specified channel.
69-
* @param p_channel_resource Pointer to the channer resource that stores the socket.
70-
* @param receive_buffer vector with enough capacity (not size) to accomodate a full receive buffer. That
71-
* capacity must not be less than the receive_buffer_size supplied to this class during construction.
72-
* @param receive_buffer_capacity Maximum size of the receive_buffer.
73-
* @param[out] receive_buffer_size Size of the received buffer.
74-
* @param[out] remote_locator Locator describing the remote restination we received a packet from.
75-
*/
76-
bool Receive(UDPChannelResource* p_channel_resource, octet* receive_buffer,
77-
uint32_t receive_buffer_capacity, uint32_t& receive_buffer_size, Locator_t& remote_locator);
78-
7967
//! Release the listening socket for the specified port.
8068
bool ReleaseInputChannel(const Locator_t& locator, const asio::ip::address& interface_address);
8169

@@ -117,6 +105,8 @@ class UDPTransportInterface : public TransportInterface
117105

118106
protected:
119107

108+
friend class UDPChannelResource;
109+
120110
// For UDPv6, the notion of channel corresponds to a port + direction tuple.
121111
asio::io_service io_service_;
122112
std::vector<IPFinder::info_IP> currentInterfaces;
@@ -162,14 +152,6 @@ class UDPTransportInterface : public TransportInterface
162152
virtual eProsimaUDPSocket OpenAndBindInputSocket(const std::string& sIp, uint16_t port, bool is_multicast) = 0;
163153
eProsimaUDPSocket OpenAndBindUnicastOutputSocket(const asio::ip::udp::endpoint& endpoint, uint16_t& port);
164154

165-
/**
166-
* Function to be called from a new thread, which takes cares of performing a blocking receive
167-
* operation on the ReceiveResource
168-
* @param p_channel_resource - Associated ChannelResource
169-
* @param input_locator - Locator that triggered the creation of the resource
170-
*/
171-
void perform_listen_operation(UDPChannelResource* p_channel_resource, Locator_t input_locator);
172-
173155
virtual void set_receive_buffer_size(uint32_t size) = 0;
174156
virtual void set_send_buffer_size(uint32_t size) = 0;
175157
virtual void SetSocketOutboundInterface(eProsimaUDPSocket&, const std::string&) = 0;

src/cpp/rtps/network/ReceiverResource.cpp

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@ namespace fastrtps{
2626
namespace rtps{
2727

2828
ReceiverResource::ReceiverResource(TransportInterface& transport, const Locator_t& locator, uint32_t max_size)
29-
: mValid(false)
29+
: Cleanup(nullptr)
30+
, LocatorMapsToManagedChannel(nullptr)
31+
, mValid(false)
32+
, mtx()
3033
, receiver(nullptr)
3134
, msg(0)
3235
{
@@ -98,14 +101,18 @@ void ReceiverResource::OnDataReceived(const octet * data, const uint32_t size,
98101

99102
}
100103

101-
ReceiverResource::~ReceiverResource()
104+
void ReceiverResource::disable()
102105
{
103106
if (Cleanup)
104107
{
105108
Cleanup();
106109
}
107110
}
108111

112+
ReceiverResource::~ReceiverResource()
113+
{
114+
}
115+
109116
} // namespace rtps
110117
} // namespace fastrtps
111118
} // namespace eprosima

src/cpp/rtps/participant/RTPSParticipantImpl.cpp

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,8 @@ RTPSParticipantImpl::RTPSParticipantImpl(const RTPSParticipantAttributes& PParam
122122

123123
/// Creation of metatraffic locator and receiver resources
124124
uint32_t metatraffic_multicast_port = m_att.port.getMulticastPort(m_att.builtin.domainId);
125-
uint32_t metatraffic_unicast_port = m_att.port.getUnicastPort(m_att.builtin.domainId, m_att.participantID);
125+
uint32_t metatraffic_unicast_port = m_att.port.getUnicastPort(m_att.builtin.domainId,
126+
static_cast<uint32_t>(m_att.participantID));
126127

127128
/* If metatrafficMulticastLocatorList is empty, add mandatory default Locators
128129
Else -> Take them */
@@ -249,16 +250,17 @@ RTPSParticipantImpl::~RTPSParticipantImpl()
249250
for(auto& block : m_receiverResourcelist)
250251
{
251252
block.Receiver->UnregisterReceiver(block.mp_receiver);
253+
block.disable();
252254
}
253255

254256
while(m_userReaderList.size() > 0)
255257
{
256-
deleteUserEndpoint((Endpoint*)*m_userReaderList.begin());
258+
deleteUserEndpoint(static_cast<Endpoint*>(*m_userReaderList.begin()));
257259
}
258260

259261
while(m_userWriterList.size() > 0)
260262
{
261-
deleteUserEndpoint((Endpoint*)*m_userWriterList.begin());
263+
deleteUserEndpoint(static_cast<Endpoint*>(*m_userWriterList.begin()));
262264
}
263265

264266
delete(this->mp_builtinProtocols);
@@ -298,7 +300,7 @@ bool RTPSParticipantImpl::createWriter(
298300
std::string type = (param.endpoint.reliabilityKind == RELIABLE) ? "RELIABLE" :"BEST_EFFORT";
299301
logInfo(RTPS_PARTICIPANT," of type " << type);
300302
EntityId_t entId;
301-
if(entityId== c_EntityId_Unknown)
303+
if(entityId == c_EntityId_Unknown)
302304
{
303305
if(param.endpoint.topicKind == NO_KEY)
304306
{
@@ -309,21 +311,21 @@ bool RTPSParticipantImpl::createWriter(
309311
entId.value[3] = 0x02;
310312
}
311313
uint32_t idnum;
312-
if(param.endpoint.getEntityID()>0)
314+
if(param.endpoint.getEntityID() > 0)
313315
{
314-
idnum = param.endpoint.getEntityID();
316+
idnum = static_cast<uint32_t>(param.endpoint.getEntityID());
315317
}
316318
else
317319
{
318320
IdCounter++;
319321
idnum = IdCounter;
320322
}
321323

322-
octet* c = (octet*)&idnum;
324+
octet* c = reinterpret_cast<octet*>(&idnum);
323325
entId.value[2] = c[0];
324326
entId.value[1] = c[1];
325327
entId.value[0] = c[2];
326-
if(this->existsEntityId(entId,WRITER))
328+
if(this->existsEntityId(entId, WRITER))
327329
{
328330
logError(RTPS_PARTICIPANT,"A writer with the same entityId already exists in this RTPSParticipant");
329331
return false;
@@ -375,14 +377,14 @@ bool RTPSParticipantImpl::createWriter(
375377
if (param.endpoint.reliabilityKind == BEST_EFFORT)
376378
{
377379
SWriter = (persistence == nullptr) ?
378-
(RTPSWriter*) new StatelessWriter(this, guid, param, hist, listen) :
379-
(RTPSWriter*) new StatelessPersistentWriter(this, guid, param, hist, listen, persistence);
380+
new StatelessWriter(this, guid, param, hist, listen) :
381+
new StatelessPersistentWriter(this, guid, param, hist, listen, persistence);
380382
}
381383
else if (param.endpoint.reliabilityKind == RELIABLE)
382384
{
383385
SWriter = (persistence == nullptr) ?
384-
(RTPSWriter*) new StatefulWriter(this, guid, param, hist, listen) :
385-
(RTPSWriter*) new StatefulPersistentWriter(this, guid, param, hist, listen, persistence);
386+
new StatefulWriter(this, guid, param, hist, listen) :
387+
new StatefulPersistentWriter(this, guid, param, hist, listen, persistence);
386388
}
387389

388390
if (SWriter == nullptr)
@@ -411,10 +413,10 @@ bool RTPSParticipantImpl::createWriter(
411413
}
412414
#endif
413415

414-
createSendResources((Endpoint *)SWriter);
416+
createSendResources(SWriter);
415417
if (param.endpoint.reliabilityKind == RELIABLE)
416418
{
417-
if (!createAndAssociateReceiverswithEndpoint((Endpoint *)SWriter))
419+
if (!createAndAssociateReceiverswithEndpoint(SWriter))
418420
{
419421
delete(SWriter);
420422
return false;
@@ -469,15 +471,15 @@ bool RTPSParticipantImpl::createReader(
469471
uint32_t idnum;
470472
if (param.endpoint.getEntityID() > 0)
471473
{
472-
idnum = param.endpoint.getEntityID();
474+
idnum = static_cast<uint32_t>(param.endpoint.getEntityID());
473475
}
474476
else
475477
{
476478
IdCounter++;
477479
idnum = IdCounter;
478480
}
479481

480-
octet* c = (octet*)&idnum;
482+
octet* c = reinterpret_cast<octet*>(&idnum);
481483
entId.value[2] = c[0];
482484
entId.value[1] = c[1];
483485
entId.value[0] = c[2];
@@ -526,14 +528,14 @@ bool RTPSParticipantImpl::createReader(
526528
if (param.endpoint.reliabilityKind == BEST_EFFORT)
527529
{
528530
SReader = (persistence == nullptr) ?
529-
(RTPSReader*) new StatelessReader(this, guid, param, hist, listen) :
530-
(RTPSReader*) new StatelessPersistentReader(this, guid, param, hist, listen, persistence);
531+
new StatelessReader(this, guid, param, hist, listen) :
532+
new StatelessPersistentReader(this, guid, param, hist, listen, persistence);
531533
}
532534
else if (param.endpoint.reliabilityKind == RELIABLE)
533535
{
534536
SReader = (persistence == nullptr) ?
535-
(RTPSReader*) new StatefulReader(this, guid, param, hist, listen) :
536-
(RTPSReader*) new StatefulPersistentReader(this, guid, param, hist, listen, persistence);
537+
new StatefulReader(this, guid, param, hist, listen) :
538+
new StatefulPersistentReader(this, guid, param, hist, listen, persistence);
537539
}
538540

539541
if (SReader == nullptr)
@@ -565,7 +567,7 @@ bool RTPSParticipantImpl::createReader(
565567

566568
if (param.endpoint.reliabilityKind == RELIABLE)
567569
{
568-
createSendResources((Endpoint *)SReader);
570+
createSendResources(SReader);
569571
}
570572

571573
if (isBuiltin)
@@ -575,7 +577,7 @@ bool RTPSParticipantImpl::createReader(
575577

576578
if (enable)
577579
{
578-
if (!createAndAssociateReceiverswithEndpoint((Endpoint *)SReader))
580+
if (!createAndAssociateReceiverswithEndpoint(SReader))
579581
{
580582
delete(SReader);
581583
return false;
@@ -594,7 +596,7 @@ bool RTPSParticipantImpl::createReader(
594596

595597
bool RTPSParticipantImpl::enableReader(RTPSReader *reader)
596598
{
597-
if (!assignEndpointListenResources((Endpoint*)reader))
599+
if (!assignEndpointListenResources(reader))
598600
{
599601
return false;
600602
}
@@ -794,7 +796,7 @@ void RTPSParticipantImpl::createReceiverResources(LocatorList_t& Locator_list, b
794796
{
795797
std::lock_guard<std::mutex> lock(m_receiverResourcelistMutex);
796798
//Push the new items into the ReceiverResource buffer
797-
m_receiverResourcelist.push_back(ReceiverControlBlock(std::move(*it_buffer)));
799+
m_receiverResourcelist.emplace_back(*it_buffer);
798800
//Create and init the MessageReceiver
799801
auto mr = new MessageReceiver(this, size);
800802
m_receiverResourcelist.back().mp_receiver = mr;
@@ -892,7 +894,7 @@ bool RTPSParticipantImpl::deleteUserEndpoint(Endpoint* p_endpoint)
892894
{
893895
if (found_in_users)
894896
{
895-
mp_builtinProtocols->removeLocalWriter((RTPSWriter*)p_endpoint);
897+
mp_builtinProtocols->removeLocalWriter(static_cast<RTPSWriter*>(p_endpoint));
896898
}
897899

898900
#if HAVE_SECURITY
@@ -907,7 +909,7 @@ bool RTPSParticipantImpl::deleteUserEndpoint(Endpoint* p_endpoint)
907909
{
908910
if (found_in_users)
909911
{
910-
mp_builtinProtocols->removeLocalReader((RTPSReader*)p_endpoint);
912+
mp_builtinProtocols->removeLocalReader(static_cast<RTPSReader*>(p_endpoint));
911913
}
912914

913915
#if HAVE_SECURITY

0 commit comments

Comments
 (0)