Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 2 additions & 15 deletions crates/hamgrd/src/actors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ use swbus_edge::swbus_proto::result::*;
use swbus_edge::swbus_proto::swbus::{swbus_message::Body, DataRequest, ServicePath, SwbusErrorCode, SwbusMessage};
use swbus_edge::SwbusEdgeRuntime;
use swss_common::{
KeyOpFieldValues, KeyOperation, ProducerStateTable, SonicDbTable, SubscriberStateTable, ZmqClient,
ZmqProducerStateTable,
KeyOpFieldValues, KeyOperation, SonicDbTable, SubscriberStateTable, ZmqClient, ZmqProducerStateTable,
};
use swss_common_bridge::{consumer::ConsumerBridge, producer::spawn_producer_bridge};
use tokio::sync::mpsc::{channel, Receiver};
Expand Down Expand Up @@ -276,18 +275,6 @@ where
);
Ok(spawn_producer_bridge(edge_runtime.clone(), sp, zpst))
} else {
if std::env::var("SIM").is_err() {
anyhow::bail!("Failed to connect to ZMQ server at {}", zmq_endpoint);
}
let dpu_appl_db = crate::db_for_table::<T>().await?;
let pst = ProducerStateTable::new(dpu_appl_db, T::table_name()).unwrap();

let sp = crate::common_bridge_sp::<T>(&edge_runtime);
info!(
"spawned producer bridge for {} at {}",
T::table_name(),
sp.to_longest_path()
);
Ok(spawn_producer_bridge(edge_runtime.clone(), sp, pst))
anyhow::bail!("Failed to connect to ZMQ server at {}", zmq_endpoint);
}
}
41 changes: 41 additions & 0 deletions crates/swss-common/tests/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,47 @@ fn zmq_consumer_producer_state_tables_sync_api_basic_test() -> Result<(), Except
Ok(())
}

// Below test covers 2 scenarios:
// 1. late connect when zmq server is started after client sending messages.
// 2. reconnect when zmq server is stopped and restarted. messages from client during
// the time should be queued by client and resent when server is restarted.
#[test]
fn zmq_consumer_producer_state_tables_sync_api_connect_late_reconnect() -> Result<(), Exception> {
use SelectResult::*;
enum TestPhase {
LateConnect,
Reconnect,
}
let (endpoint, _delete) = random_zmq_endpoint();

let zmqc = ZmqClient::new(&endpoint)?;
let redis = Redis::start();
let zpst = ZmqProducerStateTable::new(redis.db_connector(), "table_a", zmqc, false)?;

for _ in [TestPhase::LateConnect, TestPhase::Reconnect] {
let kfvs = random_kfvs();
for kfv in &kfvs {
match kfv.operation {
KeyOperation::Set => zpst.set(&kfv.key, kfv.field_values.clone())?,
KeyOperation::Del => zpst.del(&kfv.key)?,
}
}

let mut zmqs = ZmqServer::new(&endpoint)?;
let zcst = ZmqConsumerStateTable::new(redis.db_connector(), "table_a", &mut zmqs, None, None)?;
let mut kfvs_seen = Vec::new();
while kfvs_seen.len() != kfvs.len() {
assert_eq!(zcst.read_data(Duration::from_millis(2000), true)?, Data);
kfvs_seen.extend(zcst.pops()?);
}
assert_eq!(kfvs, kfvs_seen);
drop(zcst);
drop(zmqs);
}

Ok(())
}

#[test]
fn table_sync_api_basic_test() -> Result<(), Exception> {
let redis = Redis::start();
Expand Down
5 changes: 3 additions & 2 deletions test_utils/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ This folder contains some utilities, configurations, instructions on running man

cd test_utils

./run_redis.sh hamgrd/database_config.json hamgrd/redis_data_set.cmd
./run_redis.sh hamgrd/database_config.json hamgrd/database_global.json hamgrd/redis_data_set.cmd

- Start swbusd

Expand All @@ -17,4 +17,5 @@ This folder contains some utilities, configurations, instructions on running man

- Run swbus-cli

DEV=dpu0 target/debug/swbus-cli show hamgrd actor /hamgrd/0/dpu/switch1_dpu0
DEV=dpu0 target/debug/swbus-cli show hamgrd actor /hamgrd/0/dpu/switch1_dpu0