This repository was archived by the owner on Nov 15, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2.6k
BEEFY subscription fires multiple times for the same commitment #10684
Closed
Wizdave97
wants to merge
26
commits into
paritytech:master
from
Wizdave97:david/beefy-subscribe-justification-rpc
+86
−22
Closed
Changes from 8 commits
Commits
Show all changes
26 commits
Select commit
Hold shift + click to select a range
04e9166
prevent duplicate justification notifications
Wizdave97 acbc825
Merge branch 'master' of https://github.com/paritytech/substrate into…
Wizdave97 8cc6ac7
minor fix
Wizdave97 e98492b
clean up
Wizdave97 2ca50e3
fix failing tests
Wizdave97 1029b26
minor fix
Wizdave97 0d40e9a
add comments to test
Wizdave97 5cbf9d2
fix typo
Wizdave97 b20926c
minor fix
Wizdave97 307a147
Merge branch 'master' of https://github.com/paritytech/substrate into…
Wizdave97 9147724
Merge branch 'master' of https://github.com/paritytech/substrate into…
Wizdave97 5253c95
Merge branch 'master' of https://github.com/paritytech/substrate into…
Wizdave97 c6a8a2f
Merge branch 'paritytech:master' into david/beefy-subscribe-justifica…
Wizdave97 fe0f7ec
updated test case
Wizdave97 2be30bb
remove unreliable tests
Wizdave97 61088bd
Merge branch 'paritytech:master' into david/beefy-subscribe-justifica…
Wizdave97 c302c39
minor fix
Wizdave97 b213cce
Merge branch 'david/beefy-subscribe-justification-rpc' of github.com:…
Wizdave97 d0846ed
fix
Wizdave97 925ce6b
Merge branch 'paritytech:master' into david/beefy-subscribe-justifica…
Wizdave97 74e2b6c
Merge branch 'master' of https://github.com/paritytech/substrate into…
Wizdave97 99adcdf
add justification dedup test case
Wizdave97 11aba7c
Merge branch 'david/beefy-subscribe-justification-rpc' of github.com:…
Wizdave97 c3b7f22
fix tests
Wizdave97 d02bb2f
Merge branch 'master' of https://github.com/paritytech/substrate into…
Wizdave97 854cd0a
fix test assertion string
Wizdave97 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,9 +23,9 @@ | |
| use parking_lot::RwLock; | ||
| use std::sync::Arc; | ||
|
|
||
| use sp_runtime::traits::Block as BlockT; | ||
| use sp_runtime::traits::{Block as BlockT, NumberFor}; | ||
|
|
||
| use futures::{task::SpawnError, FutureExt, SinkExt, StreamExt, TryFutureExt}; | ||
| use futures::{future, task::SpawnError, FutureExt, SinkExt, StreamExt, TryFutureExt}; | ||
| use jsonrpc_derive::rpc; | ||
| use jsonrpc_pubsub::{manager::SubscriptionManager, typed::Subscriber, SubscriptionId}; | ||
| use log::warn; | ||
|
|
@@ -120,6 +120,7 @@ pub struct BeefyRpcHandler<Block: BlockT> { | |
| signed_commitment_stream: BeefySignedCommitmentStream<Block>, | ||
| beefy_best_block: Arc<RwLock<Option<Block::Hash>>>, | ||
| manager: SubscriptionManager, | ||
| beefy_best_block_num: Arc<RwLock<Option<NumberFor<Block>>>>, | ||
| } | ||
|
|
||
| impl<Block: BlockT> BeefyRpcHandler<Block> { | ||
|
|
@@ -133,13 +134,17 @@ impl<Block: BlockT> BeefyRpcHandler<Block> { | |
| E: futures::task::Spawn + Send + Sync + 'static, | ||
| { | ||
| let beefy_best_block = Arc::new(RwLock::new(None)); | ||
| let beefy_best_block_num = Arc::new(RwLock::new(None)); | ||
|
|
||
| let stream = best_block_stream.subscribe(); | ||
| let closure_clone = beefy_best_block.clone(); | ||
| let beefy_block_num_clone = beefy_best_block_num.clone(); | ||
| let future = stream.for_each(move |best_beefy| { | ||
| let async_clone = closure_clone.clone(); | ||
| let async_block_num_clone = beefy_block_num_clone.clone(); | ||
| async move { | ||
| *async_clone.write() = Some(best_beefy); | ||
| *async_clone.write() = Some(best_beefy.0); | ||
| *async_block_num_clone.write() = Some(best_beefy.1) | ||
| } | ||
| }); | ||
|
|
||
|
|
@@ -151,7 +156,7 @@ impl<Block: BlockT> BeefyRpcHandler<Block> { | |
| })?; | ||
|
|
||
| let manager = SubscriptionManager::new(Arc::new(executor)); | ||
| Ok(Self { signed_commitment_stream, beefy_best_block, manager }) | ||
| Ok(Self { signed_commitment_stream, beefy_best_block, manager, beefy_best_block_num }) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -166,9 +171,19 @@ where | |
| _metadata: Self::Metadata, | ||
| subscriber: Subscriber<notification::EncodedSignedCommitment>, | ||
| ) { | ||
| let beefy_block_num = self.beefy_best_block_num.clone(); | ||
| let stream = self | ||
| .signed_commitment_stream | ||
| .subscribe() | ||
| .filter(move |x| { | ||
| let best_block_clone = beefy_block_num.clone(); | ||
| let best_block = best_block_clone.read(); | ||
| if let Some(best_block) = *best_block { | ||
| future::ready(x.commitment.block_number > best_block) | ||
| } else { | ||
| future::ready(true) | ||
| } | ||
| }) | ||
| .map(|x| Ok::<_, ()>(Ok(notification::EncodedSignedCommitment::new::<Block>(x)))); | ||
|
|
||
| self.manager.add(subscriber, |sink| { | ||
|
|
@@ -257,7 +272,7 @@ mod tests { | |
| let (io, _) = setup_io_handler_with_best_block_stream(stream); | ||
|
|
||
| let hash = BlakeTwo256::hash(b"42"); | ||
| let r: Result<(), ()> = sender.notify(|| Ok(hash)); | ||
| let r: Result<(), ()> = sender.notify(|| Ok((hash, 0))); | ||
| r.unwrap(); | ||
|
|
||
| // Verify RPC `beefy_getFinalizedHead` returns expected hash. | ||
|
|
@@ -319,7 +334,10 @@ mod tests { | |
| // Unsubscribe again and fail | ||
| assert_eq!( | ||
| io.handle_request_sync(&unsub_req, meta), | ||
| Some(r#"{"jsonrpc":"2.0","error":{"code":-32602,"message":"Invalid subscription id."},"id":1}"#.into()), | ||
| Some( | ||
| r#"{"jsonrpc":"2.0","error":{"code":-32602,"message":"Invalid subscription id."},"id":1}"# | ||
| .into() | ||
| ), | ||
| ); | ||
| } | ||
|
|
||
|
|
@@ -341,18 +359,17 @@ mod tests { | |
| r#"{"jsonrpc":"2.0","method":"beefy_unsubscribeJustifications","params":["FOO"],"id":1}"#, | ||
| meta.clone() | ||
| ), | ||
| Some(r#"{"jsonrpc":"2.0","error":{"code":-32602,"message":"Invalid subscription id."},"id":1}"#.into()) | ||
| Some( | ||
| r#"{"jsonrpc":"2.0","error":{"code":-32602,"message":"Invalid subscription id."},"id":1}"# | ||
| .into() | ||
| ) | ||
| ); | ||
| } | ||
|
|
||
| fn create_commitment() -> BeefySignedCommitment<Block> { | ||
| fn create_commitment(block_number: u64) -> BeefySignedCommitment<Block> { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. block numbers are actually u32
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
| let payload = Payload::new(known_payload_ids::MMR_ROOT_ID, "Hello World!".encode()); | ||
| BeefySignedCommitment::<Block> { | ||
| commitment: beefy_primitives::Commitment { | ||
| payload, | ||
| block_number: 5, | ||
| validator_set_id: 0, | ||
| }, | ||
| commitment: beefy_primitives::Commitment { payload, block_number, validator_set_id: 0 }, | ||
| signatures: vec![], | ||
| } | ||
| } | ||
|
|
@@ -371,7 +388,7 @@ mod tests { | |
| let sub_id: String = serde_json::from_value(resp["result"].take()).unwrap(); | ||
|
|
||
| // Notify with commitment | ||
| let commitment = create_commitment(); | ||
| let commitment = create_commitment(5); | ||
| let r: Result<(), ()> = commitment_sender.notify(|| Ok(commitment.clone())); | ||
| r.unwrap(); | ||
|
|
||
|
|
@@ -393,4 +410,74 @@ mod tests { | |
| assert_eq!(recv_sub_id, sub_id); | ||
| assert_eq!(recv_commitment, commitment); | ||
| } | ||
|
|
||
| #[test] | ||
| fn should_not_send_signed_commitment_multiple_times() { | ||
| let (sender, stream) = BeefyBestBlockStream::<Block>::channel(); | ||
| let (io, commitment_sender) = setup_io_handler_with_best_block_stream(stream); | ||
| let (meta, receiver) = setup_session(); | ||
|
|
||
| // Subscribe | ||
| let sub_request = | ||
| r#"{"jsonrpc":"2.0","method":"beefy_subscribeJustifications","params":[],"id":1}"#; | ||
| let resp = io.handle_request_sync(sub_request, meta.clone()); | ||
| let resp: Output = serde_json::from_str(&resp.unwrap()).unwrap(); | ||
|
|
||
| let sub_id = match resp { | ||
| Output::Success(success) => success.result, | ||
| _ => panic!(), | ||
| }; | ||
|
|
||
| // Notify with commitment | ||
| let commitment = create_commitment(5); | ||
| let r: Result<(), ()> = commitment_sender.notify(|| Ok(commitment.clone())); | ||
| r.unwrap(); | ||
| let hash = BlakeTwo256::hash(b"43"); | ||
| let r: Result<(), ()> = sender.notify(|| Ok((hash, 5))); | ||
| r.unwrap(); | ||
|
|
||
| // This commitment should be filtered out | ||
| let r: Result<(), ()> = commitment_sender.notify(|| Ok(commitment.clone())); | ||
| r.unwrap(); | ||
|
|
||
| let commitment_1 = create_commitment(6); | ||
|
|
||
| let r: Result<(), ()> = commitment_sender.notify(|| Ok(commitment_1.clone())); | ||
| r.unwrap(); | ||
|
|
||
| // Inspect what we received | ||
| // We should have received only two commitments | ||
| let recvs = futures::executor::block_on(receiver.take(2).collect::<Vec<_>>()); | ||
| let recv: Notification = serde_json::from_str(&recvs[0]).unwrap(); | ||
| let mut json_map = match recv.params { | ||
| Params::Map(json_map) => json_map, | ||
| _ => panic!(), | ||
| }; | ||
|
acatangiu marked this conversation as resolved.
Outdated
|
||
|
|
||
| let recv_sub_id: String = serde_json::from_value(json_map["subscription"].take()).unwrap(); | ||
| let recv_commitment: sp_core::Bytes = | ||
| serde_json::from_value(json_map["result"].take()).unwrap(); | ||
| let recv_commitment: BeefySignedCommitment<Block> = | ||
| Decode::decode(&mut &recv_commitment[..]).unwrap(); | ||
|
|
||
| assert_eq!(recv.method, "beefy_justifications"); | ||
| assert_eq!(recv_sub_id, sub_id); | ||
| assert_eq!(recv_commitment, commitment); | ||
|
|
||
| let recv: Notification = serde_json::from_str(&recvs[1]).unwrap(); | ||
| let mut json_map = match recv.params { | ||
| Params::Map(json_map) => json_map, | ||
| _ => panic!(), | ||
| }; | ||
|
|
||
| let recv_sub_id: String = serde_json::from_value(json_map["subscription"].take()).unwrap(); | ||
| let recv_commitment: sp_core::Bytes = | ||
| serde_json::from_value(json_map["result"].take()).unwrap(); | ||
| let recv_commitment: BeefySignedCommitment<Block> = | ||
| Decode::decode(&mut &recv_commitment[..]).unwrap(); | ||
|
|
||
| assert_eq!(recv.method, "beefy_justifications"); | ||
| assert_eq!(recv_sub_id, sub_id); | ||
| assert_eq!(recv_commitment, commitment_1); | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.