Skip to content
Merged

Io #5

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
eb11274
Removing redundant libuv bindings
Jun 12, 2013
39a575f
Added libuv UDP function bindings.
Jun 12, 2013
5393e43
Corrected libuv UDP bindings.
Jun 13, 2013
74e7255
Added a utility function to extract the udp handle from udp send requ…
Jun 14, 2013
03fe59a
added bindings to extract udp handle from udp send requests
Jun 14, 2013
a7f92c9
Added a UdpWatcher and UdpSendRequest with associated callbacks
Jun 14, 2013
9687437
added wrappers about uv_ip{4,6}_{port,name}
Jun 17, 2013
b51d188
Added a RtioUdpStream trait
Jun 17, 2013
7e022c5
added a function to convert C's ipv4 data structure into the Rust ipv…
Jun 17, 2013
4744375
added Eq and TotalEq instances for IpAddr
Jun 17, 2013
e42f28c
stated to implement UdpStream
Jun 17, 2013
33ae193
Started to implemented UdpStream
Jun 17, 2013
35f3fa6
Merge remote-tracking branch 'upstream/io' into io
Jun 17, 2013
d777ba0
Wrote the Eq instance of IpAddr in a slightly different way.
Jun 19, 2013
083c692
Changed visibility from being on the impl to being on methods per lan…
Jun 19, 2013
ac49b74
socket based UDP io
Jun 20, 2013
36c0e04
derived instances of Eq and TotalEq for IpAddr rather than implement …
Jun 20, 2013
55dda46
Merge remote-tracking branch 'upstream/io' into io
Jun 20, 2013
794923c
UDP networking with tests
Jun 25, 2013
4870dce
Merge remote-tracking branch 'upstream/io' into io
Jun 25, 2013
1af2016
removed unncessary unsafe block that was stopping compliation.
Jun 25, 2013
f202713
satisfy the formatting check
Jun 25, 2013
2c5cfe1
removed obsolete FIXMEs. formatting changes.
Jun 25, 2013
d0c812f
IPv6 struct
Jun 25, 2013
c5b19f0
changed outdated match on IpAddr
Jun 25, 2013
f604686
converted UvUdpSocket into a newtype struct
Jun 26, 2013
34b1135
Converted UdpSocket into a newtype struct and (dis)connecting uses mo…
Jun 26, 2013
d0dc697
removed unecessary method
Jun 26, 2013
87ecfb7
converted TCP interface to newtype structs
Jun 26, 2013
ce97bd4
cleaned up uv/net
Jun 26, 2013
42f3f06
changed NOTE to TODO
Jun 26, 2013
e6c5779
IPv6 support for UDP and TCP.
Jul 2, 2013
6a1a781
Merge remote-tracking branch 'upstream/io' into io
Jul 2, 2013
b60cf0c
converted TODOs into XXXs
Jul 3, 2013
cf23292
Merge remote-tracking branch 'upstream/io' into io
Jul 8, 2013
6b2abca
renamed finalize to drop in Drop impl for UvUdpSocket
Jul 8, 2013
5e0be46
changed .each() to .iter().advance()
Jul 8, 2013
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/libstd/rt/io/net/ip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.

type Port = u16;

#[deriving(Eq, TotalEq)]
pub enum IpAddr {
Ipv4(u8, u8, u8, u8, u16),
Ipv6
Ipv4(u8, u8, u8, u8, Port),
Ipv6(u16, u16, u16, u16, u16, u16, u16, u16, Port)
}
240 changes: 209 additions & 31 deletions src/libstd/rt/io/net/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,11 @@ use rt::rtio::{IoFactory, IoFactoryObject,
RtioTcpStream, RtioTcpStreamObject};
use rt::local::Local;

pub struct TcpStream {
rtstream: ~RtioTcpStreamObject
}
pub struct TcpStream(~RtioTcpStreamObject);

