Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# UNRELEASED

* Optimise read performance particularly for larger than default `read_buffer_size`s.
* Update MSRV to `1.68.2`.

# 0.28.0
Expand Down
210 changes: 210 additions & 0 deletions src/protocol/frame/init_aware_buf.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
use bytes::{Buf, BytesMut};
use std::ops::{Deref, DerefMut};

/// Buffer that provides fast & safe [`Self::resize`] & [`Self::truncate`] usage.
///
/// It is aware of the initialization state of its spare capacity avoiding the
/// need to zero uninitialized bytes on resizing more than once for safe usage.
///
/// This optimisation is useful for [`std::io::Read`] to safely provide spare
/// capacity as an initialized slice.
///
/// Related, may be obsoleted by: <https://github.com/rust-lang/rust/issues/78485>
#[derive(Debug, Default)]
pub struct InitAwareBuf {
/// Backing buf length is used as capacity. This ensure this extra region
/// is always initialized (initially with zero, but otherwise the last previously
/// set value).
bytes: BytesMut,
/// Length of bytes in use (always <= bytes.len).
len: usize,
}

impl InitAwareBuf {
#[inline]
pub fn with_capacity(capacity: usize) -> Self {
Self { bytes: BytesMut::zeroed(capacity), len: 0 }
}

#[inline]
pub fn len(&self) -> usize {
self.len
}

/// Capacity that may be resized to cheaply.
#[inline]
pub fn capacity(&self) -> usize {
self.bytes.len()
}

#[inline]
pub fn split_to(&mut self, at: usize) -> BytesMut {
assert!(at <= self.len, "split_to out of bounds: {at} <= {}", self.len);
let split = self.bytes.split_to(at);
self.len -= at;
split
}

/// Reserve capacity for `additional` more bytes than the current [`Self::len`].
///
/// `max_additional` sets the maximum number of additional bytes zeroed as extra
/// capacity if available after reserving in the underlying buffer. Has no effect
/// if `max_additional <= additional`.
#[inline]
pub fn reserve(&mut self, additional: usize, max_additional: usize) {
let min_len = self.len + additional;
let cap = self.capacity();
if min_len > cap {
self.bytes.reserve(min_len - cap);
let new_cap = self.bytes.capacity().min(self.len + max_additional.max(additional));
self.bytes.resize(new_cap, 0);
}
}

/// Resizes the buffer to `new_len`.
///
/// If greater the new bytes will be either initialized to zero or as
/// they were last set to.
#[inline]
pub fn resize(&mut self, new_len: usize) {
if new_len > self.capacity() {
self.bytes.resize(new_len, 0);
}
self.len = new_len;
}

#[inline]
pub fn truncate(&mut self, len: usize) {
if len < self.len {
self.len = len;
}
}

#[inline]
pub fn advance(&mut self, cnt: usize) {
assert!(cnt <= self.len, "cannot advance past len: {cnt} <= {}", self.len);
self.bytes.advance(cnt);
self.len -= cnt;
}
}

impl From<BytesMut> for InitAwareBuf {
#[inline]
fn from(bytes: BytesMut) -> Self {
let len = bytes.len();
Self { bytes, len }
}
}

impl From<InitAwareBuf> for BytesMut {
#[inline]
fn from(mut zb: InitAwareBuf) -> Self {
zb.bytes.truncate(zb.len);
zb.bytes
}
}

impl AsRef<[u8]> for InitAwareBuf {
#[inline]
fn as_ref(&self) -> &[u8] {
&self.bytes[..self.len]
}
}

impl Deref for InitAwareBuf {
type Target = [u8];

#[inline]
fn deref(&self) -> &[u8] {
self.as_ref()
}
}

impl AsMut<[u8]> for InitAwareBuf {
#[inline]
fn as_mut(&mut self) -> &mut [u8] {
&mut self.bytes[..self.len]
}
}

