Skip to content

Commit 0844ded

Browse files
kamyuentseseanmonstar
authored andcommitted
feat(server): allow creating Server with shared Handle
1. impl Future for Server [WIP] 2. add method bind_handle to Http 3. add an example to use shared Handle in multiple server
1 parent 7b2a205 commit 0844ded

File tree

2 files changed

+261
-7
lines changed

2 files changed

+261
-7
lines changed

examples/multi_server.rs

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
#![deny(warnings)]
2+
extern crate hyper;
3+
extern crate futures;
4+
extern crate tokio_core;
5+
extern crate pretty_env_logger;
6+
7+
use futures::future::FutureResult;
8+
9+
use hyper::{Get, StatusCode};
10+
use tokio_core::reactor::Core;
11+
use hyper::header::ContentLength;
12+
use hyper::server::{Http, Service, Request, Response};
13+
14+
static INDEX1: &'static [u8] = b"The 1st service!";
15+
static INDEX2: &'static [u8] = b"The 2nd service!";
16+
17+
struct Service1;
18+
struct Service2;
19+
20+
impl Service for Service1 {
21+
type Request = Request;
22+
type Response = Response;
23+
type Error = hyper::Error;
24+
type Future = FutureResult<Response, hyper::Error>;
25+
26+
fn call(&self, req: Request) -> Self::Future {
27+
futures::future::ok(match (req.method(), req.path()) {
28+
(&Get, "/") => {
29+
Response::new()
30+
.with_header(ContentLength(INDEX1.len() as u64))
31+
.with_body(INDEX1)
32+
},
33+
_ => {
34+
Response::new()
35+
.with_status(StatusCode::NotFound)
36+
}
37+
})
38+
}
39+
40+
}
41+
42+
impl Service for Service2 {
43+
type Request = Request;
44+
type Response = Response;
45+
type Error = hyper::Error;
46+
type Future = FutureResult<Response, hyper::Error>;
47+
48+
fn call(&self, req: Request) -> Self::Future {
49+
futures::future::ok(match (req.method(), req.path()) {
50+
(&Get, "/") => {
51+
Response::new()
52+
.with_header(ContentLength(INDEX2.len() as u64))
53+
.with_body(INDEX2)
54+
},
55+
_ => {
56+
Response::new()
57+
.with_status(StatusCode::NotFound)
58+
}
59+
})
60+
}
61+
62+
}
63+
64+
65+
fn main() {
66+
pretty_env_logger::init().unwrap();
67+
let addr1 = "127.0.0.1:1337".parse().unwrap();
68+
let addr2 = "127.0.0.1:1338".parse().unwrap();
69+
70+
let mut core = Core::new().unwrap();
71+
let handle = core.handle();
72+
73+
let srv1 = Http::new().bind_handle(&addr1,|| Ok(Service1), &handle).unwrap();
74+
let srv2 = Http::new().bind_handle(&addr2,|| Ok(Service2), &handle).unwrap();
75+
76+
println!("Listening on http://{}", srv1.local_addr().unwrap());
77+
println!("Listening on http://{}", srv2.local_addr().unwrap());
78+
79+
handle.spawn(srv1.shutdown_signal(futures::future::empty::<(), ()>()));
80+
handle.spawn(srv2.shutdown_signal(futures::future::empty::<(), ()>()));
81+
core.run(futures::future::empty::<(), ()>()).unwrap();
82+
}

src/server/mod.rs

Lines changed: 179 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,9 @@ use std::net::SocketAddr;
1616
use std::rc::{Rc, Weak};
1717
use std::time::Duration;
1818

19-
use futures::future;
2019
use futures::task::{self, Task};
20+
use futures::future::{self, Select, Map};
2121
use futures::{Future, Stream, Poll, Async, Sink, StartSend, AsyncSink};
22-
use futures::future::Map;
2322

2423
#[cfg(feature = "compat")]
2524
use http;
@@ -41,6 +40,26 @@ use proto::Body;
4140
pub use proto::response::Response;
4241
pub use proto::request::Request;
4342

