Skip to content
Open
18 changes: 13 additions & 5 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ async fn setup_connection(
let params = get_connection_stake(&new_connection, &staked_nodes).map_or(
NewConnectionHandlerParams::new_unstaked(
packet_sender.clone(),
quic_server_params.max_connections_per_peer,
quic_server_params.max_unstaked_connections_per_ipaddr,
stats.clone(),
quic_server_params.wait_for_chunk_timeout,
quic_server_params.max_unstaked_connections,
Expand All @@ -774,18 +774,25 @@ async fn setup_connection(
* STREAM_THROTTLING_INTERVAL_MS)
as f64;
let stake_ratio = stake as f64 / total_stake as f64;
let peer_type = if stake_ratio < min_stake_ratio {
let (peer_type, max_connections_per_peer) = if stake_ratio < min_stake_ratio
{
// If it is a staked connection with ultra low stake ratio, treat it as unstaked.
ConnectionPeerType::Unstaked
(
ConnectionPeerType::Unstaked,
quic_server_params.max_unstaked_connections_per_ipaddr,
)
} else {
ConnectionPeerType::Staked(stake)
(
ConnectionPeerType::Staked(stake),
quic_server_params.max_connections_per_peer,
)
};
NewConnectionHandlerParams {
packet_sender,
remote_pubkey: Some(pubkey),
peer_type,
total_stake,
max_connections_per_peer: quic_server_params.max_connections_per_peer,
max_connections_per_peer,
stats: stats.clone(),
max_stake,
min_stake,
Expand Down Expand Up @@ -2091,6 +2098,7 @@ pub mod test {
staked_nodes,
QuicServerParams {
max_connections_per_peer: 2,
max_unstaked_connections_per_ipaddr: 2,
..QuicServerParams::default_for_tests()
},
cancel.clone(),
Expand Down
38 changes: 19 additions & 19 deletions streamer/src/nonblocking/stream_throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use {
},
};

const MAX_UNSTAKED_STREAMS_PERCENT: u64 = 20;
const MAX_UNSTAKED_STREAMS_PERCENT: u64 = 50;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We really should not be changing this here. This is completely unrelated change and will not have the impact you intend.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using the connection count to distribute the streams/s to different connections out of the total streams/s allowed. With increased connection count quadrupled, the per connection TPS for unstaked, can be reduced to 1/4. The increased percentage reduces that effect.

Our allocation of staked vs. unstaked does not reflect the real usage in MB:

Screenshot 2025-09-23 at 11 25 40 AM

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it does, but it also cuts massively into the staked TPU bandwidth allocations, reducing them. Not sure that is desirable.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Screenshot 2025-09-23 at 6 32 55 PM

The staked is not using the allocated bandwidth. See the max. Even 50% --> 250K/s should be more than enough for them to use realistically.

pub const STREAM_THROTTLING_INTERVAL_MS: u64 = 100;
pub const STREAM_THROTTLING_INTERVAL: Duration =
Duration::from_millis(STREAM_THROTTLING_INTERVAL_MS);
Expand Down Expand Up @@ -240,13 +240,13 @@ pub mod test {
DEFAULT_MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
));
// 50K packets per ms * 20% / 500 max unstaked connections
// 500 packets per ms * 50% * 100 (ms) / 2000 max unstaked connections
assert_eq!(
load_ema.available_load_capacity_in_throttling_duration(
ConnectionPeerType::Unstaked,
10000,
),
20
12
);
}

Expand All @@ -260,73 +260,73 @@ pub mod test {

// EMA load is used for staked connections to calculate max number of allowed streams.
// EMA window = 5ms interval * 10 intervals = 50ms
// max streams per window = 500K streams/sec * 80% = 400K/sec = 20K per 50ms
// max_streams in 50ms = ((20K * 20K) / ema_load) * stake / total_stake
// max streams per window = 500K streams/sec * 50% = 250K/sec = 12.5K per 50ms
// max_streams in 50ms = ((12.5K * 12.5K) / ema_load) * stake / total_stake
//
// Stream throttling window is 100ms. So it'll double the amount of max streams.
// max_streams in 100ms (throttling window) = 2 * ((20K * 20K) / ema_load) * stake / total_stake
// max_streams in 100ms (throttling window) = 2 * ((12.5K * 12.5K) / ema_load) * stake / total_stake

load_ema.current_load_ema.store(20000, Ordering::Relaxed);
// ema_load = 20K, stake = 15, total_stake = 10K
// max_streams in 100ms (throttling window) = 2 * ((20K * 20K) / 20K) * 15 / 10K = 60
// ema_load = 12.5K, stake = 15, total_stake = 10K
// max_streams in 100ms (throttling window) = 2 * ((12.5K * 12.5K) / 12.5K) * 15 / 10K = 22
assert_eq!(
load_ema.available_load_capacity_in_throttling_duration(
ConnectionPeerType::Staked(15),
10000,
),
60
22
);

// ema_load = 20K, stake = 1K, total_stake = 10K
// max_streams in 100ms (throttling window) = 2 * ((20K * 20K) / 20K) * 1K / 10K = 4K
// max_streams in 100ms (throttling window) = 2 * ((12.5K * 12.5K) / 20K) * 1K / 10K = 4K
assert_eq!(
load_ema.available_load_capacity_in_throttling_duration(
ConnectionPeerType::Staked(1000),
10000,
),
4000
1562
);

load_ema.current_load_ema.store(5000, Ordering::Relaxed);
// ema_load = 5K, stake = 15, total_stake = 10K
// max_streams in 100ms (throttling window) = 2 * ((20K * 20K) / 5K) * 15 / 10K = 240
// max_streams in 100ms (throttling window) = 2 * ((12.5K * 12.5K) / 5K) * 15 / 10K = 92
assert_eq!(
load_ema.available_load_capacity_in_throttling_duration(
ConnectionPeerType::Staked(15),
10000,
),
240
92
);

// ema_load = 5K, stake = 1K, total_stake = 10K
// max_streams in 100ms (throttling window) = 2 * ((20K * 20K) / 5K) * 1K / 10K = 16000
// max_streams in 100ms (throttling window) = 2 * ((12.5K * 12.5K) / 5K) * 1K / 10K = 6250
assert_eq!(
load_ema.available_load_capacity_in_throttling_duration(
ConnectionPeerType::Staked(1000),
10000,
),
16000
6250
);

// At 4000, the load is less than 25% of max_load (20K).
// Test that we cap it to 25%, yielding the same result as if load was 5000.
load_ema.current_load_ema.store(4000, Ordering::Relaxed);
// function = ((20K * 20K) / 25% of 20K) * stake / total_stake
// function = ((12.5K * 12.5K) / 25% of 20K) * stake / total_stake
assert_eq!(
load_ema.available_load_capacity_in_throttling_duration(
ConnectionPeerType::Staked(15),
10000,
),
240
116
);

// function = ((20K * 20K) / 25% of 20K) * stake / total_stake
// function = ((12.5K * 12.5K) / 25% of 20K) * stake / total_stake
assert_eq!(
load_ema.available_load_capacity_in_throttling_duration(
ConnectionPeerType::Staked(1000),
10000,
),
16000
7812
);

// At 1/40000 stake weight, and minimum load, it should still allow
Expand Down
10 changes: 9 additions & 1 deletion streamer/src/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ pub const DEFAULT_MAX_QUIC_CONNECTIONS_PER_PEER: usize = 8;

pub const DEFAULT_MAX_STAKED_CONNECTIONS: usize = 2000;

pub const DEFAULT_MAX_UNSTAKED_CONNECTIONS: usize = 500;
/// The maximum number of connections that can be opened for unstaked peers.
pub const DEFAULT_MAX_UNSTAKED_CONNECTIONS: usize = 2000;

/// The maximum number of connections that can be opened for unstaked peers.
pub const DEFAULT_MAX_UNSTAKED_CONNECTIONS_PER_IPADDR: usize = 4;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this change be in a separate PR?


/// Limit to 500K PPS
pub const DEFAULT_MAX_STREAMS_PER_MS: u64 = 500;
Expand Down Expand Up @@ -622,6 +626,7 @@ pub fn spawn_server_multi(
#[derive(Clone)]
pub struct QuicServerParams {
pub max_connections_per_peer: usize,
pub max_unstaked_connections_per_ipaddr: usize,
pub max_staked_connections: usize,
pub max_unstaked_connections: usize,
pub max_streams_per_ms: u64,
Expand All @@ -637,6 +642,7 @@ impl Default for QuicServerParams {
QuicServerParams {
max_connections_per_peer: 1,
max_staked_connections: DEFAULT_MAX_STAKED_CONNECTIONS,
max_unstaked_connections_per_ipaddr: DEFAULT_MAX_UNSTAKED_CONNECTIONS_PER_IPADDR,
max_unstaked_connections: DEFAULT_MAX_UNSTAKED_CONNECTIONS,
max_streams_per_ms: DEFAULT_MAX_STREAMS_PER_MS,
max_connections_per_ipaddr_per_min: DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
Expand All @@ -658,6 +664,7 @@ impl QuicServerParams {
Self {
coalesce_channel_size: 100_000,
num_threads: Self::DEFAULT_NUM_SERVER_THREADS_FOR_TEST,
max_unstaked_connections_per_ipaddr: 1,
..Self::default()
}
}
Expand Down Expand Up @@ -841,6 +848,7 @@ mod test {
staked_nodes,
QuicServerParams {
max_connections_per_peer: 2,
max_unstaked_connections_per_ipaddr: 2,
..QuicServerParams::default_for_tests()
},
cancel.clone(),
Expand Down
Loading