impl TcpStream {
fn new(s: ~RtioTcpStreamObject) -> TcpStream {
TcpStream {
rtstream: s
}
TcpStream(s)
}

pub fn connect(addr: IpAddr) -> Option<TcpStream> {
Expand All @@ -38,22 +34,19 @@ impl TcpStream {
};

match stream {
Ok(s) => {
Some(TcpStream::new(s))
}
Ok(s) => Some(TcpStream::new(s)),
Err(ioerr) => {
rtdebug!("failed to connect: %?", ioerr);
io_error::cond.raise(ioerr);
return None;
None
}
}
}
}

impl Reader for TcpStream {
fn read(&mut self, buf: &mut [u8]) -> Option<uint> {
let bytes_read = self.rtstream.read(buf);
match bytes_read {
match (**self).read(buf) {
Ok(read) => Some(read),
Err(ioerr) => {
// EOF is indicated by returning None
Expand All @@ -70,8 +63,7 @@ impl Reader for TcpStream {

impl Writer for TcpStream {
fn write(&mut self, buf: &[u8]) {
let res = self.rtstream.write(buf);
match res {
match (**self).write(buf) {
Ok(_) => (),
Err(ioerr) => {
io_error::cond.raise(ioerr);
Expand All @@ -82,9 +74,7 @@ impl Writer for TcpStream {
fn flush(&mut self) { fail!() }
}

pub struct TcpListener {
rtlistener: ~RtioTcpListenerObject,
}
pub struct TcpListener(~RtioTcpListenerObject);

impl TcpListener {
pub fn bind(addr: IpAddr) -> Option<TcpListener> {
Expand All @@ -93,11 +83,7 @@ impl TcpListener {
(*io).tcp_bind(addr)
};
match listener {
Ok(l) => {
Some(TcpListener {
rtlistener: l
})
}
Ok(l) => Some(TcpListener(l)),
Err(ioerr) => {
io_error::cond.raise(ioerr);
return None;
Expand All @@ -108,8 +94,7 @@ impl TcpListener {

impl Listener<TcpStream> for TcpListener {
fn accept(&mut self) -> Option<TcpStream> {
let rtstream = self.rtlistener.accept();
match rtstream {
match (**self).accept() {
Ok(s) => {
Some(TcpStream::new(s))
}
Expand Down Expand Up @@ -163,7 +148,7 @@ mod test {
}

#[test]
fn smoke_test() {
fn smoke_test_ip4() {
do run_in_newsched_task {
let addr = next_test_ip4();

Expand All @@ -183,7 +168,27 @@ mod test {
}

#[test]
fn read_eof() {
fn smoke_test_ip6() {
do run_in_newsched_task {
let addr = next_test_ip6();

do spawntask_immediately {
let mut listener = TcpListener::bind(addr);
let mut stream = listener.accept();
let mut buf = [0];
stream.read(buf);
assert!(buf[0] == 99);
}

do spawntask_immediately {
let mut stream = TcpStream::connect(addr);
stream.write([99]);
}
}
}

#[test]
fn read_eof_ip4() {
do run_in_newsched_task {
let addr = next_test_ip4();

Expand All @@ -203,7 +208,27 @@ mod test {
}

#[test]
fn read_eof_twice() {
fn read_eof_ip6() {
do run_in_newsched_task {
let addr = next_test_ip6();

do spawntask_immediately {
let mut listener = TcpListener::bind(addr);
let mut stream = listener.accept();
let mut buf = [0];
let nread = stream.read(buf);
assert!(nread.is_none());
}

do spawntask_immediately {
let _stream = TcpStream::connect(addr);
// Close
}
}
}

#[test]
fn read_eof_twice_ip4() {
do run_in_newsched_task {
let addr = next_test_ip4();

Expand All @@ -225,7 +250,29 @@ mod test {
}

#[test]
fn write_close() {
fn read_eof_twice_ip6() {
do run_in_newsched_task {
let addr = next_test_ip6();

do spawntask_immediately {
let mut listener = TcpListener::bind(addr);
let mut stream = listener.accept();
let mut buf = [0];
let nread = stream.read(buf);
assert!(nread.is_none());
let nread = stream.read(buf);
assert!(nread.is_none());
}

do spawntask_immediately {
let _stream = TcpStream::connect(addr);
// Close
}
}
}

#[test]
fn write_close_ip4() {
do run_in_newsched_task {
let addr = next_test_ip4();

Expand Down Expand Up @@ -254,7 +301,36 @@ mod test {
}

#[test]
fn multiple_connect_serial() {
fn write_close_ip6() {
do run_in_newsched_task {
let addr = next_test_ip6();

do spawntask_immediately {
let mut listener = TcpListener::bind(addr);
let mut stream = listener.accept();
let buf = [0];
loop {
let mut stop = false;
do io_error::cond.trap(|e| {
// NB: ECONNRESET on linux, EPIPE on mac
assert!(e.kind == ConnectionReset || e.kind == BrokenPipe);
stop = true;
}).in {
stream.write(buf);
}
if stop { break }
}
}

do spawntask_immediately {
let _stream = TcpStream::connect(addr);
// Close
}
}
}

#[test]
fn multiple_connect_serial_ip4() {
do run_in_newsched_task {
let addr = next_test_ip4();
let max = 10;
Expand All @@ -279,7 +355,32 @@ mod test {
}

#[test]
fn multiple_connect_interleaved_greedy_schedule() {
fn multiple_connect_serial_ip6() {
do run_in_newsched_task {
let addr = next_test_ip6();
let max = 10;

do spawntask_immediately {
let mut listener = TcpListener::bind(addr);
for max.times {
let mut stream = listener.accept();
let mut buf = [0];
stream.read(buf);
assert_eq!(buf[0], 99);
}
}

do spawntask_immediately {
for max.times {
let mut stream = TcpStream::connect(addr);
stream.write([99]);
}
}
}
}

#[test]
fn multiple_connect_interleaved_greedy_schedule_ip4() {
do run_in_newsched_task {
let addr = next_test_ip4();
static MAX: int = 10;
Expand Down Expand Up @@ -318,7 +419,46 @@ mod test {
}

#[test]
fn multiple_connect_interleaved_lazy_schedule() {
fn multiple_connect_interleaved_greedy_schedule_ip6() {
do run_in_newsched_task {
let addr = next_test_ip6();
static MAX: int = 10;

do spawntask_immediately {
let mut listener = TcpListener::bind(addr);
for int::range(0, MAX) |i| {
let stream = Cell::new(listener.accept());
rtdebug!("accepted");
// Start another task to handle the connection
do spawntask_immediately {
let mut stream = stream.take();
let mut buf = [0];
stream.read(buf);
assert!(buf[0] == i as u8);
rtdebug!("read");
}
}
}

connect(0, addr);

fn connect(i: int, addr: IpAddr) {
if i == MAX { return }

do spawntask_immediately {
rtdebug!("connecting");
let mut stream = TcpStream::connect(addr);
// Connect again before writing
connect(i + 1, addr);
rtdebug!("writing");
stream.write([i as u8]);
}
}
}
}

#[test]
fn multiple_connect_interleaved_lazy_schedule_ip4() {
do run_in_newsched_task {
let addr = next_test_ip4();
static MAX: int = 10;
Expand Down Expand Up @@ -355,5 +495,43 @@ mod test {
}
}
}
#[test]
fn multiple_connect_interleaved_lazy_schedule_ip6() {
do run_in_newsched_task {
let addr = next_test_ip6();
static MAX: int = 10;

do spawntask_immediately {
let mut listener = TcpListener::bind(addr);
for int::range(0, MAX) |_| {
let stream = Cell::new(listener.accept());
rtdebug!("accepted");
// Start another task to handle the connection
do spawntask_later {
let mut stream = stream.take();
let mut buf = [0];
stream.read(buf);
assert!(buf[0] == 99);
rtdebug!("read");
}
}
}

connect(0, addr);

fn connect(i: int, addr: IpAddr) {
if i == MAX { return }

do spawntask_later {
rtdebug!("connecting");
let mut stream = TcpStream::connect(addr);
// Connect again before writing
connect(i + 1, addr);
rtdebug!("writing");
stream.write([99]);
}
}
}
}

}
Loading