Skip to content

Commit 91b1d9d

Browse files
authored
Merge branch 'master' into feature/fuel-core-0.35.0-benchamrks
2 parents b95a57d + 5963a4e commit 91b1d9d

File tree

24 files changed

+1175
-99
lines changed

24 files changed

+1175
-99
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
66

77
## [Unreleased]
88

9+
### Added
10+
- [2131](https://github.com/FuelLabs/fuel-core/pull/2131): Add flow in TxPool in order to ask to newly connected peers to share their transaction pool
11+
912
### Changed
1013

1114
#### Breaking

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bin/fuel-core/src/cli/run/p2p.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ pub struct P2PArgs {
6666
#[clap(long = "max-headers-per-request", default_value = "100", env)]
6767
pub max_headers_per_request: usize,
6868

69+
/// Max number of txs in a single txs request response
70+
#[clap(long = "max-txs-per-request", default_value = "10000", env)]
71+
pub max_txs_per_request: usize,
72+
6973
/// Addresses of the bootstrap nodes
7074
/// They should contain PeerId within their `Multiaddr`
7175
#[clap(long = "bootstrap-nodes", value_delimiter = ',', env)]
@@ -304,6 +308,7 @@ impl P2PArgs {
304308
tcp_port: self.peering_port,
305309
max_block_size: self.max_block_size,
306310
max_headers_per_request: self.max_headers_per_request,
311+
max_txs_per_request: self.max_txs_per_request,
307312
bootstrap_nodes: self.bootstrap_nodes,
308313
reserved_nodes: self.reserved_nodes,
309314
reserved_nodes_only_mode: self.reserved_nodes_only_mode,

crates/fuel-core/src/database.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ use crate::state::{
7676
};
7777
#[cfg(feature = "rocksdb")]
7878
use std::path::Path;
79-
use fuel_core_gas_price_service::fuel_gas_price_updater::fuel_core_storage_adapter::storage::{ GasPriceMetadata};
79+
use fuel_core_gas_price_service::fuel_gas_price_updater::fuel_core_storage_adapter::storage::GasPriceMetadata;
8080
use crate::database::database_description::gas_price::GasPriceDatabase;
8181

8282
// Storages implementation

crates/fuel-core/src/p2p_test_helpers.rs

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ use fuel_core_p2p::{
2121
codecs::postcard::PostcardCodec,
2222
network_service::FuelP2PService,
2323
p2p_service::FuelP2PEvent,
24+
request_response::messages::{
25+
RequestMessage,
26+
ResponseMessage,
27+
},
2428
service::to_message_acceptance,
2529
};
2630
use fuel_core_poa::{
@@ -153,16 +157,30 @@ impl Bootstrap {
153157
}
154158
event = bootstrap.next_event() => {
155159
// The bootstrap node only forwards data without validating it.
156-
if let Some(FuelP2PEvent::GossipsubMessage {
157-
peer_id,
158-
message_id,
159-
..
160-
}) = event {
161-
bootstrap.report_message_validation_result(
162-
&message_id,
160+
match event {
161+
Some(FuelP2PEvent::GossipsubMessage {
163162
peer_id,
164-
to_message_acceptance(&GossipsubMessageAcceptance::Accept)
165-
)
163+
message_id,
164+
..
165+
}) => {
166+
bootstrap.report_message_validation_result(
167+
&message_id,
168+
peer_id,
169+
to_message_acceptance(&GossipsubMessageAcceptance::Accept)
170+
)
171+
},
172+
Some(FuelP2PEvent::InboundRequestMessage {
173+
request_id,
174+
request_message
175+
}) => {
176+
if request_message == RequestMessage::TxPoolAllTransactionsIds {
177+
let _ = bootstrap.send_response_msg(
178+
request_id,
179+
ResponseMessage::TxPoolAllTransactionsIds(Some(vec![])),
180+
);
181+
}
182+
}
183+
_ => {}
166184
}
167185
}
168186
}

crates/fuel-core/src/service/adapters/p2p.rs

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,26 @@
1-
use super::BlockImporterAdapter;
1+
use super::{
2+
BlockImporterAdapter,
3+
TxPoolAdapter,
4+
};
25
use crate::database::OnChainIterableKeyValueView;
36
use fuel_core_p2p::ports::{
47
BlockHeightImporter,
58
P2pDb,
9+
TxPool,
610
};
711
use fuel_core_services::stream::BoxStream;
812
use fuel_core_storage::Result as StorageResult;
13+
use fuel_core_txpool::types::TxId;
914
use fuel_core_types::{
1015
blockchain::{
1116
consensus::Genesis,
1217
SealedBlockHeader,
1318
},
1419
fuel_types::BlockHeight,
15-
services::p2p::Transactions,
20+
services::p2p::{
21+
NetworkableTransactionPool,
22+
Transactions,
23+
},
1624
};
1725
use std::ops::Range;
1826

@@ -49,3 +57,21 @@ impl BlockHeightImporter for BlockImporterAdapter {
4957
)
5058
}
5159
}
60+
61+
impl TxPool for TxPoolAdapter {
62+
fn get_tx_ids(&self, max_txs: usize) -> Vec<TxId> {
63+
self.service.get_tx_ids(max_txs)
64+
}
65+
66+
fn get_full_txs(&self, tx_ids: Vec<TxId>) -> Vec<Option<NetworkableTransactionPool>> {
67+
self.service
68+
.find(tx_ids)
69+
.into_iter()
70+
.map(|tx_info| {
71+
tx_info.map(|tx| {
72+
NetworkableTransactionPool::PoolTransaction(tx.tx().clone())
73+
})
74+
})
75+
.collect()
76+
}
77+
}

crates/fuel-core/src/service/adapters/sync.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,7 @@ impl PeerToPeerPort for P2PAdapter {
7474
data: range,
7575
} = range;
7676
if let Some(service) = &self.service {
77-
service
78-
.get_transactions_from_peer(peer_id.into(), range)
79-
.await
77+
service.get_transactions_from_peer(peer_id, range).await
8078
} else {
8179
Err(anyhow::anyhow!("No P2P service available"))
8280
}

crates/fuel-core/src/service/adapters/txpool.rs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use fuel_core_txpool::{
2828
GasPriceProvider,
2929
MemoryPool,
3030
},
31+
types::TxId,
3132
Result as TxPoolResult,
3233
};
3334
use fuel_core_types::{
@@ -52,6 +53,7 @@ use fuel_core_types::{
5253
p2p::{
5354
GossipsubMessageAcceptance,
5455
GossipsubMessageInfo,
56+
PeerId,
5557
TransactionGossipData,
5658
},
5759
},
@@ -65,6 +67,7 @@ impl BlockImporter for BlockImporterAdapter {
6567
}
6668

6769
#[cfg(feature = "p2p")]
70+
#[async_trait::async_trait]
6871
impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter {
6972
type GossipedTransaction = TransactionGossipData;
7073

@@ -91,6 +94,21 @@ impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter {
9194
}
9295
}
9396

97+
fn subscribe_new_peers(&self) -> BoxStream<PeerId> {
98+
use tokio_stream::{
99+
wrappers::BroadcastStream,
100+
StreamExt,
101+
};
102+
if let Some(service) = &self.service {
103+
Box::pin(
104+
BroadcastStream::new(service.subscribe_new_peers())
105+
.filter_map(|result| result.ok()),
106+
)
107+
} else {
108+
Box::pin(fuel_core_services::stream::pending())
109+
}
110+
}
111+
94112
fn notify_gossip_transaction_validity(
95113
&self,
96114
message_info: GossipsubMessageInfo,
@@ -102,9 +120,32 @@ impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter {
102120
Ok(())
103121
}
104122
}
123+
124+
async fn request_tx_ids(&self, peer_id: PeerId) -> anyhow::Result<Vec<TxId>> {
125+
if let Some(service) = &self.service {
126+
service.get_all_transactions_ids_from_peer(peer_id).await
127+
} else {
128+
Ok(vec![])
129+
}
130+
}
131+
132+
async fn request_txs(
133+
&self,
134+
peer_id: PeerId,
135+
tx_ids: Vec<TxId>,
136+
) -> anyhow::Result<Vec<Option<Transaction>>> {
137+
if let Some(service) = &self.service {
138+
service
139+
.get_full_transactions_from_peer(peer_id, tx_ids)
140+
.await
141+
} else {
142+
Ok(vec![])
143+
}
144+
}
105145
}
106146

107147
#[cfg(not(feature = "p2p"))]
148+
#[async_trait::async_trait]
108149
impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter {
109150
type GossipedTransaction = TransactionGossipData;
110151

@@ -126,6 +167,22 @@ impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter {
126167
) -> anyhow::Result<()> {
127168
Ok(())
128169
}
170+
171+
fn subscribe_new_peers(&self) -> BoxStream<PeerId> {
172+
Box::pin(fuel_core_services::stream::pending())
173+
}
174+
175+
async fn request_tx_ids(&self, _peer_id: PeerId) -> anyhow::Result<Vec<TxId>> {
176+
Ok(vec![])
177+
}
178+
179+
async fn request_txs(
180+
&self,
181+
_peer_id: PeerId,
182+
_tx_ids: Vec<TxId>,
183+
) -> anyhow::Result<Vec<Option<Transaction>>> {
184+
Ok(vec![])
185+
}
129186
}
130187

131188
impl fuel_core_txpool::ports::TxPoolDb for OnChainIterableKeyValueView {

crates/fuel-core/src/service/sub_services.rs

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ pub type PoAService = fuel_core_poa::Service<
7575
SystemTime,
7676
>;
7777
#[cfg(feature = "p2p")]
78-
pub type P2PService = fuel_core_p2p::service::Service<Database>;
78+
pub type P2PService = fuel_core_p2p::service::Service<Database, TxPoolAdapter>;
7979
pub type TxPoolSharedState = fuel_core_txpool::service::SharedState<
8080
P2PAdapter,
8181
Database,
@@ -169,14 +169,10 @@ pub fn init_sub_services(
169169
};
170170

171171
#[cfg(feature = "p2p")]
172-
let mut network = config.p2p.clone().map(|p2p_config| {
173-
fuel_core_p2p::service::new_service(
174-
chain_id,
175-
p2p_config,
176-
database.on_chain().clone(),
177-
importer_adapter.clone(),
178-
)
179-
});
172+
let p2p_externals = config
173+
.p2p
174+
.clone()
175+
.map(fuel_core_p2p::service::build_shared_state);
180176

181177
#[cfg(feature = "p2p")]
182178
let p2p_adapter = {
@@ -192,7 +188,7 @@ pub fn init_sub_services(
192188
invalid_transactions: -100.,
193189
};
194190
P2PAdapter::new(
195-
network.as_ref().map(|network| network.shared.clone()),
191+
p2p_externals.as_ref().map(|ext| ext.0.clone()),
196192
peer_report_config,
197193
)
198194
};
@@ -229,6 +225,21 @@ pub fn init_sub_services(
229225
);
230226
let tx_pool_adapter = TxPoolAdapter::new(txpool.shared.clone());
231227

228+
#[cfg(feature = "p2p")]
229+
let mut network = config.p2p.clone().zip(p2p_externals).map(
230+
|(p2p_config, (shared_state, request_receiver))| {
231+
fuel_core_p2p::service::new_service(
232+
chain_id,
233+
p2p_config,
234+
shared_state,
235+
request_receiver,
236+
database.on_chain().clone(),
237+
importer_adapter.clone(),
238+
tx_pool_adapter.clone(),
239+
)
240+
},
241+
);
242+
232243
let block_producer = fuel_core_producer::Producer {
233244
config: config.block_producer.clone(),
234245
view_provider: database.on_chain().clone(),

crates/services/p2p/src/config.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ pub const MAX_RESPONSE_SIZE: usize = 18 * 1024 * 1024;
4949
/// Maximum number of blocks per request.
5050
pub const MAX_HEADERS_PER_REQUEST: usize = 100;
5151

52+
/// Maximum number of transactions ids asked per request.
53+
pub const MAX_TXS_PER_REQUEST: usize = 10000;
54+
5255
#[derive(Clone, Debug)]
5356
pub struct Config<State = Initialized> {
5457
/// The keypair used for handshake during communication with other p2p nodes.
@@ -73,6 +76,9 @@ pub struct Config<State = Initialized> {
7376
pub max_block_size: usize,
7477
pub max_headers_per_request: usize,
7578

79+
// Maximum of txs id asked in a single request
80+
pub max_txs_per_request: usize,
81+
7682
// `DiscoveryBehaviour` related fields
7783
pub bootstrap_nodes: Vec<Multiaddr>,
7884
pub enable_mdns: bool,
@@ -151,6 +157,7 @@ impl Config<NotInitialized> {
151157
tcp_port: self.tcp_port,
152158
max_block_size: self.max_block_size,
153159
max_headers_per_request: self.max_headers_per_request,
160+
max_txs_per_request: self.max_txs_per_request,
154161
bootstrap_nodes: self.bootstrap_nodes,
155162
enable_mdns: self.enable_mdns,
156163
max_peers_connected: self.max_peers_connected,
@@ -200,6 +207,7 @@ impl Config<NotInitialized> {
200207
tcp_port: 0,
201208
max_block_size: MAX_RESPONSE_SIZE,
202209
max_headers_per_request: MAX_HEADERS_PER_REQUEST,
210+
max_txs_per_request: MAX_TXS_PER_REQUEST,
203211
bootstrap_nodes: vec![],
204212
enable_mdns: false,
205213
max_peers_connected: 50,

0 commit comments

Comments
 (0)