-
Notifications
You must be signed in to change notification settings - Fork 212
server: Extend methods with optional connection context #1295
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a21beb2
ae302d7
a6987c4
78a8098
85a1c8d
5914b8f
aa1b26e
8203e17
dbc94f6
828086e
a520dfb
415b02f
7b3d55b
24cd480
c217812
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,61 @@ | ||
| // Copyright 2019-2021 Parity Technologies (UK) Ltd. | ||
| // | ||
| // Permission is hereby granted, free of charge, to any | ||
| // person obtaining a copy of this software and associated | ||
| // documentation files (the "Software"), to deal in the | ||
| // Software without restriction, including without | ||
| // limitation the rights to use, copy, modify, merge, | ||
| // publish, distribute, sublicense, and/or sell copies of | ||
| // the Software, and to permit persons to whom the Software | ||
| // is furnished to do so, subject to the following | ||
| // conditions: | ||
| // | ||
| // The above copyright notice and this permission notice | ||
| // shall be included in all copies or substantial portions | ||
| // of the Software. | ||
| // | ||
| // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF | ||
| // ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED | ||
| // TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A | ||
| // PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT | ||
| // SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY | ||
| // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION | ||
| // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR | ||
| // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER | ||
| // DEALINGS IN THE SOFTWARE. | ||
|
|
||
| //! The context of a JSON-RPC server implementation. | ||
|
|
||
| /// The context of a JSON-RPC server that is passed to methods and subscriptions | ||
| /// that enabled the `with_context` attribute. | ||
| #[derive(Debug, Clone, Copy, PartialEq, Eq)] | ||
| pub struct ConnectionContext { | ||
| /// The ID of the connection. | ||
| connection_id: ConnectionId, | ||
| /// The maximum response size. | ||
| max_response_size: MaxResponseSize, | ||
| } | ||
|
|
||
| impl ConnectionContext { | ||
| /// Create a new context. | ||
| pub fn new(connection_id: ConnectionId, max_response_size: MaxResponseSize) -> Self { | ||
| Self { connection_id: connection_id, max_response_size } | ||
| } | ||
|
|
||
| /// Get the connection ID. | ||
| pub fn connection_id(&self) -> ConnectionId { | ||
| self.connection_id | ||
| } | ||
|
|
||
| /// Get the maximum response size. | ||
| pub fn max_response_size(&self) -> MaxResponseSize { | ||
| self.max_response_size | ||
| } | ||
| } | ||
|
|
||
| /// Connection ID, used for stateful protocol such as WebSockets. | ||
| /// For stateless protocols such as http it's unused, so feel free to set it some hardcoded value. | ||
| pub type ConnectionId = usize; | ||
|
|
||
| /// Max response size. | ||
| pub type MaxResponseSize = usize; | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -39,6 +39,7 @@ use crate::server::subscription::{ | |||||||||||||||||||
| SubNotifResultOrError, Subscribers, Subscription, SubscriptionCloseResponse, SubscriptionKey, SubscriptionPermit, | ||||||||||||||||||||
| SubscriptionState, | ||||||||||||||||||||
| }; | ||||||||||||||||||||
| use crate::server::ConnectionContext; | ||||||||||||||||||||
| use crate::server::{ResponsePayload, LOG_TARGET}; | ||||||||||||||||||||
| use crate::traits::ToRpcParams; | ||||||||||||||||||||
| use futures_util::{future::BoxFuture, FutureExt}; | ||||||||||||||||||||
|
|
@@ -56,22 +57,15 @@ use super::IntoResponse; | |||||||||||||||||||
| /// implemented as a function pointer to a `Fn` function taking four arguments: | ||||||||||||||||||||
| /// the `id`, `params`, a channel the function uses to communicate the result (or error) | ||||||||||||||||||||
| /// back to `jsonrpsee`, and the connection ID (useful for the websocket transport). | ||||||||||||||||||||
| pub type SyncMethod = Arc<dyn Send + Sync + Fn(Id, Params, MaxResponseSize) -> MethodResponse>; | ||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. another option is to add new RawMethod = Arc<dyn Send + Sync + Fn(Id, Params, ConnectionContext) -> MethodResponse>then let folks spawn futures and stuff them selves if they need and just provide a new attribute in the procs macros as you already did. then register_blocking, async, etc are not needed.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. then we don't have break the existing APIs
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That makes sense, I've implemented that in: https://github.com/paritytech/jsonrpsee/pull/1297/files#diff-dc533588250ea236f580d7fdaf87cdfce2ec80da031de9067b1088465a555998R64 One downside with doing a
This makes me wonder if I should also add the counter-part RawAsyncMethod and another At some point in the future , if we choose to implement |
||||||||||||||||||||
| pub type SyncMethod = Arc<dyn Send + Sync + Fn(Id, Params, ConnectionContext) -> MethodResponse>; | ||||||||||||||||||||
| /// Similar to [`SyncMethod`], but represents an asynchronous handler. | ||||||||||||||||||||
| pub type AsyncMethod<'a> = | ||||||||||||||||||||
| Arc<dyn Send + Sync + Fn(Id<'a>, Params<'a>, ConnectionId, MaxResponseSize) -> BoxFuture<'a, MethodResponse>>; | ||||||||||||||||||||
| Arc<dyn Send + Sync + Fn(Id<'a>, Params<'a>, ConnectionContext) -> BoxFuture<'a, MethodResponse>>; | ||||||||||||||||||||
| /// Method callback for subscriptions. | ||||||||||||||||||||
| pub type SubscriptionMethod<'a> = | ||||||||||||||||||||
| Arc<dyn Send + Sync + Fn(Id, Params, MethodSink, SubscriptionState) -> BoxFuture<'a, MethodResponse>>; | ||||||||||||||||||||
| // Method callback to unsubscribe. | ||||||||||||||||||||
| type UnsubscriptionMethod = Arc<dyn Send + Sync + Fn(Id, Params, ConnectionId, MaxResponseSize) -> MethodResponse>; | ||||||||||||||||||||
|
|
||||||||||||||||||||
| /// Connection ID, used for stateful protocol such as WebSockets. | ||||||||||||||||||||
| /// For stateless protocols such as http it's unused, so feel free to set it some hardcoded value. | ||||||||||||||||||||
| pub type ConnectionId = usize; | ||||||||||||||||||||
|
|
||||||||||||||||||||
| /// Max response size. | ||||||||||||||||||||
| pub type MaxResponseSize = usize; | ||||||||||||||||||||
| type UnsubscriptionMethod = Arc<dyn Send + Sync + Fn(Id, Params, ConnectionContext) -> MethodResponse>; | ||||||||||||||||||||
|
|
||||||||||||||||||||
| /// Raw response from an RPC | ||||||||||||||||||||
| /// A tuple containing: | ||||||||||||||||||||
|
|
@@ -351,10 +345,11 @@ impl Methods { | |||||||||||||||||||
| let id = req.id.clone(); | ||||||||||||||||||||
| let params = Params::new(req.params.as_ref().map(|params| params.as_ref().get())); | ||||||||||||||||||||
|
|
||||||||||||||||||||
| let connection_context = ConnectionContext::new(0, usize::MAX); | ||||||||||||||||||||
| let response = match self.method(&req.method) { | ||||||||||||||||||||
| None => MethodResponse::error(req.id, ErrorObject::from(ErrorCode::MethodNotFound)), | ||||||||||||||||||||
| Some(MethodCallback::Sync(cb)) => (cb)(id, params, usize::MAX), | ||||||||||||||||||||
| Some(MethodCallback::Async(cb)) => (cb)(id.into_owned(), params.into_owned(), 0, usize::MAX).await, | ||||||||||||||||||||
| Some(MethodCallback::Sync(cb)) => (cb)(id, params, connection_context), | ||||||||||||||||||||
| Some(MethodCallback::Async(cb)) => (cb)(id.into_owned(), params.into_owned(), connection_context).await, | ||||||||||||||||||||
| Some(MethodCallback::Subscription(cb)) => { | ||||||||||||||||||||
| let conn_state = | ||||||||||||||||||||
| SubscriptionState { conn_id: 0, id_provider: &RandomIntegerIdProvider, subscription_permit }; | ||||||||||||||||||||
|
|
@@ -368,7 +363,7 @@ impl Methods { | |||||||||||||||||||
|
|
||||||||||||||||||||
| res | ||||||||||||||||||||
| } | ||||||||||||||||||||
| Some(MethodCallback::Unsubscription(cb)) => (cb)(id, params, 0, usize::MAX), | ||||||||||||||||||||
| Some(MethodCallback::Unsubscription(cb)) => (cb)(id, params, connection_context), | ||||||||||||||||||||
| }; | ||||||||||||||||||||
|
|
||||||||||||||||||||
| let is_success = response.is_success(); | ||||||||||||||||||||
|
|
@@ -500,7 +495,7 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> { | |||||||||||||||||||
| /// use jsonrpsee_core::server::RpcModule; | ||||||||||||||||||||
| /// | ||||||||||||||||||||
| /// let mut module = RpcModule::new(()); | ||||||||||||||||||||
| /// module.register_method("say_hello", |_params, _ctx| "lo").unwrap(); | ||||||||||||||||||||
| /// module.register_method("say_hello", |_params, _connection_ctx, _ctx| "lo").unwrap(); | ||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. /cc @jsdw so basically this is the grumble we expose now two context's here one however, the
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah ok I see! My general grumble is that I think we should have some way to pass generic data from middleware or whatever into RPC calls. If we had that then we wouldn't need to expose eg the internal connection ID as a hardcoded thing, because the user could just create their own connection ID counter in middleware and pass that in. Methods might want access to a bunch of other stuff from middleware that we haven't yet exposed or forseen, and I wouldn't want us to always have to work around it by trying to provide those things in a hardcoded way; I'd rather that we foudn a way to solve the problem generically :) A couple of ideas for how:
But @niklasad1 you've mentioned that maybe this is possible via low level APIs so perhaps there's a way to achieve this already?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For the first idea, I've been exploring with a custom type as field to the I might be missing something here, but the trick part with this comes from the fact that the jsonrpsee/server/src/middleware/rpc/mod.rs Lines 42 to 50 in 736dd65
For this to be useful outside the method call; we'd probably need to extend the
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is tricky and I don't think it's a nice way to solve currently generic right now. so the flow is like this: let shared_state = Arc::new(Mutex::(HashMap::new::<ConnectionId, State>()));
let rpc_api = ChainHeadRpc::new(shared_state.clone());
// one could also write middleware but no way to share that middleware with the actual RPC implementation
// other then inject connection ID as params in the JSON-RPC call itself
// such as having an extra param: Option<ConnectionId>
start_rpc(rpc_api, shared_state);
#[rpc]
trait ChainHeadRpcApi {
#[method(name = "chainHead_unstable_call")]
fn f(&self, params: Vec<u8>) -> String;
}
impl ChainHeadRpcApi for ChainHeadRpc {
fn f(&self, params: Vec<u8>) -> String {
// no way to know which connection made the call
// but we can access the HashMap as shared state here
todo!();
}
}
fn start_rpc(rpc_api, shared_state) {
for conn in server.accept() {
// could also enable middleware but the state can't
// really be shared per connection in the rpc_api...
shared_state.lock.insert(conn.id, State);
tokio::spawn(upgrade_to_tower_service(conn));
}
}
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thus, it would be neat to have a have closure similar for the context/user defined data in the rpc API such that users can decide whether the state for rpc impl should be shared or per connection... |
||||||||||||||||||||
| /// ``` | ||||||||||||||||||||
| pub fn register_method<R, F>( | ||||||||||||||||||||
| &mut self, | ||||||||||||||||||||
|
|
@@ -510,13 +505,14 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> { | |||||||||||||||||||
| where | ||||||||||||||||||||
| Context: Send + Sync + 'static, | ||||||||||||||||||||
| R: IntoResponse + 'static, | ||||||||||||||||||||
| F: Fn(Params, &Context) -> R + Send + Sync + 'static, | ||||||||||||||||||||
| F: Fn(Params, ConnectionContext, &Context) -> R + Send + Sync + 'static, | ||||||||||||||||||||
| { | ||||||||||||||||||||
| let ctx = self.ctx.clone(); | ||||||||||||||||||||
| self.methods.verify_and_insert( | ||||||||||||||||||||
| method_name, | ||||||||||||||||||||
| MethodCallback::Sync(Arc::new(move |id, params, max_response_size| { | ||||||||||||||||||||
| let rp = callback(params, &*ctx).into_response(); | ||||||||||||||||||||
| MethodCallback::Sync(Arc::new(move |id, params, connection_context| { | ||||||||||||||||||||
| let max_response_size = connection_context.max_response_size(); | ||||||||||||||||||||
| let rp = callback(params, connection_context, &*ctx).into_response(); | ||||||||||||||||||||
| MethodResponse::response(id, rp, max_response_size) | ||||||||||||||||||||
| })), | ||||||||||||||||||||
| ) | ||||||||||||||||||||
|
|
@@ -542,17 +538,18 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> { | |||||||||||||||||||
| where | ||||||||||||||||||||
| R: IntoResponse + 'static, | ||||||||||||||||||||
| Fut: Future<Output = R> + Send, | ||||||||||||||||||||
| Fun: (Fn(Params<'static>, Arc<Context>) -> Fut) + Clone + Send + Sync + 'static, | ||||||||||||||||||||
| Fun: (Fn(Params<'static>, ConnectionContext, Arc<Context>) -> Fut) + Clone + Send + Sync + 'static, | ||||||||||||||||||||
| { | ||||||||||||||||||||
| let ctx = self.ctx.clone(); | ||||||||||||||||||||
| self.methods.verify_and_insert( | ||||||||||||||||||||
| method_name, | ||||||||||||||||||||
| MethodCallback::Async(Arc::new(move |id, params, _, max_response_size| { | ||||||||||||||||||||
| MethodCallback::Async(Arc::new(move |id, params, connection_context| { | ||||||||||||||||||||
| let ctx = ctx.clone(); | ||||||||||||||||||||
| let callback = callback.clone(); | ||||||||||||||||||||
| let max_response_size = connection_context.max_response_size(); | ||||||||||||||||||||
|
|
||||||||||||||||||||
| let future = async move { | ||||||||||||||||||||
| let rp = callback(params, ctx).await.into_response(); | ||||||||||||||||||||
| let rp = callback(params, connection_context, ctx).await.into_response(); | ||||||||||||||||||||
| MethodResponse::response(id, rp, max_response_size) | ||||||||||||||||||||
| }; | ||||||||||||||||||||
| future.boxed() | ||||||||||||||||||||
|
|
@@ -571,17 +568,18 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> { | |||||||||||||||||||
| where | ||||||||||||||||||||
| Context: Send + Sync + 'static, | ||||||||||||||||||||
| R: IntoResponse + 'static, | ||||||||||||||||||||
| F: Fn(Params, Arc<Context>) -> R + Clone + Send + Sync + 'static, | ||||||||||||||||||||
| F: Fn(Params, ConnectionContext, Arc<Context>) -> R + Clone + Send + Sync + 'static, | ||||||||||||||||||||
| { | ||||||||||||||||||||
| let ctx = self.ctx.clone(); | ||||||||||||||||||||
| let callback = self.methods.verify_and_insert( | ||||||||||||||||||||
| method_name, | ||||||||||||||||||||
| MethodCallback::Async(Arc::new(move |id, params, _, max_response_size| { | ||||||||||||||||||||
| MethodCallback::Async(Arc::new(move |id, params, connection_context| { | ||||||||||||||||||||
| let ctx = ctx.clone(); | ||||||||||||||||||||
| let callback = callback.clone(); | ||||||||||||||||||||
| let max_response_size = connection_context.max_response_size(); | ||||||||||||||||||||
|
|
||||||||||||||||||||
| tokio::task::spawn_blocking(move || { | ||||||||||||||||||||
| let rp = callback(params, ctx).into_response(); | ||||||||||||||||||||
| let rp = callback(params, connection_context, ctx).into_response(); | ||||||||||||||||||||
| MethodResponse::response(id, rp, max_response_size) | ||||||||||||||||||||
| }) | ||||||||||||||||||||
| .map(|result| match result { | ||||||||||||||||||||
|
|
@@ -899,7 +897,10 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> { | |||||||||||||||||||
| let subscribers = subscribers.clone(); | ||||||||||||||||||||
| self.methods.mut_callbacks().insert( | ||||||||||||||||||||
| unsubscribe_method_name, | ||||||||||||||||||||
| MethodCallback::Unsubscription(Arc::new(move |id, params, conn_id, max_response_size| { | ||||||||||||||||||||
| MethodCallback::Unsubscription(Arc::new(move |id, params, connection_context| { | ||||||||||||||||||||
| let conn_id = connection_context.connection_id(); | ||||||||||||||||||||
| let max_response_size = connection_context.max_response_size(); | ||||||||||||||||||||
|
|
||||||||||||||||||||
| let sub_id = match params.one::<RpcSubscriptionId>() { | ||||||||||||||||||||
| Ok(sub_id) => sub_id, | ||||||||||||||||||||
| Err(_) => { | ||||||||||||||||||||
|
|
||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would rather re-name this to
PerConnectionContextor something to easily distinguish from the other context