Skip to content

A proper Body trait #1438

@seanmonstar

Description

@seanmonstar

Currently (v0.11), hyper allows using any impl Stream<Item=impl AsRef<[u8]>, Error=hyper::Error> as the send body for requests and response. That should likely be changed, since it currently prevents using new features from HTTP2. While we can upgrade the current receive stream to have inherent methods to do new things, there is no way for hyper to use them with send streams. Here's some new features we'd like to support:

  • HTTP trailers: while defined in HTTP/1.1, they were rarely ever used, but HTTP/2 makes them easy to use, and new things are using them, like gRPC.
  • Push promises: while a design isn't settled yet, it's possible for a push promise to arrive after the headers, and a user may wish to be able to send new push promises after having gotten part way though a body.
  • Flow control? To start with, hyper will manage flow control internally, but we may want to add the ability for users to control that themselves. If so, they would be new methods on the trait with default implementations.
  • Other new frame types that get added to HTTP/2.
  • Being able to adjust the reset code sent before dropping the stream.

To accomplish this, hyper probably needs to define it's own trait, so that new features can be added later.

The Body trait

/// Replaces the `Stream<Item=impl AsRef<[u8]>, Error=hyper::Error>`.
///
/// This trait represents a streaming body of a `Request` or `Response`.
pub trait Body {
    /// A buffer of bytes representing a single chunk of a body.
    type Data: AsRef<[u8]> + 'static;
    
    /// The `User` error can be read more about here: https://github.com/hyperium/hyper/issues/1431
    type Error: Into<hyper::error::User>;
    
    /// Poll for a stream of `Data` buffers.
    fn poll_data(&mut self) -> Poll<Option<Self::Data>>, Self::Error>;
    
    /// Poll for an optional *single* `HeaderMap` of trailers. This should
    /// only be called after `poll_data` has ended.
    ///
    /// Note: Trailers aren't currently used for HTTP/1, only for HTTP/2.
    fn poll_trailers(&mut self) -> Poll<Option<HeaderMap>, Self::Error> {
        Ok(Async::Ready(None))
    }
    
    /// A hint that the `Body` is complete, and doesn't need to be polled more.
    ///
    /// This can be useful to determine if the there is any body or trailers
    /// without having to poll. An empty `Body` could return `true` and hyper
    /// would be able to know that only the headers need to be sent. Or, it can
    /// also be checked after each `poll_data` call, to allow hyper to try to end
    /// the underlying stream with the last chunk, instead of needing to send an
    /// extra `DATA` frame just to mark the stream as finished.
    ///
    /// As a hint, it is used to try to optimize, and thus is OK for a default
    /// implementation to return `false`.
    fn is_end_stream(&self) -> bool {
        false
    }
    
    /// Return a length of the total bytes that will be streamed, if known.
    ///
    /// If an exact size of bytes is known, this would allow hyper to send a
    /// `Content-Length` header automatically, not needing to fall back to
    /// `Transfer-Encoding: chunked`.
    ///
    /// This does not need to be kept updated after polls, it will only be called
    /// once to create the headers.
    fn content_length(&self) -> Option<u64> {
        None
    }
    
    /// A callback if there is an error trying to *write* data or trailers
    /// that have been polled from this `Body`.
    ///
    /// This can be things like declaring a certain size in `size_hint`,
    /// or having manually set a `Content-Length` header, and the polled
    /// data happens to be bigger. This error wouldn't necessarily close
    /// the request/response message, but could be useful to know.
    /// 
    /// Additionally, after returning a `Response<impl Body>` in a server
    /// context, there isn't any other mechanism to learn of errors that
    /// may occur trying to flush the body's data.
    ///
    /// The default implementation is to just log the error.
    fn on_error(&mut self, err: hyper::Error) {
        debug!("Body::on_error: {}", err);
    }
    
    // fn into_stream?
    //
    // This could be a simple wrapper that turns any `Body` into a
    // `Stream<Item=Self::Data, Error=Self::Error>`. Besides allowing
    // a user to easily convert a body into a `Stream` when something wants
    // one, it could do so correctly, by making sure that once `poll_data`
    // has reached the end, `poll_trailers` is also called, to allow any
    // `Body` implementation to know when it was finished properly.
    //
    // Alternatively, instead of being on the trait itself, this could
    // just be a type in the `hyper::body` module, `IntoStream<impl Body>`,
    // and it can be imported directly and a constructor used...
}

