diff --git a/common/asyncdbupdater.h b/common/asyncdbupdater.h index 25c6ef705..fa3ca7119 100644 --- a/common/asyncdbupdater.h +++ b/common/asyncdbupdater.h @@ -6,10 +6,6 @@ #include "dbconnector.h" #include "table.h" -#define MQ_SIZE 100 -#define MQ_MAX_RETRY 10 -#define MQ_POLL_TIMEOUT (1000) - namespace swss { class AsyncDBUpdater diff --git a/common/zmqclient.cpp b/common/zmqclient.cpp index 45f118990..3bafa4e18 100644 --- a/common/zmqclient.cpp +++ b/common/zmqclient.cpp @@ -98,6 +98,10 @@ void ZmqClient::connect() int linger = 0; zmq_setsockopt(m_socket, ZMQ_LINGER, &linger, sizeof(linger)); + // only queue on complete connection, so ZMQ will not lose data during reconnect: http://api.zeromq.org/master:zmq-setsockopt + int immediate = 1; + zmq_setsockopt(m_socket, ZMQ_IMMEDIATE, &immediate, sizeof(immediate)); + // Increase send buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt int high_watermark = MQ_WATERMARK; zmq_setsockopt(m_socket, ZMQ_SNDHWM, &high_watermark, sizeof(high_watermark)); diff --git a/common/zmqserver.h b/common/zmqserver.h index 2b8d1bac0..3701d7181 100644 --- a/common/zmqserver.h +++ b/common/zmqserver.h @@ -8,7 +8,7 @@ #define MQ_RESPONSE_MAX_COUNT (16*1024*1024) #define MQ_SIZE 100 -#define MQ_MAX_RETRY 10 +#define MQ_MAX_RETRY 3 #define MQ_POLL_TIMEOUT (1000) #define MQ_WATERMARK 10000 diff --git a/crates/swss-common-testing/src/lib.rs b/crates/swss-common-testing/src/lib.rs index 7d299265a..26811be2e 100644 --- a/crates/swss-common-testing/src/lib.rs +++ b/crates/swss-common-testing/src/lib.rs @@ -51,7 +51,7 @@ impl Redis { let mut child = Command::new("timeout") .args([ "--signal=KILL", - "15s", + "60s", "redis-server", "--appendonly", "no", "--save", "", diff --git a/crates/swss-common/tests/sync.rs b/crates/swss-common/tests/sync.rs index a4f408124..9c2c147d0 100644 --- a/crates/swss-common/tests/sync.rs +++ b/crates/swss-common/tests/sync.rs @@ -190,10 +190,10 @@ fn zmq_consumer_producer_state_tables_sync_api_basic_test() -> Result<(), Except Ok(()) } -// Below test covers 2 scenarios: -// 1. late connect when zmq server is started after client sending messages. -// 2. reconnect when zmq server is stopped and restarted. messages from client during -// the time should be queued by client and resent when server is restarted. +// This test covers two scenarios: +// 1. Late connection: the ZMQ server starts after the client has already begun sending messages. +// Messages sent during this period should not be queued by the client, as queued data may be lost. +// 2. Reconnection: once the ZMQ server is up, messages sent afterward should be successfully received. #[test] fn zmq_consumer_producer_state_tables_sync_api_connect_late_reconnect() -> Result<(), Exception> { use SelectResult::*; @@ -207,26 +207,37 @@ fn zmq_consumer_producer_state_tables_sync_api_connect_late_reconnect() -> Resul let redis = Redis::start(); let zpst = ZmqProducerStateTable::new(redis.db_connector(), "table_a", zmqc, false)?; - for _ in [TestPhase::LateConnect, TestPhase::Reconnect] { - let kfvs = random_kfvs(); - for kfv in &kfvs { - match kfv.operation { - KeyOperation::Set => zpst.set(&kfv.key, kfv.field_values.clone())?, - KeyOperation::Del => zpst.del(&kfv.key)?, - } - } + // send will failed because not connect to server + let result = zpst.set(&random_string(), random_fvs()); + let result_message = match result { + Ok(val) => format!("Success"), + Err(e) => format!("Error: {}", e), + }; + assert!(result_message.contains("zmqerrno: 11:Resource temporarily unavailable")); - let mut zmqs = ZmqServer::new(&endpoint)?; - let zcst = ZmqConsumerStateTable::new(redis.db_connector(), "table_a", &mut zmqs, None, None)?; - let mut kfvs_seen = Vec::new(); - while kfvs_seen.len() != kfvs.len() { - assert_eq!(zcst.read_data(Duration::from_millis(2000), true)?, Data); - kfvs_seen.extend(zcst.pops()?); - } - assert_eq!(kfvs, kfvs_seen); - drop(zcst); - drop(zmqs); + // create ZMQ server and try receive data + let mut zmqs = ZmqServer::new(&endpoint)?; + let zcst = ZmqConsumerStateTable::new(redis.db_connector(), "table_a", &mut zmqs, None, None)?; + + // should not receive any data + let mut kfvs_seen = Vec::new(); + for i in 0..10 { + _ = zcst.read_data(Duration::from_millis(2000), true); + kfvs_seen.extend(zcst.pops()?); + } + assert_eq!(kfvs_seen.len(), 0); + + // send again, should receive data because client reconnect + zpst.set(&random_string(), random_fvs()); + + while kfvs_seen.len() != 1 { + _ = zcst.read_data(Duration::from_millis(2000), true); + kfvs_seen.extend(zcst.pops()?); } + assert_eq!(kfvs_seen.len(), 1); + + drop(zcst); + drop(zmqs); Ok(()) } diff --git a/tests/zmq_state_ut.cpp b/tests/zmq_state_ut.cpp index 2b0b60d73..bac1962e3 100644 --- a/tests/zmq_state_ut.cpp +++ b/tests/zmq_state_ut.cpp @@ -565,7 +565,16 @@ TEST(ZmqWithResponseClientError, test) kcos.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector{}}); std::vector> kcosPtr; std::string dbName, tableName; - p.send(kcos); + + std::string exception_message = ""; + try + { + p.send(kcos); + } + catch (const std::system_error& e) { + exception_message = e.what(); + } + // Wait will timeout without server reply. - EXPECT_FALSE(p.wait(dbName, tableName, kcosPtr)); + EXPECT_TRUE(exception_message.find("zmqerrno: 11:Resource temporarily unavailable") != std::string::npos); }