Skip to content

Commit 45df788

Browse files
Use poll_recv_many for receiving messages to publish to the connection
1 parent a1876c4 commit 45df788

2 files changed

Lines changed: 21 additions & 5 deletions

File tree

async-nats/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ regex = "1.9.1"
2323
serde = { version = "1.0.184", features = ["derive"] }
2424
serde_json = "1.0.104"
2525
serde_repr = "0.1.16"
26-
tokio = { version = "1.29.0", features = ["macros", "rt", "fs", "net", "sync", "time", "io-util"] }
26+
tokio = { version = "1.36", features = ["macros", "rt", "fs", "net", "sync", "time", "io-util"] }
2727
url = { version = "2"}
2828
tokio-rustls = "0.25"
2929
rustls-pemfile = "2"

async-nats/src/lib.rs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,7 @@ impl ConnectionHandler {
443443
struct ProcessFut<'a> {
444444
handler: &'a mut ConnectionHandler,
445445
receiver: &'a mut mpsc::Receiver<Command>,
446+
recv_buf: &'a mut Vec<Command>,
446447
}
447448

448449
enum ExitReason {
@@ -451,6 +452,8 @@ impl ConnectionHandler {
451452
}
452453

453454
impl<'a> ProcessFut<'a> {
455+
const RECV_CHUNK_SIZE: usize = 16;
456+
454457
#[cold]
455458
fn ping(&mut self) -> Poll<ExitReason> {
456459
self.handler.pending_pings += 1;
@@ -519,13 +522,24 @@ impl ConnectionHandler {
519522
let mut made_progress = true;
520523
loop {
521524
while !self.handler.connection.is_write_buf_full() {
522-
match self.receiver.poll_recv(cx) {
525+
debug_assert!(self.recv_buf.is_empty());
526+
527+
let Self {
528+
recv_buf,
529+
handler,
530+
receiver,
531+
} = &mut *self;
532+
match receiver.poll_recv_many(cx, recv_buf, Self::RECV_CHUNK_SIZE) {
523533
Poll::Pending => break,
524-
Poll::Ready(Some(cmd)) => {
534+
Poll::Ready(1..) => {
525535
made_progress = true;
526-
self.handler.handle_command(cmd);
536+
537+
for cmd in recv_buf.drain(..) {
538+
handler.handle_command(cmd);
539+
}
527540
}
528-
Poll::Ready(None) => return Poll::Ready(ExitReason::Closed),
541+
// TODO: replace `_` with `0` after bumping MSRV to 1.75
542+
Poll::Ready(_) => return Poll::Ready(ExitReason::Closed),
529543
}
530544
}
531545

@@ -578,10 +592,12 @@ impl ConnectionHandler {
578592
}
579593
}
580594

595+
let mut recv_buf = Vec::with_capacity(ProcessFut::RECV_CHUNK_SIZE);
581596
loop {
582597
let process = ProcessFut {
583598
handler: self,
584599
receiver,
600+
recv_buf: &mut recv_buf,
585601
};
586602
match process.await {
587603
ExitReason::Disconnected(err) => {

0 commit comments

Comments
 (0)