Skip to content

Commit b0af3ad

Browse files
authored
Gracefully handle receiver disconnects (#1410)
* Handle receiver disconnects * Format
1 parent 190ad20 commit b0af3ad

File tree

3 files changed

+39
-33
lines changed

3 files changed

+39
-33
lines changed

mistralrs-bench/src/main.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,9 @@ fn run_bench(
9090

9191
for _ in 0..repetitions {
9292
for _ in 0..concurrency {
93-
sender
94-
.blocking_send(req.clone())
95-
.expect("Expected receiver.");
93+
if sender.blocking_send(req.clone()).is_err() {
94+
eprintln!("Receiver disconnected");
95+
}
9696
}
9797
for _ in 0..concurrency {
9898
match rx.blocking_recv() {
@@ -258,9 +258,9 @@ fn warmup_run(mistralrs: Arc<MistralRs>) {
258258
web_search_options: None,
259259
}));
260260

261-
sender
262-
.blocking_send(req.clone())
263-
.expect("Expected receiver.");
261+
if sender.blocking_send(req.clone()).is_err() {
262+
eprintln!("Receiver disconnected");
263+
}
264264

265265
let _ = rx.blocking_recv();
266266
}

mistralrs-core/src/engine/add_request.rs

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,9 @@ impl Engine {
8585
.response
8686
.send(Response::ValidationError(
8787
"Received messages for a model which does not have a chat template. Either use a different model or pass a single string as the prompt".into(),
88-
)).await.expect("Expected receiver.");
88+
))
89+
.await
90+
.unwrap_or_else(|_| warn!("Receiver disconnected"));
8991
return;
9092
}
9193

@@ -110,7 +112,7 @@ impl Engine {
110112
"Received a request incompatible for this model's category.".into(),
111113
))
112114
.await
113-
.expect("Expected receiver.");
115+
.unwrap_or_else(|_| warn!("Receiver disconnected"));
114116
return;
115117
}
116118
}
@@ -178,7 +180,7 @@ impl Engine {
178180
"Completion requests require the pipeline to have a tokenizer".into(),
179181
))
180182
.await
181-
.expect("Expected receiver.");
183+
.unwrap_or_else(|_| warn!("Receiver disconnected"));
182184
return;
183185
};
184186
let prompt = tokenizer
@@ -201,7 +203,7 @@ impl Engine {
201203
"Completion requests w/ raw tokens require the pipeline to have a tokenizer".into(),
202204
))
203205
.await
204-
.expect("Expected receiver.");
206+
.unwrap_or_else(|_| warn!("Receiver disconnected"));
205207
return;
206208
};
207209
let prompt = tokenizer
@@ -217,7 +219,7 @@ impl Engine {
217219
"Received an empty prompt.".into(),
218220
))
219221
.await
220-
.expect("Expected receiver.");
222+
.unwrap_or_else(|_| warn!("Receiver disconnected"));
221223
return;
222224
}
223225

@@ -227,7 +229,9 @@ impl Engine {
227229
.response
228230
.send(Response::ValidationError(
229231
format!("Prompt sequence length is greater than {}, perhaps consider using `truncate_sequence`?", get_mut_arcmutex!(self.pipeline).get_metadata().max_seq_len).into(),
230-
)).await.expect("Expected receiver.");
232+
))
233+
.await
234+
.unwrap_or_else(|_| warn!("Receiver disconnected"));
231235
return;
232236
} else {
233237
let prompt_len = prompt_tokens.len();
@@ -275,7 +279,8 @@ impl Engine {
275279
.send(Response::ValidationError(
276280
format!("Stop token {:?} is also a prefix of other tokens and cannot be used as a stop token.", tok_trie.token_str(*id)).into(),
277281
))
278-
.await .expect("Expected receiver.");
282+
.await
283+
.unwrap_or_else(|_| warn!("Receiver disconnected"));
279284
return;
280285
}
281286
}
@@ -303,7 +308,7 @@ impl Engine {
303308
.into(),
304309
))
305310
.await
306-
.expect("Expected receiver.");
311+
.unwrap_or_else(|_| warn!("Receiver disconnected"));
307312
return;
308313
};
309314
let encoded = tokenizer.encode_fast(stop_txt.to_string(), true);
@@ -359,7 +364,7 @@ impl Engine {
359364
"Number of choices must be greater than 0.".into(),
360365
))
361366
.await
362-
.expect("Expected receiver.");
367+
.unwrap_or_else(|_| warn!("Receiver disconnected"));
363368
return;
364369
}
365370