impl DerefMut for InitAwareBuf {
#[inline]
fn deref_mut(&mut self) -> &mut [u8] {
self.as_mut()
}
}
Comment on lines 107 to 135
Copy link

@paolobarbolini paolobarbolini Nov 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I came from the bytes issue. It's taking me some time to review this because of the manual slicing that happens in the other files. I'm not a fan of them, because in a way they leak implementation internals and leave the other modules to do the slicing by themselves. Although it's an internal API, it doesn't feel robust.

Why not copy the read_buf API (both the initial one which you can see in tokio, and the current one in std) by having methods like:

impl InitAwareBuf {
    // the region of memory that contains user data
    pub fn filled(&self) -> &[u8] {}

    // the region of memory that does not contain user data, but has been initialized
    pub fn init_mut(&mut self) -> &mut [u8] {}

    // mark `filled_len` bytes, that were written into the slice returned by `init_mut`, as filled
    pub fn advance_mut(&mut self, filled_len: usize) {}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I envisioned the wrapper as transparent bytes but with extra info, the initialised capacity. Slice access, even mut, is fine as it doesn't grow or shrink the buffer.

This style simplifies the overall change as usage of the buf, previously plain BytesMut is fairly unchanged.

We just need ensure the new wrapper itself is sound.


#[cfg(test)]
mod test {
use super::*;

#[test]
fn reserve_resize_truncate() {
let mut buf = InitAwareBuf::default();
assert_eq!(buf.len(), 0);
assert_eq!(buf.capacity(), 0);

buf.reserve(64, 0);
assert_eq!(buf.len(), 0);
let new_capacity = buf.capacity();
assert!(new_capacity >= 64);

buf.resize(10);
assert_eq!(buf.len(), 10);
assert_eq!(buf.capacity(), new_capacity);
assert_eq!(&*buf, &[0; 10]);

// write 3 bytes =8
buf[0] = 8;
buf[1] = 8;
buf[2] = 8;
// mark the other bytes as =44
for i in 3..10 {
buf[i] = 44;
}
buf.truncate(3);
assert_eq!(buf.len(), 3);
assert_eq!(buf.capacity(), new_capacity);
assert_eq!(&*buf, &[8; 3]);

// resizing should need do nothing now since this has already been initialized once
buf.resize(10);
assert_eq!(buf.len(), 10);
assert_eq!(buf.capacity(), new_capacity);
assert_eq!(&*buf, &[8, 8, 8, 44, 44, 44, 44, 44, 44, 44]);

buf.truncate(3);
assert_eq!(&*buf, &[8; 3]);

// resizing should only init to zero the 3 bytes that hadn't previously been
buf.resize(13);
assert_eq!(buf.len(), 13);
assert_eq!(buf.capacity(), new_capacity);
assert_eq!(&*buf, &[8, 8, 8, 44, 44, 44, 44, 44, 44, 44, 0, 0, 0]);
}

#[test]
fn advance() {
let mut buf = InitAwareBuf::from(BytesMut::from(&[0, 1, 2, 3, 4][..]));
assert_eq!(buf.len(), 5);
assert_eq!(buf.capacity(), 5);

buf.advance(2);
assert_eq!(buf.len(), 3);
assert_eq!(buf.capacity(), 3);
assert_eq!(&*buf, &[2, 3, 4]);
}

#[test]
fn split_to() {
let mut buf = InitAwareBuf::from(BytesMut::from(&[0, 1, 2, 3, 4][..]));
assert_eq!(buf.len(), 5);
assert_eq!(buf.capacity(), 5);

let split = buf.split_to(2);
assert_eq!(buf.len(), 3);
assert_eq!(buf.capacity(), 3);
assert_eq!(&*buf, &[2, 3, 4]);
assert_eq!(&*split, &[0, 1]);
}
}
21 changes: 11 additions & 10 deletions src/protocol/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod coding;

#[allow(clippy::module_inception)]
mod frame;
mod init_aware_buf;
mod mask;
mod utf8;

Expand All @@ -14,7 +15,7 @@ pub use self::{

use crate::{
error::{CapacityError, Error, ProtocolError, Result},
protocol::frame::mask::apply_mask,
protocol::frame::{init_aware_buf::InitAwareBuf, mask::apply_mask},
Message,
};
use bytes::BytesMut;
Expand Down Expand Up @@ -46,7 +47,7 @@ impl<Stream> FrameSocket<Stream> {

/// Extract a stream from the socket.
pub fn into_inner(self) -> (Stream, BytesMut) {
(self.stream, self.codec.in_buffer)
(self.stream, self.codec.in_buffer.into())
}

/// Returns a shared reference to the inner stream.
Expand Down Expand Up @@ -103,7 +104,7 @@ where
#[derive(Debug)]
pub(super) struct FrameCodec {
/// Buffer to read data from the stream.
in_buffer: BytesMut,
in_buffer: InitAwareBuf,
in_buf_max_read: usize,
/// Buffer to send packets to the network.
out_buffer: Vec<u8>,
Expand All @@ -123,7 +124,7 @@ impl FrameCodec {
/// Create a new frame codec.
pub(super) fn new(in_buf_len: usize) -> Self {
Self {
in_buffer: BytesMut::with_capacity(in_buf_len),
in_buffer: InitAwareBuf::with_capacity(in_buf_len),
in_buf_max_read: in_buf_len.max(FrameHeader::MAX_SIZE),
out_buffer: <_>::default(),
max_out_buffer_len: usize::MAX,
Expand All @@ -134,8 +135,8 @@ impl FrameCodec {

/// Create a new frame codec from partially read data.
pub(super) fn from_partially_read(part: Vec<u8>, min_in_buf_len: usize) -> Self {
let mut in_buffer = BytesMut::from_iter(part);
in_buffer.reserve(min_in_buf_len.saturating_sub(in_buffer.len()));
let mut in_buffer = InitAwareBuf::from(BytesMut::from_iter(part));
in_buffer.reserve(min_in_buf_len.saturating_sub(in_buffer.len()), min_in_buf_len);
Self {
in_buffer,
in_buf_max_read: min_in_buf_len.max(FrameHeader::MAX_SIZE),
Expand Down Expand Up @@ -172,7 +173,7 @@ impl FrameCodec {
let mut cursor = Cursor::new(&mut self.in_buffer);
self.header = FrameHeader::parse(&mut cursor)?;
let advanced = cursor.position();
bytes::Buf::advance(&mut self.in_buffer, advanced as _);
self.in_buffer.advance(advanced as _);

if let Some((_, len)) = &self.header {
let len = *len as usize;
Expand All @@ -187,9 +188,9 @@ impl FrameCodec {

// Reserve full message length only once, even for multiple
// loops or if WouldBlock errors cause multiple fn calls.
self.in_buffer.reserve(len);
self.in_buffer.reserve(len, self.in_buf_max_read);
} else {
self.in_buffer.reserve(FrameHeader::MAX_SIZE);
self.in_buffer.reserve(FrameHeader::MAX_SIZE, self.in_buf_max_read);
}
}

Expand Down Expand Up @@ -233,7 +234,7 @@ impl FrameCodec {
fn read_in(&mut self, stream: &mut impl Read) -> io::Result<usize> {
let len = self.in_buffer.len();
debug_assert!(self.in_buffer.capacity() > len);
self.in_buffer.resize(self.in_buffer.capacity().min(len + self.in_buf_max_read), 0);
self.in_buffer.resize(self.in_buffer.capacity().min(len + self.in_buf_max_read));
let size = stream.read(&mut self.in_buffer[len..]);
self.in_buffer.truncate(len + size.as_ref().copied().unwrap_or(0));
size
Expand Down