Skip to content

Commit ca1dc4f

Browse files
committed
Don't kill SSE stream if channel fills up (#4500)
## Issue Addressed Closes #4245 ## Proposed Changes - If an SSE channel fills up, send a comment instead of terminating the stream. - Add a CLI flag for scaling up the SSE buffer: `--http-sse-capacity-multiplier N`. ## Additional Info ~~Blocked on #4462. I haven't rebased on that PR yet for initial testing, because it still needs some more work to handle long-running HTTP threads.~~ - [x] Add CLI flag tests.
1 parent 59c24bc commit ca1dc4f

7 files changed

Lines changed: 67 additions & 21 deletions

File tree

beacon_node/beacon_chain/src/events.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,11 @@ pub struct ServerSentEventHandler<T: EthSpec> {
2121
}
2222

2323
impl<T: EthSpec> ServerSentEventHandler<T> {
24-
pub fn new(log: Logger) -> Self {
25-
Self::new_with_capacity(log, DEFAULT_CHANNEL_CAPACITY)
24+
pub fn new(log: Logger, capacity_multiplier: usize) -> Self {
25+
Self::new_with_capacity(
26+
log,
27+
capacity_multiplier.saturating_mul(DEFAULT_CHANNEL_CAPACITY),
28+
)
2629
}
2730

2831
pub fn new_with_capacity(log: Logger, capacity: usize) -> Self {

beacon_node/client/src/builder.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,10 @@ where
157157
let context = runtime_context.service_context("beacon".into());
158158
let spec = chain_spec.ok_or("beacon_chain_start_method requires a chain spec")?;
159159
let event_handler = if self.http_api_config.enabled {
160-
Some(ServerSentEventHandler::new(context.log().clone()))
160+
Some(ServerSentEventHandler::new(
161+
context.log().clone(),
162+
self.http_api_config.sse_capacity_multiplier,
163+
))
161164
} else {
162165
None
163166
};

beacon_node/http_api/src/lib.rs

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,10 @@ use tokio::sync::{
6565
mpsc::{Sender, UnboundedSender},
6666
oneshot,
6767
};
68-
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
68+
use tokio_stream::{
69+
wrappers::{errors::BroadcastStreamRecvError, BroadcastStream},
70+
StreamExt,
71+
};
6972
use types::{
7073
Attestation, AttestationData, AttestationShufflingId, AttesterSlashing, BeaconStateError,
7174
BlindedPayload, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, FullPayload,
@@ -132,6 +135,7 @@ pub struct Config {
132135
pub allow_sync_stalled: bool,
133136
pub spec_fork_name: Option<ForkName>,
134137
pub data_dir: PathBuf,
138+
pub sse_capacity_multiplier: usize,
135139
pub enable_beacon_processor: bool,
136140
}
137141

@@ -146,6 +150,7 @@ impl Default for Config {
146150
allow_sync_stalled: false,
147151
spec_fork_name: None,
148152
data_dir: PathBuf::from(DEFAULT_ROOT_DIR),
153+
sse_capacity_multiplier: 1,
149154
enable_beacon_processor: true,
150155
}
151156
}
@@ -4348,22 +4353,29 @@ pub fn serve<T: BeaconChainTypes>(
43484353
}
43494354
};
43504355

4351-
receivers.push(BroadcastStream::new(receiver).map(|msg| {
4352-
match msg {
4353-
Ok(data) => Event::default()
4354-
.event(data.topic_name())
4355-
.json_data(data)
4356-
.map_err(|e| {
4357-
warp_utils::reject::server_sent_event_error(format!(
4358-
"{:?}",
4359-
e
4360-
))
4361-
}),
4362-
Err(e) => Err(warp_utils::reject::server_sent_event_error(
4363-
format!("{:?}", e),
4364-
)),
4365-
}
4366-
}));
4356+
receivers.push(
4357+
BroadcastStream::new(receiver)
4358+
.map(|msg| {
4359+
match msg {
4360+
Ok(data) => Event::default()
4361+
.event(data.topic_name())
4362+
.json_data(data)
4363+
.unwrap_or_else(|e| {
4364+
Event::default()
4365+
.comment(format!("error - bad json: {e:?}"))
4366+
}),
4367+
// Do not terminate the stream if the channel fills
4368+
// up. Just drop some messages and send a comment to
4369+
// the client.
4370+
Err(BroadcastStreamRecvError::Lagged(n)) => {
4371+
Event::default().comment(format!(
4372+
"error - dropped {n} messages"
4373+
))
4374+
}
4375+
}
4376+
})
4377+
.map(Ok::<_, std::convert::Infallible>),
4378+
);
43674379
}
43684380
} else {
43694381
return Err(warp_utils::reject::custom_server_error(
@@ -4373,7 +4385,7 @@ pub fn serve<T: BeaconChainTypes>(
43734385

43744386
let s = futures::stream::select_all(receivers);
43754387

4376-
Ok::<_, warp::Rejection>(warp::sse::reply(warp::sse::keep_alive().stream(s)))
4388+
Ok(warp::sse::reply(warp::sse::keep_alive().stream(s)))
43774389
})
43784390
},
43794391
);

beacon_node/http_api/src/test_utils.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ pub async fn create_api_server_on_port<T: BeaconChainTypes>(
225225
allow_sync_stalled: false,
226226
data_dir: std::path::PathBuf::from(DEFAULT_ROOT_DIR),
227227
spec_fork_name: None,
228+
sse_capacity_multiplier: 1,
228229
enable_beacon_processor: true,
229230
},
230231
chain: Some(chain),

beacon_node/src/cli.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,15 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
382382
stalled. This is useful for very small testnets. TESTING ONLY. DO NOT USE ON \
383383
MAINNET.")
384384
)
385+
.arg(
386+
Arg::with_name("http-sse-capacity-multiplier")
387+
.long("http-sse-capacity-multiplier")
388+
.takes_value(true)
389+
.default_value("1")
390+
.value_name("N")
391+
.help("Multiplier to apply to the length of HTTP server-sent-event (SSE) channels. \
392+
Increasing this value can prevent messages from being dropped.")
393+
)
385394
.arg(
386395
Arg::with_name("http-enable-beacon-processor")
387396
.long("http-enable-beacon-processor")

beacon_node/src/config.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,9 @@ pub fn get_config<E: EthSpec>(
149149
client_config.http_api.allow_sync_stalled = true;
150150
}
151151

152+
client_config.http_api.sse_capacity_multiplier =
153+
parse_required(cli_args, "http-sse-capacity-multiplier")?;
154+
152155
client_config.http_api.enable_beacon_processor =
153156
parse_required(cli_args, "http-enable-beacon-processor")?;
154157

lighthouse/tests/beacon_node.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2349,3 +2349,18 @@ fn beacon_processor_zero_workers() {
23492349
.flag("beacon-processor-max-workers", Some("0"))
23502350
.run_with_zero_port();
23512351
}
2352+
2353+
#[test]
2354+
fn http_sse_capacity_multiplier_default() {
2355+
CommandLineTest::new()
2356+
.run_with_zero_port()
2357+
.with_config(|config| assert_eq!(config.http_api.sse_capacity_multiplier, 1));
2358+
}
2359+
2360+
#[test]
2361+
fn http_sse_capacity_multiplier_override() {
2362+
CommandLineTest::new()
2363+
.flag("http-sse-capacity-multiplier", Some("10"))
2364+
.run_with_zero_port()
2365+
.with_config(|config| assert_eq!(config.http_api.sse_capacity_multiplier, 10));
2366+
}

0 commit comments

Comments
 (0)