Skip to content
Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions common/asyncdbupdater.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@
#include "dbconnector.h"
#include "table.h"

#define MQ_SIZE 100
#define MQ_MAX_RETRY 10
#define MQ_POLL_TIMEOUT (1000)

namespace swss {

class AsyncDBUpdater
Expand Down
4 changes: 4 additions & 0 deletions common/zmqclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ void ZmqClient::connect()
int linger = 0;
zmq_setsockopt(m_socket, ZMQ_LINGER, &linger, sizeof(linger));

// only queue on complete connection, so ZMQ will not lost data during reconnect: http://api.zeromq.org/master:zmq-setsockopt
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lost

lose

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, current hamgrd relies on ZMQ_IMMEDIATE=0 because it doesn't actively retry. With this change, does zmq_connect fails if the peer is not connected? And what the client needs to do if connection is lost?

Copy link
Copy Markdown
Contributor Author

@liuh-80 liuh-80 Aug 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ZMQ client will retry when connection dropped. and if retry failed, ZMQ client will throw an exception: "zmqerrno: 11:Resource temporarily unavailable"

Using ZMQ_IMMEDIATE=0 is unstable, hamgrd need handle exception and retry to make sure data not lost.

But because ZMQ client already retry before throw exception, so hamgrd may need log the exception and ignore it or crash with a error.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lost

lose

Fixed

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate on ZMQ_IMMEDIATE=0 is unstable? Do you mean message could lose when queue is full?

Copy link
Copy Markdown
Contributor Author

@liuh-80 liuh-80 Aug 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When ZMQ server side down, client side can't queue.
Then when ZMQ server side up, client side can queue data and send to server.
If ZMQ server side down again, then client side can't queue new data to server. client will retry:
If server start before client retry finish, the send will succeed.
If server does not start before client retry finish, the send will failed and throw exception.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, the last 2 "if" are same condition but different result. Did you miss a "not" somewhere?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, the last 2 "if" are same condition but different result. Did you miss a "not" somewhere?

Fixed

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Basically, if server is not connected, client calling zmq.send() will fail but the messages already queued are safe and zmq will retry to deliver them to the server. Then I don't understand the root cause of this issue. I thought zmq dropped the first message during server is reconnected. But apparently it is not because it is able to resend queued messages without issue. Is the issue in gnmi resend logic?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ZMQ document doesn't give detail about how data dropped. if connection is setup, the pull/push is safe, but when connection broken, the queued data may drop.

int immediate = 1;
zmq_setsockopt(m_socket, ZMQ_IMMEDIATE, &immediate, sizeof(immediate));

// Increase send buffer for use all bandwidth: http://api.zeromq.org/4-2:zmq-setsockopt
int high_watermark = MQ_WATERMARK;
zmq_setsockopt(m_socket, ZMQ_SNDHWM, &high_watermark, sizeof(high_watermark));
Expand Down
2 changes: 1 addition & 1 deletion common/zmqserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

#define MQ_RESPONSE_MAX_COUNT (16*1024*1024)
#define MQ_SIZE 100
#define MQ_MAX_RETRY 10
#define MQ_MAX_RETRY 3
#define MQ_POLL_TIMEOUT (1000)
#define MQ_WATERMARK 10000

Expand Down
2 changes: 1 addition & 1 deletion crates/swss-common-testing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl Redis {
let mut child = Command::new("timeout")
.args([
"--signal=KILL",
"15s",
"60s",
"redis-server",
"--appendonly", "no",
"--save", "",
Expand Down
55 changes: 33 additions & 22 deletions crates/swss-common/tests/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,10 @@ fn zmq_consumer_producer_state_tables_sync_api_basic_test() -> Result<(), Except
Ok(())
}

// Below test covers 2 scenarios:
// 1. late connect when zmq server is started after client sending messages.
// 2. reconnect when zmq server is stopped and restarted. messages from client during
// the time should be queued by client and resent when server is restarted.
// This test covers two scenarios:
// 1. Late connection: the ZMQ server starts after the client has already begun sending messages.
// Messages sent during this period should not be queued by the client, as queued data may be lost.
// 2. Reconnection: once the ZMQ server is up, messages sent afterward should be successfully received.
#[test]
fn zmq_consumer_producer_state_tables_sync_api_connect_late_reconnect() -> Result<(), Exception> {
use SelectResult::*;
Expand All @@ -207,26 +207,37 @@ fn zmq_consumer_producer_state_tables_sync_api_connect_late_reconnect() -> Resul
let redis = Redis::start();
let zpst = ZmqProducerStateTable::new(redis.db_connector(), "table_a", zmqc, false)?;

for _ in [TestPhase::LateConnect, TestPhase::Reconnect] {
let kfvs = random_kfvs();
for kfv in &kfvs {
match kfv.operation {
KeyOperation::Set => zpst.set(&kfv.key, kfv.field_values.clone())?,
KeyOperation::Del => zpst.del(&kfv.key)?,
}
}
// send will failed because not connect to server
let result = zpst.set(&random_string(), random_fvs().clone());
let result_message = match result {
Ok(val) => format!("Success"),
Err(e) => format!("Error: {}", e),
};
assert!(result_message.contains("zmqerrno: 11:Resource temporarily unavailable"));

let mut zmqs = ZmqServer::new(&endpoint)?;
let zcst = ZmqConsumerStateTable::new(redis.db_connector(), "table_a", &mut zmqs, None, None)?;
let mut kfvs_seen = Vec::new();
while kfvs_seen.len() != kfvs.len() {
assert_eq!(zcst.read_data(Duration::from_millis(2000), true)?, Data);
kfvs_seen.extend(zcst.pops()?);
}
assert_eq!(kfvs, kfvs_seen);
drop(zcst);
drop(zmqs);
// create ZMQ server and try receive data
let mut zmqs = ZmqServer::new(&endpoint)?;
let zcst = ZmqConsumerStateTable::new(redis.db_connector(), "table_a", &mut zmqs, None, None)?;

// should not receive any data
let mut kfvs_seen = Vec::new();
for i in 0..10 {
zcst.read_data(Duration::from_millis(2000), true);
kfvs_seen.extend(zcst.pops()?);
}
assert_eq!(kfvs_seen.len(), 0);

// send again, should receive data because client reconnect
zpst.set(&random_string(), random_fvs().clone());

while kfvs_seen.len() != 1 {
zcst.read_data(Duration::from_millis(2000), true);
kfvs_seen.extend(zcst.pops()?);
}
assert_eq!(kfvs_seen.len(), 1);

drop(zcst);
drop(zmqs);

Ok(())
}
Expand Down
13 changes: 11 additions & 2 deletions tests/zmq_state_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,16 @@ TEST(ZmqWithResponseClientError, test)
kcos.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector<FieldValueTuple>{}});
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>> kcosPtr;
std::string dbName, tableName;
p.send(kcos);

std::string exception_message = "";
try
{
p.send(kcos);
}
catch (const std::system_error& e) {
exception_message = e.what();
}

// Wait will timeout without server reply.
EXPECT_FALSE(p.wait(dbName, tableName, kcosPtr));
EXPECT_TRUE(exception_message.find("zmqerrno: 11:Resource temporarily unavailable") != std::string::npos);
}
Loading