Skip to content

Commit d809d69

Browse files
authored
Add preconfirmation gossip messages (#2726)
## Linked Issues/PRs <!-- List of related issues/PRs --> Closes #2738 ## Description <!-- List of detailed changes --> This is part of the greater tx confirmation work. This will enable the block producer to gossip a batch of signed transactions around the network for the senders of those transactions to receive. ~In an attempt to cleanup the P2P code, some of the tests were moved to their own files (this makes reviewing slightly more difficult, so I've marked the new tests with comments in those refactored test suites).~ TODO: - [x] Finalize the `TxConfirmations` type - [x] What is the correct type for `signature`? - [x] What are all the statuses for the `txs`? - [x] Default to having the Preconfirmation topic off to minimize traffic, turn on with CLI arg - [ ] ~Constrain peers for gossiping to minimize traffic~ #2740 ## Checklist - [ ] Breaking changes are clearly marked as such in the PR description and changelog - [ ] New behavior is reflected in tests - [ ] [The specification](https://github.com/FuelLabs/fuel-specs/) matches the implemented behavior (link update PR if changes are needed) ### Before requesting review - [ ] I have reviewed the code myself - [ ] I have created follow-up issues caused by this PR and linked them here ### After merging, notify other teams [Add or remove entries as needed] - [ ] [Rust SDK](https://github.com/FuelLabs/fuels-rs/) - [ ] [Sway compiler](https://github.com/FuelLabs/sway/) - [ ] [Platform documentation](https://github.com/FuelLabs/devrel-requests/issues/new?assignees=&labels=new+request&projects=&template=NEW-REQUEST.yml&title=%5BRequest%5D%3A+) (for out-of-organization contributors, the person merging the PR will do this) - [ ] Someone else?
1 parent 27827c5 commit d809d69

File tree

11 files changed

+463
-70
lines changed

11 files changed

+463
-70
lines changed

.changes/added/2726.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add a new gossip-sub message for transaction confirmations

bin/fuel-core/src/cli/run/p2p.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,10 @@ pub struct P2PArgs {
205205
/// Number of threads to read from the tx pool.
206206
#[clap(long = "p2p-txpool-threads", default_value = "0", env)]
207207
pub tx_pool_threads: usize,
208+
209+
/// Subscribe to pre-confirmation gossip topic
210+
#[clap(long = "subscribe-to-pre-confirmations", env)]
211+
subscribe_to_pre_confirmations: bool,
208212
}
209213

210214
#[derive(Debug, Clone, Args)]
@@ -349,6 +353,7 @@ impl P2PArgs {
349353
database_read_threads: self.database_read_threads,
350354
tx_pool_threads: self.tx_pool_threads,
351355
state: NotInitialized,
356+
subscribe_to_pre_confirmations: self.subscribe_to_pre_confirmations,
352357
};
353358
Ok(Some(config))
354359
}

crates/services/p2p/src/codecs/gossipsub.rs

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
1-
use std::io;
2-
3-
use fuel_core_types::fuel_tx::Transaction;
4-
51
use crate::gossipsub::messages::{
62
GossipTopicTag,
73
GossipsubBroadcastRequest,
84
GossipsubMessage,
95
};
6+
use fuel_core_types::{
7+
fuel_tx::Transaction,
8+
services::p2p::PreConfirmationMessage,
9+
};
10+
use std::{
11+
io,
12+
ops::Deref,
13+
};
1014

