Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions common/asyncdbupdater.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@
#include "dbconnector.h"
#include "table.h"

#define MQ_SIZE 100
#define MQ_MAX_RETRY 10
#define MQ_POLL_TIMEOUT (1000)

namespace swss {

class AsyncDBUpdater
Expand Down
4 changes: 4 additions & 0 deletions common/zmqclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ void ZmqClient::connect()
int linger = 0;
zmq_setsockopt(m_socket, ZMQ_LINGER, &linger, sizeof(linger));

// only queue on complete connection, so ZMQ will not lose data during reconnect: http://api.zeromq.org/master:zmq-setsockopt
int immediate = 1;
zmq_setsockopt(m_socket, ZMQ_IMMEDIATE, &immediate, sizeof(immediate));

// Increase send buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt
int high_watermark = MQ_WATERMARK;
zmq_setsockopt(m_socket, ZMQ_SNDHWM, &high_watermark, sizeof(high_watermark));
Expand Down
2 changes: 1 addition & 1 deletion common/zmqserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

#define MQ_RESPONSE_MAX_COUNT (16*1024*1024)
#define MQ_SIZE 100
#define MQ_MAX_RETRY 10
#define MQ_MAX_RETRY 3
#define MQ_POLL_TIMEOUT (1000)
#define MQ_WATERMARK 10000

Expand Down
2 changes: 1 addition & 1 deletion crates/swss-common-testing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl Redis {
let mut child = Command::new("timeout")
.args([
"--signal=KILL",
"15s",
"60s",
"redis-server",
"--appendonly", "no",
"--save", "",
Expand Down
55 changes: 33 additions & 22 deletions crates/swss-common/tests/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,10 @@ fn zmq_consumer_producer_state_tables_sync_api_basic_test() -> Result<(), Except
Ok(())
}

// Below test covers 2 scenarios:
// 1. late connect when zmq server is started after client sending messages.
// 2. reconnect when zmq server is stopped and restarted. messages from client during
// the time should be queued by client and resent when server is restarted.
// This test covers two scenarios:
// 1. Late connection: the ZMQ server starts after the client has already begun sending messages.
// Messages sent during this period should not be queued by the client, as queued data may be lost.
// 2. Reconnection: once the ZMQ server is up, messages sent afterward should be successfully received.
#[test]
fn zmq_consumer_producer_state_tables_sync_api_connect_late_reconnect() -> Result<(), Exception> {
use SelectResult::*;
Expand All @@ -207,26 +207,37 @@ fn zmq_consumer_producer_state_tables_sync_api_connect_late_reconnect() -> Resul
let redis = Redis::start();
let zpst = ZmqProducerStateTable::new(redis.db_connector(), "table_a", zmqc, false)?;

for _ in [TestPhase::LateConnect, TestPhase::Reconnect] {
let kfvs = random_kfvs();
for kfv in &kfvs {
match kfv.operation {
KeyOperation::Set => zpst.set(&kfv.key, kfv.field_values.clone())?,
KeyOperation::Del => zpst.del(&kfv.key)?,
}
}
// send will failed because not connect to server
let result = zpst.set(&random_string(), random_fvs());
let result_message = match result {
Ok(val) => format!("Success"),
Err(e) => format!("Error: {}", e),
};
assert!(result_message.contains("zmqerrno: 11:Resource temporarily unavailable"));

let mut zmqs = ZmqServer::new(&endpoint)?;
let zcst = ZmqConsumerStateTable::new(redis.db_connector(), "table_a", &mut zmqs, None, None)?;
let mut kfvs_seen = Vec::new();
while kfvs_seen.len() != kfvs.len() {
assert_eq!(zcst.read_data(Duration::from_millis(2000), true)?, Data);
kfvs_seen.extend(zcst.pops()?);
}
assert_eq!(kfvs, kfvs_seen);
drop(zcst);
drop(zmqs);
// create ZMQ server and try receive data
let mut zmqs = ZmqServer::new(&endpoint)?;
let zcst = ZmqConsumerStateTable::new(redis.db_connector(), "table_a", &mut zmqs, None, None)?;

// should not receive any data
let mut kfvs_seen = Vec::new();
for i in 0..10 {
_ = zcst.read_data(Duration::from_millis(2000), true);
kfvs_seen.extend(zcst.pops()?);
}
assert_eq!(kfvs_seen.len(), 0);

// send again, should receive data because client reconnect
zpst.set(&random_string(), random_fvs());

while kfvs_seen.len() != 1 {
_ = zcst.read_data(Duration::from_millis(2000), true);
kfvs_seen.extend(zcst.pops()?);
}
assert_eq!(kfvs_seen.len(), 1);

drop(zcst);
drop(zmqs);

Ok(())
}
Expand Down
13 changes: 11 additions & 2 deletions tests/zmq_state_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,16 @@ TEST(ZmqWithResponseClientError, test)
kcos.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector<FieldValueTuple>{}});
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>> kcosPtr;
std::string dbName, tableName;
p.send(kcos);

std::string exception_message = "";
try
{
p.send(kcos);
}
catch (const std::system_error& e) {
exception_message = e.what();
}

// Wait will timeout without server reply.
EXPECT_FALSE(p.wait(dbName, tableName, kcosPtr));
EXPECT_TRUE(exception_message.find("zmqerrno: 11:Resource temporarily unavailable") != std::string::npos);
}
Loading