Skip to content

Commit dcf6556

Browse files
Format Rust code using rustfmt
1 parent 86496c7 commit dcf6556

File tree

6 files changed

+108
-69
lines changed

6 files changed

+108
-69
lines changed

core/src/tcp/client.rs

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,19 @@
2828

2929
//! A basic TCP client implementation designed for long-running connections.
3030
31-
use tokio::sync::{mpsc, watch, Semaphore};
31+
use crate::tcp::buffer::ChannelBuffer;
32+
use crate::tcp::util::{DataMsg, Network, ReadyEvent};
33+
use crate::tcp::{NetReceiver, BYTES_CHANNEL_SIZE};
34+
use bp3d_debug::{error, trace};
3235
use std::future::Future;
33-
use std::sync::Arc;
3436
use std::sync::atomic::AtomicBool;
3537
use std::sync::atomic::Ordering::Relaxed;
36-
use bp3d_debug::{error, trace};
38+
use std::sync::Arc;
3739
use tokio::io::AsyncWriteExt;
3840
use tokio::net::{TcpStream, ToSocketAddrs};
3941
use tokio::select;
4042
use tokio::sync::mpsc::error::{SendError, TrySendError};
41-
use crate::tcp::{NetReceiver, BYTES_CHANNEL_SIZE};
42-
use crate::tcp::buffer::ChannelBuffer;
43-
use crate::tcp::util::{DataMsg, Network, ReadyEvent};
43+
use tokio::sync::{mpsc, watch, Semaphore};
4444