1115
use super::{
1216
Decode,
@@ -22,16 +26,21 @@ pub struct GossipsubMessageHandler<Codec> {
2226

2327
impl<Codec> GossipsubCodec for GossipsubMessageHandler<Codec>
2428
where
25-
Codec:
26-
Encode<Transaction, Error = io::Error> + Decode<Transaction, Error = io::Error>,
29+
Codec: Encode<Transaction, Error = io::Error>
30+
+ Decode<Transaction, Error = io::Error>
31+
+ Encode<PreConfirmationMessage, Error = io::Error>
32+
+ Decode<PreConfirmationMessage, Error = io::Error>,
2733
{
2834
type RequestMessage = GossipsubBroadcastRequest;
2935
type ResponseMessage = GossipsubMessage;
3036

3137
fn encode(&self, data: Self::RequestMessage) -> Result<Vec<u8>, io::Error> {
3238
match data {
3339
GossipsubBroadcastRequest::NewTx(tx) => {
34-
Ok(self.codec.encode(&tx)?.into_bytes())
40+
Ok(self.codec.encode(tx.deref())?.into_bytes())
41+
}
42+
GossipsubBroadcastRequest::TxPreConfirmations(msg) => {
43+
Ok(self.codec.encode(msg.deref())?.into_bytes())
3544
}
3645
}
3746
}
@@ -45,6 +54,9 @@ where
4554
GossipTopicTag::NewTx => {
4655
GossipsubMessage::NewTx(self.codec.decode(encoded_data)?)
4756
}
57+
GossipTopicTag::TxPreConfirmations => {
58+
GossipsubMessage::TxPreConfirmations(self.codec.decode(encoded_data)?)
59+
}
4860
};
4961

5062
Ok(decoded_response)

crates/services/p2p/src/codecs/postcard.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,10 @@ impl<T> Encode<T> for PostcardCodec
3535
where
3636
T: ?Sized + serde::Serialize,
3737
{
38-
type Encoder<'a> = Cow<'a, [u8]> where T: 'a;
38+
type Encoder<'a>
39+
= Cow<'a, [u8]>
40+
where
41+
T: 'a;
3942
type Error = io::Error;
4043

4144
fn encode<'a>(&self, value: &'a T) -> Result<Self::Encoder<'a>, Self::Error> {

crates/services/p2p/src/config.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,9 @@ pub struct Config<State = Initialized> {
146146
/// with the `NotInitialized` state. But it can be set into the `Initialized` state only with
147147
/// the `init` method.
148148
pub state: State,
149+
150+
/// If true, the node will subscribe to pre-confirmations topic
151+
pub subscribe_to_pre_confirmations: bool,
149152
}
150153

151154
/// The initialized state can be achieved only by the `init` function because `()` is private.
@@ -195,6 +198,7 @@ impl Config<NotInitialized> {
195198
database_read_threads: self.database_read_threads,
196199
tx_pool_threads: self.tx_pool_threads,
197200
state: Initialized(()),
201+
subscribe_to_pre_confirmations: self.subscribe_to_pre_confirmations,
198202
})
199203
}
200204
}
@@ -249,6 +253,7 @@ impl Config<NotInitialized> {
249253
database_read_threads: 0,
250254
tx_pool_threads: 0,
251255
state: NotInitialized,
256+
subscribe_to_pre_confirmations: false,
252257
}
253258
}
254259
}

