Skip to content
This repository was archived by the owner on Jan 22, 2026. It is now read-only.

Commit 0f5c529

Browse files
committed
Partially refactor input port opening
1 parent 33431fd commit 0f5c529

File tree

1 file changed

+81
-17
lines changed
  • lib/protoflow-zeromq/src

1 file changed

+81
-17
lines changed

lib/protoflow-zeromq/src/lib.rs

Lines changed: 81 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ pub use protoflow_core::prelude;
99
extern crate std;
1010

1111
use protoflow_core::{
12-
prelude::{Arc, BTreeMap, Bytes, String, ToString, Vec},
12+
prelude::{vec, Arc, BTreeMap, Bytes, String, ToString, Vec},
1313
InputPortID, OutputPortID, PortError, PortResult, PortState, Transport,
1414
};
1515

@@ -74,7 +74,7 @@ enum ZmqInputPortState {
7474
Open(
7575
// TODO: hide these
7676
Arc<SyncSender<ZmqTransportEvent>>,
77-
Arc<Mutex<Receiver<ZmqInputPortEvent>>>,
77+
Arc<Mutex<Receiver<ZmqTransportEvent>>>,
7878
),
7979
Connected(
8080
// channels for requests from public close
@@ -85,7 +85,7 @@ enum ZmqInputPortState {
8585
Arc<Mutex<Receiver<ZmqInputPortEvent>>>,
8686
// internal channels for events
8787
Arc<SyncSender<ZmqTransportEvent>>,
88-
Arc<Mutex<Receiver<ZmqInputPortEvent>>>,
88+
Arc<Mutex<Receiver<ZmqTransportEvent>>>,
8989
// vec of the connected port ids
9090
Vec<OutputPortID>,
9191
),
@@ -405,6 +405,83 @@ impl ZmqTransport {
405405

406406
Ok((to_worker_send, from_worker_recv))
407407
}
408+
409+
fn start_input_worker(&self, input_port_id: InputPortID) -> Result<(), PortError> {
410+
let topic = format!("{}:", input_port_id);
411+
412+
let (to_worker_send, to_worker_recv) = sync_channel(1);
413+
let to_worker_send = Arc::new(to_worker_send);
414+
let to_worker_recv = Arc::new(Mutex::new(to_worker_recv));
415+
416+
{
417+
let mut inputs = self.inputs.write();
418+
let state = ZmqInputPortState::Open(to_worker_send.clone(), to_worker_recv.clone());
419+
let state = RwLock::new(state);
420+
inputs.insert(input_port_id, state);
421+
}
422+
423+
let inputs = self.inputs.clone();
424+
tokio::task::spawn(async move {
425+
let input = &to_worker_recv;
426+
427+
let inputs = inputs;
428+
429+
loop {
430+
let event: ZmqTransportEvent = input.lock().recv().expect("input worker recv");
431+
use ZmqTransportEvent::*;
432+
match event {
433+
Connect(output_port_id, input_port_id) => {
434+
let inputs = inputs.read();
435+
let Some(input_state) = inputs.get(&input_port_id) else {
436+
todo!();
437+
};
438+
let input_state = input_state.write();
439+
440+
use ZmqInputPortState::*;
441+
match &*input_state {
442+
Open(_, _) => {
443+
let (req_send, req_recv) = sync_channel(1);
444+
let req_send = Arc::new(req_send);
445+
let req_recv = Arc::new(Mutex::new(req_recv));
446+
447+
let (msgs_send, msgs_recv) = sync_channel(1);
448+
449+
let msgs_send = Arc::new(msgs_send);
450+
let msgs_recv = Arc::new(Mutex::new(msgs_recv));
451+
452+
let mut input_state = input_state;
453+
454+
*input_state = ZmqInputPortState::Connected(
455+
req_send,
456+
req_recv,
457+
msgs_send,
458+
msgs_recv,
459+
to_worker_send.clone(),
460+
input.clone(),
461+
vec![output_port_id],
462+
);
463+
}
464+
Connected(_, _, _, _, _, _, _) => todo!(),
465+
Closed => todo!(),
466+
}
467+
}
468+
AckConnection(output_port_id, input_port_id) => todo!(),
469+
Message(output_port_id, input_port_id, _, bytes) => todo!(),
470+
AckMessage(output_port_id, input_port_id, _) => todo!(),
471+
CloseOutput(output_port_id, input_port_id) => todo!(),
472+
CloseInput(input_port_id) => todo!(),
473+
};
474+
}
475+
});
476+
477+
// send sub request
478+
self.tokio
479+
.block_on(
480+
self.sub_queue
481+
.send(ZmqSubscriptionRequest::Subscribe(topic)),
482+
)
483+
.map_err(|e| PortError::Other(e.to_string()))
484+
}
408485
}
409486

410487
impl Transport for ZmqTransport {
@@ -430,20 +507,7 @@ impl Transport for ZmqTransport {
430507
let new_id = InputPortID::try_from(-(inputs.len() as isize + 1))
431508
.map_err(|e| PortError::Other(e.to_string()))?;
432509

433-
let (_, receiver) = self
434-
.subscribe_for_input_port(new_id)
435-
.map_err(|e| PortError::Other(e.to_string()))?;
436-
437-
loop {
438-
let msg = receiver
439-
.lock()
440-
.recv()
441-
.map_err(|e| PortError::Other(e.to_string()))?;
442-
match msg {
443-
ZmqInputPortEvent::Opened => break Ok(new_id),
444-
_ => continue, // TODO
445-
}
446-
}
510+
self.start_input_worker(new_id).map(|_| new_id)
447511
}
448512

449513
fn open_output(&self) -> PortResult<OutputPortID> {

0 commit comments

Comments
 (0)