Skip to content

Commit d3b1b3a

Browse files
lexnvniklasad1
andauthored
server: Register raw method with connection ID (#1297)
* server: Add a raw method Signed-off-by: Alexandru Vasile <[email protected]> * server: Register raw methods, blocking or unblocking Signed-off-by: Alexandru Vasile <[email protected]> * proc-macros: Add with-context attribute Signed-off-by: Alexandru Vasile <[email protected]> * server: Register sync and nonblocking methods for raw API Signed-off-by: Alexandru Vasile <[email protected]> * examples: Add with context example Signed-off-by: Alexandru Vasile <[email protected]> * core: Adjust docs for the raw method registering Signed-off-by: Alexandru Vasile <[email protected]> * proc-macros: Cargo fmt Signed-off-by: Alexandru Vasile <[email protected]> * server: Request Arc<Context> for the raw method callback Signed-off-by: Alexandru Vasile <[email protected]> * proc-macros: Per method raw-method attribute Signed-off-by: Alexandru Vasile <[email protected]> * examples: Add server raw method Signed-off-by: Alexandru Vasile <[email protected]> * tests/ui: Check correct proc-macro behavior Signed-off-by: Alexandru Vasile <[email protected]> * tests/ui: Negative test for async with raw methods Signed-off-by: Alexandru Vasile <[email protected]> * tests/ui: Negative test for blocking with raw methods Signed-off-by: Alexandru Vasile <[email protected]> * tests/proc-macros: Ensure unique connection IDs from different clients Signed-off-by: Alexandru Vasile <[email protected]> * tests/integration: Ensure unique connection IDs from different clients Signed-off-by: Alexandru Vasile <[email protected]> * proc-macros: Apply cargo fmt Signed-off-by: Alexandru Vasile <[email protected]> * Register raw method as async method Signed-off-by: Alexandru Vasile <[email protected]> * Fix testing Signed-off-by: Alexandru Vasile <[email protected]> * core: Fix documentation Signed-off-by: Alexandru Vasile <[email protected]> * server: Rename raw method to `module.register_async_with_details` Signed-off-by: Alexandru Vasile <[email protected]> * server: Add connection details wrapper Signed-off-by: Alexandru Vasile <[email protected]> * server: Add asyncWithDetails and connection details Signed-off-by: Alexandru Vasile <[email protected]> * proc-macros: Provide connection details to methods Signed-off-by: Alexandru Vasile <[email protected]> * Update core/src/server/rpc_module.rs Co-authored-by: Niklas Adolfsson <[email protected]> * server: Remove connection details builder Signed-off-by: Alexandru Vasile <[email protected]> * server: Refactor `.register_async_with_details` to `.register_async_method_with_details` Signed-off-by: Alexandru Vasile <[email protected]> * proc-macro: Clarify comment Signed-off-by: Alexandru Vasile <[email protected]> * core: Doc hidden for async with details Signed-off-by: Alexandru Vasile <[email protected]> * Rename example Signed-off-by: Alexandru Vasile <[email protected]> * Update core/src/server/rpc_module.rs Co-authored-by: Niklas Adolfsson <[email protected]> * core: Remove doc(hidden) from ConnectionDetails::id Signed-off-by: Alexandru Vasile <[email protected]> * Update core/src/server/rpc_module.rs --------- Signed-off-by: Alexandru Vasile <[email protected]> Co-authored-by: Niklas Adolfsson <[email protected]>
1 parent 3f16132 commit d3b1b3a

14 files changed

Lines changed: 369 additions & 18 deletions

core/src/server/rpc_module.rs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,11 @@ pub type SyncMethod = Arc<dyn Send + Sync + Fn(Id, Params, MaxResponseSize) -> M
6060
/// Similar to [`SyncMethod`], but represents an asynchronous handler.
6161
pub type AsyncMethod<'a> =
6262
Arc<dyn Send + Sync + Fn(Id<'a>, Params<'a>, ConnectionId, MaxResponseSize) -> BoxFuture<'a, MethodResponse>>;
63+
64+
/// Similar to [`AsyncMethod`], but represents an asynchronous handler with connection details.
65+
#[doc(hidden)]
66+
pub type AsyncMethodWithDetails<'a> =
67+
Arc<dyn Send + Sync + Fn(Id<'a>, Params<'a>, ConnectionDetails, MaxResponseSize) -> BoxFuture<'a, MethodResponse>>;
6368
/// Method callback for subscriptions.
6469
pub type SubscriptionMethod<'a> =
6570
Arc<dyn Send + Sync + Fn(Id, Params, MethodSink, SubscriptionState) -> BoxFuture<'a, MethodResponse>>;
@@ -79,6 +84,27 @@ pub type MaxResponseSize = usize;
7984
/// - a [`mpsc::UnboundedReceiver<String>`] to receive future subscription results
8085
pub type RawRpcResponse = (String, mpsc::Receiver<String>);
8186

87+
/// The connection details exposed to the server methods.
88+
#[derive(Debug, Clone)]
89+
#[allow(missing_copy_implementations)]
90+
#[doc(hidden)]
91+
pub struct ConnectionDetails {
92+
id: ConnectionId,
93+
}
94+
95+
impl ConnectionDetails {
96+
/// Construct a new [`ConnectionDetails`].
97+
#[doc(hidden)]
98+
pub fn _new(id: ConnectionId) -> ConnectionDetails {
99+
Self { id }
100+
}
101+
102+
/// Get the connection ID.
103+
pub fn id(&self) -> ConnectionId {
104+
self.id
105+
}
106+
}
107+
82108
/// The error that can occur when [`Methods::call`] or [`Methods::subscribe`] is invoked.
83109
#[derive(thiserror::Error, Debug)]
84110
pub enum MethodsError {
@@ -131,6 +157,9 @@ pub enum MethodCallback {
131157
Sync(SyncMethod),
132158
/// Asynchronous method handler.
133159
Async(AsyncMethod<'static>),
160+
/// Asynchronous method handler with details.
161+
#[doc(hidden)]
162+
AsyncWithDetails(AsyncMethodWithDetails<'static>),
134163
/// Subscription method handler.
135164
Subscription(SubscriptionMethod<'static>),
136165
/// Unsubscription method handler.
@@ -184,6 +213,7 @@ impl Debug for MethodCallback {
184213
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
185214
match self {
186215
Self::Async(_) => write!(f, "Async"),
216+
Self::AsyncWithDetails(_) => write!(f, "AsyncWithDetails"),
187217
Self::Sync(_) => write!(f, "Sync"),
188218
Self::Subscription(_) => write!(f, "Subscription"),
189219
Self::Unsubscription(_) => write!(f, "Unsubscription"),
@@ -355,6 +385,9 @@ impl Methods {
355385
None => MethodResponse::error(req.id, ErrorObject::from(ErrorCode::MethodNotFound)),
356386
Some(MethodCallback::Sync(cb)) => (cb)(id, params, usize::MAX),
357387
Some(MethodCallback::Async(cb)) => (cb)(id.into_owned(), params.into_owned(), 0, usize::MAX).await,
388+
Some(MethodCallback::AsyncWithDetails(cb)) => {
389+
(cb)(id.into_owned(), params.into_owned(), ConnectionDetails::_new(0), usize::MAX).await
390+
}
358391
Some(MethodCallback::Subscription(cb)) => {
359392
let conn_state =
360393
SubscriptionState { conn_id: 0, id_provider: &RandomIntegerIdProvider, subscription_permit };
@@ -598,6 +631,43 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
598631
Ok(callback)
599632
}
600633

634+
/// Register a new raw RPC method, which computes the response with the given callback.
635+
///
636+
/// ## Examples
637+
///
638+
/// ```
639+
/// use jsonrpsee_core::server::RpcModule;
640+
///
641+
/// let mut module = RpcModule::new(());
642+
/// module.register_async_method_with_details("say_hello", |_params, _connection_details, _ctx| async { "lo" }).unwrap();
643+
/// ```
644+
#[doc(hidden)]
645+
pub fn register_async_method_with_details<R, Fun, Fut>(
646+
&mut self,
647+
method_name: &'static str,
648+
callback: Fun,
649+
) -> Result<&mut MethodCallback, RegisterMethodError>
650+
where
651+
R: IntoResponse + 'static,
652+
Fut: Future<Output = R> + Send,
653+
Fun: (Fn(Params<'static>, ConnectionDetails, Arc<Context>) -> Fut) + Clone + Send + Sync + 'static,
654+
{
655+
let ctx = self.ctx.clone();
656+
self.methods.verify_and_insert(
657+
method_name,
658+
MethodCallback::AsyncWithDetails(Arc::new(move |id, params, connection_details, max_response_size| {
659+
let ctx = ctx.clone();
660+
let callback = callback.clone();
661+
662+
let future = async move {
663+
let rp = callback(params, connection_details, ctx).await.into_response();
664+
MethodResponse::response(id, rp, max_response_size)
665+
};
666+
future.boxed()
667+
})),
668+
)
669+
}
670+
601671
/// Register a new publish/subscribe interface using JSON-RPC notifications.
602672
///
603673
/// It implements the [ethereum pubsub specification](https://geth.ethereum.org/docs/rpc/pubsub)
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2+
//
3+
// Permission is hereby granted, free of charge, to any
4+
// person obtaining a copy of this software and associated
5+
// documentation files (the "Software"), to deal in the
6+
// Software without restriction, including without
7+
// limitation the rights to use, copy, modify, merge,
8+
// publish, distribute, sublicense, and/or sell copies of
9+
// the Software, and to permit persons to whom the Software
10+
// is furnished to do so, subject to the following
11+
// conditions:
12+
//
13+
// The above copyright notice and this permission notice
14+
// shall be included in all copies or substantial portions
15+
// of the Software.
16+
//
17+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
18+
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
19+
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
20+
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
21+
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
22+
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
23+
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
24+
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
25+
// DEALINGS IN THE SOFTWARE.
26+
27+
use std::net::SocketAddr;
28+
29+
use jsonrpsee::core::{async_trait, client::Subscription};
30+
use jsonrpsee::proc_macros::rpc;
31+
use jsonrpsee::server::{PendingSubscriptionSink, Server, SubscriptionMessage};
32+
use jsonrpsee::types::ErrorObjectOwned;
33+
use jsonrpsee::ws_client::WsClientBuilder;
34+
use jsonrpsee::ConnectionDetails;
35+
36+
#[rpc(server, client)]
37+
pub trait Rpc {
38+
/// Raw method with connection ID.
39+
#[method(name = "connectionIdMethod", raw_method)]
40+
async fn raw_method(&self, first_param: usize, second_param: u16) -> Result<usize, ErrorObjectOwned>;
41+
42+
/// Normal method call example.
43+
#[method(name = "normalMethod")]
44+
fn normal_method(&self, first_param: usize, second_param: u16) -> Result<usize, ErrorObjectOwned>;
45+
46+
/// Subscriptions expose the connection ID on the subscription sink.
47+
#[subscription(name = "subscribeSync" => "sync", item = usize)]
48+
fn sub(&self, first_param: usize);
49+
}
50+
51+
pub struct RpcServerImpl;
52+
53+
#[async_trait]
54+
impl RpcServer for RpcServerImpl {
55+
async fn raw_method(
56+
&self,
57+
connection_details: ConnectionDetails,
58+
_first_param: usize,
59+
_second_param: u16,
60+
) -> Result<usize, ErrorObjectOwned> {
61+
// Return the connection ID from which this method was called.
62+
Ok(connection_details.id())
63+
}
64+
65+
fn normal_method(&self, _first_param: usize, _second_param: u16) -> Result<usize, ErrorObjectOwned> {
66+
// The normal method does not have access to the connection ID.
67+
Ok(usize::MAX)
68+
}
69+
70+
fn sub(&self, pending: PendingSubscriptionSink, _first_param: usize) {
71+
tokio::spawn(async move {
72+
// The connection ID can be obtained before or after accepting the subscription
73+
let pending_connection_id = pending.connection_id();
74+
let sink = pending.accept().await.unwrap();
75+
let sink_connection_id = sink.connection_id();
76+
77+
assert_eq!(pending_connection_id, sink_connection_id);
78+
79+
let msg = SubscriptionMessage::from_json(&sink_connection_id).unwrap();
80+
sink.send(msg).await.unwrap();
81+
});
82+
}
83+
}
84+
85+
#[tokio::main]
86+
async fn main() -> anyhow::Result<()> {
87+
tracing_subscriber::FmtSubscriber::builder()
88+
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
89+
.try_init()
90+
.expect("setting default subscriber failed");
91+
92+
let server_addr = run_server().await?;
93+
let url = format!("ws://{}", server_addr);
94+
95+
let client = WsClientBuilder::default().build(&url).await?;
96+
let connection_id_first = client.raw_method(1, 2).await.unwrap();
97+
98+
// Second call from the same connection ID.
99+
assert_eq!(client.raw_method(1, 2).await.unwrap(), connection_id_first);
100+
101+
// Second client will increment the connection ID.
102+
let client_second = WsClientBuilder::default().build(&url).await?;
103+
let connection_id_second = client_second.raw_method(1, 2).await.unwrap();
104+
assert_ne!(connection_id_first, connection_id_second);
105+
106+
let mut sub: Subscription<usize> = RpcClient::sub(&client, 0).await.unwrap();
107+
assert_eq!(connection_id_first, sub.next().await.transpose().unwrap().unwrap());
108+
109+
let mut sub: Subscription<usize> = RpcClient::sub(&client_second, 0).await.unwrap();
110+
assert_eq!(connection_id_second, sub.next().await.transpose().unwrap().unwrap());
111+
112+
Ok(())
113+
}
114+
115+
async fn run_server() -> anyhow::Result<SocketAddr> {
116+
let server = Server::builder().build("127.0.0.1:0").await?;
117+
118+
let addr = server.local_addr()?;
119+
let handle = server.start(RpcServerImpl.into_rpc());
120+
121+
// In this example we don't care about doing shutdown so let's it run forever.
122+
// You may use the `ServerHandle` to shut it down or manage it yourself.
123+
tokio::spawn(handle.stopped());
124+
125+
Ok(addr)
126+
}

proc-macros/src/render_server.rs

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,15 @@ impl RpcDescription {
6161
fn render_methods(&self) -> Result<TokenStream2, syn::Error> {
6262
let methods = self.methods.iter().map(|method| {
6363
let docs = &method.docs;
64-
let method_sig = &method.signature;
64+
let mut method_sig = method.signature.clone();
65+
66+
if method.raw_method {
67+
let context_ty = self.jrps_server_item(quote! { ConnectionDetails });
68+
// Add `ConnectionDetails` as the second parameter to the signature.
69+
let context: syn::FnArg = syn::parse_quote!(connection_details: #context_ty);
70+
method_sig.sig.inputs.insert(1, context);
71+
}
72+
6573
quote! {
6674
#docs
6775
#method_sig
@@ -132,23 +140,32 @@ impl RpcDescription {
132140

133141
check_name(&rpc_method_name, rust_method_name.span());
134142

135-
if method.signature.sig.asyncness.is_some() {
143+
if method.raw_method {
136144
handle_register_result(quote! {
137-
rpc.register_async_method(#rpc_method_name, |params, context| async move {
145+
rpc.register_async_method_with_details(#rpc_method_name, |params, connection_details, context| async move {
138146
#parsing
139-
#into_response::into_response(context.as_ref().#rust_method_name(#params_seq).await)
147+
#into_response::into_response(context.as_ref().#rust_method_name(connection_details, #params_seq).await)
140148
})
141149
})
142150
} else {
143-
let register_kind =
144-
if method.blocking { quote!(register_blocking_method) } else { quote!(register_method) };
151+
if method.signature.sig.asyncness.is_some() {
152+
handle_register_result(quote! {
153+
rpc.register_async_method(#rpc_method_name, |params, context| async move {
154+
#parsing
155+
#into_response::into_response(context.as_ref().#rust_method_name(#params_seq).await)
156+
})
157+
})
158+
} else {
159+
let register_kind =
160+
if method.blocking { quote!(register_blocking_method) } else { quote!(register_method) };
145161

146-
handle_register_result(quote! {
147-
rpc.#register_kind(#rpc_method_name, |params, context| {
148-
#parsing
149-
#into_response::into_response(context.#rust_method_name(#params_seq))
162+
handle_register_result(quote! {
163+
rpc.#register_kind(#rpc_method_name, |params, context| {
164+
#parsing
165+
#into_response::into_response(context.#rust_method_name(#params_seq))
166+
})
150167
})
151-
})
168+
}
152169
}
153170
})
154171
.collect::<Vec<_>>();

proc-macros/src/rpc_macro.rs

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,17 +48,19 @@ pub struct RpcMethod {
4848
pub returns: Option<syn::Type>,
4949
pub signature: syn::TraitItemFn,
5050
pub aliases: Vec<String>,
51+
pub raw_method: bool,
5152
}
5253

5354
impl RpcMethod {
5455
pub fn from_item(attr: Attribute, mut method: syn::TraitItemFn) -> syn::Result<Self> {
55-
let [aliases, blocking, name, param_kind] =
56-
AttributeMeta::parse(attr)?.retain(["aliases", "blocking", "name", "param_kind"])?;
56+
let [aliases, blocking, name, param_kind, raw_method] =
57+
AttributeMeta::parse(attr)?.retain(["aliases", "blocking", "name", "param_kind", "raw_method"])?;
5758

5859
let aliases = parse_aliases(aliases)?;
5960
let blocking = optional(blocking, Argument::flag)?.is_some();
6061
let name = name?.string()?;
6162
let param_kind = parse_param_kind(param_kind)?;
63+
let raw_method = optional(raw_method, Argument::flag)?.is_some();
6264

6365
let sig = method.sig.clone();
6466
let docs = extract_doc_comments(&method.attrs);
@@ -98,7 +100,18 @@ impl RpcMethod {
98100
// We've analyzed attributes and don't need them anymore.
99101
method.attrs.clear();
100102

101-
Ok(Self { aliases, blocking, name, params, param_kind, returns, signature: method, docs, deprecated })
103+
Ok(Self {
104+
aliases,
105+
blocking,
106+
name,
107+
params,
108+
param_kind,
109+
returns,
110+
signature: method,
111+
docs,
112+
deprecated,
113+
raw_method,
114+
})
102115
}
103116
}
104117

@@ -212,7 +225,6 @@ impl RpcDescription {
212225
let namespace = optional(namespace, Argument::string)?;
213226
let client_bounds = optional(client_bounds, Argument::group)?;
214227
let server_bounds = optional(server_bounds, Argument::group)?;
215-
216228
if !needs_server && !needs_client {
217229
return Err(syn::Error::new_spanned(&item.ident, "Either 'server' or 'client' attribute must be applied"));
218230
}
@@ -260,6 +272,28 @@ impl RpcDescription {
260272
is_method = true;
261273

262274
let method_data = RpcMethod::from_item(attr.clone(), method.clone())?;
275+
276+
if method_data.blocking && method_data.raw_method {
277+
return Err(syn::Error::new_spanned(
278+
method,
279+
"Methods cannot be blocking when used with `raw_method`; remove `blocking` attribute or `raw_method` attribute",
280+
));
281+
}
282+
283+
if !needs_server && method_data.raw_method {
284+
return Err(syn::Error::new_spanned(
285+
&item.ident,
286+
"Attribute 'raw_method' must be specified with 'server'",
287+
));
288+
}
289+
290+
if method.sig.asyncness.is_none() && method_data.raw_method {
291+
return Err(syn::Error::new_spanned(
292+
method,
293+
"Methods must be asynchronous when used with `raw_method`; use `async fn` instead of `fn`",
294+
));
295+
}
296+
263297
methods.push(method_data);
264298
}
265299
if let Some(attr) = find_attr(&method.attrs, "subscription") {
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
//! Example of using proc macro to generate working client.
2+
3+
use jsonrpsee::{core::RpcResult, proc_macros::rpc, types::ErrorObjectOwned};
4+
5+
#[rpc(server)]
6+
pub trait Rpc {
7+
#[method(name = "foo", raw_method)]
8+
async fn async_method(&self, param_a: u8, param_b: String) -> RpcResult<u16>;
9+
10+
#[method(name = "bar")]
11+
fn sync_method(&self) -> Result<u16, ErrorObjectOwned>;
12+
}
13+
14+
fn main() {}

0 commit comments

Comments
 (0)