From 73f074ad0594e931b3857e6ebafb6a5355dab5a2 Mon Sep 17 00:00:00 2001 From: Alex Butler Date: Sun, 23 Nov 2025 21:01:20 +0000 Subject: [PATCH 1/3] Optimise read buf initialization performance Particularly improves large read_buffer_size performance --- CHANGELOG.md | 1 + src/protocol/frame/init_aware_buf.rs | 221 +++++++++++++++++++++++++++ src/protocol/frame/mod.rs | 15 +- 3 files changed, 230 insertions(+), 7 deletions(-) create mode 100644 src/protocol/frame/init_aware_buf.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index b00bc226..449e70e7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ # UNRELEASED +* Optimise read performance particularly for larger than default `read_buffer_size`s. * Update MSRV to `1.68.2`. # 0.28.0 diff --git a/src/protocol/frame/init_aware_buf.rs b/src/protocol/frame/init_aware_buf.rs new file mode 100644 index 00000000..9d3451bf --- /dev/null +++ b/src/protocol/frame/init_aware_buf.rs @@ -0,0 +1,221 @@ +use bytes::{Buf, BytesMut}; +use std::{ + ops::{Deref, DerefMut}, + ptr, +}; + +/// [`BytesMut`] wrapper that tracks initialization state of its spare capacity. +/// +/// Supports safe & efficient repeated calls to [`Self::resize`] + [`Self::truncate`]. +/// +/// This optimisation is useful for [`std::io::Read`] to safely provide spare +/// capacity as an initialized slice. +/// +/// Related, may be obsoleted by: +#[derive(Debug, Default)] +pub struct InitAwareBuf { + bytes: BytesMut, + /// Capacity that has been initialized. + init_cap: usize, +} + +impl InitAwareBuf { + #[inline] + pub fn with_capacity(capacity: usize) -> Self { + Self { bytes: BytesMut::with_capacity(capacity), init_cap: 0 } + } + + #[inline] + pub fn len(&self) -> usize { + self.bytes.len() + } + + #[inline] + pub fn capacity(&self) -> usize { + self.bytes.capacity() + } + + #[inline] + pub fn split_to(&mut self, at: usize) -> BytesMut { + let split = self.bytes.split_to(at); + self.init_cap -= at; + split + } + + #[inline] + pub fn reserve(&mut self, additional: usize) { + // Increasing capacity doesn't change `init_cap` + self.bytes.reserve(additional); + } + + /// Sets the length of the buffer to `len`. If above the current + /// initialized capacity any uninitialized bytes will be zeroed. + /// + /// This is more efficient that [`BytesMut::resize`] as spare capacity + /// is only initialized **once** past the initialized_capacity. This + /// allow the method to be efficiently called after truncating. + /// + /// # Panics + /// Panics if `len > capacity`. + #[inline] + pub fn resize(&mut self, len: usize) { + if len <= self.init_cap { + // SAFETY: init_cap tracks initialised bytes. + unsafe { + self.bytes.set_len(len); + } + } else { + assert!(len <= self.capacity()); + let cur_len = self.bytes.len(); + let spare = self.bytes.spare_capacity_mut(); + let already_init = self.init_cap - cur_len; + let zeroes = len - self.init_cap; + debug_assert!(already_init + zeroes <= spare.len()); + unsafe { + // SAFETY: spare capacity is sufficient for `zeroes` extra bytes + ptr::write_bytes(spare[already_init..].as_mut_ptr().cast::(), 0, zeroes); + // SAFETY: len has been initialized + self.bytes.set_len(len); + } + self.init_cap = len; + } + } + + #[inline] + pub fn truncate(&mut self, len: usize) { + // truncating doesn't change `init_cap` + self.bytes.truncate(len); + } + + #[inline] + pub fn advance(&mut self, cnt: usize) { + self.bytes.advance(cnt); + self.init_cap -= cnt; + } +} + +impl From for InitAwareBuf { + #[inline] + fn from(bytes: BytesMut) -> Self { + let init_cap = bytes.len(); + Self { bytes, init_cap } + } +} + +impl From for BytesMut { + #[inline] + fn from(value: InitAwareBuf) -> Self { + value.bytes + } +} + +impl AsRef<[u8]> for InitAwareBuf { + #[inline] + fn as_ref(&self) -> &[u8] { + &self.bytes + } +} + +impl Deref for InitAwareBuf { + type Target = [u8]; + + #[inline] + fn deref(&self) -> &[u8] { + &self.bytes + } +} + +impl AsMut<[u8]> for InitAwareBuf { + #[inline] + fn as_mut(&mut self) -> &mut [u8] { + &mut self.bytes + } +} + +impl DerefMut for InitAwareBuf { + #[inline] + fn deref_mut(&mut self) -> &mut [u8] { + &mut self.bytes + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn reserve_resize_truncate() { + let mut buf = InitAwareBuf::default(); + assert_eq!(buf.len(), 0); + assert_eq!(buf.init_cap, 0); + assert_eq!(buf.capacity(), 0); + + buf.reserve(64); + assert_eq!(buf.len(), 0); + assert_eq!(buf.init_cap, 0); + let new_capacity = buf.capacity(); + assert!(new_capacity >= 64); + + buf.resize(10); + assert_eq!(buf.len(), 10); + assert_eq!(buf.init_cap, 10); + assert_eq!(buf.capacity(), new_capacity); + assert_eq!(&*buf, &[0; 10]); + + // write 3 bytes =8 + buf[0] = 8; + buf[1] = 8; + buf[2] = 8; + // mark the other bytes as =44 + for i in 3..10 { + buf[i] = 44; + } + buf.truncate(3); + assert_eq!(buf.len(), 3); + assert_eq!(buf.init_cap, 10); + assert_eq!(buf.capacity(), new_capacity); + assert_eq!(&*buf, &[8; 3]); + + // resizing should need do nothing now since this has already been initialized once + buf.resize(10); + assert_eq!(buf.len(), 10); + assert_eq!(buf.init_cap, 10); + assert_eq!(buf.capacity(), new_capacity); + assert_eq!(&*buf, &[8, 8, 8, 44, 44, 44, 44, 44, 44, 44]); + + buf.truncate(3); + assert_eq!(&*buf, &[8; 3]); + + // resizing should only init to zero the 3 bytes that hadn't previously been + buf.resize(13); + assert_eq!(buf.len(), 13); + assert_eq!(buf.init_cap, 13); + assert_eq!(buf.capacity(), new_capacity); + assert_eq!(&*buf, &[8, 8, 8, 44, 44, 44, 44, 44, 44, 44, 0, 0, 0]); + } + + #[test] + fn advance() { + let mut buf = InitAwareBuf::from(BytesMut::from(&[0, 1, 2, 3, 4][..])); + assert_eq!(buf.len(), 5); + assert_eq!(buf.init_cap, 5); + + buf.advance(2); + assert_eq!(buf.len(), 3); + assert_eq!(buf.init_cap, 3); + assert_eq!(&*buf, &[2, 3, 4]); + } + + #[test] + fn split_to() { + let mut buf = InitAwareBuf::from(BytesMut::from(&[0, 1, 2, 3, 4][..])); + assert_eq!(buf.len(), 5); + assert_eq!(buf.init_cap, 5); + + let split = buf.split_to(2); + assert_eq!(buf.len(), 3); + assert_eq!(buf.init_cap, 3); + assert_eq!(&*buf, &[2, 3, 4]); + assert_eq!(&*split, &[0, 1]); + } +} diff --git a/src/protocol/frame/mod.rs b/src/protocol/frame/mod.rs index ac8f209c..055d2034 100644 --- a/src/protocol/frame/mod.rs +++ b/src/protocol/frame/mod.rs @@ -4,6 +4,7 @@ pub mod coding; #[allow(clippy::module_inception)] mod frame; +mod init_aware_buf; mod mask; mod utf8; @@ -14,7 +15,7 @@ pub use self::{ use crate::{ error::{CapacityError, Error, ProtocolError, Result}, - protocol::frame::mask::apply_mask, + protocol::frame::{init_aware_buf::InitAwareBuf, mask::apply_mask}, Message, }; use bytes::BytesMut; @@ -46,7 +47,7 @@ impl FrameSocket { /// Extract a stream from the socket. pub fn into_inner(self) -> (Stream, BytesMut) { - (self.stream, self.codec.in_buffer) + (self.stream, self.codec.in_buffer.into()) } /// Returns a shared reference to the inner stream. @@ -103,7 +104,7 @@ where #[derive(Debug)] pub(super) struct FrameCodec { /// Buffer to read data from the stream. - in_buffer: BytesMut, + in_buffer: InitAwareBuf, in_buf_max_read: usize, /// Buffer to send packets to the network. out_buffer: Vec, @@ -123,7 +124,7 @@ impl FrameCodec { /// Create a new frame codec. pub(super) fn new(in_buf_len: usize) -> Self { Self { - in_buffer: BytesMut::with_capacity(in_buf_len), + in_buffer: InitAwareBuf::with_capacity(in_buf_len), in_buf_max_read: in_buf_len.max(FrameHeader::MAX_SIZE), out_buffer: <_>::default(), max_out_buffer_len: usize::MAX, @@ -137,7 +138,7 @@ impl FrameCodec { let mut in_buffer = BytesMut::from_iter(part); in_buffer.reserve(min_in_buf_len.saturating_sub(in_buffer.len())); Self { - in_buffer, + in_buffer: in_buffer.into(), in_buf_max_read: min_in_buf_len.max(FrameHeader::MAX_SIZE), out_buffer: <_>::default(), max_out_buffer_len: usize::MAX, @@ -172,7 +173,7 @@ impl FrameCodec { let mut cursor = Cursor::new(&mut self.in_buffer); self.header = FrameHeader::parse(&mut cursor)?; let advanced = cursor.position(); - bytes::Buf::advance(&mut self.in_buffer, advanced as _); + self.in_buffer.advance(advanced as _); if let Some((_, len)) = &self.header { let len = *len as usize; @@ -233,7 +234,7 @@ impl FrameCodec { fn read_in(&mut self, stream: &mut impl Read) -> io::Result { let len = self.in_buffer.len(); debug_assert!(self.in_buffer.capacity() > len); - self.in_buffer.resize(self.in_buffer.capacity().min(len + self.in_buf_max_read), 0); + self.in_buffer.resize(self.in_buffer.capacity().min(len + self.in_buf_max_read)); let size = stream.read(&mut self.in_buffer[len..]); self.in_buffer.truncate(len + size.as_ref().copied().unwrap_or(0)); size From 2fddc07de0a2a3b68200bfb72775805a149ff972 Mon Sep 17 00:00:00 2001 From: Alex Butler Date: Sun, 30 Nov 2025 02:42:26 +0000 Subject: [PATCH 2/3] Rework as using underlying bytes.len as capacity --- src/protocol/frame/init_aware_buf.rs | 119 ++++++++++++--------------- src/protocol/frame/mod.rs | 10 +-- 2 files changed, 59 insertions(+), 70 deletions(-) diff --git a/src/protocol/frame/init_aware_buf.rs b/src/protocol/frame/init_aware_buf.rs index 9d3451bf..95f091eb 100644 --- a/src/protocol/frame/init_aware_buf.rs +++ b/src/protocol/frame/init_aware_buf.rs @@ -1,12 +1,10 @@ use bytes::{Buf, BytesMut}; -use std::{ - ops::{Deref, DerefMut}, - ptr, -}; +use std::ops::{Deref, DerefMut}; -/// [`BytesMut`] wrapper that tracks initialization state of its spare capacity. +/// Buffer that provides fast & safe [`Self::resize`] & [`Self::truncate`] usage. /// -/// Supports safe & efficient repeated calls to [`Self::resize`] + [`Self::truncate`]. +/// It is aware of the initialization state of its spare capacity avoiding the +/// need to zero uninitialized bytes on resizing more than once for safe usage. /// /// This optimisation is useful for [`std::io::Read`] to safely provide spare /// capacity as an initialized slice. @@ -14,105 +12,102 @@ use std::{ /// Related, may be obsoleted by: #[derive(Debug, Default)] pub struct InitAwareBuf { + /// Backing buf length is used as capacity. This ensure this extra region + /// is always initialized (initially with zero, but otherwise the last previously + /// set value). bytes: BytesMut, - /// Capacity that has been initialized. - init_cap: usize, + /// Length of bytes in use (always <= bytes.len). + len: usize, } impl InitAwareBuf { #[inline] pub fn with_capacity(capacity: usize) -> Self { - Self { bytes: BytesMut::with_capacity(capacity), init_cap: 0 } + Self { bytes: BytesMut::zeroed(capacity), len: 0 } } #[inline] pub fn len(&self) -> usize { - self.bytes.len() + self.len } + /// Capacity that may be resized to cheaply. #[inline] pub fn capacity(&self) -> usize { - self.bytes.capacity() + self.bytes.len() } #[inline] pub fn split_to(&mut self, at: usize) -> BytesMut { + assert!(at <= self.len, "split_to out of bounds: {at} <= {}", self.len); let split = self.bytes.split_to(at); - self.init_cap -= at; + self.len -= at; split } + /// Reserve capacity for `min_additional` more bytes than the current [`Self::len`]. + /// + /// `max_additional` sets the maximum number of additional bytes zeroed as extra + /// capacity if available after reserving in the underlying buffer. Has no effect + /// if `max_additional <= additional`. #[inline] - pub fn reserve(&mut self, additional: usize) { - // Increasing capacity doesn't change `init_cap` - self.bytes.reserve(additional); + pub fn reserve(&mut self, additional: usize, max_additional: usize) { + let min_len = self.len + additional; + let cap = self.capacity(); + if min_len > cap { + self.bytes.reserve(min_len - cap); + let new_cap = self.bytes.capacity().min(self.len + max_additional.max(additional)); + self.bytes.resize(new_cap, 0); + } } - /// Sets the length of the buffer to `len`. If above the current - /// initialized capacity any uninitialized bytes will be zeroed. - /// - /// This is more efficient that [`BytesMut::resize`] as spare capacity - /// is only initialized **once** past the initialized_capacity. This - /// allow the method to be efficiently called after truncating. + /// Resizes the buffer to `new_len`. /// - /// # Panics - /// Panics if `len > capacity`. + /// If greater the new bytes will be either initialized to zero or as + /// they were last set to. #[inline] - pub fn resize(&mut self, len: usize) { - if len <= self.init_cap { - // SAFETY: init_cap tracks initialised bytes. - unsafe { - self.bytes.set_len(len); - } - } else { - assert!(len <= self.capacity()); - let cur_len = self.bytes.len(); - let spare = self.bytes.spare_capacity_mut(); - let already_init = self.init_cap - cur_len; - let zeroes = len - self.init_cap; - debug_assert!(already_init + zeroes <= spare.len()); - unsafe { - // SAFETY: spare capacity is sufficient for `zeroes` extra bytes - ptr::write_bytes(spare[already_init..].as_mut_ptr().cast::(), 0, zeroes); - // SAFETY: len has been initialized - self.bytes.set_len(len); - } - self.init_cap = len; + pub fn resize(&mut self, new_len: usize) { + if new_len > self.capacity() { + self.bytes.resize(new_len, 0); } + self.len = new_len; } #[inline] pub fn truncate(&mut self, len: usize) { - // truncating doesn't change `init_cap` - self.bytes.truncate(len); + if len < self.len { + self.len = len; + } } #[inline] pub fn advance(&mut self, cnt: usize) { + assert!(cnt <= self.len, "cannot advance past len: {cnt} <= {}", self.len); self.bytes.advance(cnt); - self.init_cap -= cnt; + self.len -= cnt; } } impl From for InitAwareBuf { #[inline] fn from(bytes: BytesMut) -> Self { - let init_cap = bytes.len(); - Self { bytes, init_cap } + let len = bytes.len(); + Self { bytes, len } } } impl From for BytesMut { #[inline] - fn from(value: InitAwareBuf) -> Self { - value.bytes + fn from(mut zb: InitAwareBuf) -> Self { + zb.bytes.truncate(zb.len); + zb.bytes } } impl AsRef<[u8]> for InitAwareBuf { #[inline] fn as_ref(&self) -> &[u8] { - &self.bytes + &self.bytes[..self.len] } } @@ -121,21 +116,21 @@ impl Deref for InitAwareBuf { #[inline] fn deref(&self) -> &[u8] { - &self.bytes + self.as_ref() } } impl AsMut<[u8]> for InitAwareBuf { #[inline] fn as_mut(&mut self) -> &mut [u8] { - &mut self.bytes + &mut self.bytes[..self.len] } } impl DerefMut for InitAwareBuf { #[inline] fn deref_mut(&mut self) -> &mut [u8] { - &mut self.bytes + self.as_mut() } } @@ -147,18 +142,15 @@ mod test { fn reserve_resize_truncate() { let mut buf = InitAwareBuf::default(); assert_eq!(buf.len(), 0); - assert_eq!(buf.init_cap, 0); assert_eq!(buf.capacity(), 0); - buf.reserve(64); + buf.reserve(64, 0); assert_eq!(buf.len(), 0); - assert_eq!(buf.init_cap, 0); let new_capacity = buf.capacity(); assert!(new_capacity >= 64); buf.resize(10); assert_eq!(buf.len(), 10); - assert_eq!(buf.init_cap, 10); assert_eq!(buf.capacity(), new_capacity); assert_eq!(&*buf, &[0; 10]); @@ -172,14 +164,12 @@ mod test { } buf.truncate(3); assert_eq!(buf.len(), 3); - assert_eq!(buf.init_cap, 10); assert_eq!(buf.capacity(), new_capacity); assert_eq!(&*buf, &[8; 3]); // resizing should need do nothing now since this has already been initialized once buf.resize(10); assert_eq!(buf.len(), 10); - assert_eq!(buf.init_cap, 10); assert_eq!(buf.capacity(), new_capacity); assert_eq!(&*buf, &[8, 8, 8, 44, 44, 44, 44, 44, 44, 44]); @@ -189,7 +179,6 @@ mod test { // resizing should only init to zero the 3 bytes that hadn't previously been buf.resize(13); assert_eq!(buf.len(), 13); - assert_eq!(buf.init_cap, 13); assert_eq!(buf.capacity(), new_capacity); assert_eq!(&*buf, &[8, 8, 8, 44, 44, 44, 44, 44, 44, 44, 0, 0, 0]); } @@ -198,11 +187,11 @@ mod test { fn advance() { let mut buf = InitAwareBuf::from(BytesMut::from(&[0, 1, 2, 3, 4][..])); assert_eq!(buf.len(), 5); - assert_eq!(buf.init_cap, 5); + assert_eq!(buf.capacity(), 5); buf.advance(2); assert_eq!(buf.len(), 3); - assert_eq!(buf.init_cap, 3); + assert_eq!(buf.capacity(), 3); assert_eq!(&*buf, &[2, 3, 4]); } @@ -210,11 +199,11 @@ mod test { fn split_to() { let mut buf = InitAwareBuf::from(BytesMut::from(&[0, 1, 2, 3, 4][..])); assert_eq!(buf.len(), 5); - assert_eq!(buf.init_cap, 5); + assert_eq!(buf.capacity(), 5); let split = buf.split_to(2); assert_eq!(buf.len(), 3); - assert_eq!(buf.init_cap, 3); + assert_eq!(buf.capacity(), 3); assert_eq!(&*buf, &[2, 3, 4]); assert_eq!(&*split, &[0, 1]); } diff --git a/src/protocol/frame/mod.rs b/src/protocol/frame/mod.rs index 055d2034..53a6d521 100644 --- a/src/protocol/frame/mod.rs +++ b/src/protocol/frame/mod.rs @@ -135,10 +135,10 @@ impl FrameCodec { /// Create a new frame codec from partially read data. pub(super) fn from_partially_read(part: Vec, min_in_buf_len: usize) -> Self { - let mut in_buffer = BytesMut::from_iter(part); - in_buffer.reserve(min_in_buf_len.saturating_sub(in_buffer.len())); + let mut in_buffer = InitAwareBuf::from(BytesMut::from_iter(part)); + in_buffer.reserve(min_in_buf_len.saturating_sub(in_buffer.len()), min_in_buf_len); Self { - in_buffer: in_buffer.into(), + in_buffer, in_buf_max_read: min_in_buf_len.max(FrameHeader::MAX_SIZE), out_buffer: <_>::default(), max_out_buffer_len: usize::MAX, @@ -188,9 +188,9 @@ impl FrameCodec { // Reserve full message length only once, even for multiple // loops or if WouldBlock errors cause multiple fn calls. - self.in_buffer.reserve(len); + self.in_buffer.reserve(len, self.in_buf_max_read); } else { - self.in_buffer.reserve(FrameHeader::MAX_SIZE); + self.in_buffer.reserve(FrameHeader::MAX_SIZE, self.in_buf_max_read); } } From ef3ea34a51640a9186aec98e9eb44a58a3226bd4 Mon Sep 17 00:00:00 2001 From: Alex Butler Date: Mon, 12 Jan 2026 15:17:42 +0000 Subject: [PATCH 3/3] Update src/protocol/frame/init_aware_buf.rs Co-authored-by: Daniel Abramov --- src/protocol/frame/init_aware_buf.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/protocol/frame/init_aware_buf.rs b/src/protocol/frame/init_aware_buf.rs index 95f091eb..41adb5ac 100644 --- a/src/protocol/frame/init_aware_buf.rs +++ b/src/protocol/frame/init_aware_buf.rs @@ -45,7 +45,7 @@ impl InitAwareBuf { split } - /// Reserve capacity for `min_additional` more bytes than the current [`Self::len`]. + /// Reserve capacity for `additional` more bytes than the current [`Self::len`]. /// /// `max_additional` sets the maximum number of additional bytes zeroed as extra /// capacity if available after reserving in the underlying buffer. Has no effect