Skip to content

Commit 6e3f8dd

Browse files
committed
Add test using the ZMQ_CONFLATE option receiver-side
1 parent f4ad565 commit 6e3f8dd

File tree

1 file changed

+32
-0
lines changed

1 file changed

+32
-0
lines changed

tests/test.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,38 @@ test!(test_raw_roundtrip, {
117117
let _ = unsafe { Socket::from_raw(raw) };
118118
});
119119

120+
// The `conflate` option limits the buffer size to one; let's see if we can get
121+
// messages (unreliably) across the connection.
122+
test!(test_conflating_receiver, {
123+
use std::sync::{Arc, atomic::{Ordering, AtomicBool}};
124+
125+
let ctx = zmq::Context::new();
126+
let receiver = ctx.socket(zmq::PULL).unwrap();
127+
receiver.bind("tcp://127.0.0.1:*").unwrap();
128+
let receiver_endpoint = receiver.get_last_endpoint().unwrap().unwrap();
129+
130+
let stop = Arc::new(AtomicBool::new(false));
131+
let sender_thread = {
132+
let stop = Arc::clone(&stop);
133+
let ctx = ctx.clone();
134+
std::thread::spawn(move || {
135+
let sender = ctx.socket(zmq::PUSH).unwrap();
136+
sender.connect(&receiver_endpoint).unwrap();
137+
while !stop.load(Ordering::SeqCst) {
138+
sender.send("bar", 0).expect("send failed");
139+
}
140+
})
141+
};
142+
143+
receiver.set_conflate(true).expect("could not set conflate option");
144+
for _ in 0..100 {
145+
let msg = receiver.recv_bytes(0).unwrap();
146+
assert_eq!(&msg[..], b"bar");
147+
}
148+
stop.store(true, Ordering::SeqCst);
149+
sender_thread.join().expect("could not join sender thread");
150+
});
151+
120152
test!(test_version, {
121153
let (major, _, _) = version();
122154
assert!(major == 3 || major == 4);

0 commit comments

Comments
 (0)