Skip to content

Commit 571b010

Browse files
committed
lightway-core: io: Add a CoW abstraction for sending buffer
Depending on the specific implementation of the trait they may or may not need an owned version of the buffer. Since we have one already in the core in some paths we can expose that through the stack and avoid some extra allocations. In the UDP send path we can easily and cheaply freeze the `BytesMut` buffer to get an owned `Bytes` (with a little more overhead while `aggressive_send` is enabled). However in the TCP path the use of `SendBuffer` makes this harder (if not impossible) to achieve (cheaply at least). So add `CowBytes` which can contain either a `Bytes` or a `&[u8]` slice. Note that right now every consumer still uses the byte slice.
1 parent f624e71 commit 571b010

8 files changed

Lines changed: 58 additions & 25 deletions

File tree

lightway-client/src/io/outside/tcp.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::{net::SocketAddr, sync::Arc};
44
use tokio::net::TcpStream;
55

66
use super::OutsideIO;
7-
use lightway_core::{IOCallbackResult, OutsideIOSendCallback, OutsideIOSendCallbackArg};
7+
use lightway_core::{CowBytes, IOCallbackResult, OutsideIOSendCallback, OutsideIOSendCallbackArg};
88

99
pub struct Tcp(tokio::net::TcpStream, SocketAddr);
1010

@@ -58,8 +58,8 @@ impl OutsideIO for Tcp {
5858
}
5959