43+
// The `Server` can be created use its own `Core`, or an shared `Handle`.
44+
enum Reactor {
45+
// Own its `Core`
46+
Core(Core),
47+
// Share `Handle` with others
48+
Handle(Handle),
49+
}
50+
51+
impl Reactor {
52+
/// Returns a handle to the underlying event loop that this server will be
53+
/// running on.
54+
#[inline]
55+
pub fn handle(&self) -> Handle {
56+
match *self {
57+
Reactor::Core(ref core) => core.handle(),
58+
Reactor::Handle(ref handle) => handle.clone(),
59+
}
60+
}
61+
}
62+
4463
/// An instance of the HTTP protocol, and implementation of tokio-proto's
4564
/// `ServerProto` trait.
4665
///
@@ -63,12 +82,23 @@ where B: Stream<Error=::Error>,
6382
{
6483
protocol: Http<B::Item>,
6584
new_service: S,
66-
core: Core,
85+
reactor: Reactor,
6786
listener: TcpListener,
6887
shutdown_timeout: Duration,
6988
no_proto: bool,
7089
}
7190

91+
/// The Future of an Server.
92+
pub struct ServerFuture<F, S, B>
93+
where B: Stream<Error=::Error>,
94+
B::Item: AsRef<[u8]>,
95+
{
96+
server: Server<S, B>,
97+
info: Rc<RefCell<Info>>,
98+
shutdown_signal: F,
99+
shutdown: Option<Select<WaitUntilZero, Timeout>>,
100+
}
101+
72102
impl<B: AsRef<[u8]> + 'static> Http<B> {
73103
/// Creates a new instance of the HTTP protocol, ready to spawn a server or
74104
/// start accepting connections.
@@ -118,7 +148,30 @@ impl<B: AsRef<[u8]> + 'static> Http<B> {
118148

