Skip to content

Commit fbd1c6c

Browse files
jeromegndjc
authored andcommitted
Don't pre-allocate memory for every possible stream per connection
1 parent f659258 commit fbd1c6c

4 files changed

Lines changed: 113 additions & 50 deletions

File tree

quinn-proto/src/connection/streams/mod.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@ use bytes::Bytes;
77
use thiserror::Error;
88
use tracing::trace;
99

10+
use self::state::get_or_insert_recv;
11+
1012
use super::spaces::{Retransmits, ThinRetransmits};
11-
use crate::{frame, Dir, StreamId, VarInt};
13+
use crate::{connection::streams::state::get_or_insert_send, frame, Dir, StreamId, VarInt};
1214

1315
mod recv;
1416
use recv::Recv;
@@ -133,7 +135,7 @@ impl<'a> RecvStream<'a> {
133135
hash_map::Entry::Occupied(s) => s,
134136
hash_map::Entry::Vacant(_) => return Err(UnknownStream { _private: () }),
135137
};
136-
let stream = entry.get_mut();
138+
let stream = get_or_insert_recv(self.state.stream_receive_window)(entry.get_mut());
137139

138140
let (read_credits, stop_sending) = stream.stop()?;
139141
if stop_sending.should_transmit() {
@@ -207,11 +209,16 @@ impl<'a> SendStream<'a> {
207209
}
208210

209211
let limit = self.state.write_limit();
212+
213+
let max_send_data = self.state.initial_max_send_data(self.id);
214+
210215
let stream = self
211216
.state
212217
.send
213218
.get_mut(&self.id)
219+
.map(get_or_insert_send(max_send_data))
214220
.ok_or(WriteError::UnknownStream)?;
221+
215222
if limit == 0 {
216223
trace!(
217224
stream = %self.id, max_data = self.state.max_data, data_sent = self.state.data_sent,
@@ -237,8 +244,9 @@ impl<'a> SendStream<'a> {
237244

238245
/// Check if this stream was stopped, get the reason if it was
239246
pub fn stopped(&mut self) -> Result<Option<VarInt>, UnknownStream> {
240-
match self.state.send.get(&self.id) {
241-
Some(s) => Ok(s.stop_reason),
247+
match self.state.send.get(&self.id).as_ref() {
248+
Some(Some(s)) => Ok(s.stop_reason),
249+
Some(None) => Ok(None),
242250
None => Err(UnknownStream { _private: () }),
243251
}
244252
}
@@ -249,10 +257,12 @@ impl<'a> SendStream<'a> {
249257
///
250258
/// [`StreamEvent::Finished`]: crate::StreamEvent::Finished
251259
pub fn finish(&mut self) -> Result<(), FinishError> {
260+
let max_send_data = self.state.initial_max_send_data(self.id);
252261
let stream = self
253262
.state
254263
.send
255264
.get_mut(&self.id)
265+
.map(get_or_insert_send(max_send_data))
256266
.ok_or(FinishError::UnknownStream)?;
257267

258268
let was_pending = stream.is_pending();
@@ -269,10 +279,12 @@ impl<'a> SendStream<'a> {
269279
/// # Panics
270280
/// - when applied to a receive stream
271281
pub fn reset(&mut self, error_code: VarInt) -> Result<(), UnknownStream> {
282+
let max_send_data = self.state.initial_max_send_data(self.id);
272283
let stream = self
273284
.state
274285
.send
275286
.get_mut(&self.id)
287+
.map(get_or_insert_send(max_send_data))
276288
.ok_or(UnknownStream { _private: () })?;
277289

278290
if matches!(stream.state, SendState::ResetSent) {
@@ -296,10 +308,12 @@ impl<'a> SendStream<'a> {
296308
/// # Panics
297309
/// - when applied to a receive stream
298310
pub fn set_priority(&mut self, priority: i32) -> Result<(), UnknownStream> {
311+
let max_send_data = self.state.initial_max_send_data(self.id);
299312
let stream = self
300313
.state
301314
.send
302315
.get_mut(&self.id)
316+
.map(get_or_insert_send(max_send_data))
303317
.ok_or(UnknownStream { _private: () })?;
304318

305319
stream.priority = priority;
@@ -317,7 +331,7 @@ impl<'a> SendStream<'a> {
317331
.get(&self.id)
318332
.ok_or(UnknownStream { _private: () })?;
319333

320-
Ok(stream.priority)
334+
Ok(stream.as_ref().map(|s| s.priority).unwrap_or_default())
321335
}
322336
}
323337

quinn-proto/src/connection/streams/recv.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use std::mem;
44
use thiserror::Error;
55
use tracing::debug;
66

7+
use super::state::get_or_insert_recv;
78
use super::{Retransmits, ShouldTransmit, StreamHalf, StreamId, StreamsState, UnknownStream};
89
use crate::connection::assembler::{Assembler, Chunk, IllegalOrderedRead};
910
use crate::{frame, TransportError, VarInt};
@@ -18,14 +19,14 @@ pub(super) struct Recv {
1819
}
1920

2021
impl Recv {
21-
pub(super) fn new(initial_max_data: u64) -> Self {
22-
Self {
22+
pub(super) fn new(initial_max_data: u64) -> Box<Self> {
23+
Box::new(Self {
2324
state: RecvState::default(),
2425
assembler: Assembler::new(),
2526
sent_max_stream_data: initial_max_data,
2627
end: 0,
2728
stopped: false,
28-
}
29+
})
2930
}
3031

3132
/// Process a STREAM frame
@@ -215,15 +216,16 @@ impl<'a> Chunks<'a> {
215216
streams: &'a mut StreamsState,
216217
pending: &'a mut Retransmits,
217218
) -> Result<Self, ReadableError> {
218-
let entry = match streams.recv.entry(id) {
219+
let mut entry = match streams.recv.entry(id) {
219220
Entry::Occupied(entry) => entry,
220221
Entry::Vacant(_) => return Err(ReadableError::UnknownStream),
221222
};
222223

223-
let mut recv = match entry.get().stopped {
224-
true => return Err(ReadableError::UnknownStream),
225-
false => entry.remove(),
226-
};
224+
let mut recv =
225+
match get_or_insert_recv(streams.stream_receive_window)(entry.get_mut()).stopped {
226+
true => return Err(ReadableError::UnknownStream),
227+
false => entry.remove().unwrap(), // this can't fail due to the previous get_or_insert_with
228+
};
227229

228230
recv.assembler.ensure_ordering(ordered)?;
229231
Ok(Self {
@@ -313,7 +315,7 @@ impl<'a> Chunks<'a> {
313315
self.pending.max_stream_data.insert(self.id);
314316
}
315317
// Return the stream to storage for future use
316-
self.streams.recv.insert(self.id, rs);
318+
self.streams.recv.insert(self.id, Some(rs));
317319
}
318320

319321
// Issue connection-level flow control credit for any data we read regardless of state
@@ -331,7 +333,7 @@ impl<'a> Drop for Chunks<'a> {
331333
}
332334

333335
enum ChunksState {
334-
Readable(Recv),
336+
Readable(Box<Recv>),
335337
Reset(VarInt),
336338
Finished,
337339
Finalized,

quinn-proto/src/connection/streams/send.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,16 @@ pub(super) struct Send {
1818
}
1919

2020
impl Send {
21-
pub(super) fn new(max_data: VarInt) -> Self {
22-
Self {
21+
pub(super) fn new(max_data: VarInt) -> Box<Self> {
22+
Box::new(Self {
2323
max_data: max_data.into(),
2424
state: SendState::Ready,
2525
pending: SendBuffer::new(),
2626
priority: 0,
2727
fin_pending: false,
2828
connection_blocked: false,
2929
stop_reason: None,
30-
}
30+
})
3131
}
3232

3333
/// Whether the stream has been reset

0 commit comments

Comments
 (0)