@@ -378,7 +383,7 @@ impl Engine {
378383
format!("Invalid grammar. {}", err).into(),
379384
))
380385
.await
381-
.expect("Expected receiver.");
386+
.unwrap_or_else(|_| warn!("Receiver disconnected"));
382387
return;
383388
}
384389
};
@@ -434,7 +439,7 @@ impl Engine {
434439
.into(),
435440
))
436441
.await
437-
.expect("Expected receiver.");
442+
.unwrap_or_else(|_| warn!("Receiver disconnected"));
438443
return;
439444
}
440445
}
@@ -455,7 +460,7 @@ impl Engine {
455460
.into(),
456461
))
457462
.await
458-
.expect("Expected receiver.");
463+
.unwrap_or_else(|_| warn!("Receiver disconnected"));
459464
return;
460465
}
461466
}
@@ -587,7 +592,7 @@ impl Engine {
587592
.response
588593
.send(Err(e))
589594
.await
590-
.expect("Expected receiver.");
595+
.unwrap_or_else(|_| warn!("Receiver disconnected"));
591596
return;
592597
}
593598
};
@@ -609,7 +614,7 @@ impl Engine {
609614
"Pipeline does not include a toksnizer.",
610615
)))
611616
.await
612-
.expect("Expected receiver.");
617+
.unwrap_or_else(|_| warn!("Receiver disconnected"));
613618
return;
614619
}
615620
};
@@ -621,7 +626,7 @@ impl Engine {
621626
.response
622627
.send(Err(anyhow::Error::msg(e)))
623628
.await
624-
.expect("Expected receiver.");
629+
.unwrap_or_else(|_| warn!("Receiver disconnected"));
625630
return;
626631
}
627632
};
@@ -646,7 +651,7 @@ impl Engine {
646651
"Pipeline does not include a toksnizer.",
647652
)))
648653
.await
649-
.expect("Expected receiver.");
654+
.unwrap_or_else(|_| warn!("Receiver disconnected"));
650655
return;
651656
}
652657
};
@@ -658,7 +663,7 @@ impl Engine {
658663
.response
659664
.send(Err(anyhow::Error::msg(e)))
660665
.await
661-
.expect("Expected receiver.");
666+
.unwrap_or_else(|_| warn!("Receiver disconnected"));
662667
return;
663668
}
664669
};

mistralrs-core/src/utils/mod.rs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,9 @@ macro_rules! handle_seq_error {
2929
Ok(v) => v,
3030
Err(e) => {
3131
use $crate::response::Response;
32-
$response
33-
.send(Response::InternalError(e.into()))
34-
.await
35-
.expect("Expected receiver.");
32+
if let Err(_) = $response.send(Response::InternalError(e.into())).await {
33+
tracing::warn!("Receiver disconnected");
34+
}
3635
return;
3736
}
3837
}
@@ -47,10 +46,9 @@ macro_rules! handle_seq_error_ok {
4746
Ok(v) => v,
4847
Err(e) => {
4948
use $crate::response::Response;
50-
$response
51-
.send(Response::InternalError(e.into()))
52-
.await
53-
.expect("Expected receiver.");
49+
if let Err(_) = $response.send(Response::InternalError(e.into())).await {
50+
tracing::warn!("Receiver disconnected");
51+
}
5452
return Ok(());
5553
}
5654
}
@@ -66,10 +64,13 @@ macro_rules! handle_seq_error_stateaware_ok {
6664
Err(e) => {
6765
use $crate::response::Response;
6866
use $crate::sequence::SequenceState;
69-
$seq.responder()
67+
if let Err(_) = $seq
68+
.responder()
7069
.send(Response::InternalError(e.into()))
7170
.await
72-
.expect("Expected receiver.");
71+
{
72+
tracing::warn!("Receiver disconnected");
73+
}
7374
$seq.set_state(SequenceState::Error);
7475
return Ok(());
7576
}

0 commit comments

Comments
 (0)