diff --git a/.changes/added/2726.md b/.changes/added/2726.md new file mode 100644 index 00000000000..a5afc3ba528 --- /dev/null +++ b/.changes/added/2726.md @@ -0,0 +1 @@ +Add a new gossip-sub message for transaction confirmations \ No newline at end of file diff --git a/bin/fuel-core/src/cli/run/p2p.rs b/bin/fuel-core/src/cli/run/p2p.rs index 14a838ad890..ea82a7b8078 100644 --- a/bin/fuel-core/src/cli/run/p2p.rs +++ b/bin/fuel-core/src/cli/run/p2p.rs @@ -205,6 +205,10 @@ pub struct P2PArgs { /// Number of threads to read from the tx pool. #[clap(long = "p2p-txpool-threads", default_value = "0", env)] pub tx_pool_threads: usize, + + /// Subscribe to pre-confirmation gossip topic + #[clap(long = "subscribe-to-pre-confirmations", env)] + subscribe_to_pre_confirmations: bool, } #[derive(Debug, Clone, Args)] @@ -349,6 +353,7 @@ impl P2PArgs { database_read_threads: self.database_read_threads, tx_pool_threads: self.tx_pool_threads, state: NotInitialized, + subscribe_to_pre_confirmations: self.subscribe_to_pre_confirmations, }; Ok(Some(config)) } diff --git a/crates/services/p2p/src/codecs/gossipsub.rs b/crates/services/p2p/src/codecs/gossipsub.rs index 2dc7ce96bf1..fe44b2ef04b 100644 --- a/crates/services/p2p/src/codecs/gossipsub.rs +++ b/crates/services/p2p/src/codecs/gossipsub.rs @@ -1,12 +1,16 @@ -use std::io; - -use fuel_core_types::fuel_tx::Transaction; - use crate::gossipsub::messages::{ GossipTopicTag, GossipsubBroadcastRequest, GossipsubMessage, }; +use fuel_core_types::{ + fuel_tx::Transaction, + services::p2p::PreConfirmationMessage, +}; +use std::{ + io, + ops::Deref, +}; use super::{ Decode, @@ -22,8 +26,10 @@ pub struct GossipsubMessageHandler { impl GossipsubCodec for GossipsubMessageHandler where - Codec: - Encode + Decode, + Codec: Encode + + Decode + + Encode + + Decode, { type RequestMessage = GossipsubBroadcastRequest; type ResponseMessage = GossipsubMessage; @@ -31,7 +37,10 @@ where fn encode(&self, data: Self::RequestMessage) -> Result, io::Error> { match data { GossipsubBroadcastRequest::NewTx(tx) => { - Ok(self.codec.encode(&tx)?.into_bytes()) + Ok(self.codec.encode(tx.deref())?.into_bytes()) + } + GossipsubBroadcastRequest::TxPreConfirmations(msg) => { + Ok(self.codec.encode(msg.deref())?.into_bytes()) } } } @@ -45,6 +54,9 @@ where GossipTopicTag::NewTx => { GossipsubMessage::NewTx(self.codec.decode(encoded_data)?) } + GossipTopicTag::TxPreConfirmations => { + GossipsubMessage::TxPreConfirmations(self.codec.decode(encoded_data)?) + } }; Ok(decoded_response) diff --git a/crates/services/p2p/src/codecs/postcard.rs b/crates/services/p2p/src/codecs/postcard.rs index 2d10fc37736..296deb34590 100644 --- a/crates/services/p2p/src/codecs/postcard.rs +++ b/crates/services/p2p/src/codecs/postcard.rs @@ -35,7 +35,10 @@ impl Encode for PostcardCodec where T: ?Sized + serde::Serialize, { - type Encoder<'a> = Cow<'a, [u8]> where T: 'a; + type Encoder<'a> + = Cow<'a, [u8]> + where + T: 'a; type Error = io::Error; fn encode<'a>(&self, value: &'a T) -> Result, Self::Error> { diff --git a/crates/services/p2p/src/config.rs b/crates/services/p2p/src/config.rs index 091fa5f8978..6492c3c1fb2 100644 --- a/crates/services/p2p/src/config.rs +++ b/crates/services/p2p/src/config.rs @@ -146,6 +146,9 @@ pub struct Config { /// with the `NotInitialized` state. But it can be set into the `Initialized` state only with /// the `init` method. pub state: State, + + /// If true, the node will subscribe to pre-confirmations topic + pub subscribe_to_pre_confirmations: bool, } /// The initialized state can be achieved only by the `init` function because `()` is private. @@ -195,6 +198,7 @@ impl Config { database_read_threads: self.database_read_threads, tx_pool_threads: self.tx_pool_threads, state: Initialized(()), + subscribe_to_pre_confirmations: self.subscribe_to_pre_confirmations, }) } } @@ -249,6 +253,7 @@ impl Config { database_read_threads: 0, tx_pool_threads: 0, state: NotInitialized, + subscribe_to_pre_confirmations: false, } } } diff --git a/crates/services/p2p/src/gossipsub/config.rs b/crates/services/p2p/src/gossipsub/config.rs index 26a63f4e959..959fad12aa6 100644 --- a/crates/services/p2p/src/gossipsub/config.rs +++ b/crates/services/p2p/src/gossipsub/config.rs @@ -1,4 +1,7 @@ -use super::topics::NEW_TX_GOSSIP_TOPIC; +use super::topics::{ + NEW_TX_GOSSIP_TOPIC, + TX_PRECONFIRMATIONS_GOSSIP_TOPIC, +}; use crate::{ config::{ Config, @@ -49,6 +52,8 @@ const MESH_SIZE: usize = 8; // The weight applied to the score for delivering new transactions. const NEW_TX_GOSSIP_WEIGHT: f64 = 0.05; +const TX_PRECONFIRMATIONS_GOSSIP_WEIGHT: f64 = 0.05; + // The threshold for a peer's score to be considered for greylisting. // If a peer's score falls below this value, they will be greylisted. // Greylisting is a lighter form of banning, where the peer's messages might be ignored or given lower priority, @@ -222,7 +227,13 @@ fn initialize_gossipsub(gossipsub: &mut gossipsub::Behaviour, p2p_config: &Confi .with_peer_score(peer_score_params, peer_score_thresholds) .expect("gossipsub initialized with peer score"); - let topics = vec![(NEW_TX_GOSSIP_TOPIC, NEW_TX_GOSSIP_WEIGHT)]; + let mut topics = vec![(NEW_TX_GOSSIP_TOPIC, NEW_TX_GOSSIP_WEIGHT)]; + if p2p_config.subscribe_to_pre_confirmations { + topics.push(( + TX_PRECONFIRMATIONS_GOSSIP_TOPIC, + TX_PRECONFIRMATIONS_GOSSIP_WEIGHT, + )); + } // subscribe to gossipsub topics with the network name suffix for (topic, weight) in topics { diff --git a/crates/services/p2p/src/gossipsub/messages.rs b/crates/services/p2p/src/gossipsub/messages.rs index 07070685991..c6f93d8eed0 100644 --- a/crates/services/p2p/src/gossipsub/messages.rs +++ b/crates/services/p2p/src/gossipsub/messages.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use fuel_core_types::fuel_tx::Transaction; +use fuel_core_types::services::p2p::PreConfirmationMessage; use serde::{ Deserialize, Serialize, @@ -12,6 +13,7 @@ use serde::{ #[derive(Debug, Clone, Copy, Eq, PartialEq)] pub enum GossipTopicTag { NewTx, + TxPreConfirmations, } /// Takes `Arc` and wraps it in a matching GossipsubBroadcastRequest @@ -20,9 +22,11 @@ pub enum GossipTopicTag { #[derive(Debug, Clone)] pub enum GossipsubBroadcastRequest { NewTx(Arc), + TxPreConfirmations(Arc), } #[derive(Serialize, Deserialize, Debug, Clone)] pub enum GossipsubMessage { NewTx(Transaction), + TxPreConfirmations(PreConfirmationMessage), } diff --git a/crates/services/p2p/src/gossipsub/topics.rs b/crates/services/p2p/src/gossipsub/topics.rs index ac8e9efb105..276965793d7 100644 --- a/crates/services/p2p/src/gossipsub/topics.rs +++ b/crates/services/p2p/src/gossipsub/topics.rs @@ -10,6 +10,7 @@ use super::messages::{ }; pub const NEW_TX_GOSSIP_TOPIC: &str = "new_tx"; +pub const TX_PRECONFIRMATIONS_GOSSIP_TOPIC: &str = "tx_preconfirmations"; /// Holds used Gossipsub Topics /// Each field contains TopicHash of existing topics @@ -17,15 +18,18 @@ pub const NEW_TX_GOSSIP_TOPIC: &str = "new_tx"; #[derive(Debug)] pub struct GossipsubTopics { new_tx_topic: TopicHash, + tx_confirmations_topic: TopicHash, } impl GossipsubTopics { pub fn new(network_name: &str) -> Self { let new_tx_topic: Sha256Topic = Topic::new(format!("{NEW_TX_GOSSIP_TOPIC}/{network_name}")); - + let tx_confirmations_topic: Sha256Topic = + Topic::new(format!("{TX_PRECONFIRMATIONS_GOSSIP_TOPIC}/{network_name}")); Self { new_tx_topic: new_tx_topic.hash(), + tx_confirmations_topic: tx_confirmations_topic.hash(), } } @@ -36,6 +40,9 @@ impl GossipsubTopics { ) -> Option { match incoming_topic { hash if hash == &self.new_tx_topic => Some(GossipTopicTag::NewTx), + hash if hash == &self.tx_confirmations_topic => { + Some(GossipTopicTag::TxPreConfirmations) + } _ => None, } } @@ -48,6 +55,9 @@ impl GossipsubTopics { ) -> TopicHash { match outgoing_request { GossipsubBroadcastRequest::NewTx(_) => self.new_tx_topic.clone(), + GossipsubBroadcastRequest::TxPreConfirmations(_) => { + self.tx_confirmations_topic.clone() + } } } } diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index 87459c85697..2840af08e54 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -125,7 +125,7 @@ pub struct FuelP2PService { /// to the peer that requested it. inbound_requests_table: HashMap>, - /// `PostcardCodec` as GossipsubCodec for encoding and decoding of Gossipsub messages + /// `PostcardCodec` as GossipsubCodec for encoding and decoding of Gossipsub messages gossipsub_codec: GossipsubMessageHandler, /// Stores additional p2p network info @@ -857,27 +857,33 @@ impl FuelP2PService { } } -#[allow(clippy::cast_possible_truncation)] #[cfg(test)] mod tests { + #![allow(non_snake_case)] + #![allow(clippy::cast_possible_truncation)] + use super::{ FuelP2PService, PublishError, }; use crate::{ - codecs::{ - gossipsub::GossipsubMessageHandler, - request_response::RequestResponseMessageHandler, - }, config::Config, gossipsub::{ messages::{ + GossipTopicTag, GossipsubBroadcastRequest, GossipsubMessage, }, - topics::NEW_TX_GOSSIP_TOPIC, + topics::{ + NEW_TX_GOSSIP_TOPIC, + TX_PRECONFIRMATIONS_GOSSIP_TOPIC, + }, + }, + p2p_service::{ + FuelP2PEvent, + GossipsubMessageHandler, + RequestResponseMessageHandler, }, - p2p_service::FuelP2PEvent, peer_manager::PeerInfo, request_response::messages::{ RequestMessage, @@ -906,6 +912,7 @@ mod tests { services::p2p::{ GossipsubMessageAcceptance, NetworkableTransactionPool, + PreConfirmationMessage, Transactions, }, }; @@ -929,7 +936,10 @@ mod tests { use rand::Rng; use std::{ collections::HashSet, - ops::Range, + ops::{ + Deref, + Range, + }, sync::Arc, time::Duration, }; @@ -940,7 +950,6 @@ mod tests { watch, }; use tracing_attributes::instrument; - type P2PService = FuelP2PService; /// helper function for building FuelP2PService @@ -1112,9 +1121,9 @@ mod tests { assert_eq!(node_a.peer_manager().total_peers_connected(), 0); } }) - .await - // Then - .expect_err("The node should not connect to itself"); + .await + // Then + .expect_err("The node should not connect to itself"); assert_eq!(node_b.peer_manager().total_peers_connected(), 0); } @@ -1449,7 +1458,7 @@ mod tests { #[tokio::test] #[instrument] - async fn gossipsub_broadcast_tx_with_accept() { + async fn gossipsub_broadcast_tx_with_accept__new_tx() { for _ in 0..100 { tokio::time::timeout( Duration::from_secs(5), @@ -1468,7 +1477,26 @@ mod tests { #[tokio::test] #[instrument] - async fn gossipsub_broadcast_tx_with_reject() { + async fn gossipsub_broadcast_tx_with_accept__tx_confirmations() { + for _ in 0..100 { + tokio::time::timeout( + Duration::from_secs(20), + gossipsub_broadcast( + GossipsubBroadcastRequest::TxPreConfirmations(Arc::new( + PreConfirmationMessage::default_test_confirmation(), + )), + GossipsubMessageAcceptance::Accept, + None, + ), + ) + .await + .unwrap(); + } + } + + #[tokio::test] + #[instrument] + async fn gossipsub_broadcast_tx_with_reject__new_tx() { for _ in 0..100 { tokio::time::timeout( Duration::from_secs(5), @@ -1485,6 +1513,25 @@ mod tests { } } + #[tokio::test] + #[instrument] + async fn gossipsub_broadcast_tx_with_reject__tx_confirmations() { + for _ in 0..100 { + tokio::time::timeout( + Duration::from_secs(5), + gossipsub_broadcast( + GossipsubBroadcastRequest::TxPreConfirmations(Arc::new( + PreConfirmationMessage::default_test_confirmation(), + )), + GossipsubMessageAcceptance::Reject, + None, + ), + ) + .await + .unwrap(); + } + } + #[tokio::test] #[instrument] #[ignore] @@ -1617,13 +1664,25 @@ mod tests { p2p_config.max_functional_peers_connected = connection_limit; } - let selected_topic: Sha256Topic = { - let topic = match broadcast_request { - GossipsubBroadcastRequest::NewTx(_) => NEW_TX_GOSSIP_TOPIC, + p2p_config.subscribe_to_pre_confirmations = true; + + let (selected_topic, selected_tag): (Sha256Topic, GossipTopicTag) = { + let (topic, tag) = match broadcast_request { + GossipsubBroadcastRequest::NewTx(_) => { + (NEW_TX_GOSSIP_TOPIC, GossipTopicTag::NewTx) + } + GossipsubBroadcastRequest::TxPreConfirmations(_) => ( + TX_PRECONFIRMATIONS_GOSSIP_TOPIC, + GossipTopicTag::TxPreConfirmations, + ), }; - Topic::new(format!("{}/{}", topic, p2p_config.network_name)) + ( + Topic::new(format!("{}/{}", topic, p2p_config.network_name)), + tag, + ) }; + tracing::info!("Selected Topic: {:?}", selected_topic); let mut message_sent = false; @@ -1657,16 +1716,21 @@ mod tests { tokio::select! { node_a_event = node_a.next_event() => { - if let Some(FuelP2PEvent::NewSubscription { peer_id, .. }) = &node_a_event { - if peer_id == &node_b.local_peer_id { + if let Some(FuelP2PEvent::NewSubscription { peer_id, tag }) = &node_a_event { + if tag != &selected_tag { + tracing::info!("Wrong tag, expected: {:?}, actual: {:?}", selected_tag, tag); + } else if peer_id == &node_b.local_peer_id { a_connected_to_b = true; } } tracing::info!("Node A Event: {:?}", node_a_event); }, node_b_event = node_b.next_event() => { - if let Some(FuelP2PEvent::NewSubscription { peer_id, .. }) = &node_b_event { - if peer_id == &node_c.local_peer_id { + if let Some(FuelP2PEvent::NewSubscription { peer_id,tag, }) = &node_b_event { + tracing::info!("New subscription for peer_id: {:?} with tag: {:?}", peer_id, tag); + if tag != &selected_tag { + tracing::info!("Wrong tag, expected: {:?}, actual: {:?}", selected_tag, tag); + } else if peer_id == &node_c.local_peer_id { b_connected_to_c = true; } } @@ -1682,15 +1746,7 @@ mod tests { panic!("Wrong Topic"); } - // received value should match sent value - match &message { - GossipsubMessage::NewTx(tx) => { - if tx != &Transaction::default_test_tx() { - tracing::error!("Wrong p2p message {:?}", message); - panic!("Wrong GossipsubMessage") - } - } - } + check_message_matches_request(&message, &broadcast_request); // Node B received the correct message // If we try to publish it again we will get `PublishError::Duplicate` @@ -1729,6 +1785,22 @@ mod tests { } } + fn check_message_matches_request( + message: &GossipsubMessage, + expected: &GossipsubBroadcastRequest, + ) { + match (message, expected) { + (GossipsubMessage::NewTx(received), GossipsubBroadcastRequest::NewTx(requested)) => { + assert_eq!(requested.deref(), received, "Both messages were `NewTx`s, but the received message did not match the requested message"); + } + ( + GossipsubMessage::TxPreConfirmations(received), + GossipsubBroadcastRequest::TxPreConfirmations(requested), + ) => assert_eq!(requested.deref(), received, "Both messages were `Confirmations`, but the received message did not match the requested message"), + _ => panic!("Message does not match the expected request, expected: {:?}, actual: {:?}", expected, message), + } + } + fn arbitrary_headers_for_range(range: Range) -> Vec { let mut blocks = Vec::new(); for i in range { @@ -2160,16 +2232,16 @@ mod tests { #[tokio::test] async fn gossipsub_peer_limit_works() { tokio::time::timeout( - Duration::from_secs(5), - gossipsub_broadcast( - GossipsubBroadcastRequest::NewTx(Arc::new( - Transaction::default_test_tx(), - )), - GossipsubMessageAcceptance::Accept, - Some(1) // limit to 1 peer, therefore the function will timeout, as it will not be able to propagate the message - ), - ) - .await.expect_err("Should have timed out"); + Duration::from_secs(5), + gossipsub_broadcast( + GossipsubBroadcastRequest::NewTx(Arc::new( + Transaction::default_test_tx(), + )), + GossipsubMessageAcceptance::Accept, + Some(1) // limit to 1 peer, therefore the function will timeout, as it will not be able to propagate the message + ), + ) + .await.expect_err("Should have timed out"); } #[tokio::test] diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 9126f0e3aba..7668c5a25bc 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -63,6 +63,7 @@ use fuel_core_types::{ PeerReport, }, BlockHeightHeartbeatData, + ConfirmationsGossipData, GossipData, GossipsubMessageAcceptance, GossipsubMessageInfo, @@ -79,6 +80,7 @@ use futures::{ use libp2p::{ gossipsub::{ MessageAcceptance, + MessageId, PublishError, }, request_response::InboundRequestId, @@ -371,6 +373,11 @@ pub trait Broadcast: Send { fn tx_broadcast(&self, transaction: TransactionGossipData) -> anyhow::Result<()>; + fn pre_confirmation_broadcast( + &self, + confirmations: ConfirmationsGossipData, + ) -> anyhow::Result<()>; + fn new_tx_subscription_broadcast(&self, peer_id: FuelPeerId) -> anyhow::Result<()>; } @@ -397,6 +404,14 @@ impl Broadcast for SharedState { Ok(()) } + fn pre_confirmation_broadcast( + &self, + confirmations: ConfirmationsGossipData, + ) -> anyhow::Result<()> { + self.confirmations_broadcast.send(confirmations)?; + Ok(()) + } + fn new_tx_subscription_broadcast(&self, peer_id: FuelPeerId) -> anyhow::Result<()> { self.new_tx_subscription_broadcast.send(peer_id)?; Ok(()) @@ -443,6 +458,28 @@ pub struct Task { cached_view: Arc, } +impl Task { + pub(crate) fn broadcast_gossip_message( + &self, + message: GossipsubMessage, + message_id: MessageId, + peer_id: PeerId, + ) { + let message_id = message_id.0; + + match message { + GossipsubMessage::NewTx(transaction) => { + let next_transaction = GossipData::new(transaction, peer_id, message_id); + let _ = self.broadcast.tx_broadcast(next_transaction); + } + GossipsubMessage::TxPreConfirmations(confirmations) => { + let data = GossipData::new(confirmations, peer_id, message_id); + let _ = self.broadcast.pre_confirmation_broadcast(data); + } + } + } +} + #[derive(Default, Clone)] pub struct HeartbeatPeerReputationConfig { old_heartbeat_penalty: AppScore, @@ -990,14 +1027,8 @@ where let _ = self.broadcast.block_height_broadcast(block_height_data); } Some(FuelP2PEvent::GossipsubMessage { message, message_id, peer_id,.. }) => { - let message_id = message_id.0; - - match message { - GossipsubMessage::NewTx(transaction) => { - let next_transaction = GossipData::new(transaction, peer_id, message_id); - let _ = self.broadcast.tx_broadcast(next_transaction); - }, - } + tracing::info!("Received gossip message from peer {:?}", peer_id); + self.broadcast_gossip_message(message, message_id, peer_id); }, Some(FuelP2PEvent::InboundRequestMessage { request_message, request_id }) => { let res = self.process_request(request_message, request_id); @@ -1052,6 +1083,8 @@ pub struct SharedState { new_tx_subscription_broadcast: broadcast::Sender, /// Sender of p2p transaction used for subscribing. tx_broadcast: broadcast::Sender, + /// Sender of p2p tx confirmations used for subscribing. + confirmations_broadcast: broadcast::Sender, /// Sender of reserved peers connection updates. reserved_peers_broadcast: broadcast::Sender, /// Used for communicating with the `Task`. @@ -1281,6 +1314,12 @@ impl SharedState { self.tx_broadcast.subscribe() } + pub fn subscribe_confirmations( + &self, + ) -> broadcast::Receiver { + self.confirmations_broadcast.subscribe() + } + pub fn subscribe_block_height( &self, ) -> broadcast::Receiver { @@ -1323,6 +1362,7 @@ pub fn build_shared_state( ) -> (SharedState, Receiver) { let (request_sender, request_receiver) = mpsc::channel(CHANNEL_SIZE); let (tx_broadcast, _) = broadcast::channel(CHANNEL_SIZE); + let (confirmations_broadcast, _) = broadcast::channel(CHANNEL_SIZE); let (new_tx_subscription_broadcast, _) = broadcast::channel(CHANNEL_SIZE); let (block_height_broadcast, _) = broadcast::channel(CHANNEL_SIZE); @@ -1339,6 +1379,7 @@ pub fn build_shared_state( request_sender, new_tx_subscription_broadcast, tx_broadcast, + confirmations_broadcast, reserved_peers_broadcast, block_height_broadcast, max_txs_per_request: config.max_txs_per_request, @@ -1407,15 +1448,17 @@ fn report_message( warn!(target: "fuel-p2p", "Failed to read PeerId from received GossipsubMessageId: {}", msg_id); } } - #[cfg(test)] -pub mod tests { +pub mod task_tests { #![allow(non_snake_case)] use crate::ports::P2pDb; use super::*; - use crate::peer_manager::heartbeat_data::HeartbeatData; + use crate::{ + gossipsub::topics::TX_PRECONFIRMATIONS_GOSSIP_TOPIC, + peer_manager::heartbeat_data::HeartbeatData, + }; use fuel_core_services::{ Service, State, @@ -1424,8 +1467,10 @@ pub mod tests { use fuel_core_types::{ blockchain::consensus::Genesis, fuel_types::BlockHeight, + services::p2p::PreConfirmationMessage, }; use futures::FutureExt; + use libp2p::gossipsub::TopicHash; use std::{ collections::VecDeque, time::SystemTime, @@ -1615,6 +1660,7 @@ pub mod tests { struct FakeBroadcast { pub peer_reports: mpsc::Sender<(FuelPeerId, AppScore, String)>, + pub confirmation_gossip_broadcast: mpsc::Sender, } impl Broadcast for FakeBroadcast { @@ -1646,6 +1692,14 @@ pub mod tests { todo!() } + fn pre_confirmation_broadcast( + &self, + confirmations: ConfirmationsGossipData, + ) -> anyhow::Result<()> { + self.confirmation_gossip_broadcast.try_send(confirmations)?; + Ok(()) + } + fn new_tx_subscription_broadcast( &self, _peer_id: FuelPeerId, @@ -1686,6 +1740,7 @@ pub mod tests { let (report_sender, mut report_receiver) = mpsc::channel(100); let broadcast = FakeBroadcast { peer_reports: report_sender, + confirmation_gossip_broadcast: mpsc::channel(100).0, }; // Less than actual @@ -1779,6 +1834,7 @@ pub mod tests { let (report_sender, mut report_receiver) = mpsc::channel(100); let broadcast = FakeBroadcast { peer_reports: report_sender, + confirmation_gossip_broadcast: mpsc::channel(100).0, }; // Greater than actual @@ -1870,6 +1926,7 @@ pub mod tests { let (request_sender, request_receiver) = mpsc::channel(100); let broadcast = FakeBroadcast { peer_reports: mpsc::channel(100).0, + confirmation_gossip_broadcast: mpsc::channel(100).0, }; let mut task = Task { chain_id: Default::default(), @@ -1905,4 +1962,109 @@ pub mod tests { .expect("Should process the block height even under p2p pressure"); } } + + fn arb_tx_confirmation_gossip_message() -> FuelP2PEvent { + let peer_id = PeerId::random(); + let message_id = vec![1, 2, 3, 4, 5].into(); + let topic_hash = TopicHash::from_raw(TX_PRECONFIRMATIONS_GOSSIP_TOPIC); + let confirmations = PreConfirmationMessage::default_test_confirmation(); + let message = GossipsubMessage::TxPreConfirmations(confirmations); + FuelP2PEvent::GossipsubMessage { + peer_id, + message_id, + topic_hash, + message, + } + } + + #[tokio::test] + async fn run__gossip_message_from_p2p_service_is_broadcasted__tx_confirmations() { + // given + let gossip_message_event = arb_tx_confirmation_gossip_message(); + let events = vec![gossip_message_event.clone()]; + let event_stream = futures::stream::iter(events); + let p2p_service = FakeP2PService { + peer_info: vec![], + next_event_stream: Box::pin(event_stream), + }; + let (confirmations_sender, mut confirmations_receiver) = mpsc::channel(100); + let broadcast = FakeBroadcast { + peer_reports: mpsc::channel(100).0, + confirmation_gossip_broadcast: confirmations_sender, + }; + let (request_sender, request_receiver) = mpsc::channel(100); + let mut task = Task { + chain_id: Default::default(), + response_timeout: Default::default(), + p2p_service, + view_provider: FakeDB, + next_block_height: FakeBlockImporter.next_block_height(), + tx_pool: FakeTxPool, + request_receiver, + request_sender, + db_heavy_task_processor: SyncProcessor::new("Test", 1, 1).unwrap(), + tx_pool_heavy_task_processor: AsyncProcessor::new("Test", 1, 1).unwrap(), + broadcast, + max_headers_per_request: 0, + max_txs_per_request: 100, + heartbeat_check_interval: Duration::from_secs(0), + heartbeat_max_avg_interval: Default::default(), + heartbeat_max_time_since_last: Default::default(), + next_check_time: Instant::now(), + heartbeat_peer_reputation_config: Default::default(), + cached_view: Arc::new(CachedView::new(100, false)), + }; + + // when + let mut watcher = StateWatcher::started(); + let _ = task.run(&mut watcher).await; + tokio::time::sleep(Duration::from_millis(100)).await; + + // then + let actual = confirmations_receiver.try_recv().unwrap().data.unwrap(); + let FuelP2PEvent::GossipsubMessage { message, .. } = gossip_message_event else { + panic!("Expected GossipsubMessage event"); + }; + let GossipsubMessage::TxPreConfirmations(expected) = message else { + panic!("Expected Confirmations message"); + }; + assert_eq!(expected, actual); + } +} + +#[cfg(test)] +pub mod broadcast_tests { + + #![allow(non_snake_case)] + + use super::*; + use fuel_core_types::services::p2p::PreConfirmationMessage; + + fn arb_shared_state() -> SharedState { + let config = Config::default("test network"); + let (shared_state, _) = build_shared_state(config); + shared_state + } + + #[tokio::test] + async fn shared_state__broadcast__tx_confirmations() { + // given + let broadcast = arb_shared_state(); + let confirmations = PreConfirmationMessage::default_test_confirmation(); + let confirmations_gossip_data = ConfirmationsGossipData { + data: Some(confirmations.clone()), + peer_id: FuelPeerId::from(PeerId::random().to_bytes().to_vec()), + message_id: vec![1, 2, 3, 4], + }; + let mut confirmations_receiver = broadcast.subscribe_confirmations(); + + // when + broadcast + .pre_confirmation_broadcast(confirmations_gossip_data) + .unwrap(); + + // then + let actual = confirmations_receiver.try_recv().unwrap().data.unwrap(); + assert_eq!(confirmations, actual); + } } diff --git a/crates/types/src/services/p2p.rs b/crates/types/src/services/p2p.rs index e6659a7db54..6c662e34021 100644 --- a/crates/types/src/services/p2p.rs +++ b/crates/types/src/services/p2p.rs @@ -6,8 +6,18 @@ use serde::{ Serialize, }; +use super::txpool::ArcPoolTx; +#[cfg(feature = "serde")] +use super::txpool::PoolTransaction; use crate::{ - fuel_tx::Transaction, + fuel_crypto::{ + PublicKey, + Signature, + }, + fuel_tx::{ + Transaction, + TxId, + }, fuel_types::BlockHeight, }; use std::{ @@ -20,11 +30,7 @@ use std::{ str::FromStr, time::SystemTime, }; - -use super::txpool::ArcPoolTx; - -#[cfg(feature = "serde")] -use super::txpool::PoolTransaction; +use tai64::Tai64; /// Contains types and logic for Peer Reputation pub mod peer_reputation; @@ -72,6 +78,108 @@ pub struct GossipData { /// Transactions gossiped by peers for inclusion into a block pub type TransactionGossipData = GossipData; +/// Transactions that have been confirmed by block producer +pub type ConfirmationsGossipData = GossipData; + +/// A value and an associated signature +#[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub struct Sealed { + /// The actual value + pub entity: Entity, + /// Seal + pub signature: Signature, +} + +/// A key that will be used to sign a pre-confirmations +#[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub struct DelegatePreConfirmationKey { + /// The public key of the person who is allowed to create pre-confirmations. + public_key: PublicKey, + /// The time at which the key will expire. Used to indicate to the recipient which key + /// to use to verify the pre-confirmations--serves the second purpose of being a nonce of + /// each key + expiration: Tai64, +} + +/// A pre-confirmation is a message that is sent by the block producer to give the _final_ +/// status of a transaction +#[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub struct Preconfirmation { + /// The ID of the transaction that is being pre-confirmed + tx_id: TxId, + /// The status of the transaction that is being pre-confirmed + status: PreconfirmationStatus, +} + +/// Status of a transaction that has been pre-confirmed by block producer +#[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub enum PreconfirmationStatus { + /// Transaction was squeezed out by the tx pool + SqueezedOutByBlockProducer { + /// Reason the transaction was squeezed out + reason: String, + }, + /// Transaction has been confirmed and will be included in block_height + SuccessByBlockProducer { + /// The block height at which the transaction will be included + block_height: BlockHeight, + }, + /// Transaction will not be included in a block, rejected at `block_height` + FailureByBlockProducer { + /// The block height at which the transaction will be rejected + block_height: BlockHeight, + }, +} + +/// A collection of pre-confirmations that have been signed by a delegate +#[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub struct Preconfirmations { + /// The expiration time of the key used to sign + expiration: Tai64, + /// The transactions which have been pre-confirmed + preconfirmations: Vec, +} + +/// A signed key delegation +pub type SignedByBlockProducerDelegation = Sealed; + +/// A signed pre-confirmation +pub type SignedPreconfirmationByDelegate = Sealed; + +/// The possible messages sent by the parties pre-confirming transactinos +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum PreConfirmationMessage { + /// Notification of key delegation + Delegate(SignedByBlockProducerDelegation), + /// Notification of pre-confirmations + Preconfirmations(SignedPreconfirmationByDelegate), +} + +#[cfg(feature = "test-helpers")] +impl PreConfirmationMessage { + /// Test helper for creating arbitrary, meaningless `TxConfirmations` data + pub fn default_test_confirmation() -> Self { + Self::Preconfirmations(SignedPreconfirmationByDelegate { + entity: Preconfirmations { + expiration: Tai64::UNIX_EPOCH, + preconfirmations: vec![Preconfirmation { + tx_id: TxId::default(), + status: PreconfirmationStatus::SuccessByBlockProducer { + block_height: BlockHeight::new(0), + }, + }], + }, + signature: Signature::default(), + }) + } +} + #[derive(Default, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] /// The source of some network data. pub struct SourcePeer {