crates/services/p2p/src/gossipsub/config.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use super::topics::NEW_TX_GOSSIP_TOPIC;
1+
use super::topics::{
2+
NEW_TX_GOSSIP_TOPIC,
3+
TX_PRECONFIRMATIONS_GOSSIP_TOPIC,
4+
};
25
use crate::{
36
config::{
47
Config,
@@ -49,6 +52,8 @@ const MESH_SIZE: usize = 8;
4952
// The weight applied to the score for delivering new transactions.
5053
const NEW_TX_GOSSIP_WEIGHT: f64 = 0.05;
5154

55+
const TX_PRECONFIRMATIONS_GOSSIP_WEIGHT: f64 = 0.05;
56+
5257
// The threshold for a peer's score to be considered for greylisting.
5358
// If a peer's score falls below this value, they will be greylisted.
5459
// Greylisting is a lighter form of banning, where the peer's messages might be ignored or given lower priority,
@@ -222,7 +227,13 @@ fn initialize_gossipsub(gossipsub: &mut gossipsub::Behaviour, p2p_config: &Confi
222227
.with_peer_score(peer_score_params, peer_score_thresholds)
223228
.expect("gossipsub initialized with peer score");
224229

225-
let topics = vec![(NEW_TX_GOSSIP_TOPIC, NEW_TX_GOSSIP_WEIGHT)];
230+
let mut topics = vec![(NEW_TX_GOSSIP_TOPIC, NEW_TX_GOSSIP_WEIGHT)];
231+
if p2p_config.subscribe_to_pre_confirmations {
232+
topics.push((
233+
TX_PRECONFIRMATIONS_GOSSIP_TOPIC,
234+
TX_PRECONFIRMATIONS_GOSSIP_WEIGHT,
235+
));
236+
}
226237

227238
// subscribe to gossipsub topics with the network name suffix
228239
for (topic, weight) in topics {

crates/services/p2p/src/gossipsub/messages.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::sync::Arc;
22

33
use fuel_core_types::fuel_tx::Transaction;
44

5+
use fuel_core_types::services::p2p::PreConfirmationMessage;
56
use serde::{
67
Deserialize,
78
Serialize,
@@ -12,6 +13,7 @@ use serde::{
1213
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
1314
pub enum GossipTopicTag {
1415
NewTx,
16+
TxPreConfirmations,
1517
}
1618

1719
/// Takes `Arc<T>` and wraps it in a matching GossipsubBroadcastRequest
@@ -20,9 +22,11 @@ pub enum GossipTopicTag {
2022
#[derive(Debug, Clone)]
2123
pub enum GossipsubBroadcastRequest {
2224
NewTx(Arc<Transaction>),
25+
TxPreConfirmations(Arc<PreConfirmationMessage>),
2326
}
2427

2528
#[derive(Serialize, Deserialize, Debug, Clone)]
2629
pub enum GossipsubMessage {
2730
NewTx(Transaction),
31+
TxPreConfirmations(PreConfirmationMessage),
2832
}

crates/services/p2p/src/gossipsub/topics.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,22 +10,26 @@ use super::messages::{
1010
};
1111

1212
pub const NEW_TX_GOSSIP_TOPIC: &str = "new_tx";
13+
pub const TX_PRECONFIRMATIONS_GOSSIP_TOPIC: &str = "tx_preconfirmations";
1314

1415
/// Holds used Gossipsub Topics
1516
/// Each field contains TopicHash of existing topics
1617
/// in order to avoid converting topics to TopicHash on each received message
1718
#[derive(Debug)]
1819
pub struct GossipsubTopics {
1920
new_tx_topic: TopicHash,
21+
tx_confirmations_topic: TopicHash,
2022
}
2123

2224
impl GossipsubTopics {
2325
pub fn new(network_name: &str) -> Self {
2426
let new_tx_topic: Sha256Topic =
2527
Topic::new(format!("{NEW_TX_GOSSIP_TOPIC}/{network_name}"));
26-
28+
let tx_confirmations_topic: Sha256Topic =
29+
Topic::new(format!("{TX_PRECONFIRMATIONS_GOSSIP_TOPIC}/{network_name}"));
2730
Self {
2831
new_tx_topic: new_tx_topic.hash(),
32+
tx_confirmations_topic: tx_confirmations_topic.hash(),
2933
}
3034
}
3135

@@ -36,6 +40,9 @@ impl GossipsubTopics {
3640
) -> Option<GossipTopicTag> {
3741
match incoming_topic {
3842
hash if hash == &self.new_tx_topic => Some(GossipTopicTag::NewTx),
43+
hash if hash == &self.tx_confirmations_topic => {
44+
Some(GossipTopicTag::TxPreConfirmations)
45+
}
3946
_ => None,
4047
}
4148
}
@@ -48,6 +55,9 @@ impl GossipsubTopics {
4855
) -> TopicHash {
4956
match outgoing_request {
5057
GossipsubBroadcastRequest::NewTx(_) => self.new_tx_topic.clone(),
58+
GossipsubBroadcastRequest::TxPreConfirmations(_) => {
59+
self.tx_confirmations_topic.clone()
60+
}
5161
}
5262
}
5363
}

0 commit comments

Comments
 (0)