Skip to content

Commit 88f65bd

Browse files
Ralithdjc
authored andcommitted
Allow adjustment of per-connection concurrent stream limits
1 parent 214b34a commit 88f65bd

3 files changed

Lines changed: 276 additions & 9 deletions

File tree

quinn-proto/src/connection/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1095,6 +1095,14 @@ impl Connection {
10951095
self.path.congestion.as_ref()
10961096
}
10971097

1098+
/// Modify the number of remotely initiated streams that may be concurrently open
1099+
///
1100+
/// No streams may be opened by the peer unless fewer than `count` are already open. Large
1101+
/// `count`s increase both minimum and worst-case memory consumption.
1102+
pub fn set_max_concurrent_streams(&mut self, dir: Dir, count: VarInt) {
1103+
self.streams.set_max_concurrent(dir, count);
1104+
}
1105+
10981106
fn on_ack_received(
10991107
&mut self,
11001108
now: Instant,

quinn-proto/src/connection/streams/state.rs

Lines changed: 246 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,20 @@ pub struct StreamsState {
2626
pub(super) send: FxHashMap<StreamId, Send>,
2727
pub(super) recv: FxHashMap<StreamId, Recv>,
2828
pub(super) next: [u64; 2],
29-
// Locally initiated
29+
/// Maximum number of locally-initiated streams that may be opened over the lifetime of the
30+
/// connection so far, per direction
3031
pub(super) max: [u64; 2],
31-
// Maximum that can be remotely initiated
32+
/// Maximum number of remotely-initiated streams that may be opened over the lifetime of the
33+
/// connection so far, per direction
3234
max_remote: [u64; 2],
33-
// Lowest that hasn't actually been opened
35+
/// Number of streams that we've given the peer permission to open
36+
allocated_remote_count: [u64; 2],
37+
/// Size of the desired stream flow control window. May be smaller than `allocated_remote_count`
38+
/// due to `set_max_concurrent` calls.
39+
max_concurrent_remote_count: [u64; 2],
40+
/// Whether `max_concurrent_remote_count` has ever changed
41+
flow_control_adjusted: bool,
42+
/// Lowest remotely-initiated stream index that haven't actually been opened by the peer
3443
pub(super) next_remote: [u64; 2],
3544
/// Whether the remote endpoint has opened any streams the application doesn't know about yet,
3645
/// per directionality
@@ -94,6 +103,9 @@ impl StreamsState {
94103
next: [0, 0],
95104
max: [0, 0],
96105
max_remote: [max_remote_bi.into(), max_remote_uni.into()],
106+
allocated_remote_count: [max_remote_bi.into(), max_remote_uni.into()],
107+
max_concurrent_remote_count: [max_remote_bi.into(), max_remote_uni.into()],
108+
flow_control_adjusted: false,
97109
next_remote: [0, 0],
98110
opened: [false, false],
99111
next_reported_remote: [0, 0],
@@ -139,11 +151,18 @@ impl StreamsState {
139151
}
140152
}
141153

142-
fn alloc_remote_stream(&mut self, dir: Dir) {
143-
self.max_remote[dir as usize] += 1;
144-
let id = StreamId::new(!self.side, dir, self.max_remote[dir as usize] - 1);
145-
self.insert(true, id);
146-
self.max_streams_dirty[dir as usize] = true;
154+
/// Ensure we have space for at least a full flow control window of remotely-initiated streams
155+
/// to be open, and notify the peer if the window has moved
156+
fn ensure_remote_streams(&mut self, dir: Dir) {
157+
let new_count = self.max_concurrent_remote_count[dir as usize]
158+
.saturating_sub(self.allocated_remote_count[dir as usize]);
159+
for i in 0..new_count {
160+
let id = StreamId::new(!self.side, dir, self.max_remote[dir as usize] + i);
161+
self.insert(true, id);
162+
}
163+
self.allocated_remote_count[dir as usize] += new_count;
164+
self.max_remote[dir as usize] += new_count;
165+
self.max_streams_dirty[dir as usize] = new_count != 0;
147166
}
148167

149168
pub fn zero_rtt_rejected(&mut self) {
@@ -159,7 +178,13 @@ impl StreamsState {
159178
}
160179
}
161180
self.next[dir as usize] = 0;
181+
182+
// If 0-RTT was rejected, any flow control frames we sent were lost.
183+
if self.flow_control_adjusted {
184+
self.max_streams_dirty[dir as usize] = true;
185+
}
162186
}
187+
163188
self.pending.clear();
164189
self.send_streams = 0;
165190
self.data_sent = 0;
@@ -728,6 +753,12 @@ impl StreamsState {
728753
id.index() >= self.next[id.dir() as usize]
729754
}
730755

756+
pub fn set_max_concurrent(&mut self, dir: Dir, count: VarInt) {
757+
self.flow_control_adjusted = true;
758+
self.max_concurrent_remote_count[dir as usize] = count.into();
759+
self.ensure_remote_streams(dir);
760+
}
761+
731762
pub(super) fn insert(&mut self, remote: bool, id: StreamId) {
732763
let bi = id.dir() == Dir::Bi;
733764
if bi || !remote {
@@ -782,7 +813,8 @@ impl StreamsState {
782813
StreamHalf::Recv => !self.send.contains_key(&id),
783814
};
784815
if fully_free {
785-
self.alloc_remote_stream(id.dir());
816+
self.allocated_remote_count[id.dir() as usize] -= 1;
817+
self.ensure_remote_streams(id.dir());
786818
}
787819
}
788820
if half == StreamHalf::Send {
@@ -1316,4 +1348,209 @@ mod tests {
13161348
assert_eq!(pending.reset_stream, &[(id, 0u32.into())]);
13171349
assert!(!server.can_send_stream_data());
13181350
}
1351+
1352+
#[test]
1353+
fn stream_limit_fixed() {
1354+
let mut client = make(Side::Client);
1355+
// Open streams 0-127
1356+
assert_eq!(
1357+
client.received(
1358+
frame::Stream {
1359+
id: StreamId::new(Side::Server, Dir::Uni, 127),
1360+
offset: 0,
1361+
fin: true,
1362+
data: Bytes::from_static(&[]),
1363+
},
1364+
0
1365+
),
1366+
Ok(ShouldTransmit(false))
1367+
);
1368+
// Try to open stream 128, exceeding limit
1369+
assert_eq!(
1370+
client
1371+
.received(
1372+
frame::Stream {
1373+
id: StreamId::new(Side::Server, Dir::Uni, 128),
1374+
offset: 0,
1375+
fin: true,
1376+
data: Bytes::from_static(&[]),
1377+
},
1378+
0
1379+
)
1380+
.unwrap_err()
1381+
.code,
1382+
TransportErrorCode::STREAM_LIMIT_ERROR
1383+
);
1384+
1385+
// Free stream 127
1386+
let mut pending = Retransmits::default();
1387+
let mut stream = RecvStream {
1388+
id: StreamId::new(Side::Server, Dir::Uni, 127),
1389+
state: &mut client,
1390+
pending: &mut pending,
1391+
};
1392+
stream.stop(0u32.into()).unwrap();
1393+
1394+
assert!(client.max_streams_dirty[Dir::Uni as usize]);
1395+
1396+
// Open stream 128
1397+
assert_eq!(
1398+
client.received(
1399+
frame::Stream {
1400+
id: StreamId::new(Side::Server, Dir::Uni, 128),
1401+
offset: 0,
1402+
fin: true,
1403+
data: Bytes::from_static(&[]),
1404+
},
1405+
0
1406+
),
1407+
Ok(ShouldTransmit(false))
1408+
);
1409+
}
1410+
1411+
#[test]
1412+
fn stream_limit_grows() {
1413+
let mut client = make(Side::Client);
1414+
// Open streams 0-127
1415+
assert_eq!(
1416+
client.received(
1417+
frame::Stream {
1418+
id: StreamId::new(Side::Server, Dir::Uni, 127),
1419+
offset: 0,
1420+
fin: true,
1421+
data: Bytes::from_static(&[]),
1422+
},
1423+
0
1424+
),
1425+
Ok(ShouldTransmit(false))
1426+
);
1427+
// Try to open stream 128, exceeding limit
1428+
assert_eq!(
1429+
client
1430+
.received(
1431+
frame::Stream {
1432+
id: StreamId::new(Side::Server, Dir::Uni, 128),
1433+
offset: 0,
1434+
fin: true,
1435+
data: Bytes::from_static(&[]),
1436+
},
1437+
0
1438+
)
1439+
.unwrap_err()
1440+
.code,
1441+
TransportErrorCode::STREAM_LIMIT_ERROR
1442+
);
1443+
1444+
// Relax limit by one
1445+
client.set_max_concurrent(Dir::Uni, 129u32.into());
1446+
1447+
assert!(client.max_streams_dirty[Dir::Uni as usize]);
1448+
1449+
// Open stream 128
1450+
assert_eq!(
1451+
client.received(
1452+
frame::Stream {
1453+
id: StreamId::new(Side::Server, Dir::Uni, 128),
1454+
offset: 0,
1455+
fin: true,
1456+
data: Bytes::from_static(&[]),
1457+
},
1458+
0
1459+
),
1460+
Ok(ShouldTransmit(false))
1461+
);
1462+
}
1463+
1464+
#[test]
1465+
fn stream_limit_shrinks() {
1466+
let mut client = make(Side::Client);
1467+
// Open streams 0-127
1468+
assert_eq!(
1469+
client.received(
1470+
frame::Stream {
1471+
id: StreamId::new(Side::Server, Dir::Uni, 127),
1472+
offset: 0,
1473+
fin: true,
1474+
data: Bytes::from_static(&[]),
1475+
},
1476+
0
1477+
),
1478+
Ok(ShouldTransmit(false))
1479+
);
1480+
1481+
// Tighten limit by one
1482+
client.set_max_concurrent(Dir::Uni, 127u32.into());
1483+
1484+
// Free stream 127
1485+
let mut pending = Retransmits::default();
1486+
let mut stream = RecvStream {
1487+
id: StreamId::new(Side::Server, Dir::Uni, 127),
1488+
state: &mut client,
1489+
pending: &mut pending,
1490+
};
1491+
stream.stop(0u32.into()).unwrap();
1492+
assert!(!client.max_streams_dirty[Dir::Uni as usize]);
1493+
1494+
// Try to open stream 128, still exceeding limit
1495+
assert_eq!(
1496+
client
1497+
.received(
1498+
frame::Stream {
1499+
id: StreamId::new(Side::Server, Dir::Uni, 128),
1500+
offset: 0,
1501+
fin: true,
1502+
data: Bytes::from_static(&[]),
1503+
},
1504+
0
1505+
)
1506+
.unwrap_err()
1507+
.code,
1508+
TransportErrorCode::STREAM_LIMIT_ERROR
1509+
);
1510+
1511+
// Free stream 126
1512+
assert_eq!(
1513+
client.received_reset(frame::ResetStream {
1514+
id: StreamId::new(Side::Server, Dir::Uni, 126),
1515+
error_code: 0u32.into(),
1516+
final_offset: 0u32.into(),
1517+
}),
1518+
Ok(ShouldTransmit(false))
1519+
);
1520+
let mut pending = Retransmits::default();
1521+
let mut stream = RecvStream {
1522+
id: StreamId::new(Side::Server, Dir::Uni, 126),
1523+
state: &mut client,
1524+
pending: &mut pending,
1525+
};
1526+
stream.stop(0u32.into()).unwrap();
1527+
1528+
assert!(client.max_streams_dirty[Dir::Uni as usize]);
1529+
1530+
// Open stream 128
1531+
assert_eq!(
1532+
client.received(
1533+
frame::Stream {
1534+
id: StreamId::new(Side::Server, Dir::Uni, 128),
1535+
offset: 0,
1536+
fin: true,
1537+
data: Bytes::from_static(&[]),
1538+
},
1539+
0
1540+
),
1541+
Ok(ShouldTransmit(false))
1542+
);
1543+
}
1544+
1545+
#[test]
1546+
fn remote_stream_capacity() {
1547+
let mut client = make(Side::Client);
1548+
for _ in 0..2 {
1549+
client.set_max_concurrent(Dir::Uni, 200u32.into());
1550+
client.set_max_concurrent(Dir::Bi, 201u32.into());
1551+
assert_eq!(client.recv.len(), 200 + 201);
1552+
assert_eq!(client.max_remote[Dir::Uni as usize], 200);
1553+
assert_eq!(client.max_remote[Dir::Bi as usize], 201);
1554+
}
1555+
}
13191556
}

quinn/src/connection.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -534,6 +534,28 @@ impl Connection {
534534
.crypto_session()
535535
.export_keying_material(output, label, context)
536536
}
537+
538+
/// Modify the number of remotely initiated unidirectional streams that may be concurrently open
539+
///
540+
/// No streams may be opened by the peer unless fewer than `count` are already open. Large
541+
/// `count`s increase both minimum and worst-case memory consumption.
542+
pub fn set_max_concurrent_uni_streams(&self, count: VarInt) {
543+
let mut conn = self.0.lock("set_max_concurrent_uni_streams");
544+
conn.inner.set_max_concurrent_streams(Dir::Uni, count);
545+
// May need to send MAX_STREAMS to make progress
546+
conn.wake();
547+
}
548+
549+
/// Modify the number of remotely initiated bidirectional streams that may be concurrently open
550+
///
551+
/// No streams may be opened by the peer unless fewer than `count` are already open. Large
552+
/// `count`s increase both minimum and worst-case memory consumption.
553+
pub fn set_max_concurrent_bi_streams(&self, count: VarInt) {
554+
let mut conn = self.0.lock("set_max_concurrent_bi_streams");
555+
conn.inner.set_max_concurrent_streams(Dir::Bi, count);
556+
// May need to send MAX_STREAMS to make progress
557+
conn.wake();
558+
}
537559
}
538560

539561
impl Clone for Connection {

0 commit comments

Comments
 (0)