Skip to content

Commit ade2a06

Browse files
committed
Correctly connect parties with socket. (#218)
Summary: Pull Request resolved: #218 Our existing arrangement of party info (ip address and port number) is buggy as there will be a conflict with more than 2 parties. This diff is to address this issue. Our current code works fine with 2 party setting. But it will break with more than 2 parties. Example scenario: when party 0 connects with party 1, with current code, he will pull out the partyinfo of party 0, and add one to the port number. Party 1 will add 1 to a port number on his side. When party 1 connects with party 2, with current code, he will pull out the same partyinfo of party 0, and add on to the port number. Party 2 will add 1 to a port number on his side. Since multiple connections will be established between each pair of parties, this will definitely create a conflict. For example, party 0 will add 1 to the same port number twice while others only do that once. Our solution: 1. Each party keep a map of partyinfo, served as "info needed to connected to party i". 2. during each connection attemption, each party will use the peer's id to look up the partyinfo map. If this party is going to play the role of server, the ip address is useless. The corresponding ip address and port number can be used to establish connection. Differential Revision: D36908724 fbshipit-source-id: 7821ade91cfb8c61be76cf79547ef8a4fdc8c476
1 parent 40b0a5c commit ade2a06

3 files changed

Lines changed: 72 additions & 27 deletions

File tree

fbpcf/engine/communication/SocketPartyCommunicationAgentFactory.h

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,16 @@ class SocketPartyCommunicationAgentFactory final
2424
int portNo;
2525
};
2626

