Skip to content

Commit 52b9102

Browse files
croos12lolyu
andauthored
[lib] Make ZMQ buffer size adjustable and set it to 4MB by default (#1697)
* Update zmq buffer to be adjustable Update zmq response buffers to be adjustable, also decrease the size of response buffers outside of syncd Signed-off-by: Connor Roos <[email protected]> * Add buffer size constants Adds 64MB buffer size constants to RedisRemoteSaiInterface and syncd Signed-off-by: Connor Roos <[email protected]> * Remove trailing whitespace Signed-off-by: Connor Roos <[email protected]> * Make buffer adjustable in RedisRemoteSaiInterface Signed-off-by: Connor Roos <[email protected]> --------- Signed-off-by: Connor Roos <[email protected]> Co-authored-by: Longxiang Lyu <[email protected]>
1 parent 88222b4 commit 52b9102

7 files changed

Lines changed: 60 additions & 24 deletions

lib/RedisRemoteSaiInterface.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
#include <inttypes.h>
2222

23+
#define SAI_ZMQ_DEFAULT_RESPONSE_BUFFER_SIZE (64*1024*1024)
24+
2325
using namespace sairedis;
2426
using namespace saimeta;
2527
using namespace sairediscommon;
@@ -77,13 +79,15 @@ sai_status_t RedisRemoteSaiInterface::apiInitialize(
7779
m_useTempView = false;
7880
m_syncMode = false;
7981
m_redisCommunicationMode = SAI_REDIS_COMMUNICATION_MODE_REDIS_ASYNC;
82+
m_zmqResponseBufferSize = SAI_ZMQ_DEFAULT_RESPONSE_BUFFER_SIZE;
8083

8184
if (m_contextConfig->m_zmqEnable)
8285
{
8386
m_communicationChannel = std::make_shared<ZeroMQChannel>(
8487
m_contextConfig->m_zmqEndpoint,
8588
m_contextConfig->m_zmqNtfEndpoint,
86-
std::bind(&RedisRemoteSaiInterface::handleNotification, this, _1, _2, _3));
89+
std::bind(&RedisRemoteSaiInterface::handleNotification, this, _1, _2, _3),
90+
m_zmqResponseBufferSize);
8791

8892
SWSS_LOG_NOTICE("zmq enabled, forcing sync mode");
8993

@@ -420,7 +424,8 @@ sai_status_t RedisRemoteSaiInterface::setRedisExtensionAttribute(
420424
m_communicationChannel = std::make_shared<ZeroMQChannel>(
421425
m_contextConfig->m_zmqEndpoint,
422426
m_contextConfig->m_zmqNtfEndpoint,
423-
std::bind(&RedisRemoteSaiInterface::handleNotification, this, _1, _2, _3));
427+
std::bind(&RedisRemoteSaiInterface::handleNotification, this, _1, _2, _3),
428+
m_zmqResponseBufferSize);
424429

425430
m_communicationChannel->setResponseTimeout(m_responseTimeoutMs);
426431

lib/RedisRemoteSaiInterface.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,8 @@ namespace sairedis
460460

461461
uint64_t m_responseTimeoutMs;
462462

463+
size_t m_zmqResponseBufferSize;
464+
463465
std::function<sai_switch_notifications_t(std::shared_ptr<Notification>)> m_notificationCallback;
464466

465467
std::map<sai_object_id_t, swss::TableDump> m_tableDump;

lib/ZeroMQChannel.cpp

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,24 +12,33 @@
1212

1313
using namespace sairedis;
1414

15-
#define ZMQ_RESPONSE_BUFFER_SIZE (64*1024*1024)
1615
#define ZMQ_MAX_RETRY 10
1716

1817
ZeroMQChannel::ZeroMQChannel(
1918
_In_ const std::string& endpoint,
2019
_In_ const std::string& ntfEndpoint,
21-
_In_ Channel::Callback callback):
20+
_In_ Channel::Callback callback,
21+
_In_ long zmqResponseBufferSize):
2222
Channel(callback),
2323
m_endpoint(endpoint),
2424
m_ntfEndpoint(ntfEndpoint),
2525
m_context(nullptr),
2626
m_socket(nullptr),
2727
m_ntfContext(nullptr),
28-
m_ntfSocket(nullptr)
28+
m_ntfSocket(nullptr),
29+
m_zmqResponseBufferSize(zmqResponseBufferSize)
2930
{
3031
SWSS_LOG_ENTER();
32+
if (m_zmqResponseBufferSize != ZMQ_RESPONSE_DEFAULT_BUFFER_SIZE)
33+
{
34+
SWSS_LOG_NOTICE("setting zmq response buffer size to %ld bytes", m_zmqResponseBufferSize);
35+
}
36+
else
37+
{
38+
SWSS_LOG_NOTICE("using default zmq response buffer size of %ld bytes", ZMQ_RESPONSE_DEFAULT_BUFFER_SIZE);
39+
}
3140

32-
m_buffer.resize(ZMQ_RESPONSE_BUFFER_SIZE);
41+
m_buffer.resize(m_zmqResponseBufferSize);
3342

3443
// configure ZMQ for main communication
3544

@@ -129,14 +138,14 @@ void ZeroMQChannel::notificationThreadFunction()
129138

130139
std::vector<uint8_t> buffer;
131140

132-
buffer.resize(ZMQ_RESPONSE_BUFFER_SIZE);
141+
buffer.resize(m_zmqResponseBufferSize);
133142

134143
while (m_runNotificationThread)
135144
{
136145
// NOTE: this entire loop internal could be encapsulated into separate class
137146
// which will inherit from Selectable class, and name this as ntf receiver
138147

139-
int rc = zmq_recv(m_ntfSocket, buffer.data(), ZMQ_RESPONSE_BUFFER_SIZE, 0);
148+
int rc = zmq_recv(m_ntfSocket, buffer.data(), m_zmqResponseBufferSize, 0);
140149

141150
if (!m_runNotificationThread)
142151
break;
@@ -156,10 +165,10 @@ void ZeroMQChannel::notificationThreadFunction()
156165
continue;
157166
}
158167

159-
if (rc >= ZMQ_RESPONSE_BUFFER_SIZE)
168+
if (rc >= m_zmqResponseBufferSize)
160169
{
161170
SWSS_LOG_WARN("zmq_recv message was truncated (over %d bytes, received %d), increase buffer size, message DROPPED",
162-
ZMQ_RESPONSE_BUFFER_SIZE,
171+
m_zmqResponseBufferSize,
163172
rc);
164173

165174
continue;
@@ -291,7 +300,7 @@ sai_status_t ZeroMQChannel::wait(
291300

292301
for (int i = 0; true ; ++i)
293302
{
294-
rc = zmq_recv(m_socket, m_buffer.data(), ZMQ_RESPONSE_BUFFER_SIZE, 0);
303+
rc = zmq_recv(m_socket, m_buffer.data(), m_zmqResponseBufferSize, 0);
295304

296305
if (rc < 0 && zmq_errno() == EINTR && i < ZMQ_MAX_RETRY)
297306
{
@@ -301,10 +310,10 @@ sai_status_t ZeroMQChannel::wait(
301310
{
302311
SWSS_LOG_THROW("zmq_recv failed, zmqerrno: %d", zmq_errno());
303312
}
304-
if (rc >= ZMQ_RESPONSE_BUFFER_SIZE)
313+
if (rc >= m_zmqResponseBufferSize)
305314
{
306315
SWSS_LOG_THROW("zmq_recv message was truncated (over %d bytes, received %d), increase buffer size, message DROPPED",
307-
ZMQ_RESPONSE_BUFFER_SIZE,
316+
m_zmqResponseBufferSize,
308317
rc);
309318
}
310319
break;

lib/ZeroMQChannel.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
#include <memory>
1111
#include <functional>
1212

13+
#define ZMQ_RESPONSE_DEFAULT_BUFFER_SIZE (4*1024*1024)
14+
1315
namespace sairedis
1416
{
1517
class ZeroMQChannel:
@@ -20,7 +22,8 @@ namespace sairedis
2022
ZeroMQChannel(
2123
_In_ const std::string& endpoint,
2224
_In_ const std::string& ntfEndpoint,
23-
_In_ Channel::Callback callback);
25+
_In_ Channel::Callback callback,
26+
_In_ long zmqResponseBufferSize = ZMQ_RESPONSE_DEFAULT_BUFFER_SIZE);
2427

2528
virtual ~ZeroMQChannel();
2629

@@ -63,5 +66,7 @@ namespace sairedis
6366
void* m_ntfContext;
6467

6568
void* m_ntfSocket;
69+
70+
long m_zmqResponseBufferSize;
6671
};
6772
}

meta/ZeroMQSelectableChannel.cpp

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,27 +6,36 @@
66
#include <zmq.h>
77
#include <unistd.h>
88

9-
#define ZMQ_RESPONSE_BUFFER_SIZE (64*1024*1024)
10-
119
//#define ZMQ_POLL_TIMEOUT (2*60*1000)
1210
#define ZMQ_POLL_TIMEOUT (1000)
1311

1412
using namespace sairedis;
1513

1614
ZeroMQSelectableChannel::ZeroMQSelectableChannel(
17-
_In_ const std::string& endpoint):
15+
_In_ const std::string& endpoint,
16+
_In_ long zmqResponseBufferSize):
1817
m_endpoint(endpoint),
1918
m_context(nullptr),
2019
m_socket(nullptr),
2120
m_fd(0),
2221
m_allowZmqPoll(false),
23-
m_runThread(true)
22+
m_runThread(true),
23+
m_zmqResponseBufferSize(zmqResponseBufferSize)
2424
{
2525
SWSS_LOG_ENTER();
2626

2727
SWSS_LOG_NOTICE("binding on %s", endpoint.c_str());
2828

29-
m_buffer.resize(ZMQ_RESPONSE_BUFFER_SIZE);
29+
if (m_zmqResponseBufferSize != ZMQ_RESPONSE_DEFAULT_BUFFER_SIZE)
30+
{
31+
SWSS_LOG_NOTICE("setting zmq response buffer size to %ld bytes", m_zmqResponseBufferSize);
32+
}
33+
else
34+
{
35+
SWSS_LOG_NOTICE("using default zmq response buffer size of %ld bytes", ZMQ_RESPONSE_DEFAULT_BUFFER_SIZE);
36+
}
37+
38+
m_buffer.resize(m_zmqResponseBufferSize);
3039

3140
m_context = zmq_ctx_new();;
3241

@@ -230,17 +239,17 @@ uint64_t ZeroMQSelectableChannel::readData()
230239
// clear selectable event so it could be triggered in next select()
231240
m_selectableEvent.readData();
232241

233-
int rc = zmq_recv(m_socket, m_buffer.data(), ZMQ_RESPONSE_BUFFER_SIZE, 0);
242+
int rc = zmq_recv(m_socket, m_buffer.data(), m_zmqResponseBufferSize, 0);
234243

235244
if (rc < 0)
236245
{
237246
SWSS_LOG_THROW("zmq_recv failed, zmqerrno: %d", zmq_errno());
238247
}
239248

240-
if (rc >= ZMQ_RESPONSE_BUFFER_SIZE)
249+
if (rc >= m_zmqResponseBufferSize)
241250
{
242251
SWSS_LOG_THROW("zmq_recv message was truncated (over %d bytes, received %d), increase buffer size, message DROPPED",
243-
ZMQ_RESPONSE_BUFFER_SIZE,
252+
m_zmqResponseBufferSize,
244253
rc);
245254
}
246255

meta/ZeroMQSelectableChannel.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
#include <thread>
1010
#include <memory>
1111

12+
#define ZMQ_RESPONSE_DEFAULT_BUFFER_SIZE (4*1024*1024)
13+
1214
namespace sairedis
1315
{
1416
class ZeroMQSelectableChannel:
@@ -17,7 +19,8 @@ namespace sairedis
1719
public:
1820

1921
ZeroMQSelectableChannel(
20-
_In_ const std::string& endpoint);
22+
_In_ const std::string& endpoint,
23+
_In_ long zmqResponseBufferSize = ZMQ_RESPONSE_DEFAULT_BUFFER_SIZE);
2124

2225
virtual ~ZeroMQSelectableChannel();
2326

@@ -75,5 +78,7 @@ namespace sairedis
7578
std::shared_ptr<std::thread> m_zmlPollThread;
7679

7780
swss::SelectableEvent m_selectableEvent;
81+
82+
long m_zmqResponseBufferSize;
7883
};
7984
}

syncd/Syncd.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040

4141
#define DEF_SAI_WARM_BOOT_DATA_FILE "/var/warmboot/sai-warmboot.bin"
4242
#define SAI_FAILURE_DUMP_SCRIPT "/usr/bin/sai_failure_dump.sh"
43+
#define SYNCD_ZMQ_RESPONSE_BUFFER_SIZE (64*1024*1024)
4344

4445
using namespace syncd;
4546
using namespace saimeta;
@@ -134,7 +135,7 @@ Syncd::Syncd(
134135

135136
m_enableSyncMode = true;
136137

137-
m_selectableChannel = std::make_shared<sairedis::ZeroMQSelectableChannel>(m_contextConfig->m_zmqEndpoint);
138+
m_selectableChannel = std::make_shared<sairedis::ZeroMQSelectableChannel>(m_contextConfig->m_zmqEndpoint, SYNCD_ZMQ_RESPONSE_BUFFER_SIZE);
138139
}
139140
else
140141
{

0 commit comments

Comments
 (0)