Skip to content

Commit 199c724

Browse files
committed
Drop NewConnection in favor of simpler API
1 parent 399c595 commit 199c724

13 files changed

Lines changed: 67 additions & 352 deletions

File tree

bench/src/bin/bulk.rs

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -65,22 +65,17 @@ async fn server(mut incoming: quinn::Incoming, opt: Opt) -> Result<()> {
6565
// Handle only the expected amount of clients
6666
for _ in 0..opt.clients {
6767
let handshake = incoming.next().await.unwrap();
68-
let quinn::NewConnection {
69-
mut bi_streams,
70-
connection,
71-
..
72-
} = handshake.await.context("handshake failed")?;
68+
let connection = handshake.await.context("handshake failed")?;
7369

7470
server_tasks.push(tokio::spawn(async move {
7571
loop {
76-
let (mut send_stream, mut recv_stream) = match bi_streams.next().await {
77-
None => break,
78-
Some(Err(quinn::ConnectionError::ApplicationClosed(_))) => break,
79-
Some(Err(e)) => {
72+
let (mut send_stream, mut recv_stream) = match connection.accept_bi().await {
73+
Err(quinn::ConnectionError::ApplicationClosed(_)) => break,
74+
Err(e) => {
8075
eprintln!("accepting stream failed: {:?}", e);
8176
break;
8277
}
83-
Some(Ok(stream)) => stream,
78+
Ok(stream) => stream,
8479
};
8580
trace!("stream established");
8681

bench/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ pub async fn connect_client(
7070
let mut client_config = quinn::ClientConfig::new(Arc::new(crypto));
7171
client_config.transport_config(Arc::new(transport_config(&opt)));
7272

73-
let quinn::NewConnection { connection, .. } = endpoint
73+
let connection = endpoint
7474
.connect_with(client_config, server_addr, "localhost")
7575
.unwrap()
7676
.await

perf/src/bin/perf_client.rs

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -119,24 +119,17 @@ async fn run(opt: Opt) -> Result<()> {
119119

120120
let stream_stats = OpenStreamStats::default();
121121

122-
let quinn::NewConnection {
123-
connection,
124-
uni_streams,
125-
..
126-
} = endpoint
122+
let connection = endpoint
127123
.connect_with(cfg, addr, host_name)?
128124
.await
129125
.context("connecting")?;
130126

131127
info!("established");
132128

133-
let acceptor = UniAcceptor(Arc::new(tokio::sync::Mutex::new(uni_streams)));
134-
135129
let drive_fut = async {
136130
tokio::try_join!(
137131
drive_uni(
138132
connection.clone(),
139-
acceptor,
140133
stream_stats.clone(),
141134
opt.uni_requests,
142135
opt.upload_size,
@@ -236,7 +229,6 @@ async fn drain_stream(
236229

237230
async fn drive_uni(
238231
connection: quinn::Connection,
239-
acceptor: UniAcceptor,
240232
stream_stats: OpenStreamStats,
241233
concurrency: u64,
242234
upload: u64,
@@ -247,12 +239,12 @@ async fn drive_uni(
247239
loop {
248240
let permit = sem.clone().acquire_owned().await.unwrap();
249241
let send = connection.open_uni().await?;
250-
let acceptor = acceptor.clone();
251242
let stream_stats = stream_stats.clone();
252243

253244
debug!("sending request on {}", send.id());
245+
let connection = connection.clone();
254246
tokio::spawn(async move {
255-
if let Err(e) = request_uni(send, acceptor, upload, download, stream_stats).await {
247+
if let Err(e) = request_uni(send, connection, upload, download, stream_stats).await {
256248
error!("sending request failed: {:#}", e);
257249
}
258250

@@ -263,19 +255,13 @@ async fn drive_uni(
263255

264256
async fn request_uni(
265257
send: quinn::SendStream,
266-
acceptor: UniAcceptor,
258+
conn: quinn::Connection,
267259
upload: u64,
268260
download: u64,
269261
stream_stats: OpenStreamStats,
270262
) -> Result<()> {
271263
request(send, upload, download, stream_stats.clone()).await?;
272-
let recv = {
273-
let mut guard = acceptor.0.lock().await;
274-
guard
275-
.next()
276-
.await
277-
.ok_or_else(|| anyhow::anyhow!("End of stream"))
278-
}??;
264+
let recv = conn.accept_uni().await?;
279265
drain_stream(recv, download, stream_stats).await?;
280266
Ok(())
281267
}
@@ -348,9 +334,6 @@ async fn request_bi(
348334
Ok(())
349335
}
350336

351-
#[derive(Clone)]
352-
struct UniAcceptor(Arc<tokio::sync::Mutex<quinn::IncomingUniStreams>>);
353-
354337
struct SkipServerVerification;
355338

356339
impl SkipServerVerification {

perf/src/bin/perf_server.rs

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -103,16 +103,11 @@ async fn run(opt: Opt) -> Result<()> {
103103
}
104104

105105
async fn handle(handshake: quinn::Connecting, opt: Arc<Opt>) -> Result<()> {
106-
let quinn::NewConnection {
107-
uni_streams,
108-
bi_streams,
109-
connection,
110-
..
111-
} = handshake.await.context("handshake failed")?;
106+
let connection = handshake.await.context("handshake failed")?;
112107
debug!("{} connected", connection.remote_address());
113108
tokio::try_join!(
114-
drive_uni(connection.clone(), uni_streams),
115-
drive_bi(bi_streams),
109+
drive_uni(connection.clone()),
110+
drive_bi(connection.clone()),
116111
conn_stats(connection, opt)
117112
)?;
118113
Ok(())
@@ -129,12 +124,8 @@ async fn conn_stats(connection: quinn::Connection, opt: Arc<Opt>) -> Result<()>
129124
Ok(())
130125
}
131126

132-
async fn drive_uni(
133-
connection: quinn::Connection,
134-
mut streams: quinn::IncomingUniStreams,
135-
) -> Result<()> {
136-
while let Some(stream) = streams.next().await {
137-
let stream = stream?;
127+
async fn drive_uni(connection: quinn::Connection) -> Result<()> {
128+
while let Ok(stream) = connection.accept_uni().await {
138129
let connection = connection.clone();
139130
tokio::spawn(async move {
140131
if let Err(e) = handle_uni(connection, stream).await {
@@ -152,9 +143,8 @@ async fn handle_uni(connection: quinn::Connection, stream: quinn::RecvStream) ->
152143
Ok(())
153144
}
154145

155-
async fn drive_bi(mut streams: quinn::IncomingBiStreams) -> Result<()> {
156-
while let Some(stream) = streams.next().await {
157-
let (send, recv) = stream?;
146+
async fn drive_bi(connection: quinn::Connection) -> Result<()> {
147+
while let Ok((send, recv)) = connection.accept_bi().await {
158148
tokio::spawn(async move {
159149
if let Err(e) = handle_bi(send, recv).await {
160150
error!("request failed: {:#}", e);

quinn/examples/client.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,14 +108,11 @@ async fn run(options: Opt) -> Result<()> {
108108
.ok_or_else(|| anyhow!("no hostname specified"))?;
109109

110110
eprintln!("connecting to {} at {}", host, remote);
111-
let new_conn = endpoint
111+
let conn = endpoint
112112
.connect(remote, host)?
113113
.await
114114
.map_err(|e| anyhow!("failed to connect: {}", e))?;
115115
eprintln!("connected at {:?}", start.elapsed());
116-
let quinn::NewConnection {
117-
connection: conn, ..
118-
} = new_conn;
119116
let (mut send, recv) = conn
120117
.open_bi()
121118
.await

quinn/examples/connection.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,29 +12,25 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
1212
// accept a single connection
1313
tokio::spawn(async move {
1414
let incoming_conn = incoming.next().await.unwrap();
15-
let new_conn = incoming_conn.await.unwrap();
15+
let conn = incoming_conn.await.unwrap();
1616
println!(
1717
"[server] connection accepted: addr={}",
18-
new_conn.connection.remote_address()
18+
conn.remote_address()
1919
);
2020
// Dropping all handles associated with a connection implicitly closes it
2121
});
2222

2323
let endpoint = make_client_endpoint("0.0.0.0:0".parse().unwrap(), &[&server_cert])?;
2424
// connect to server
25-
let quinn::NewConnection {
26-
connection,
27-
mut uni_streams,
28-
..
29-
} = endpoint
25+
let connection = endpoint
3026
.connect(server_addr, "localhost")
3127
.unwrap()
3228
.await
3329
.unwrap();
3430
println!("[client] connected: addr={}", connection.remote_address());
3531

3632
// Waiting for a stream will complete with an error when the server closes the connection
37-
let _ = uni_streams.next().await;
33+
let _ = connection.accept_uni().await;
3834

3935
// Give the server has a chance to clean up
4036
endpoint.wait_idle().await;

quinn/examples/insecure_connection.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ async fn run_server(addr: SocketAddr) {
2323
let (mut incoming, _server_cert) = make_server_endpoint(addr).unwrap();
2424
// accept a single connection
2525
let incoming_conn = incoming.next().await.unwrap();
26-
let new_conn = incoming_conn.await.unwrap();
26+
let conn = incoming_conn.await.unwrap();
2727
println!(
2828
"[server] connection accepted: addr={}",
29-
new_conn.connection.remote_address()
29+
conn.remote_address()
3030
);
3131
}
3232

@@ -36,7 +36,7 @@ async fn run_client(server_addr: SocketAddr) -> Result<(), Box<dyn Error>> {
3636
endpoint.set_default_client_config(client_cfg);
3737

3838
// connect to server
39-
let quinn::NewConnection { connection, .. } = endpoint
39+
let connection = endpoint
4040
.connect(server_addr, "localhost")
4141
.unwrap()
4242
.await

quinn/examples/server.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -161,11 +161,7 @@ async fn run(options: Opt) -> Result<()> {
161161
}
162162

163163
async fn handle_connection(root: Arc<Path>, conn: quinn::Connecting) -> Result<()> {
164-
let quinn::NewConnection {
165-
connection,
166-
mut bi_streams,
167-
..
168-
} = conn.await?;
164+
let connection = conn.await?;
169165
let span = info_span!(
170166
"connection",
171167
remote = %connection.remote_address(),
@@ -180,7 +176,8 @@ async fn handle_connection(root: Arc<Path>, conn: quinn::Connecting) -> Result<(
180176
info!("established");
181177

182178
// Each stream initiated by the client constitutes a new request.
183-
while let Some(stream) = bi_streams.next().await {
179+
loop {
180+
let stream = connection.accept_bi().await;
184181
let stream = match stream {
185182
Err(quinn::ConnectionError::ApplicationClosed { .. }) => {
186183
info!("connection closed");
@@ -201,7 +198,6 @@ async fn handle_connection(root: Arc<Path>, conn: quinn::Connecting) -> Result<(
201198
.instrument(info_span!("request")),
202199
);
203200
}
204-
Ok(())
205201
}
206202
.instrument(span)
207203
.await?;

quinn/examples/single_socket.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ fn run_server(addr: SocketAddr) -> Result<Vec<u8>, Box<dyn Error>> {
4141
let (mut incoming, server_cert) = make_server_endpoint(addr)?;
4242
// accept a single connection
4343
tokio::spawn(async move {
44-
let quinn::NewConnection { connection, .. } = incoming.next().await.unwrap().await.unwrap();
44+
let connection = incoming.next().await.unwrap().await.unwrap();
4545
println!(
4646
"[server] incoming connection: addr={}",
4747
connection.remote_address()
@@ -54,6 +54,6 @@ fn run_server(addr: SocketAddr) -> Result<Vec<u8>, Box<dyn Error>> {
5454
/// Attempt QUIC connection with the given server address.
5555
async fn run_client(endpoint: &Endpoint, server_addr: SocketAddr) {
5656
let connect = endpoint.connect(server_addr, "localhost").unwrap();
57-
let quinn::NewConnection { connection, .. } = connect.await.unwrap();
57+
let connection = connect.await.unwrap();
5858
println!("[client] connected: addr={}", connection.remote_address());
5959
}

0 commit comments

Comments
 (0)