Skip to content

Commit 07632a1

Browse files
committed
feat(neon): Add JoinHandle as a result of Channel::send
Part of neon-bindings/rfcs#32
1 parent 24e7d6f commit 07632a1

File tree

4 files changed

+111
-7
lines changed

4 files changed

+111
-7
lines changed

src/event/event_queue.rs

Lines changed: 58 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::sync::atomic::{AtomicUsize, Ordering};
2-
use std::sync::Arc;
2+
use std::sync::{mpsc, Arc};
33

44
use neon_runtime::raw::Env;
55
use neon_runtime::tsfn::ThreadsafeFunction;
@@ -104,9 +104,10 @@ impl Channel {
104104

105105
/// Schedules a closure to execute on the JavaScript thread that created this Channel
106106
/// Panics if there is a libuv error
107-
pub fn send<F>(&self, f: F)
107+
pub fn send<T, F>(&self, f: F) -> JoinHandle<T>
108108
where
109-
F: FnOnce(TaskContext) -> NeonResult<()> + Send + 'static,
109+
T: Send + 'static,
110+
F: FnOnce(TaskContext) -> NeonResult<T> + Send + 'static,
110111
{
111112
self.try_send(f).unwrap()
112113
}
@@ -115,21 +116,29 @@ impl Channel {
115116
/// Returns an `Error` if the task could not be scheduled.
116117
///
117118
/// See [`SendError`] for additional details on failure causes.
118-
pub fn try_send<F>(&self, f: F) -> Result<(), SendError>
119+
pub fn try_send<T, F>(&self, f: F) -> Result<JoinHandle<T>, SendError>
119120
where
120-
F: FnOnce(TaskContext) -> NeonResult<()> + Send + 'static,
121+
T: Send + 'static,
122+
F: FnOnce(TaskContext) -> NeonResult<T> + Send + 'static,
121123
{
124+
let (tx, rx) = mpsc::sync_channel(1);
122125
let callback = Box::new(move |env| {
123126
let env = unsafe { std::mem::transmute(env) };
124127

125128
// Note: It is sufficient to use `TaskContext`'s `InheritedHandleScope` because
126129
// N-API creates a `HandleScope` before calling the callback.
127130
TaskContext::with_context(env, move |cx| {
128-
let _ = f(cx);
131+
// Error can be ignored; it only means the user didn't join
132+
let _ = tx.send(f(cx));
129133
});
130134
});
131135

132-
self.state.tsfn.call(callback, None).map_err(|_| SendError)
136+
self.state
137+
.tsfn
138+
.call(callback, None)
139+
.map_err(|_| SendError)?;
140+
141+
Ok(JoinHandle { rx })
133142
}
134143

135144
/// Returns a boolean indicating if this `Channel` will prevent the Node event
@@ -202,6 +211,48 @@ impl Drop for Channel {
202211
}
203212
}
204213

214+
/// An owned permission to join on the result of a closure sent to the JavaScript main
215+
/// thread with [`Channel::send`].
216+
pub struct JoinHandle<T> {
217+
rx: mpsc::Receiver<NeonResult<T>>,
218+
}
219+
220+
impl<T> JoinHandle<T> {
221+
/// Waits for the associated closure to finish executing
222+
///
223+
/// If the closure panics or throws an exception, `Err` is returned
224+
pub fn join(self) -> Result<T, JoinError> {
225+
self.rx
226+
.recv()
227+
// If the sending side dropped without sending, it must have panicked
228+
.map_err(|_| JoinError(JoinErrorType::Panic))?
229+
// If the closure returned `Err`, a JavaScript exception was thrown
230+
.map_err(|_| JoinError(JoinErrorType::Throw))
231+
}
232+
}
233+
234+
#[derive(Debug)]
235+
/// Error returned by [`JoinHandle::join`] indicating the associated closure panicked
236+
/// or threw an exception.
237+
pub struct JoinError(JoinErrorType);
238+
239+
#[derive(Debug)]
240+
enum JoinErrorType {
241+
Panic,
242+
Throw,
243+
}
244+
245+
impl std::fmt::Display for JoinError {
246+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
247+
match &self.0 {
248+
JoinErrorType::Panic => f.write_str("Closure panicked before returning"),
249+
JoinErrorType::Throw => f.write_str("Closure threw an exception"),
250+
}
251+
}
252+
}
253+
254+
impl std::error::Error for JoinError {}
255+
205256
/// Error indicating that a closure was unable to be scheduled to execute on the event loop.
206257
///
207258
/// The most likely cause of a failure is that Node is shutting down. This may occur if the

test/napi/lib/threads.js

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,4 +71,17 @@ const assert = require('chai').assert;
7171
// Asynchronously GC to give the task queue a chance to execute
7272
setTimeout(() => global.gc(), 10);
7373
});
74+
75+
it('should be able to join on the result of a channel', function (cb) {
76+
let msg = "Uninitialized";
77+
78+
addon.channel_join(() => msg, (res) => {
79+
assert.strictEqual(res, "Received: Hello, World!");
80+
cb();
81+
});
82+
83+
setTimeout(() => {
84+
msg = "Hello, World!";
85+
}, 10);
86+
});
7487
});

test/napi/src/js/threads.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::cell::RefCell;
22
use std::sync::Arc;
33

44
use neon::prelude::*;
5+
use std::time::Duration;
56

67
pub fn useless_root(mut cx: FunctionContext) -> JsResult<JsObject> {
78
let object = cx.argument::<JsObject>(0)?;
@@ -185,3 +186,41 @@ pub fn drop_global_queue(mut cx: FunctionContext) -> JsResult<JsUndefined> {
185186

186187
Ok(cx.undefined())
187188
}
189+
190+
pub fn channel_join(mut cx: FunctionContext) -> JsResult<JsUndefined> {
191+
let get_message = cx.argument::<JsFunction>(0)?.root(&mut cx);
192+
let callback = cx.argument::<JsFunction>(1)?.root(&mut cx);
193+
let channel = cx.channel();
194+
195+
std::thread::spawn(move || {
196+
// Give a chance for the data to change
197+
std::thread::sleep(Duration::from_millis(20));
198+
199+
// Get the current message
200+
let message = channel
201+
.send(move |mut cx| {
202+
let this = cx.undefined();
203+
204+
get_message
205+
.into_inner(&mut cx)
206+
.call::<_, _, JsValue, _>(&mut cx, this, [])?
207+
.downcast_or_throw::<JsString, _>(&mut cx)
208+
.map(|v| v.value(&mut cx))
209+
})
210+
.join()
211+
.unwrap();
212+
213+
let response = format!("Received: {}", message);
214+
215+
channel.send(move |mut cx| {
216+
let this = cx.undefined();
217+
let args = [cx.string(response)];
218+
219+
callback.into_inner(&mut cx).call(&mut cx, this, args)?;
220+
221+
Ok(())
222+
});
223+
});
224+
225+
Ok(cx.undefined())
226+
}

test/napi/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,7 @@ fn main(mut cx: ModuleContext) -> NeonResult<()> {
255255
cx.export_function("greeter_greet", greeter_greet)?;
256256
cx.export_function("leak_channel", leak_channel)?;
257257
cx.export_function("drop_global_queue", drop_global_queue)?;
258+
cx.export_function("channel_join", channel_join)?;
258259

259260
Ok(())
260261
}

0 commit comments

Comments
 (0)