diff --git a/src/driver/socket.rs b/src/driver/socket.rs index cbfdff60..9f9e3c10 100644 --- a/src/driver/socket.rs +++ b/src/driver/socket.rs @@ -92,7 +92,7 @@ impl Socket { Self::bind_internal(addr, libc::AF_UNIX.into(), socket_type.into()) } - pub(crate) fn from_std(socket: std::net::UdpSocket) -> Socket { + pub(crate) fn from_std(socket: T) -> Socket { let fd = SharedFd::new(socket.into_raw_fd()); Self::from_shared_fd(fd) } diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 6511c279..a33c9569 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -50,6 +50,21 @@ impl TcpStream { Ok(tcp_stream) } + /// Creates new `TcpStream` from a previously bound `std::net::TcpStream`. + /// + /// This function is intended to be used to wrap a TCP stream from the + /// standard library in the tokio-uring equivalent. The conversion assumes nothing + /// about the underlying socket; it is left up to the user to decide what socket + /// options are appropriate for their use case. + /// + /// This can be used in conjunction with socket2's `Socket` interface to + /// configure a socket before it's handed off, such as setting options like + /// `reuse_address` or binding to multiple addresses. + pub fn from_std(socket: std::net::TcpStream) -> Self { + let inner = Socket::from_std(socket); + Self { inner } + } + pub(crate) fn from_socket(inner: Socket) -> Self { Self { inner } } @@ -154,7 +169,6 @@ impl TcpStream { /// This function will cause all pending and future I/O on the specified portions to return /// immediately with an appropriate value. pub fn shutdown(&self, how: std::net::Shutdown) -> io::Result<()> { - // TODO same method for unix stream for consistency. self.inner.shutdown(how) } } diff --git a/src/net/udp.rs b/src/net/udp.rs index 8a8bc680..49a4eb62 100644 --- a/src/net/udp.rs +++ b/src/net/udp.rs @@ -1,9 +1,13 @@ use crate::{ buf::{IoBuf, IoBufMut}, - driver::Socket, + driver::{SharedFd, Socket}, }; use socket2::SockAddr; -use std::{io, net::SocketAddr}; +use std::{ + io, + net::SocketAddr, + os::unix::prelude::{AsRawFd, FromRawFd, RawFd}, +}; /// A UDP socket. /// @@ -95,7 +99,7 @@ impl UdpSocket { /// Creates new `UdpSocket` from a previously bound `std::net::UdpSocket`. /// /// This function is intended to be used to wrap a UDP socket from the - /// standard library in the Tokio equivalent. The conversion assumes nothing + /// standard library in the tokio-uring equivalent. The conversion assumes nothing /// about the underlying socket; it is left up to the user to decide what socket /// options are appropriate for their use case. /// @@ -141,11 +145,13 @@ impl UdpSocket { /// }) /// } /// ``` - pub fn from_std(socket: std::net::UdpSocket) -> UdpSocket { - let inner_socket = Socket::from_std(socket); - Self { - inner: inner_socket, - } + pub fn from_std(socket: std::net::UdpSocket) -> Self { + let inner = Socket::from_std(socket); + Self { inner } + } + + pub(crate) fn from_socket(inner: Socket) -> Self { + Self { inner } } /// Connects this UDP socket to a remote address, allowing the `write` and @@ -186,4 +192,24 @@ impl UdpSocket { pub async fn write(&self, buf: T) -> crate::BufResult { self.inner.write(buf).await } + + /// Shuts down the read, write, or both halves of this connection. + /// + /// This function will cause all pending and future I/O on the specified portions to return + /// immediately with an appropriate value. + pub fn shutdown(&self, how: std::net::Shutdown) -> io::Result<()> { + self.inner.shutdown(how) + } +} + +impl FromRawFd for UdpSocket { + unsafe fn from_raw_fd(fd: RawFd) -> Self { + UdpSocket::from_socket(Socket::from_shared_fd(SharedFd::new(fd))) + } +} + +impl AsRawFd for UdpSocket { + fn as_raw_fd(&self) -> RawFd { + self.inner.as_raw_fd() + } } diff --git a/src/net/unix/stream.rs b/src/net/unix/stream.rs index f312af93..da2fdfc9 100644 --- a/src/net/unix/stream.rs +++ b/src/net/unix/stream.rs @@ -1,9 +1,13 @@ use crate::{ buf::{IoBuf, IoBufMut}, - driver::Socket, + driver::{SharedFd, Socket}, }; use socket2::SockAddr; -use std::{io, path::Path}; +use std::{ + io, + os::unix::prelude::{AsRawFd, FromRawFd, RawFd}, + path::Path, +}; /// A Unix stream between two local sockets on a Unix OS. /// @@ -48,6 +52,25 @@ impl UnixStream { Ok(unix_stream) } + /// Creates new `UnixStream` from a previously bound `std::os::unix::net::UnixStream`. + /// + /// This function is intended to be used to wrap a TCP stream from the + /// standard library in the tokio-uring equivalent. The conversion assumes nothing + /// about the underlying socket; it is left up to the user to decide what socket + /// options are appropriate for their use case. + /// + /// This can be used in conjunction with socket2's `Socket` interface to + /// configure a socket before it's handed off, such as setting options like + /// `reuse_address` or binding to multiple addresses. + pub fn from_std(socket: std::os::unix::net::UnixStream) -> UnixStream { + let inner = Socket::from_std(socket); + Self { inner } + } + + pub(crate) fn from_socket(inner: Socket) -> Self { + Self { inner } + } + /// Read some data from the stream into the buffer, returning the original buffer and /// quantity of data read. pub async fn read(&self, buf: T) -> crate::BufResult { @@ -59,4 +82,71 @@ impl UnixStream { pub async fn write(&self, buf: T) -> crate::BufResult { self.inner.write(buf).await } + + /// Attempts to write an entire buffer to the stream. + /// + /// This method will continuously call [`write`] until there is no more data to be + /// written or an error is returned. This method will not return until the entire + /// buffer has been successfully written or an error has occurred. + /// + /// If the buffer contains no data, this will never call [`write`]. + /// + /// # Errors + /// + /// This function will return the first error that [`write`] returns. + /// + /// [`write`]: Self::write + pub async fn write_all(&self, mut buf: T) -> crate::BufResult<(), T> { + // This function is copied from the TcpStream impl. + let mut n = 0; + while n < buf.bytes_init() { + let res = self.write(buf.slice(n..)).await; + match res { + (Ok(0), slice) => { + return ( + Err(std::io::Error::new( + std::io::ErrorKind::WriteZero, + "failed to write whole buffer", + )), + slice.into_inner(), + ) + } + (Ok(m), slice) => { + n += m; + buf = slice.into_inner(); + } + + // This match on an EINTR error is not performed because this + // crate's design ensures we are not calling the 'wait' option + // in the ENTER syscall. Only an Enter with 'wait' can generate + // an EINTR according to the io_uring man pages. + // (Err(ref e), slice) if e.kind() == std::io::ErrorKind::Interrupted => { + // buf = slice.into_inner(); + // }, + (Err(e), slice) => return (Err(e), slice.into_inner()), + } + } + + (Ok(()), buf) + } + + /// Shuts down the read, write, or both halves of this connection. + /// + /// This function will cause all pending and future I/O on the specified portions to return + /// immediately with an appropriate value. + pub fn shutdown(&self, how: std::net::Shutdown) -> io::Result<()> { + self.inner.shutdown(how) + } +} + +impl FromRawFd for UnixStream { + unsafe fn from_raw_fd(fd: RawFd) -> Self { + UnixStream::from_socket(Socket::from_shared_fd(SharedFd::new(fd))) + } +} + +impl AsRawFd for UnixStream { + fn as_raw_fd(&self) -> RawFd { + self.inner.as_raw_fd() + } }