4545
/// The reader trait which is supposed to handle the actual data reading loop.
4646
pub trait Reader {
@@ -86,7 +86,10 @@ pub trait Handler {
8686
/// * `net`: the network context created for this client.
8787
///
8888
/// returns: impl Future<Output=Result<Self::Reader, Error>>+Send+Sized
89-
fn connect(&mut self, net: &mut Network) -> impl Future<Output = std::io::Result<Self::Reader>> + Send;
89+
fn connect(
90+
&mut self,
91+
net: &mut Network,
92+
) -> impl Future<Output = std::io::Result<Self::Reader>> + Send;
9093

9194
/// Called when the client task is about to return.
9295
///
@@ -96,9 +99,7 @@ pub trait Handler {
9699
///
97100
/// returns: impl Future<Output=Result<(), Error>>+Send+Sized
98101
fn disconnect(&mut self, _: &mut Network) -> impl Future<Output = std::io::Result<()>> + Send {
99-
async move {
100-
Ok(())
101-
}
102+
async move { Ok(()) }
102103
}
103104
}
104105

@@ -124,15 +125,15 @@ async unsafe fn handle_data(msg: DataMsg, net: &mut Network) -> std::io::Result<
124125
/// The main builder structure used to create a new TCP client.
125126
pub struct Builder<F> {
126127
factory: F,
127-
event_queue_size: usize
128+
event_queue_size: usize,
128129
}
129130

130131
impl<F: Factory> Builder<F> {
131132
/// Creates a new TCP client from the given main event handler factory.
132133
pub fn new(factory: F) -> Builder<F> {
133134
Self {
134135
factory,
135-
event_queue_size: 4
136+
event_queue_size: 4,
136137
}
137138
}
138139

@@ -212,7 +213,7 @@ impl<F: Factory> Builder<F> {
212213
Ok(ClientApp {
213214
client,
214215
handle,
215-
reply_receiver
216+
reply_receiver,
216217
})
217218
}
218219
}
@@ -223,7 +224,7 @@ pub struct Client<H: Handler> {
223224
request_sender: mpsc::Sender<H::Request>,
224225
data: mpsc::Sender<DataMsg>,
225226
reply_sender: mpsc::Sender<H::Reply>,
226-
is_exiting: AtomicBool
227+
is_exiting: AtomicBool,
227228
}
228229

229230
impl<H: Handler> Client<H> {
@@ -293,12 +294,17 @@ impl<H: Handler> Client<H> {
293294
// SAFETY: It is safe to pass a pointer to msg as long as we wait for all clients to have
294295
// consumed the pointer before returning.
295296
let synchro = Semaphore::new(0);
296-
if (self.data.send(DataMsg {
297-
synchro: &synchro,
298-
buffer: msg.as_ptr(),
299-
buffer_size: msg.len(),
300-
net_id: 0
301-
}).await).is_err() {
297+
if (self
298+
.data
299+
.send(DataMsg {
300+
synchro: &synchro,
301+
buffer: msg.as_ptr(),
302+
buffer_size: msg.len(),
303+
net_id: 0,
304+
})
305+
.await)
306+
.is_err()
307+
{
302308
return Err(crate::tcp::util::SendError::Closed);
303309
}
304310
trace!("Waiting for the async task to acknowledge");

core/src/tcp/server/client.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,16 @@
2626
// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
2727
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
2828

29-
use std::future::Future;
29+
use crate::tcp::buffer::ChannelBuffer;
30+
use crate::tcp::util::{DataMsg, Network, ReadyEvent};
31+
use crate::tcp::{NetReceiver, BYTES_CHANNEL_SIZE};
3032
use bp3d_debug::{error, trace};
33+
use std::future::Future;
3134
use tokio::io::AsyncWriteExt;
3235
use tokio::select;
36+
use tokio::sync::broadcast;
3337
use tokio::sync::mpsc;
3438
use tokio::sync::watch;
35-
use tokio::sync::broadcast;
36-
use crate::tcp::buffer::ChannelBuffer;
37-
use crate::tcp::{NetReceiver, BYTES_CHANNEL_SIZE};
38-
use crate::tcp::util::{DataMsg, Network, ReadyEvent};
3939

4040
/// Represents a client event handler.
4141
pub trait Handler {
@@ -56,17 +56,15 @@ pub trait Handler {
5656
///
5757
/// returns: impl Future<Output=Result<(), Error>>+Send+Sized
5858
fn disconnect(&mut self, _: &mut Network) -> impl Future<Output = std::io::Result<()>> + Send {
59-
async move {
60-
Ok(())
61-
}
59+
async move { Ok(()) }
6260
}
6361
}
6462

6563
pub(crate) struct ClientTask<'a, H> {
6664
pub(crate) net: &'a mut Network,
6765
pub(crate) handler: H,
6866
pub(crate) exit: watch::Receiver<()>,
69-
pub(crate) broadcast: broadcast::Receiver<DataMsg>
67+
pub(crate) broadcast: broadcast::Receiver<DataMsg>,
7068
}
7169

7270
impl<H: Handler + Send + 'static> ClientTask<'_, H> {

core/src/tcp/server/server.rs

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,22 +26,22 @@
2626
// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
2727
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
2828

29+
use crate::tcp::server::client::ClientTask;
30+
use crate::tcp::util::{DataMsg, Network};
31+
use bp3d_debug::{debug, trace};
2932
use std::future::Future;
3033
use std::net::Ipv4Addr;
31-
use std::sync::Arc;
32-
use std::sync::atomic::{AtomicBool, AtomicUsize};
3334
use std::sync::atomic::Ordering::Relaxed;
34-
use bp3d_debug::{debug, trace};
35+
use std::sync::atomic::{AtomicBool, AtomicUsize};
36+
use std::sync::Arc;
3537
use tokio::io::AsyncWriteExt;
3638
use tokio::net::{TcpListener, ToSocketAddrs};
3739
use tokio::select;
38-
use tokio::sync::{watch, Semaphore};
3940
use tokio::sync::broadcast;
4041
use tokio::sync::mpsc;
4142
use tokio::sync::mpsc::error::{SendError, TrySendError};
43+
use tokio::sync::{watch, Semaphore};
4244
use tokio::task::JoinSet;
43-
use crate::tcp::server::client::ClientTask;
44-
use crate::tcp::util::{DataMsg, Network};
4545

4646
/// A factory trait which can be used to create the instance of the main server event handler.
4747
pub trait Factory {
@@ -83,7 +83,10 @@ pub trait Handler {
8383
/// * `net`: the network context created for this client.
8484
///
8585
/// returns: impl Future<Output=Result<Self::ClientHandler, Error>>+Send+Sized
86-
fn connect(&mut self, net: &mut Network) -> impl Future<Output = std::io::Result<Self::ClientHandler>> + Send;
86+
fn connect(
87+
&mut self,
88+
net: &mut Network,
89+
) -> impl Future<Output = std::io::Result<Self::ClientHandler>> + Send;
8790

8891
/// Called when a client has disconnected from the server.
8992
///
@@ -96,7 +99,11 @@ pub trait Handler {
9699
/// * `handler`: the client event handler which was associated with the client.
97100
///
98101
/// returns: impl Future<Output=()>+Send+Sized
99-
fn disconnect(&mut self, _: &mut Network, _: Self::ClientHandler) -> impl Future<Output = ()> + Send {
102+
fn disconnect(
103+
&mut self,
104+
_: &mut Network,
105+
_: Self::ClientHandler,
106+
) -> impl Future<Output = ()> + Send {
100107
async move {}
101108
}
102109
}
@@ -106,7 +113,7 @@ struct ServerTask<H: Handler> {
106113
listener: TcpListener,
107114
exit_receiver: watch::Receiver<()>,
108115
request_receiver: mpsc::Receiver<H::Request>,
109-
server: Arc<Server<H>>
116+
server: Arc<Server<H>>,
110117
}
111118

112119
impl<H: Handler + Send + 'static> ServerTask<H> {
@@ -271,14 +278,14 @@ impl<F: Factory> Builder<F> {
271278
listener,
272279
exit_receiver,
273280
request_receiver,
274-
server
281+
server,
275282
};
276283
task.run().await
277284
});
278285
Ok(ServerApp {
279286
handle,
280287
server: motherfuckingrust,
281-
reply_receiver
288+
reply_receiver,
282289
})
283290
}
284291
}
@@ -291,7 +298,7 @@ pub struct Server<H: Handler> {
291298
max_clients: usize,
292299
request_sender: mpsc::Sender<H::Request>,
293300
reply_sender: mpsc::Sender<H::Reply>,
294-
is_exiting: AtomicBool
301+
is_exiting: AtomicBool,
295302
}
296303

297304
impl<H: Handler> Server<H> {
@@ -376,10 +383,10 @@ impl<H: Handler> Server<H> {
376383
synchro: &synchro,
377384
buffer: msg.as_ptr(),
378385
buffer_size: msg.len(),
379-
net_id
386+
net_id,
380387
}) {
381388
Err(_) => return Err(crate::tcp::util::SendError::Closed),
382-
Ok(v) => debug!("Broadcasting to {} client(s)", v)
389+
Ok(v) => debug!("Broadcasting to {} client(s)", v),
383390
}
384391
let clients = self.cur_clients.load(Relaxed);
385392
trace!("Waiting for {} client(s) to acknowledge", clients);
@@ -406,10 +413,10 @@ impl<H: Handler> Server<H> {
406413
synchro: &synchro,
407414
buffer: msg.as_ptr(),
408415
buffer_size: msg.len(),
409-
net_id: 0
416+
net_id: 0,
410417
}) {
411418
Err(_) => return Err(crate::tcp::util::SendError::Closed),
412-
Ok(v) => debug!("Broadcasting to {} client(s)", v)
419+
Ok(v) => debug!("Broadcasting to {} client(s)", v),
413420
}
414421
let clients = self.cur_clients.load(Relaxed);
415422
trace!("Waiting for {} client(s) to acknowledge", clients);

core/src/tcp/util.rs

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,27 +28,27 @@
2828

2929
//! Utility module for TCP client or server.
3030
31+
use bp3d_debug::warning;
3132
use std::fmt::{Debug, Display, Formatter};
3233
use std::io::{Error, ErrorKind, IoSlice};
3334
use std::net::SocketAddr;
3435
use std::pin::Pin;
3536
use std::task::{Context, Poll};
36-
use bp3d_debug::warning;
3737
use tokio::io::{AsyncRead, AsyncWrite, Interest, ReadBuf};
3838
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
3939
use tokio::net::TcpStream;
4040
use tokio::sync::mpsc;
4141

42-
use tokio::sync::Semaphore;
4342
use crate::tcp::buffer::Bytes;
43+
use tokio::sync::Semaphore;
4444

4545
#[derive(Clone, Debug)]
4646
pub(super) struct DataMsg {
4747
pub(super) synchro: *const Semaphore,
4848
pub(super) buffer: *const u8,
4949
pub(super) buffer_size: usize,
5050
#[allow(dead_code)]
51-
pub(super) net_id: usize
51+
pub(super) net_id: usize,
5252
}
5353

5454
unsafe impl Send for DataMsg {}
@@ -60,14 +60,14 @@ pub enum SendError {
6060
IsExiting,
6161

6262
/// The broadcast channel is already closed.
63-
Closed
63+
Closed,
6464
}
6565

6666
impl Display for SendError {
6767
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
6868
match self {
6969
SendError::IsExiting => f.write_str("exit requested"),
70-
SendError::Closed => f.write_str("channel closed")
70+
SendError::Closed => f.write_str("channel closed"),
7171
}
7272
}
7373
}
@@ -83,7 +83,7 @@ pub enum ReadyEvent {
8383
None,
8484

8585
/// Data was submitted to the given ChannelBuffer channel.
86-
Submitted
86+
Submitted,
8787
}
8888

8989
/// Buffered reader/writer for a TCP stream.
@@ -93,7 +93,7 @@ pub struct Network {
9393
reader: OwnedReadHalf,
9494
writer: OwnedWriteHalf,
9595
addr: SocketAddr,
96-
id: usize
96+
id: usize,
9797
}
9898

9999
impl Debug for Network {
@@ -141,8 +141,14 @@ impl Network {
141141
/// # Errors
142142
///
143143
/// Returns an IO error if the operation failed.
144-
pub async fn ready_read<const N: usize>(&self, bytes_sender: &mpsc::Sender<Bytes<N>>) -> std::io::Result<ReadyEvent> {
145-
let ev = self.reader.ready(Interest::ERROR | Interest::READABLE).await?;
144+
pub async fn ready_read<const N: usize>(
145+
&self,
146+
bytes_sender: &mpsc::Sender<Bytes<N>>,
147+
) -> std::io::Result<ReadyEvent> {
148+
let ev = self
149+
.reader
150+
.ready(Interest::ERROR | Interest::READABLE)
151+
.await?;
146152
if ev.is_write_closed() || ev.is_read_closed() || ev.is_error() {
147153
return Ok(ReadyEvent::ConnectionLoss);
148154
}
@@ -170,14 +176,25 @@ impl Network {
170176
}
171177

172178
impl AsyncRead for Network {
173-
fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<std::io::Result<()>> {
179+
fn poll_read(
180+
self: Pin<&mut Self>,
181+
cx: &mut Context<'_>,
182+
buf: &mut ReadBuf<'_>,
183+
) -> Poll<std::io::Result<()>> {
174184
unsafe { self.map_unchecked_mut(|v| &mut v.reader).poll_read(cx, buf) }
175185
}
176186
}
177187

178188
impl AsyncWrite for Network {
179-
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize, Error>> {
180-
unsafe { self.map_unchecked_mut(|v| &mut v.writer).poll_write(cx, buf) }
189+
fn poll_write(
190+
self: Pin<&mut Self>,
191+
cx: &mut Context<'_>,
192+
buf: &[u8],
193+
) -> Poll<Result<usize, Error>> {
194+
unsafe {
195+
self.map_unchecked_mut(|v| &mut v.writer)
196+
.poll_write(cx, buf)
197+
}
181198
}
182199

183200
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
@@ -188,8 +205,15 @@ impl AsyncWrite for Network {
188205
unsafe { self.map_unchecked_mut(|v| &mut v.writer).poll_shutdown(cx) }
189206
}
190207

191-
fn poll_write_vectored(self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>]) -> Poll<Result<usize, Error>> {
192-
unsafe { self.map_unchecked_mut(|v| &mut v.writer).poll_write_vectored(cx, bufs) }
208+
fn poll_write_vectored(
209+
self: Pin<&mut Self>,
210+
cx: &mut Context<'_>,
211+
bufs: &[IoSlice<'_>],
212+
) -> Poll<Result<usize, Error>> {
213+
unsafe {
214+
self.map_unchecked_mut(|v| &mut v.writer)
215+
.poll_write_vectored(cx, bufs)
216+
}
193217
}
194218

195219
fn is_write_vectored(&self) -> bool {

0 commit comments

Comments
 (0)