Skip to content

Commit 46b74a1

Browse files
committed
fix(pubsub): cool single-peer recovery sends
1 parent 2510d04 commit 46b74a1

3 files changed

Lines changed: 99 additions & 6 deletions

File tree

CHANGELOG.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,15 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## [0.5.26] - 2026-05-03
9+
10+
### Fixed
11+
12+
- PubSub slow-peer cooling now also applies to single-peer recovery/control sends
13+
(IWANT, anti-entropy, and EAGER replies to IWANT). These paths previously used
14+
the bounded send timeout but did not update the per-topic cooling state, so a
15+
cooled peer could keep consuming 750 ms send slots outside normal EAGER fanout.
16+
817
## [0.5.25] - 2026-05-03
918

1019
### Added

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ members = [
1818
resolver = "2"
1919

2020
[workspace.package]
21-
version = "0.5.25"
21+
version = "0.5.26"
2222
edition = "2021"
2323
authors = ["David Irvine <david@saorsalabs.com>"]
2424
license = "MIT OR Apache-2.0"

crates/pubsub/src/lib.rs

Lines changed: 89 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1149,11 +1149,20 @@ impl<T: GossipTransport + 'static> PlumtreePubSub<T> {
11491149

11501150
async fn send_to_peer_bounded(
11511151
&self,
1152+
topic: TopicId,
11521153
peer: PeerId,
11531154
stream_type: GossipStreamType,
11541155
bytes: Bytes,
11551156
op: &'static str,
11561157
) -> Result<()> {
1158+
if self
1159+
.filter_suppressed_topic_peers(topic, vec![peer], op)
1160+
.await
1161+
.is_empty()
1162+
{
1163+
return Ok(());
1164+
}
1165+
11571166
match Self::send_to_peer_with_timeout(
11581167
Arc::clone(&self.transport),
11591168
Arc::clone(&self.stage_stats),
@@ -1164,7 +1173,16 @@ impl<T: GossipTransport + 'static> PlumtreePubSub<T> {
11641173
)
11651174
.await?
11661175
{
1167-
PeerSendOutcome::Sent | PeerSendOutcome::TimedOut => Ok(()),
1176+
PeerSendOutcome::Sent => {
1177+
self.record_topic_send_results(topic, vec![peer], Vec::new())
1178+
.await;
1179+
Ok(())
1180+
}
1181+
PeerSendOutcome::TimedOut => {
1182+
self.record_topic_send_results(topic, Vec::new(), vec![peer])
1183+
.await;
1184+
Ok(())
1185+
}
11681186
}
11691187
}
11701188

@@ -1692,7 +1710,7 @@ impl<T: GossipTransport + 'static> PlumtreePubSub<T> {
16921710
}
16931711
};
16941712
let send_result = self
1695-
.send_to_peer_bounded(from, GossipStreamType::PubSub, bytes.into(), "IWANT")
1713+
.send_to_peer_bounded(topic, from, GossipStreamType::PubSub, bytes.into(), "IWANT")
16961714
.await;
16971715
self.record_stage(PubSubStage::Republish, republish_started);
16981716
send_result?;
@@ -1752,7 +1770,7 @@ impl<T: GossipTransport + 'static> PlumtreePubSub<T> {
17521770
}
17531771
};
17541772
let send_result = self
1755-
.send_to_peer_bounded(from, GossipStreamType::PubSub, bytes.into(), "EAGER")
1773+
.send_to_peer_bounded(topic, from, GossipStreamType::PubSub, bytes.into(), "EAGER")
17561774
.await;
17571775
if let Err(e) = send_result {
17581776
self.record_stage(PubSubStage::Republish, republish_started);
@@ -1849,6 +1867,7 @@ impl<T: GossipTransport + 'static> PlumtreePubSub<T> {
18491867
if let Ok(bytes) = postcard::to_stdvec(&eager_msg) {
18501868
let _ = self
18511869
.send_to_peer_bounded(
1870+
topic,
18521871
from,
18531872
GossipStreamType::PubSub,
18541873
bytes.into(),
@@ -1887,6 +1906,7 @@ impl<T: GossipTransport + 'static> PlumtreePubSub<T> {
18871906
if let Ok(bytes) = postcard::to_stdvec(&iwant_msg) {
18881907
let _ = self
18891908
.send_to_peer_bounded(
1909+
topic,
18901910
from,
18911911
GossipStreamType::PubSub,
18921912
bytes.into(),
@@ -1960,6 +1980,7 @@ impl<T: GossipTransport + 'static> PlumtreePubSub<T> {
19601980
if let Ok(bytes) = postcard::to_stdvec(&iwant_msg) {
19611981
let _ = self
19621982
.send_to_peer_bounded(
1983+
topic,
19631984
from,
19641985
GossipStreamType::PubSub,
19651986
bytes.into(),
@@ -2015,8 +2036,14 @@ impl<T: GossipTransport + 'static> PlumtreePubSub<T> {
20152036

20162037
let bytes =
20172038
postcard::to_stdvec(&message).map_err(|e| anyhow!("Serialization failed: {}", e))?;
2018-
self.send_to_peer_bounded(peer, GossipStreamType::PubSub, bytes.into(), "ANTI_ENTROPY")
2019-
.await?;
2039+
self.send_to_peer_bounded(
2040+
topic,
2041+
peer,
2042+
GossipStreamType::PubSub,
2043+
bytes.into(),
2044+
"ANTI_ENTROPY",
2045+
)
2046+
.await?;
20202047

20212048
debug!(
20222049
peer_id = %peer,
@@ -3207,6 +3234,63 @@ mod tests {
32073234
);
32083235
}
32093236

3237+
#[tokio::test]
3238+
async fn test_single_peer_bounded_sends_record_cooling_and_skip_suppressed_peer() {
3239+
let peer_id = test_peer_id(1);
3240+
let (transport, mut started_rx) = BlockingTransport::new(peer_id);
3241+
let pubsub = PlumtreePubSub::new_with_task_control(
3242+
peer_id,
3243+
Arc::clone(&transport),
3244+
test_signing_key(),
3245+
false,
3246+
);
3247+
let topic = TopicId::new([46u8; 32]);
3248+
let slow_peer = test_peer_id(2);
3249+
pubsub.initialize_topic_peers(topic, vec![slow_peer]).await;
3250+
3251+
for _ in 0..PEER_TIMEOUT_THRESHOLD {
3252+
tokio::time::timeout(
3253+
Duration::from_secs(2),
3254+
pubsub.send_to_peer_bounded(
3255+
topic,
3256+
slow_peer,
3257+
GossipStreamType::PubSub,
3258+
Bytes::from_static(b"bounded-single-peer"),
3259+
"EAGER",
3260+
),
3261+
)
3262+
.await
3263+
.expect("bounded send should return after per-peer timeout")
3264+
.expect("timeout outcome is recorded, not returned as an error");
3265+
3266+
tokio::time::timeout(Duration::from_millis(100), started_rx.recv())
3267+
.await
3268+
.expect("send should have started")
3269+
.expect("started channel should stay open");
3270+
}
3271+
3272+
let suppressed = pubsub.stage_stats().suppressed_peers;
3273+
assert_eq!(suppressed.len(), 1, "single-peer sends should cool peers");
3274+
assert_eq!(suppressed[0].peer_id, slow_peer.to_string());
3275+
3276+
let attempts_after_cooling = transport.send_count();
3277+
pubsub
3278+
.send_to_peer_bounded(
3279+
topic,
3280+
slow_peer,
3281+
GossipStreamType::PubSub,
3282+
Bytes::from_static(b"should-be-skipped"),
3283+
"EAGER",
3284+
)
3285+
.await
3286+
.expect("suppressed peer should be skipped cleanly");
3287+
assert_eq!(
3288+
transport.send_count(),
3289+
attempts_after_cooling,
3290+
"suppressed peer should not consume another send slot"
3291+
);
3292+
}
3293+
32103294
#[test]
32113295
fn test_peer_cooling_recovers_after_cooldown_without_restart() {
32123296
let mut state = TopicState::new();

0 commit comments

Comments
 (0)