Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 58 additions & 7 deletions src/event/event_queue.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::{mpsc, Arc};

use neon_runtime::raw::Env;
use neon_runtime::tsfn::ThreadsafeFunction;
Expand Down Expand Up @@ -104,9 +104,10 @@ impl Channel {

/// Schedules a closure to execute on the JavaScript thread that created this Channel
/// Panics if there is a libuv error
pub fn send<F>(&self, f: F)
pub fn send<T, F>(&self, f: F) -> JoinHandle<T>
where
F: FnOnce(TaskContext) -> NeonResult<()> + Send + 'static,
T: Send + 'static,
F: FnOnce(TaskContext) -> NeonResult<T> + Send + 'static,
{
self.try_send(f).unwrap()
}
Expand All @@ -115,21 +116,29 @@ impl Channel {
/// Returns an `Error` if the task could not be scheduled.
///
/// See [`SendError`] for additional details on failure causes.
pub fn try_send<F>(&self, f: F) -> Result<(), SendError>
pub fn try_send<T, F>(&self, f: F) -> Result<JoinHandle<T>, SendError>
where
F: FnOnce(TaskContext) -> NeonResult<()> + Send + 'static,
T: Send + 'static,
F: FnOnce(TaskContext) -> NeonResult<T> + Send + 'static,
{
let (tx, rx) = mpsc::sync_channel(1);
let callback = Box::new(move |env| {
let env = unsafe { std::mem::transmute(env) };

// Note: It is sufficient to use `TaskContext`'s `InheritedHandleScope` because
// N-API creates a `HandleScope` before calling the callback.
TaskContext::with_context(env, move |cx| {
let _ = f(cx);
// Error can be ignored; it only means the user didn't join
let _ = tx.send(f(cx));
});
});

self.state.tsfn.call(callback, None).map_err(|_| SendError)
self.state
.tsfn
.call(callback, None)
.map_err(|_| SendError)?;

Ok(JoinHandle { rx })
}

/// Returns a boolean indicating if this `Channel` will prevent the Node event
Expand Down Expand Up @@ -202,6 +211,48 @@ impl Drop for Channel {
}
}

/// An owned permission to join on the result of a closure sent to the JavaScript main
/// thread with [`Channel::send`].
pub struct JoinHandle<T> {
rx: mpsc::Receiver<NeonResult<T>>,
}

impl<T> JoinHandle<T> {
/// Waits for the associated closure to finish executing
///
/// If the closure panics or throws an exception, `Err` is returned
pub fn join(self) -> Result<T, JoinError> {
self.rx
.recv()
// If the sending side dropped without sending, it must have panicked
.map_err(|_| JoinError(JoinErrorType::Panic))?
// If the closure returned `Err`, a JavaScript exception was thrown
.map_err(|_| JoinError(JoinErrorType::Throw))
}
}

#[derive(Debug)]
/// Error returned by [`JoinHandle::join`] indicating the associated closure panicked
/// or threw an exception.
pub struct JoinError(JoinErrorType);

#[derive(Debug)]
enum JoinErrorType {
Panic,
Throw,
}

impl std::fmt::Display for JoinError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.0 {
JoinErrorType::Panic => f.write_str("Closure panicked before returning"),
JoinErrorType::Throw => f.write_str("Closure threw an exception"),
}
}
}

impl std::error::Error for JoinError {}

/// Error indicating that a closure was unable to be scheduled to execute on the event loop.
///
/// The most likely cause of a failure is that Node is shutting down. This may occur if the
Expand Down
22 changes: 22 additions & 0 deletions test/napi/lib/threads.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,26 @@ const assert = require('chai').assert;
// Asynchronously GC to give the task queue a chance to execute
setTimeout(() => global.gc(), 10);
});

it('should be able to join on the result of a channel', function (cb) {
// `msg` is closed over by multiple functions. A function that returns the
// current value is passed to the Neon function `addon.channel_join`. Additionally,
// the value is modified after `10ms` in a timeout.
let msg = "Uninitialized";

// The `addon.channel_join` function will wait 100ms before fetching the current
// value of `msg` using the first closure. The second closure is called
// after fetching and processing the message. We expect the message to already
// have been changed.
addon.channel_join(() => msg, (res) => {
assert.strictEqual(res, "Received: Hello, World!");
cb();
});

// Change the value of `msg` after 10ms. This should happen before `addon.channel_join`
// fetches it.
setTimeout(() => {
msg = "Hello, World!";
}, 10);
});
});
44 changes: 44 additions & 0 deletions test/napi/src/js/threads.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::cell::RefCell;
use std::sync::Arc;
use std::time::Duration;

use neon::prelude::*;

Expand Down Expand Up @@ -185,3 +186,46 @@ pub fn drop_global_queue(mut cx: FunctionContext) -> JsResult<JsUndefined> {

Ok(cx.undefined())
}

pub fn channel_join(mut cx: FunctionContext) -> JsResult<JsUndefined> {
// Function to fetch a message for processing
let get_message = cx.argument::<JsFunction>(0)?.root(&mut cx);
// Callback into JavaScript with completion
let callback = cx.argument::<JsFunction>(1)?.root(&mut cx);
let channel = cx.channel();

// Spawn a Rust thread to stop blocking the event loop
std::thread::spawn(move || {
// Give a chance for the data to change
std::thread::sleep(Duration::from_millis(100));

// Get the current message
let message = channel
.send(move |mut cx| {
let this = cx.undefined();

get_message
.into_inner(&mut cx)
.call::<_, _, JsValue, _>(&mut cx, this, [])?
.downcast_or_throw::<JsString, _>(&mut cx)
.map(|v| v.value(&mut cx))
})
.join()
.unwrap();

// Process the message
let response = format!("Received: {}", message);

// Call back to JavaScript with the response
channel.send(move |mut cx| {
let this = cx.undefined();
let args = [cx.string(response)];

callback.into_inner(&mut cx).call(&mut cx, this, args)?;

Ok(())
});
});

Ok(cx.undefined())
}
1 change: 1 addition & 0 deletions test/napi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ fn main(mut cx: ModuleContext) -> NeonResult<()> {
cx.export_function("greeter_greet", greeter_greet)?;
cx.export_function("leak_channel", leak_channel)?;
cx.export_function("drop_global_queue", drop_global_queue)?;
cx.export_function("channel_join", channel_join)?;

Ok(())
}