Some questions about the design that I still have:

  • Does the signature of poll_trailers() look like it is a stream of trailers? It's supposed to be like a future, only yielding once, not multiple HeaderMaps. It's probably fine, since Iterator::next returns Option<T>, and that doesn't mean that anything else returning Option<T> is iterator-like. Documentation should be enough.

  • Does content_length pull its weight?

  • Should there a trait method into_stream? Being there does mean less imports are needed, but does it conflict with Body trait objects? It could just be IntoStream::new(body)...

  • Does on_error make sense to have? Additionally, should it also be used to allow sending a reset error on a receive stream? For instance, if you've received the default body in a Request in the server, and after polling a little while, some other part of your app is on fire, should you be able to call body.on_error(User::internal_error()) and hyper should try to send that reset error? Currently, all you can do is drop the body, and hyper will send a CANCEL reset.

  • An alternative to separate poll_* methods is to use a single method, and return a non-exhaustive enum of the frames that a user can receive. This might reduce complication around knowing what order to call methods in, especially as new events are added, like push promises.

      match await body.poll_next()? {
        Some(Frame::Data(chunk)) => {},
        Some(Frame::Trailers(trailers)) => {},
        _ => {
            // new, unknown frame type
            // HTTP/2 requires new frames to be ignorable by
            // those that don't understand them
        }
    }

    Then, when hyper adds push promise support, a user just needs to update their match block to handle them. If they are added in a new method, it might be confusing when to call poll_data vs poll_push_promise, etc.

The default body type

Since the trait name is Body, the v0.11 hyper::Body needs a new name. So far, I haven't thought of any that I feel great about. In Java, you often find interfaces or abstract classes with the good name, and DefaultFoo as the implementation. 🤷‍♂️

/// What `hyper::Body` is in v0.11. Don't love the name...
///
/// This type serves two purposes:
/// - It is the body type that is *received* from hyper, such as in
///   client responses and server requests.
/// - It is a simple to use default for applications to create and
///   give to hyper as a send stream.
pub struct DefaultBody {
    // ...
}

impl Body for DefaultBody {
    type Data = hyper::Chunk;
    type Error = hyper::Error;
    
    // ...
}

impl DefaultBody {
    /// Creates a channel with a sender that can be safely passed to
    /// other tasks or threads. Data can be pushed onto the sender,
    /// and the associated `DefaultBody` receives it.
    pub fn channel() -> (DefaultBodySender, Self) {
        // ...
    }
    
    /// Creates an empty body.
    pub fn empty() -> Self {
        // ...
    }
    
    /// Create a body that yields just a single buffer.
    pub fn once<T>(buf: T) -> Self
    where
        T: Into<Chunk>,
    {
        // ...
    }
    
    /// Wraps any `Stream` as a `hyper::Body`.
    ///
    /// Internally, this boxes the stream, so as to not require
    /// a type parameter on `Self`. If boxing is undesirable,
    /// consider implementing `hyper::Body` on a custom type.
    pub fn wrap_stream<T>(stream: T) -> Self
    where
        T: Stream + 'static,
        T::Item: Into<Chunk>,
        T::Error: Into<hyper::error::User>,
    {
        // ...
    }
}

Some things I'd like to add to the (naming, bleck) DefaultBodySender are:

  • The ability to send new frames as hyper gains support for them, like push promises.

  • The ability to make closing the stream an explicit action. Right now, with an mpsc channel, if the sender is in another thread and that that thread panics, the sender is dropped, and the receiver just assumes the stream is finished. hyper understands that as "all good, end the writing". However, this can be bad! If the stream had a Content-Length, the remote will realize the body was interrupted. But if it didn't, the remote will just assume that the ended stream is the full response. This can result in the remote treating partial content as the full content.

    I'd like for the default sender to have an option to mark that being dropped without having called close() should signal an error has occurred. This could either be a boolean flag you enable on the sender, or it could be a a new method that returns a guard type to be used in places where panics may happen.

Metadata

Metadata

Assignees

No one assigned

    Labels

    A-bodyArea: body streaming.C-featureCategory: feature. This is adding a new feature.

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions