Skip to content

Commit c9667f0

Browse files
authored
Merge pull request #81 from helius-labs/raw-codec-grpc-defaults
Eliminate double-serialization and fix gRPC flow control in JS SDK
2 parents dd3e0c2 + a40feff commit c9667f0

File tree

4 files changed

+218
-106
lines changed

4 files changed

+218
-106
lines changed

javascript/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "laserstream-napi"
3-
version = "0.2.9"
3+
version = "0.3.1"
44
edition = "2021"
55

66
[lib]

javascript/napi-src/lib.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,16 @@ static STREAM_REGISTRY: LazyLock<Mutex<HashMap<String, Arc<stream::StreamInner>>
3636
static SIGNAL_HANDLERS_REGISTERED: AtomicBool = AtomicBool::new(false);
3737
static ACTIVE_STREAM_COUNT: std::sync::atomic::AtomicU32 = std::sync::atomic::AtomicU32::new(0);
3838

39-
// Simple wrapper that contains the protobuf bytes
40-
pub struct SubscribeUpdateBytes(pub Vec<u8>);
39+
// Simple wrapper that contains the protobuf bytes as ref-counted bytes::Bytes.
40+
// Using Bytes instead of Vec<u8> avoids an eager copy on the hot path — the .to_vec()
41+
// is deferred to ToNapiValue when the JS event loop actually picks up the callback.
42+
pub struct SubscribeUpdateBytes(pub bytes::Bytes);
4143

4244
impl ToNapiValue for SubscribeUpdateBytes {
4345
unsafe fn to_napi_value(env: napi::sys::napi_env, val: Self) -> napi::Result<napi::sys::napi_value> {
44-
// Create a Uint8Array from the protobuf bytes (zero-copy)
46+
// Create a Uint8Array from the protobuf bytes
4547
let env = unsafe { napi::Env::from_raw(env) };
46-
let buffer = env.create_buffer_with_data(val.0)?;
48+
let buffer = env.create_buffer_with_data(val.0.to_vec())?;
4749
unsafe { Ok(buffer.into_unknown().raw()) }
4850
}
4951
}
@@ -182,7 +184,7 @@ impl LaserstreamClient {
182184
// Threadsafe function that forwards protobuf bytes to JS
183185
// Use bounded queue to prevent unbounded memory growth when callbacks are slow
184186
let ts_callback: ThreadsafeFunction<SubscribeUpdateBytes, ErrorStrategy::CalleeHandled> =
185-
callback.create_threadsafe_function(100000, |ctx| {
187+
callback.create_threadsafe_function(1000, |ctx| {
186188
let bytes_wrapper: SubscribeUpdateBytes = ctx.value;
187189
let js_uint8array = unsafe { SubscribeUpdateBytes::to_napi_value(ctx.env.raw(), bytes_wrapper)? };
188190
Ok(vec![unsafe { napi::JsUnknown::from_raw(ctx.env.raw(), js_uint8array)? }])
@@ -209,7 +211,7 @@ impl LaserstreamClient {
209211

210212
// Threadsafe function that forwards protobuf bytes to JS
211213
let ts_callback: ThreadsafeFunction<SubscribePreprocessedUpdateBytes, ErrorStrategy::CalleeHandled> =
212-
callback.create_threadsafe_function(100000, |ctx| {
214+
callback.create_threadsafe_function(1000, |ctx| {
213215
let bytes_wrapper: SubscribePreprocessedUpdateBytes = ctx.value;
214216
let js_uint8array = unsafe { SubscribePreprocessedUpdateBytes::to_napi_value(ctx.env.raw(), bytes_wrapper)? };
215217
Ok(vec![unsafe { napi::JsUnknown::from_raw(ctx.env.raw(), js_uint8array)? }])

0 commit comments

Comments
 (0)