6060
impl OutsideIOSendCallback for Tcp {
61-
fn send(&self, buf: &[u8]) -> IOCallbackResult<usize> {
62-
match self.0.try_write(buf) {
61+
fn send(&self, buf: CowBytes) -> IOCallbackResult<usize> {
62+
match self.0.try_write(buf.as_bytes()) {
6363
Ok(nr) => IOCallbackResult::Ok(nr),
6464
Err(err) if matches!(err.kind(), std::io::ErrorKind::WouldBlock) => {
6565
IOCallbackResult::WouldBlock

lightway-client/src/io/outside/udp.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use tokio::net::UdpSocket;
55

66
use super::OutsideIO;
77
use lightway_app_utils::sockopt;
8-
use lightway_core::{IOCallbackResult, OutsideIOSendCallback, OutsideIOSendCallbackArg};
8+
use lightway_core::{CowBytes, IOCallbackResult, OutsideIOSendCallback, OutsideIOSendCallbackArg};
99

1010
pub struct Udp {
1111
sock: tokio::net::UdpSocket,
@@ -67,8 +67,8 @@ impl OutsideIO for Udp {
6767
}
6868

6969
impl OutsideIOSendCallback for Udp {
70-
fn send(&self, buf: &[u8]) -> IOCallbackResult<usize> {
71-
match self.sock.try_send_to(buf, self.peer_addr) {
70+
fn send(&self, buf: CowBytes) -> IOCallbackResult<usize> {
71+
match self.sock.try_send_to(buf.as_bytes(), self.peer_addr) {
7272
Ok(nr) => IOCallbackResult::Ok(nr),
7373
Err(err) if matches!(err.kind(), std::io::ErrorKind::WouldBlock) => {
7474
IOCallbackResult::WouldBlock

lightway-core/src/connection/io_adapter.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ use more_asserts::*;
55
use wolfssl::IOCallbackResult;
66

77
use crate::{
8-
plugin::PluginList, wire, ConnectionType, OutsideIOSendCallbackArg, PluginResult, Version,
8+
plugin::PluginList, wire, ConnectionType, CowBytes, OutsideIOSendCallbackArg, PluginResult,
9+
Version,
910
};
1011

1112
pub(crate) struct SendBuffer {
@@ -164,26 +165,28 @@ impl WolfSSLIOAdapter {
164165
}
165166
}
166167

168+
let b = b.freeze();
169+
167170
// Send header + buf. If we are in aggressive mode we send it
168171
// a total of three times. On any send error we return
169172
// immediately without the remaining tries, otherwise we
170173
// return the result of the final attempt.
171174

172175
if self.aggressive_send {
173-
match self.io.send(&b[..]) {
176+
match self.io.send(CowBytes::Owned(b.clone())) {
174177
IOCallbackResult::Ok(_) => {}
175178
wb @ IOCallbackResult::WouldBlock => return wb,
176179
err @ IOCallbackResult::Err(_) => return err,
177180
}
178181

179-
match self.io.send(&b[..]) {
182+
match self.io.send(CowBytes::Owned(b.clone())) {
180183
IOCallbackResult::Ok(_) => {}
181184
wb @ IOCallbackResult::WouldBlock => return wb,
182185
err @ IOCallbackResult::Err(_) => return err,
183186
}
184187
}
185188

186-
match self.io.send(&b[..]) {
189+
match self.io.send(CowBytes::Owned(b)) {
187190
IOCallbackResult::Ok(n) => {
188191
// We've sent `n` bytes successfully out of
189192
// `wire::Header::WIRE_SIZE` + `b.len()` that we
@@ -250,7 +253,7 @@ impl WolfSSLIOAdapter {
250253
debug_assert_le!(send_buffer.original_len(), buf.len());
251254
}
252255

253-
match self.io.send(send_buffer.as_bytes()) {
256+
match self.io.send(CowBytes::Borrowed(send_buffer.as_bytes())) {
254257
IOCallbackResult::Ok(n) if n == send_buffer.actual_len() => {
255258
// We've now sent everything we were originally
256259
// asked to, so signal completion of that original
@@ -335,7 +338,8 @@ mod tests {
335338
}
336339

337340
impl OutsideIOSendCallback for FakeOutsideIOSend {
338-
fn send(&self, buf: &[u8]) -> IOCallbackResult<usize> {
341+
fn send(&self, buf: CowBytes) -> IOCallbackResult<usize> {
342+
let buf = buf.as_bytes();
339343
let (fakes, sent) = &mut *self.0.lock().unwrap();
340344
match fakes.pop_front() {
341345
Some(IOCallbackResult::Ok(n)) => {

lightway-core/src/io.rs

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::{net::SocketAddr, sync::Arc};
22

3-
use bytes::BytesMut;
3+
use bytes::{Bytes, BytesMut};
44
use wolfssl::IOCallbackResult;
55

66
/// Application provided callback used to send inside data.
@@ -20,6 +20,33 @@ pub trait InsideIOSendCallback<AppState> {
2020
/// Convenience type to use as function arguments
2121
pub type InsideIOSendCallbackArg<AppState> = Arc<dyn InsideIOSendCallback<AppState> + Send + Sync>;
2222

23+
/// A byte buffer to be sent, may be owned or borrowed.
24+
pub enum CowBytes<'a> {
25+
/// An owned buffer
26+
Owned(Bytes),
27+
/// A borrowed buffer
28+
Borrowed(&'a [u8]),
29+
}
30+
31+
impl<'a> CowBytes<'a> {
32+
/// Convert this buffer into an owned `Bytes`. Cheap if this
33+
/// instance if `::Owned`, but copied if not.
34+
pub fn into_owned(self) -> Bytes {
35+
match self {
36+
CowBytes::Owned(b) => b,
37+
CowBytes::Borrowed(b) => Bytes::copy_from_slice(b),
38+
}
39+
}
40+
41+
/// Gain access to the underlying byte buffer.
42+
pub fn as_bytes(&self) -> &[u8] {
43+
match self {
44+
CowBytes::Owned(b) => b.as_ref(),
45+
CowBytes::Borrowed(b) => b,
46+
}
47+
}
48+
}
49+
2350
/// Application provided callback used to send outside data.
2451
pub trait OutsideIOSendCallback {
2552
/// Called when Lightway wishes to send some outside data
@@ -30,7 +57,7 @@ pub trait OutsideIOSendCallback {
3057
/// [`IOCallbackResult::WouldBlock`].
3158
///
3259
/// This is the same method as [`wolfssl::IOCallbacks::send`].
33-
fn send(&self, buf: &[u8]) -> IOCallbackResult<usize>;
60+
fn send(&self, buf: CowBytes) -> IOCallbackResult<usize>;
3461

3562
/// Get the peer's [`SocketAddr`]
3663
fn peer_addr(&self) -> SocketAddr;

lightway-core/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ pub use context::{
3838
ServerAuthArg, ServerAuthHandle, ServerAuthResult, ServerContext, ServerContextBuilder,
3939
};
4040
pub use io::{
41-
InsideIOSendCallback, InsideIOSendCallbackArg, OutsideIOSendCallback, OutsideIOSendCallbackArg,
41+
CowBytes, InsideIOSendCallback, InsideIOSendCallbackArg, OutsideIOSendCallback,
42+
OutsideIOSendCallbackArg,
4243
};
4344
pub use packet::OutsidePacket;
4445
pub use plugin::{

lightway-core/tests/connection.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,8 @@ impl TestSock for TestDatagramSock {
106106
}
107107

108108
impl OutsideIOSendCallback for TestDatagramSock {
109-
fn send(&self, buf: &[u8]) -> IOCallbackResult<usize> {
109+
fn send(&self, buf: CowBytes) -> IOCallbackResult<usize> {
110+
let buf = buf.as_bytes();
110111
match self.0.try_send(buf) {
111112
Ok(nr) => IOCallbackResult::Ok(nr),
112113
Err(err) if matches!(err.kind(), std::io::ErrorKind::WouldBlock) => {
@@ -156,8 +157,8 @@ impl TestSock for TestStreamSock {
156157
}
157158

158159
impl OutsideIOSendCallback for TestStreamSock {
159-
fn send(&self, buf: &[u8]) -> IOCallbackResult<usize> {
160-
match self.0.try_write(buf) {
160+
fn send(&self, buf: CowBytes) -> IOCallbackResult<usize> {
161+
match self.0.try_write(buf.as_bytes()) {
161162
Ok(nr) => IOCallbackResult::Ok(nr),
162163
Err(err) if matches!(err.kind(), std::io::ErrorKind::WouldBlock) => {
163164
IOCallbackResult::WouldBlock

lightway-server/src/io/outside/tcp.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use anyhow::{anyhow, Result};
44
use async_trait::async_trait;
55
use bytes::BytesMut;
66
use lightway_core::{
7-
ConnectionType, IOCallbackResult, OutsideIOSendCallback, OutsidePacket, Version,
7+
ConnectionType, CowBytes, IOCallbackResult, OutsideIOSendCallback, OutsidePacket, Version,
88
MAX_OUTSIDE_MTU,
99
};
1010
use socket2::SockRef;
@@ -21,8 +21,8 @@ struct TcpStream {
2121
}
2222

2323
impl OutsideIOSendCallback for TcpStream {
24-
fn send(&self, buf: &[u8]) -> IOCallbackResult<usize> {
25-
match self.sock.try_write(buf) {
24+
fn send(&self, buf: CowBytes) -> IOCallbackResult<usize> {
25+
match self.sock.try_write(buf.as_bytes()) {
2626
Ok(nr) => IOCallbackResult::Ok(nr),
2727
Err(err) if matches!(err.kind(), std::io::ErrorKind::WouldBlock) => {
2828
IOCallbackResult::WouldBlock

lightway-server/src/io/outside/udp.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ use bytes::BytesMut;
1111
use bytesize::ByteSize;
1212
use lightway_app_utils::sockopt::socket_enable_pktinfo;
1313
use lightway_core::{
14-
ConnectionType, Header, IOCallbackResult, OutsideIOSendCallback, OutsidePacket, SessionId,
15-
Version, MAX_OUTSIDE_MTU,
14+
ConnectionType, CowBytes, Header, IOCallbackResult, OutsideIOSendCallback, OutsidePacket,
15+
SessionId, Version, MAX_OUTSIDE_MTU,
1616
};
1717
use socket2::{MaybeUninitSlice, MsgHdr, MsgHdrMut, SockAddr, SockRef};
1818
use tokio::io::Interest;
@@ -87,9 +87,9 @@ struct UdpSocket {
8787
}
8888

8989
impl OutsideIOSendCallback for UdpSocket {
90-
fn send(&self, buf: &[u8]) -> IOCallbackResult<usize> {
90+
fn send(&self, buf: CowBytes) -> IOCallbackResult<usize> {
9191
let peer_addr = self.peer_addr.read().unwrap();
92-
send_to_socket(&self.sock, buf, &peer_addr.1, self.reply_pktinfo)
92+
send_to_socket(&self.sock, buf.as_bytes(), &peer_addr.1, self.reply_pktinfo)
9393
}
9494

9595
fn peer_addr(&self) -> SocketAddr {

0 commit comments

Comments
 (0)