119149
Ok(Server {
120150
new_service: new_service,
121-
core: core,
151+
reactor: Reactor::Core(core),
152+
listener: listener,
153+
protocol: self.clone(),
154+
shutdown_timeout: Duration::new(1, 0),
155+
})
156+
}
157+
158+
/// This method allows the ability to share a `Core` with multiple servers.
159+
///
160+
/// Bind the provided `addr` and return a server with a shared `Core`.
161+
///
162+
/// This is method will bind the `addr` provided with a new TCP listener ready
163+
/// to accept connections. Each connection will be processed with the
164+
/// `new_service` object provided as well, creating a new service per
165+
/// connection.
166+
pub fn bind_handle<S, Bd>(&self, addr: &SocketAddr, new_service: S, handle: &Handle) -> ::Result<Server<S, Bd>>
167+
where S: NewService<Request = Request, Response = Response<Bd>, Error = ::Error> + 'static,
168+
Bd: Stream<Item=B, Error=::Error>,
169+
{
170+
let listener = TcpListener::bind(addr, &handle)?;
171+
172+
Ok(Server {
173+
new_service: new_service,
174+
reactor: Reactor::Handle(handle.clone()),
122175
listener: listener,
123176
protocol: self.clone(),
124177
shutdown_timeout: Duration::new(1, 0),
@@ -544,7 +597,7 @@ impl<S, B> Server<S, B>
544597
/// Returns a handle to the underlying event loop that this server will be
545598
/// running on.
546599
pub fn handle(&self) -> Handle {
547-
self.core.handle()
600+
self.reactor.handle()
548601
}
549602

550603
/// Configure the amount of time this server will wait for a "graceful
@@ -566,6 +619,21 @@ impl<S, B> Server<S, B>
566619
self
567620
}
568621

622+
/// Configure the `shutdown_signal`.
623+
pub fn shutdown_signal<F>(self, signal: F) -> ServerFuture<F, S, B>
624+
where F: Future<Item = (), Error = ()>
625+
{
626+
ServerFuture {
627+
server: self,
628+
info: Rc::new(RefCell::new(Info {
629+
active: 0,
630+
blocker: None,
631+
})),
632+
shutdown_signal: signal,
633+
shutdown: None,
634+
}
635+
}
636+
569637
/// Execute this server infinitely.
570638
///
571639
/// This method does not currently return, but it will return an error if
@@ -590,7 +658,13 @@ impl<S, B> Server<S, B>
590658
pub fn run_until<F>(self, shutdown_signal: F) -> ::Result<()>
591659
where F: Future<Item = (), Error = ()>,
592660
{
593-
let Server { protocol, new_service, mut core, listener, shutdown_timeout, no_proto } = self;
661+
let Server { protocol, new_service, reactor, listener, shutdown_timeout, no_proto } = self;
662+
663+
let mut core = match reactor {
664+
Reactor::Core(core) => core,
665+
_ => panic!("Server does not own its core, use `Handle::spawn()` to run the service!"),
666+
};
667+
594668
let handle = core.handle();
595669

596670
// Mini future to track the number of active services
@@ -649,19 +723,117 @@ impl<S, B> Server<S, B>
649723
}
650724
}
651725

726+
impl<S, B> Future for Server<S, B>
727+
where S: NewService<Request = Request, Response = Response<B>, Error = ::Error> + 'static,
728+
B: Stream<Error=::Error> + 'static,
729+
B::Item: AsRef<[u8]>,
730+
{
731+
type Item = ();
732+
type Error = ();
733+
734+
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
735+
if let Reactor::Core(_) = self.reactor {
736+
panic!("Server owns its core, use `Server::run()` to run the service!")
737+
}
738+
739+
loop {
740+
match self.listener.accept() {
741+
Ok((socket, addr)) => {
742+
// TODO: use the NotifyService
743+
match self.new_service.new_service() {
744+
Ok(srv) => self.protocol.bind_connection(&self.handle(),
745+
socket,
746+
addr,
747+
srv),
748+
Err(e) => debug!("internal error: {:?}", e),
749+
}
750+
}
751+
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(Async::NotReady),
752+
Err(e) => debug!("internal error: {:?}", e),
753+
}
754+
}
755+
}
756+
}
757+
758+
impl<F, S, B> Future for ServerFuture<F, S, B>
759+
where F: Future<Item = (), Error = ()>,
760+
S: NewService<Request = Request, Response = Response<B>, Error = ::Error> + 'static,
761+
B: Stream<Error=::Error> + 'static,
762+
B::Item: AsRef<[u8]>,
763+
{
764+
type Item = ();
765+
type Error = ();
766+
767+
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
768+
loop {
769+
if let Some(ref mut shutdown) = self.shutdown {
770+
match shutdown.poll() {
771+
Ok(Async::Ready(_)) => return Ok(Async::Ready(())),
772+
Ok(Async::NotReady) => return Ok(Async::NotReady),
773+
Err((e, _)) => debug!("internal error: {:?}", e),
774+
}
775+
} else if let Ok(Async::Ready(())) = self.shutdown_signal.poll() {
776+
match Timeout::new(self.server.shutdown_timeout, &self.server.handle()) {
777+
Ok(timeout) => {
778+
let wait = WaitUntilZero { info: self.info.clone() };
779+
self.shutdown = Some(wait.select(timeout))
780+
},
781+
Err(e) => debug!("internal error: {:?}", e),
782+
}
783+
} else {
784+
match self.server.listener.accept() {
785+
Ok((socket, addr)) => {
786+
match self.server.new_service.new_service() {
787+
Ok(inner_srv) => {
788+
let srv = NotifyService {
789+
inner: inner_srv,
790+
info: Rc::downgrade(&self.info),
791+
};
792+
self.info.borrow_mut().active += 1;
793+
self.server.protocol.bind_connection(&self.server.handle(),
794+
socket,
795+
addr,
796+
srv)
797+
},
798+
Err(e) => debug!("internal error: {:?}", e),
799+
}
800+
},
801+
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(Async::NotReady),
802+
Err(e) => debug!("internal error: {:?}", e),
803+
}
804+
}
805+
}
806+
}
807+
}
808+
809+
652810
impl<S: fmt::Debug, B: Stream<Error=::Error>> fmt::Debug for Server<S, B>
653811
where B::Item: AsRef<[u8]>
654812
{
655813
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
656814
f.debug_struct("Server")
657-
.field("core", &"...")
815+
.field("reactor", &"...")
658816
.field("listener", &self.listener)
659817
.field("new_service", &self.new_service)
660818
.field("protocol", &self.protocol)
661819
.finish()
662820
}
663821
}
664822

823+
impl <F, S: fmt::Debug, B: Stream<Error=::Error>> fmt::Debug for ServerFuture<F, S, B>
824+
where B::Item: AsRef<[u8]>,
825+
F: Future<Item = (), Error = ()>
826+
{
827+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
828+
f.debug_struct("ServerFuture")
829+
.field("server", &self.server)
830+
.field("info", &"...")
831+
.field("shutdown_signal", &"...")
832+
.field("shutdown", &"...")
833+
.finish()
834+
}
835+
}
836+
665837
struct NotifyService<S> {
666838
inner: S,
667839
info: Weak<RefCell<Info>>,

0 commit comments

Comments
 (0)