Skip to content
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
d88ec30
create example
Ruben2424 Mar 9, 2023
44587d6
make anonymous futures structs
Ruben2424 May 21, 2023
55753a9
remove exec and fix warnings
Ruben2424 Jun 6, 2023
ad57f1c
remove bound
Ruben2424 Jun 6, 2023
4132aa6
fmt
Ruben2424 Jun 6, 2023
4138669
use right future
Ruben2424 Jun 6, 2023
ce4619e
fix features ci
Ruben2424 Jun 6, 2023
4ac835a
fix ffi
Ruben2424 Jun 8, 2023
27eb1e2
Merge branch 'master' into issue-3017
Ruben2424 Jun 8, 2023
ed95666
move client example to single_threaded example
Ruben2424 Jun 8, 2023
beb3ee4
Merge branch 'issue-3017' of https://github.com/Ruben2424/hyper into …
Ruben2424 Jun 8, 2023
c4a51b3
remove from cargo toml
Ruben2424 Jun 8, 2023
7cbfc72
use pin_project_lite
Ruben2424 Jun 9, 2023
d3821a9
deny warnings
Ruben2424 Jun 9, 2023
2394949
error bounds
Ruben2424 Jun 9, 2023
8e0f907
fix test
Ruben2424 Jun 9, 2023
6d44c29
sealed ExecutorClient
Ruben2424 Jun 9, 2023
c7d59fe
better error message
Ruben2424 Jun 9, 2023
0269396
make it work also for io types
Ruben2424 Jun 9, 2023
3a78a57
improve example
Ruben2424 Jun 12, 2023
be5c654
fmt
Ruben2424 Jun 12, 2023
b39d5d4
Merge branch 'master' into issue-3017
Ruben2424 Jun 16, 2023
15be46f
fix merge fail
Ruben2424 Jun 16, 2023
8391604
fix error bounds
Ruben2424 Jun 16, 2023
a276efc
Merge branch 'master' into issue-3017
Ruben2424 Jun 19, 2023
3dd0579
Merge branch 'master' into issue-3017
Ruben2424 Jun 29, 2023
98b0599
Merge branch 'issue-3017' of https://github.com/Ruben2424/hyper into …
Ruben2424 Jun 29, 2023
25ff95c
Merge branch 'master' into issue-3017
Ruben2424 Jun 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 157 additions & 14 deletions examples/single_threaded.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
#![deny(warnings)]

use http_body_util::BodyExt;
use hyper::server::conn::http2;
use std::cell::Cell;
use std::net::SocketAddr;
use std::rc::Rc;
use tokio::io::{self, AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio::net::TcpListener;

use hyper::body::{Body as HttpBody, Bytes, Frame};
use hyper::service::service_fn;
use hyper::Request;
use hyper::{Error, Response};
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::thread;
use tokio::net::TcpStream;

struct Body {
// Our Body type is !Send and !Sync:
Expand Down Expand Up @@ -40,28 +45,57 @@ impl HttpBody for Body {
}
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
fn main() {
pretty_env_logger::init();

// Configure a runtime that runs everything on the current thread
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("build runtime");

// Combine it with a `LocalSet, which means it can spawn !Send futures...
let local = tokio::task::LocalSet::new();
local.block_on(&rt, run())
let server = thread::spawn(move || {
// Configure a runtime for the server that runs everything on the current thread
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("build runtime");

// Combine it with a `LocalSet, which means it can spawn !Send futures...
let local = tokio::task::LocalSet::new();
local.block_on(&rt, server()).unwrap();
});

let client = thread::spawn(move || {
// Configure a runtime for the client that runs everything on the current thread
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("build runtime");

// Combine it with a `LocalSet, which means it can spawn !Send futures...
let local = tokio::task::LocalSet::new();
local
.block_on(
&rt,
client("http://localhost:3000".parse::<hyper::Uri>().unwrap()),
)
.unwrap();
});

server.join().unwrap();
client.join().unwrap();
}

async fn run() -> Result<(), Box<dyn std::error::Error>> {
let addr: SocketAddr = ([127, 0, 0, 1], 3000).into();
async fn server() -> Result<(), Box<dyn std::error::Error>> {
let mut stdout = io::stdout();

let addr: SocketAddr = ([127, 0, 0, 1], 3000).into();
// Using a !Send request counter is fine on 1 thread...
let counter = Rc::new(Cell::new(0));

let listener = TcpListener::bind(addr).await?;
println!("Listening on http://{}", addr);

stdout
.write_all(format!("Listening on http://{}", addr).as_bytes())
.await
.unwrap();
stdout.flush().await.unwrap();

loop {
let (stream, _) = listener.accept().await?;

Expand All @@ -80,12 +114,121 @@ async fn run() -> Result<(), Box<dyn std::error::Error>> {
.serve_connection(stream, service)
.await
{
println!("Error serving connection: {:?}", err);
let mut stdout = io::stdout();
stdout
.write_all(format!("Error serving connection: {:?}", err).as_bytes())
.await
.unwrap();
stdout.flush().await.unwrap();
}
});
}
}

struct IOTypeNotSend {
_marker: PhantomData<*const ()>,
stream: TcpStream,
}

impl IOTypeNotSend {
fn new(stream: TcpStream) -> Self {
Self {
_marker: PhantomData,
stream,
}
}
}

impl AsyncWrite for IOTypeNotSend {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
Pin::new(&mut self.stream).poll_write(cx, buf)
}

fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
Pin::new(&mut self.stream).poll_flush(cx)
}

fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
Pin::new(&mut self.stream).poll_shutdown(cx)
}
}

impl AsyncRead for IOTypeNotSend {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.stream).poll_read(cx, buf)
}
}

async fn client(url: hyper::Uri) -> Result<(), Box<dyn std::error::Error>> {
let host = url.host().expect("uri has no host");
let port = url.port_u16().unwrap_or(80);
let addr = format!("{}:{}", host, port);
let stream = TcpStream::connect(addr).await?;

let stream = IOTypeNotSend::new(stream);

let (mut sender, conn) = hyper::client::conn::http2::handshake(LocalExec, stream).await?;

tokio::task::spawn_local(async move {
if let Err(err) = conn.await {
let mut stdout = io::stdout();
stdout
.write_all(format!("Connection failed: {:?}", err).as_bytes())
.await
.unwrap();
stdout.flush().await.unwrap();
}
});

let authority = url.authority().unwrap().clone();

// Make 4 requests
for _ in 0..4 {
let req = Request::builder()
.uri(url.clone())
.header(hyper::header::HOST, authority.as_str())
.body(Body::from("test".to_string()))?;

let mut res = sender.send_request(req).await?;

let mut stdout = io::stdout();
stdout
.write_all(format!("Response: {}\n", res.status()).as_bytes())
.await
.unwrap();
stdout
.write_all(format!("Headers: {:#?}\n", res.headers()).as_bytes())
.await
.unwrap();
stdout.flush().await.unwrap();

// Print the response body
while let Some(next) = res.frame().await {
let frame = next?;
if let Some(chunk) = frame.data_ref() {
stdout.write_all(&chunk).await.unwrap();
}
}
stdout.write_all(b"\n-----------------\n").await.unwrap();
stdout.flush().await.unwrap();
}
Ok(())
}

// NOTE: This part is only needed for HTTP/2. HTTP/1 doesn't need an executor.
//
// Since the Server needs to spawn some background tasks, we needed
Expand Down
Loading