27-
// it's OK if a party with a smaller id doesn't know a party with larger id's
28-
// ip address, since the party with smaller id will always be the server.
27+
/** it's OK if a party with a smaller id doesn't know a party with larger id's
28+
* ip address, since the party with smaller id will always be the server.
29+
*@param partyInfos This is a map that contains connection information for all
30+
other parties, 0...n where myId is some m, 0 <= m <= n. The map contains entries
31+
for all parties != m. For all parties where id < m, it will contain an address
32+
and a port and will behave as a TCP client. For all parties where id > m, it
33+
will contain a port (and an address that will be ignored) and will behave as a
34+
TCP server. We expect the gap between port numbers are large enough to allow
35+
establishing multiple connections (>3) between each party pair.
36+
*/
2937
SocketPartyCommunicationAgentFactory(
3038
int myId,
3139
std::map<int, PartyInfo> partyInfos)
@@ -51,8 +59,7 @@ class SocketPartyCommunicationAgentFactory final
5159
if (id == myId_) {
5260
throw std::runtime_error("No need to talk to myself!");
5361
} else {
54-
auto serverId = id < myId_ ? id : myId_;
55-
auto iter = partyInfos_.find(serverId);
62+
auto iter = partyInfos_.find(id);
5663
if (iter == partyInfos_.end()) {
5764
throw std::runtime_error("Don't know how to connect to this party!");
5865
}

fbpcf/engine/communication/test/PartyCommunicationAgentTest.cpp

Lines changed: 52 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -44,18 +44,30 @@ void sendAndReceive(std::unique_ptr<IPartyCommunicationAgent> agent, int size) {
4444

4545
void testAgentFactory(
4646
int myId,
47+
int totalParty,
4748
int size,
4849
std::unique_ptr<IPartyCommunicationAgentFactory> factory) {
49-
auto agent = factory->create(1 - myId);
50-
sendAndReceive(std::move(agent), size);
50+
for (int i = 0; i < totalParty; i++) {
51+
std::vector<std::thread> testThreads;
52+
if (i != myId) {
53+
auto agent = factory->create(i);
54+
testThreads.push_back(
55+
std::thread(sendAndReceive, std::move(agent), size));
56+
}
57+
for (auto& t : testThreads) {
58+
t.join();
59+
}
60+
}
5161
}
5262

5363
TEST(InMemoryPartyCommunicationAgentTest, testSendAndReceive) {
5464
auto factorys = getInMemoryAgentFactory(2);
5565

5666
int size = 1024;
57-
auto thread0 = std::thread(testAgentFactory, 0, size, std::move(factorys[0]));
58-
auto thread1 = std::thread(testAgentFactory, 1, size, std::move(factorys[1]));
67+
auto thread0 =
68+
std::thread(testAgentFactory, 0, 2, size, std::move(factorys[0]));
69+
auto thread1 =
70+
std::thread(testAgentFactory, 1, 2, size, std::move(factorys[1]));
5971

6072
thread1.join();
6173
thread0.join();
@@ -73,19 +85,30 @@ TEST(SocketPartyCommunicationAgentTest, testSendAndReceiveWithTls) {
7385
* stress runs, we get errors when trying to bind to the
7486
* same port multiple times.
7587
*/
76-
std::map<int, SocketPartyCommunicationAgentFactory::PartyInfo> partyInfo = {
77-
{0, {"127.0.0.1", intDistro(defEngine)}},
78-
{1, {"127.0.0.1", intDistro(defEngine)}}};
88+
auto port01 = intDistro(defEngine);
89+
auto port02 = port01 + 4;
90+
auto port12 = port01 + 8;
91+
92+
std::map<int, SocketPartyCommunicationAgentFactory::PartyInfo> partyInfo0 = {
93+
{1, {"127.0.0.1", port01}}, {2, {"127.0.0.1", port02}}};
94+
std::map<int, SocketPartyCommunicationAgentFactory::PartyInfo> partyInfo1 = {
95+
{0, {"127.0.0.1", port01}}, {2, {"127.0.0.1", port12}}};
96+
std::map<int, SocketPartyCommunicationAgentFactory::PartyInfo> partyInfo2 = {
97+
{0, {"127.0.0.1", port02}}, {1, {"127.0.0.1", port12}}};
7998

8099
auto factory0 = std::make_unique<SocketPartyCommunicationAgentFactory>(
81-
0, partyInfo, true, createdDir);
100+
0, partyInfo0, true, createdDir);
82101
auto factory1 = std::make_unique<SocketPartyCommunicationAgentFactory>(
83-
1, partyInfo, true, createdDir);
102+
1, partyInfo1, true, createdDir);
103+
auto factory2 = std::make_unique<SocketPartyCommunicationAgentFactory>(
104+
2, partyInfo2, true, createdDir);
84105

85106
int size = 1048576; // 1024 ^ 2
86-
auto thread0 = std::thread(testAgentFactory, 0, size, std::move(factory0));
87-
auto thread1 = std::thread(testAgentFactory, 1, size, std::move(factory1));
107+
auto thread0 = std::thread(testAgentFactory, 0, 3, size, std::move(factory0));
108+
auto thread1 = std::thread(testAgentFactory, 1, 3, size, std::move(factory1));
109+
auto thread2 = std::thread(testAgentFactory, 2, 3, size, std::move(factory2));
88110

111+
thread2.join();
89112
thread1.join();
90113
thread0.join();
91114

@@ -97,19 +120,30 @@ TEST(SocketPartyCommunicationAgentTest, testSendAndReceiveWithoutTls) {
97120
std::default_random_engine defEngine(rd());
98121
std::uniform_int_distribution<int> intDistro(10000, 25000);
99122

100-
std::map<int, SocketPartyCommunicationAgentFactory::PartyInfo> partyInfo = {
101-
{0, {"127.0.0.1", intDistro(defEngine)}},
102-
{1, {"127.0.0.1", intDistro(defEngine)}}};
123+
auto port01 = intDistro(defEngine);
124+
auto port02 = intDistro(defEngine);
125+
auto port12 = intDistro(defEngine);
126+
127+
std::map<int, SocketPartyCommunicationAgentFactory::PartyInfo> partyInfo0 = {
128+
{1, {"127.0.0.1", port01}}, {2, {"127.0.0.1", port02}}};
129+
std::map<int, SocketPartyCommunicationAgentFactory::PartyInfo> partyInfo1 = {
130+
{0, {"127.0.0.1", port01}}, {2, {"127.0.0.1", port12}}};
131+
std::map<int, SocketPartyCommunicationAgentFactory::PartyInfo> partyInfo2 = {
132+
{0, {"127.0.0.1", port02}}, {1, {"127.0.0.1", port12}}};
103133

104134
auto factory0 =
105-
std::make_unique<SocketPartyCommunicationAgentFactory>(0, partyInfo);
135+
std::make_unique<SocketPartyCommunicationAgentFactory>(0, partyInfo0);
106136
auto factory1 =
107-
std::make_unique<SocketPartyCommunicationAgentFactory>(1, partyInfo);
137+
std::make_unique<SocketPartyCommunicationAgentFactory>(1, partyInfo1);
138+
auto factory2 =
139+
std::make_unique<SocketPartyCommunicationAgentFactory>(2, partyInfo2);
108140

109141
int size = 1048576; // 1024 ^ 2
110-
auto thread0 = std::thread(testAgentFactory, 0, size, std::move(factory0));
111-
auto thread1 = std::thread(testAgentFactory, 1, size, std::move(factory1));
142+
auto thread0 = std::thread(testAgentFactory, 0, 3, size, std::move(factory0));
143+
auto thread1 = std::thread(testAgentFactory, 1, 3, size, std::move(factory1));
144+
auto thread2 = std::thread(testAgentFactory, 2, 3, size, std::move(factory2));
112145

146+
thread2.join();
113147
thread1.join();
114148
thread0.join();
115149
}

fbpcf/engine/util/test/benchmarks/BenchmarkHelper.h

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,18 +44,22 @@ getSocketAgents() {
4444
auto retries = 5;
4545
while (retries--) {
4646
try {
47+
auto port = intDistro(e);
4748
std::map<
4849
int,
4950
communication::SocketPartyCommunicationAgentFactory::PartyInfo>
50-
partyInfo = {
51-
{0, {"127.0.0.1", intDistro(e)}},
52-
{1, {"127.0.0.1", intDistro(e)}}};
51+
partyInfo0 = {{1, {"127.0.0.1", port}}};
52+
std::map<
53+
int,
54+
communication::SocketPartyCommunicationAgentFactory::PartyInfo>
55+
partyInfo1 = {{0, {"127.0.0.1", port}}};
56+
5357
auto factory0 =
5458
std::make_unique<communication::SocketPartyCommunicationAgentFactory>(
55-
0, partyInfo);
59+
0, partyInfo0);
5660
auto factory1 =
5761
std::make_unique<communication::SocketPartyCommunicationAgentFactory>(
58-
1, partyInfo);
62+
1, partyInfo1);
5963

6064
auto task =
6165
[](std::unique_ptr<communication::IPartyCommunicationAgentFactory>

0 commit comments

Comments
 (0)