Skip to content

Commit 7d068b0

Browse files
committed
Add user-data to if task.
1 parent 93c5fb4 commit 7d068b0

File tree

2 files changed

+21
-20
lines changed

2 files changed

+21
-20
lines changed

transports/tcp/src/if_task.rs

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,17 @@ pub enum IfTaskEvent {
1313
}
1414

1515
#[derive(Debug)]
16-
enum Command {
17-
RegisterListener(SocketAddr, mpsc::UnboundedSender<IfTaskEvent>),
18-
PortReuseSocket(SocketAddr, oneshot::Sender<Option<SocketAddr>>),
16+
enum Command<T> {
17+
RegisterListener(SocketAddr, T, mpsc::UnboundedSender<IfTaskEvent>),
18+
PortReuseSocket(SocketAddr, oneshot::Sender<Option<T>>),
1919
}
2020

2121
#[derive(Clone, Debug)]
22-
pub struct IfTaskHandle {
23-
tx: mpsc::UnboundedSender<Command>,
22+
pub struct IfTaskHandle<T> {
23+
tx: mpsc::UnboundedSender<Command<T>>,
2424
}
2525

26-
impl IfTaskHandle {
26+
impl<T: Clone + Send + 'static> IfTaskHandle<T> {
2727
pub async fn new() -> io::Result<Self> {
2828
// channel is unbounded so that `register_listener` doesn't need to be async.
2929
let (tx, rx) = mpsc::unbounded();
@@ -40,14 +40,15 @@ impl IfTaskHandle {
4040
pub fn register_listener(
4141
&self,
4242
socket_addr: &SocketAddr,
43+
user_data: T,
4344
) -> mpsc::UnboundedReceiver<IfTaskEvent> {
4445
let (tx, rx) = mpsc::unbounded();
45-
let cmd = Command::RegisterListener(*socket_addr, tx);
46+
let cmd = Command::RegisterListener(*socket_addr, user_data, tx);
4647
self.tx.unbounded_send(cmd).expect("task paniced");
4748
rx
4849
}
4950

50-
pub async fn port_reuse_socket(&self, socket_addr: &SocketAddr) -> Option<SocketAddr> {
51+
pub async fn port_reuse_socket(&self, socket_addr: &SocketAddr) -> Option<T> {
5152
let (tx, rx) = oneshot::channel();
5253
let cmd = Command::PortReuseSocket(*socket_addr, tx);
5354
self.tx.unbounded_send(cmd).expect("task paniced");
@@ -56,17 +57,17 @@ impl IfTaskHandle {
5657
}
5758

5859
#[derive(Debug)]
59-
struct Task {
60+
struct Task<T> {
6061
/// Command receiver.
61-
rx: mpsc::UnboundedReceiver<Command>,
62+
rx: mpsc::UnboundedReceiver<Command<T>>,
6263
/// Interface watcher.
6364
watcher: IfWatcher,
6465
/// Listening addresses.
65-
listening_addrs: Vec<(SocketAddr, mpsc::UnboundedSender<IfTaskEvent>)>,
66+
listening_addrs: Vec<(SocketAddr, T, mpsc::UnboundedSender<IfTaskEvent>)>,
6667
}
6768

68-
impl Task {
69-
async fn new(rx: mpsc::UnboundedReceiver<Command>) -> io::Result<Self> {
69+
impl<T: Clone + Send + 'static> Task<T> {
70+
async fn new(rx: mpsc::UnboundedReceiver<Command<T>>) -> io::Result<Self> {
7071
Ok(Self {
7172
rx,
7273
watcher: IfWatcher::new().await?,
@@ -86,7 +87,7 @@ impl Task {
8687
continue;
8788
}
8889
};
89-
self.listening_addrs.retain(|(addr, tx)| {
90+
self.listening_addrs.retain(|(addr, _, tx)| {
9091
match event {
9192
IfEvent::Up(inet) => {
9293
if let Some(addr) = iface_match(&inet, &addr) {
@@ -107,20 +108,20 @@ impl Task {
107108
}
108109
cmd = rx.next() => {
109110
match cmd {
110-
Some(Command::RegisterListener(socket_addr, tx)) => {
111+
Some(Command::RegisterListener(socket_addr, user_data, tx)) => {
111112
for iface in self.watcher.iter() {
112113
if let Some(addr) = iface_match(iface, &socket_addr) {
113114
tx.unbounded_send(IfTaskEvent::NewAddress(addr)).ok();
114115
}
115116
}
116-
self.listening_addrs.push((socket_addr, tx));
117+
self.listening_addrs.push((socket_addr, user_data, tx));
117118
}
118119
Some(Command::PortReuseSocket(socket_addr, tx)) => {
119120
let mut reuse_socket = None;
120-
for (addr, _) in &self.listening_addrs {
121+
for (addr, user_data, _) in &self.listening_addrs {
121122
if addr.ip().is_ipv4() == socket_addr.ip().is_ipv4()
122123
&& addr.ip().is_loopback() == socket_addr.ip().is_loopback() {
123-
reuse_socket = Some(*addr);
124+
reuse_socket = Some(user_data.clone());
124125
break;
125126
}
126127
}

transports/tcp/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ pub struct TcpConfig {
6969
/// Port reuse.
7070
port_reuse: bool,
7171
/// The task handle.
72-
handle: IfTaskHandle,
72+
handle: IfTaskHandle<SocketAddr>,
7373
}
7474

7575
impl TcpConfig {
@@ -140,7 +140,7 @@ impl TcpConfig {
140140
let listener = Async::new(socket.into_tcp_listener())?;
141141
let socket_addr = listener.get_ref().local_addr()?;
142142

143-
let rx = self.handle.register_listener(&socket_addr);
143+
let rx = self.handle.register_listener(&socket_addr, socket_addr);
144144
let listen_stream = TcpListenStream {
145145
config: self,
146146
listener,

0 commit comments

Comments
 (0)