Skip to content

Commit 65a91e6

Browse files
committed
Merge branch 'develop' of github.com:BlockProject3D/tools.net into develop
2 parents e9cd156 + cae9656 commit 65a91e6

File tree

10 files changed

+86
-55
lines changed

10 files changed

+86
-55
lines changed

core/src/tcp/client.rs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,10 @@
2929
//! A basic TCP client implementation designed for long-running connections.
3030
3131
use crate::tcp::util::buffer::{ChannelBuffer, NetReceiver};
32+
use crate::tcp::util::net::{Network, ReadyEvent};
3233
use crate::tcp::util::DataMsg;
3334
use crate::tcp::BYTES_CHANNEL_SIZE;
35+
use crate::util::barrier;
3436
use bp3d_debug::{error, trace};
3537
use std::future::Future;
3638
use std::sync::Arc;
@@ -39,8 +41,6 @@ use tokio::net::{TcpStream, ToSocketAddrs};
3941
use tokio::select;
4042
use tokio::sync::mpsc::error::{SendError, TrySendError};
4143
use tokio::sync::{mpsc, watch};
42-
use crate::tcp::util::net::{Network, ReadyEvent};
43-
use crate::util::barrier;
4444

4545
/// The reader trait which is supposed to handle the actual data reading loop.
4646
pub trait Reader {
@@ -113,7 +113,10 @@ pub trait Factory {
113113
}
114114

115115
/// SAFETY: DataMsg must point to valid memory (normally ensured by Client structure).
116-
async unsafe fn handle_data(msg: barrier::mpsc::Lock<DataMsg>, net: &mut Network) -> std::io::Result<()> {
116+
async unsafe fn handle_data(
117+
msg: barrier::mpsc::Lock<DataMsg>,
118+
net: &mut Network,
119+
) -> std::io::Result<()> {
117120
trace!({?net} {?msg}, "Received data event");
118121
let slice = std::slice::from_raw_parts(msg.buffer, msg.buffer_size);
119122
net.write_all(slice).await?;
@@ -173,7 +176,7 @@ impl<F: Factory> Builder<F> {
173176
exit: exit_sender,
174177
request_sender,
175178
reply_sender,
176-
data: data_sender
179+
data: data_sender,
177180
});
178181
let mut handler = self.factory.start(&client);
179182
let handle = tokio::spawn(async move {
@@ -221,7 +224,7 @@ pub struct Client<H: Handler> {
221224
exit: watch::Sender<()>,
222225
request_sender: mpsc::Sender<H::Request>,
223226
data: barrier::mpsc::Sender<DataMsg>,
224-
reply_sender: mpsc::Sender<H::Reply>
227+
reply_sender: mpsc::Sender<H::Reply>,
225228
}
226229

227230
impl<H: Handler> Client<H> {
@@ -286,11 +289,13 @@ impl<H: Handler> Client<H> {
286289
/// returns: true if the operation has succeeded, false otherwise.
287290
pub async fn send(&self, msg: &[u8]) -> Result<(), barrier::Error> {
288291
// SAFETY: It is safe to pass a pointer to msg thanks to the barrier synchronization.
289-
self.data.send(DataMsg {
290-
buffer: msg.as_ptr(),
291-
buffer_size: msg.len(),
292-
net_id: 0,
293-
}).await
292+
self.data
293+
.send(DataMsg {
294+
buffer: msg.as_ptr(),
295+
buffer_size: msg.len(),
296+
net_id: 0,
297+
})
298+
.await
294299
}
295300
}
296301

core/src/tcp/server/client.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,16 @@
2727
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
2828

2929
use crate::tcp::util::buffer::{ChannelBuffer, NetReceiver};
30+
use crate::tcp::util::net::{Network, ReadyEvent};
3031
use crate::tcp::util::DataMsg;
3132
use crate::tcp::BYTES_CHANNEL_SIZE;
33+
use crate::util::barrier;
3234
use bp3d_debug::{error, trace};
3335
use std::future::Future;
3436
use tokio::io::AsyncWriteExt;
3537
use tokio::select;
3638
use tokio::sync::mpsc;
3739
use tokio::sync::watch;
38-
use crate::tcp::util::net::{Network, ReadyEvent};
39-
use crate::util::barrier;
4040

4141
/// Represents a client event handler.
4242
pub trait Handler {
@@ -108,7 +108,10 @@ impl<H: Handler + Send + 'static> ClientTask<'_, H> {
108108
}
109109

110110
/// SAFETY: DataMsg must point to valid memory (normally ensured by Server structure).
111-
async unsafe fn handle_broadcast(msg: barrier::broadcast::Lock<DataMsg>, net: &mut Network) -> std::io::Result<()> {
111+
async unsafe fn handle_broadcast(
112+
msg: barrier::broadcast::Lock<DataMsg>,
113+
net: &mut Network,
114+
) -> std::io::Result<()> {
112115
trace!({?net} {?msg}, "Received broadcast event");
113116
if msg.net_id == 0 || msg.net_id == net.id() {
114117
let slice = std::slice::from_raw_parts(msg.buffer, msg.buffer_size);

core/src/tcp/server/server.rs

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,14 @@
2727
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
2828

2929
use crate::tcp::server::client::ClientTask;
30+
use crate::tcp::util::net::Network;
3031
use crate::tcp::util::DataMsg;
32+
use crate::util::barrier;
3133
use bp3d_debug::{debug, trace};
3234
use std::future::Future;
3335
use std::net::Ipv4Addr;
34-
use std::sync::atomic::Ordering::Relaxed;
3536
use std::sync::atomic::AtomicUsize;
37+
use std::sync::atomic::Ordering::Relaxed;
3638
use std::sync::Arc;
3739
use tokio::io::AsyncWriteExt;
3840
use tokio::net::{TcpListener, ToSocketAddrs};
@@ -41,8 +43,6 @@ use tokio::sync::mpsc;
4143
use tokio::sync::mpsc::error::{SendError, TrySendError};
4244
use tokio::sync::watch;
4345
use tokio::task::JoinSet;
44-
use crate::tcp::util::net::Network;
45-
use crate::util::barrier;
4646

4747
/// A factory trait which can be used to create the instance of the main server event handler.
4848
pub trait Factory {
@@ -268,7 +268,7 @@ impl<F: Factory> Builder<F> {
268268
exit: exit_sender,
269269
max_clients: self.max_clients,
270270
cur_clients: AtomicUsize::new(0),
271-
reply_sender
271+
reply_sender,
272272
});
273273
let handler = self.factory.start(&server);
274274
let motherfuckingrust = server.clone();
@@ -297,7 +297,7 @@ pub struct Server<H: Handler> {
297297
cur_clients: AtomicUsize,
298298
max_clients: usize,
299299
request_sender: mpsc::Sender<H::Request>,
300-
reply_sender: mpsc::Sender<H::Reply>
300+
reply_sender: mpsc::Sender<H::Reply>,
301301
}
302302

303303
impl<H: Handler> Server<H> {
@@ -373,11 +373,13 @@ impl<H: Handler> Server<H> {
373373
/// returns: true if the operation has succeeded, false otherwise.
374374
pub async fn send(&self, net_id: usize, msg: &[u8]) -> Result<(), barrier::Error> {
375375
// SAFETY: It is safe to pass a pointer to msg thanks to the barrier synchronization.
376-
self.broadcast.send(DataMsg {
377-
buffer: msg.as_ptr(),
378-
buffer_size: msg.len(),
379-
net_id
380-
}).await?;
376+
self.broadcast
377+
.send(DataMsg {
378+
buffer: msg.as_ptr(),
379+
buffer_size: msg.len(),
380+
net_id,
381+
})
382+
.await?;
381383
trace!("All clients have acknowledged");
382384
Ok(())
383385
}
@@ -391,11 +393,13 @@ impl<H: Handler> Server<H> {
391393
/// returns: true if the operation has succeeded, false otherwise.
392394
pub async fn broadcast(&self, msg: &[u8]) -> Result<(), barrier::Error> {
393395
// SAFETY: It is safe to pass a pointer to msg thanks to the barrier synchronization.
394-
self.broadcast.send(DataMsg {
395-
buffer: msg.as_ptr(),
396-
buffer_size: msg.len(),
397-
net_id: 0
398-
}).await?;
396+
self.broadcast
397+
.send(DataMsg {
398+
buffer: msg.as_ptr(),
399+
buffer_size: msg.len(),
400+
net_id: 0,
401+
})
402+
.await?;
399403
trace!("All clients have acknowledged");
400404
Ok(())
401405
}

core/src/tcp/util/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@
2828

2929
//! Utility module for TCP client or server.
3030
31-
pub mod net;
3231
pub mod buffer;
32+
pub mod net;
3333

3434
use std::fmt::Debug;
3535

core/src/tcp/util/net.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,17 @@
2828

2929
//! TCP network stream async reader/writer tools.
3030
31+
use crate::tcp::util::buffer::Bytes;
32+
use bp3d_debug::warning;
3133
use std::fmt::{Debug, Formatter};
3234
use std::io::{Error, ErrorKind, IoSlice};
3335
use std::net::SocketAddr;
3436
use std::pin::Pin;
3537
use std::task::{Context, Poll};
36-
use bp3d_debug::warning;
3738
use tokio::io::{AsyncRead, AsyncWrite, Interest, ReadBuf};
3839
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
3940
use tokio::net::TcpStream;
4041
use tokio::sync::mpsc;
41-
use crate::tcp::util::buffer::Bytes;
4242

4343
/// The event returned by the ready function in [Network].
4444
pub enum ReadyEvent {

core/src/util/barrier/broadcast.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,28 +28,28 @@
2828

2929
//! A multi-client barrier synchronization primitive.
3030
31+
use crate::util::barrier::{Error, Msg};
32+
use bp3d_debug::trace;
3133
use std::fmt::Debug;
3234
use std::ops::{Deref, DerefMut};
3335
use std::sync::atomic::AtomicBool;
3436
use std::sync::atomic::Ordering::Relaxed;
35-
use bp3d_debug::trace;
3637
use tokio::sync::{broadcast, Semaphore};
37-
use crate::util::barrier::{Error, Msg};
3838

3939
/// Represents a multi-client barrier lock.
4040
pub struct Lock<T> {
41-
msg: Msg<T>
41+
msg: Msg<T>,
4242
}
4343

4444
/// Represents a multi-client sender.
4545
pub struct Sender<T> {
4646
inner: broadcast::Sender<Msg<T>>,
47-
closed: AtomicBool
47+
closed: AtomicBool,
4848
}
4949

5050
/// Represents a multi-client receiver.
5151
pub struct Receiver<T> {
52-
inner: broadcast::Receiver<Msg<T>>
52+
inner: broadcast::Receiver<Msg<T>>,
5353
}
5454

5555
impl<T> Deref for Lock<T> {
@@ -98,7 +98,7 @@ impl<T> Sender<T> {
9898
let semaphore = Semaphore::new(0);
9999
let msg = Msg {
100100
synchro: &semaphore,
101-
inner: msg
101+
inner: msg,
102102
};
103103
let count = self.inner.send(msg).map_err(|_| Error::BrokenPipe)?;
104104
trace!("Waiting for {} client(s) to acknowledge", count);
@@ -108,7 +108,9 @@ impl<T> Sender<T> {
108108

109109
/// Creates and subscribes a new receiver to this sender.
110110
pub fn subscribe(&self) -> Receiver<T> {
111-
Receiver { inner: self.inner.subscribe() }
111+
Receiver {
112+
inner: self.inner.subscribe(),
113+
}
112114
}
113115
}
114116

@@ -119,7 +121,11 @@ impl<T: Clone> Receiver<T> {
119121
///
120122
/// This returns an [Error] if the channel was prematurely closed.
121123
pub async fn recv(&mut self) -> Result<Lock<T>, Error> {
122-
self.inner.recv().await.map_err(|_| Error::BrokenPipe).map(|msg| Lock { msg })
124+
self.inner
125+
.recv()
126+
.await
127+
.map_err(|_| Error::BrokenPipe)
128+
.map(|msg| Lock { msg })
123129
}
124130
}
125131

@@ -132,5 +138,8 @@ impl<T: Clone> Receiver<T> {
132138
/// returns: Sender<T>
133139
pub fn barrier<T: Clone>(len: usize) -> Sender<T> {
134140
let (sender, _) = broadcast::channel(len);
135-
Sender { inner: sender, closed: AtomicBool::new(false) }
141+
Sender {
142+
inner: sender,
143+
closed: AtomicBool::new(false),
144+
}
136145
}

core/src/util/barrier/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,14 @@ pub enum Error {
4141
BrokenPipe,
4242

4343
/// The channel was requested to be closed.
44-
Closed
44+
Closed,
4545
}
4646

4747
impl Display for Error {
4848
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
4949
match self {
5050
Error::BrokenPipe => f.write_str("broken pipe"),
51-
Error::Closed => f.write_str("closed")
51+
Error::Closed => f.write_str("closed"),
5252
}
5353
}
5454
}
@@ -58,7 +58,7 @@ impl std::error::Error for Error {}
5858
#[derive(Clone)]
5959
struct Msg<T> {
6060
synchro: *const Semaphore,
61-
inner: T
61+
inner: T,
6262
}
6363

6464
unsafe impl<T: Send> Send for Msg<T> {}

core/src/util/barrier/mpsc.rs

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,28 +28,28 @@
2828

2929
//! A single-client barrier synchronization primitive.
3030
31+
use crate::util::barrier::{Error, Msg};
32+
use bp3d_debug::trace;
3133
use std::fmt::Debug;
3234
use std::ops::{Deref, DerefMut};
3335
use std::sync::atomic::AtomicBool;
3436
use std::sync::atomic::Ordering::Relaxed;
35-
use bp3d_debug::trace;
3637
use tokio::sync::{mpsc, Semaphore};
37-
use crate::util::barrier::{Error, Msg};
3838

3939
/// Represents a single-client barrier lock.
4040
pub struct Lock<T> {
41-
msg: Msg<T>
41+
msg: Msg<T>,
4242
}
4343

4444
/// Represents a single-client sender.
4545
pub struct Sender<T> {
4646
inner: mpsc::Sender<Msg<T>>,
47-
closed: AtomicBool
47+
closed: AtomicBool,
4848
}
4949

5050
/// Represents a single-client receiver.
5151
pub struct Receiver<T> {
52-
inner: mpsc::Receiver<Msg<T>>
52+
inner: mpsc::Receiver<Msg<T>>,
5353
}
5454

5555
impl<T> Deref for Lock<T> {
@@ -98,7 +98,7 @@ impl<T> Sender<T> {
9898
let semaphore = Semaphore::new(0);
9999
let msg = Msg {
100100
synchro: &semaphore,
101-
inner: msg
101+
inner: msg,
102102
};
103103
self.inner.send(msg).await.map_err(|_| Error::BrokenPipe)?;
104104
trace!("Waiting for client to acknowledge");
@@ -114,7 +114,11 @@ impl<T: Clone> Receiver<T> {
114114
///
115115
/// This returns an [Error] if the channel was prematurely closed.
116116
pub async fn recv(&mut self) -> Result<Lock<T>, Error> {
117-
self.inner.recv().await.ok_or(Error::BrokenPipe).map(|msg| Lock { msg })
117+
self.inner
118+
.recv()
119+
.await
120+
.ok_or(Error::BrokenPipe)
121+
.map(|msg| Lock { msg })
118122
}
119123
}
120124

@@ -127,5 +131,11 @@ impl<T: Clone> Receiver<T> {
127131
/// returns: (Sender<T>, Receiver<T>)
128132
pub fn barrier<T: Clone>(len: usize) -> (Sender<T>, Receiver<T>) {
129133
let (sender, receiver) = mpsc::channel(len);
130-
(Sender { inner: sender, closed: AtomicBool::new(false) }, Receiver { inner: receiver })
134+
(
135+
Sender {
136+
inner: sender,
137+
closed: AtomicBool::new(false),
138+
},
139+
Receiver { inner: receiver },
140+
)
131141
}

testprog/src/client.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,10 @@
2727
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
2828

2929
use bp3d_net::tcp::client::{Client, Factory, Handler, Reader};
30-
use std::sync::Arc;
31-
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
3230
use bp3d_net::tcp::util::buffer::NetReceiver;
3331
use bp3d_net::tcp::util::net::Network;
32+
use std::sync::Arc;
33+
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
3434

3535
#[derive(Clone)]
3636
pub struct EchoClient {

0 commit comments

Comments
 (0)