Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
a64cc83
Add skeleton for transaction exchanges
AurelienFT Aug 27, 2024
8a1a0ff
Merge branch 'master' into send_tx_new_peer_subscription
AurelienFT Aug 27, 2024
433f7f3
Re-organize creation of P2P adapter to be more flexible (separation c…
AurelienFT Aug 27, 2024
c957f12
Remove unwrap and fix config
AurelienFT Aug 27, 2024
cb031b3
Handle all requests related to txpool in p2p
AurelienFT Aug 27, 2024
24e2f4e
Add high level functions to get tx ids and full txs
AurelienFT Aug 27, 2024
56fc16f
Add tests on the two new request/response in P2P and update CHANGELOG.md
AurelienFT Aug 27, 2024
d0cd575
Add asking of txs when we dial a new node
AurelienFT Aug 28, 2024
dacaf1b
format
AurelienFT Aug 28, 2024
09d2024
Merge branch 'master' into send_tx_new_peer_subscription
AurelienFT Aug 28, 2024
368a3cb
Revert "Add asking of txs when we dial a new node"
AurelienFT Aug 28, 2024
cb8379a
Add workflow -> subscription send to TxPool -> TxPool asks for Txs
AurelienFT Aug 28, 2024
56d60d2
Merge branch 'master' into send_tx_new_peer_subscription
AurelienFT Aug 28, 2024
3c40f3e
Fix test compilation without p2p feature
AurelienFT Aug 28, 2024
27ce3eb
format
AurelienFT Aug 28, 2024
cae2242
Fix mock p2p
AurelienFT Aug 28, 2024
c6fd186
Add management of request all ids in p2p test helpers.
AurelienFT Aug 28, 2024
33c1684
Add tests for txpool asking transactions after p2p subscription
AurelienFT Aug 29, 2024
00087d7
Remove option type and specify txpool in message names
AurelienFT Aug 29, 2024
ecfd8bc
Merge branch 'master' into send_tx_new_peer_subscription
AurelienFT Sep 2, 2024
ecaba40
Nits formatting modifications
AurelienFT Sep 2, 2024
5682844
Merge branch 'master' into send_tx_new_peer_subscription
AurelienFT Sep 3, 2024
2af3dab
Use top level import in p2p adapter
AurelienFT Sep 4, 2024
5f582e4
Merge branch 'master' into send_tx_new_peer_subscription
AurelienFT Sep 9, 2024
e1abcf1
Use PeerId type and remove unused Results
AurelienFT Sep 9, 2024
4a7aac3
Fix compil error in tests
AurelienFT Sep 9, 2024
1483bca
Add results in get txs function from libp2p
AurelienFT Sep 12, 2024
f962d8e
Merge branch 'master' into send_tx_new_peer_subscription
xgreenx Sep 12, 2024
cddf78a
Merge branch 'master' into send_tx_new_peer_subscription
AurelienFT Sep 13, 2024
4818fad
remove deadcode and add chekc length.
AurelienFT Sep 13, 2024
479f684
Add check on the subscription
AurelienFT Sep 16, 2024
268d794
Add heavy async processor pool
AurelienFT Sep 16, 2024
9b5de0d
Add a special type for transaction from pool in p2p to avoid clone
AurelienFT Sep 16, 2024
5e16d87
remove global serialize pool transaction
AurelienFT Sep 16, 2024
31162c8
Merge branch 'master' into send_tx_new_peer_subscription
AurelienFT Sep 16, 2024
cf71721
update cargo lock
AurelienFT Sep 16, 2024
d2f5b58
Merge branch 'send_tx_new_peer_subscription' of github.com:FuelLabs/f…
AurelienFT Sep 16, 2024
2687ec3
Add some clean code modifications in txpool and p2p
AurelienFT Sep 16, 2024
31a352b
use serde behind a feature
AurelienFT Sep 17, 2024
6dcef3d
serde behind a feature
AurelienFT Sep 17, 2024
eff906f
Merge branch 'refs/heads/master' into send_tx_new_peer_subscription
xgreenx Sep 18, 2024
eac2060
Submit cargo lock
xgreenx Sep 18, 2024
c27c256
Update crates/services/txpool/src/service.rs
AurelienFT Sep 18, 2024
f6ba46a
Use heavy task processor for p2p -> txpool interactions
AurelienFT Sep 18, 2024
de29489
Fix clippy warning
AurelienFT Sep 18, 2024
f4a45e8
Merge branch 'master' into send_tx_new_peer_subscription
xgreenx Sep 18, 2024
b6537bf
Merge branch 'master' into send_tx_new_peer_subscription
AurelienFT Sep 18, 2024
82a9b86
Check size when sending and add documentation port
AurelienFT Sep 18, 2024
b2be250
Change limit tx management
AurelienFT Sep 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]

### Added
- [2131](https://github.com/FuelLabs/fuel-core/pull/2131): Add flow in TxPool in order to ask to newly connected peers to share their transaction pool

## [Version 0.35.0]

### Added
Expand Down
5 changes: 5 additions & 0 deletions bin/fuel-core/src/cli/run/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ pub struct P2PArgs {
#[clap(long = "max-headers-per-request", default_value = "100", env)]
pub max_headers_per_request: usize,

/// Max number of txs in a single txs request response
#[clap(long = "max-txs-per-request", default_value = "10000", env)]
pub max_txs_per_request: usize,

/// Addresses of the bootstrap nodes
/// They should contain PeerId within their `Multiaddr`
#[clap(long = "bootstrap-nodes", value_delimiter = ',', env)]
Expand Down Expand Up @@ -304,6 +308,7 @@ impl P2PArgs {
tcp_port: self.peering_port,
max_block_size: self.max_block_size,
max_headers_per_request: self.max_headers_per_request,
max_txs_per_request: self.max_txs_per_request,
bootstrap_nodes: self.bootstrap_nodes,
reserved_nodes: self.reserved_nodes,
reserved_nodes_only_mode: self.reserved_nodes_only_mode,
Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-core/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ use crate::state::{
};
#[cfg(feature = "rocksdb")]
use std::path::Path;
use fuel_core_gas_price_service::fuel_gas_price_updater::fuel_core_storage_adapter::storage::{ GasPriceMetadata};
use fuel_core_gas_price_service::fuel_gas_price_updater::fuel_core_storage_adapter::storage::GasPriceMetadata;
use crate::database::database_description::gas_price::GasPriceDatabase;

// Storages implementation
Expand Down
36 changes: 27 additions & 9 deletions crates/fuel-core/src/p2p_test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ use fuel_core_p2p::{
codecs::postcard::PostcardCodec,
network_service::FuelP2PService,
p2p_service::FuelP2PEvent,
request_response::messages::{
RequestMessage,
ResponseMessage,
},
service::to_message_acceptance,
};
use fuel_core_poa::{
Expand Down Expand Up @@ -153,16 +157,30 @@ impl Bootstrap {
}
event = bootstrap.next_event() => {
// The bootstrap node only forwards data without validating it.
if let Some(FuelP2PEvent::GossipsubMessage {
peer_id,
message_id,
..
}) = event {
bootstrap.report_message_validation_result(
&message_id,
match event {
Some(FuelP2PEvent::GossipsubMessage {
peer_id,
to_message_acceptance(&GossipsubMessageAcceptance::Accept)
)
message_id,
..
}) => {
bootstrap.report_message_validation_result(
&message_id,
peer_id,
to_message_acceptance(&GossipsubMessageAcceptance::Accept)
)
},
Some(FuelP2PEvent::InboundRequestMessage {
request_id,
request_message
}) => {
if request_message == RequestMessage::AllTransactionsIds {
let _ = bootstrap.send_response_msg(
request_id,
ResponseMessage::AllTransactionsIds(Some(vec![])),
);
}
}
_ => {}
}
}
}
Expand Down
24 changes: 23 additions & 1 deletion crates/fuel-core/src/service/adapters/p2p.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
use super::BlockImporterAdapter;
use super::{
BlockImporterAdapter,
TxPoolAdapter,
};
use crate::database::OnChainIterableKeyValueView;
use fuel_core_p2p::ports::{
BlockHeightImporter,
P2pDb,
TxPool,
};
use fuel_core_services::stream::BoxStream;
use fuel_core_storage::Result as StorageResult;
Expand All @@ -11,6 +15,7 @@ use fuel_core_types::{
consensus::Genesis,
SealedBlockHeader,
},
fuel_tx::Transaction,
fuel_types::BlockHeight,
services::p2p::Transactions,
};
Expand Down Expand Up @@ -49,3 +54,20 @@ impl BlockHeightImporter for BlockImporterAdapter {
)
}
}

impl TxPool for TxPoolAdapter {
fn get_all_tx_ids(&self) -> Vec<fuel_core_txpool::types::TxId> {
self.service.get_all_tx_ids()
}

fn get_full_txs(
&self,
tx_ids: Vec<fuel_core_txpool::types::TxId>,
) -> Vec<Option<Transaction>> {
self.service
.find(tx_ids)
.into_iter()
.map(|tx_info| tx_info.map(|tx| tx.tx().as_ref().into()))
.collect()
}
}
74 changes: 74 additions & 0 deletions crates/fuel-core/src/service/adapters/txpool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ impl BlockImporter for BlockImporterAdapter {
}

#[cfg(feature = "p2p")]
#[async_trait::async_trait]
impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter {
type GossipedTransaction = TransactionGossipData;

Expand All @@ -91,6 +92,21 @@ impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter {
}
}

fn new_tx_subscription(&self) -> BoxStream<Vec<u8>> {
use tokio_stream::{
wrappers::BroadcastStream,
StreamExt,
};
if let Some(service) = &self.service {
Box::pin(
BroadcastStream::new(service.subscribe_new_tx_subscription())
.filter_map(|result| result.ok()),
)
} else {
fuel_core_services::stream::IntoBoxStream::into_boxed(tokio_stream::pending())
}
}

fn notify_gossip_transaction_validity(
&self,
message_info: GossipsubMessageInfo,
Expand All @@ -102,9 +118,48 @@ impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter {
Ok(())
}
}

async fn request_tx_ids(
&self,
peer_id: Vec<u8>,
) -> anyhow::Result<Vec<fuel_core_txpool::types::TxId>> {
if let Some(service) = &self.service {
match service.get_all_transactions_ids_from_peer(peer_id).await {
Ok(txs) => Ok(txs.unwrap_or_default()),
Err(e) => {
tracing::error!("Error getting tx ids from peer: {:?}", e);
Ok(vec![])
}
}
} else {
Ok(vec![])
}
}

async fn request_txs(
&self,
peer_id: Vec<u8>,
tx_ids: Vec<fuel_core_txpool::types::TxId>,
) -> anyhow::Result<Vec<Option<Transaction>>> {
if let Some(service) = &self.service {
match service
.get_full_transactions_from_peer(peer_id, tx_ids)
.await
{
Ok(txs) => Ok(txs.unwrap_or_default()),
Err(e) => {
tracing::error!("Error getting tx ids from peer: {:?}", e);
Ok(vec![])
}
}
} else {
Ok(vec![])
}
}
}

#[cfg(not(feature = "p2p"))]
#[async_trait::async_trait]
impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter {
type GossipedTransaction = TransactionGossipData;

Expand All @@ -126,6 +181,25 @@ impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter {
) -> anyhow::Result<()> {
Ok(())
}

fn new_tx_subscription(&self) -> BoxStream<Vec<u8>> {
Box::pin(fuel_core_services::stream::pending())
}

async fn request_tx_ids(
&self,
_peer_id: Vec<u8>,
) -> anyhow::Result<Vec<fuel_core_txpool::types::TxId>> {
Ok(vec![])
}

async fn request_txs(
&self,
_peer_id: Vec<u8>,
_tx_ids: Vec<fuel_core_txpool::types::TxId>,
) -> anyhow::Result<Vec<Option<Transaction>>> {
Ok(vec![])
}
}

impl fuel_core_txpool::ports::TxPoolDb for OnChainIterableKeyValueView {
Expand Down
31 changes: 21 additions & 10 deletions crates/fuel-core/src/service/sub_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ pub type PoAService = fuel_core_poa::Service<
InDirectoryPredefinedBlocks,
>;
#[cfg(feature = "p2p")]
pub type P2PService = fuel_core_p2p::service::Service<Database>;
pub type P2PService = fuel_core_p2p::service::Service<Database, TxPoolAdapter>;
pub type TxPoolSharedState = fuel_core_txpool::service::SharedState<
P2PAdapter,
Database,
Expand Down Expand Up @@ -167,14 +167,10 @@ pub fn init_sub_services(
};

#[cfg(feature = "p2p")]
let mut network = config.p2p.clone().map(|p2p_config| {
fuel_core_p2p::service::new_service(
chain_id,
p2p_config,
database.on_chain().clone(),
importer_adapter.clone(),
)
});
let p2p_externals = config
.p2p
.clone()
.map(fuel_core_p2p::service::build_shared_state);

#[cfg(feature = "p2p")]
let p2p_adapter = {
Expand All @@ -190,7 +186,7 @@ pub fn init_sub_services(
invalid_transactions: -100.,
};
P2PAdapter::new(
network.as_ref().map(|network| network.shared.clone()),
p2p_externals.as_ref().map(|ext| ext.0.clone()),
peer_report_config,
)
};
Expand Down Expand Up @@ -227,6 +223,21 @@ pub fn init_sub_services(
);
let tx_pool_adapter = TxPoolAdapter::new(txpool.shared.clone());

#[cfg(feature = "p2p")]
let mut network = config.p2p.clone().zip(p2p_externals).map(
|(p2p_config, (shared_state, request_receiver))| {
fuel_core_p2p::service::new_service(
chain_id,
p2p_config,
shared_state,
request_receiver,
database.on_chain().clone(),
importer_adapter.clone(),
tx_pool_adapter.clone(),
)
},
);

let block_producer = fuel_core_producer::Producer {
config: config.block_producer.clone(),
view_provider: database.on_chain().clone(),
Expand Down
8 changes: 8 additions & 0 deletions crates/services/p2p/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ pub const MAX_RESPONSE_SIZE: usize = 18 * 1024 * 1024;
/// Maximum number of blocks per request.
pub const MAX_HEADERS_PER_REQUEST: usize = 100;

/// Maximum number of transactions ids asked per request.
pub const MAX_TXS_PER_REQUEST: usize = 10000;

#[derive(Clone, Debug)]
pub struct Config<State = Initialized> {
/// The keypair used for handshake during communication with other p2p nodes.
Expand All @@ -73,6 +76,9 @@ pub struct Config<State = Initialized> {
pub max_block_size: usize,
pub max_headers_per_request: usize,

// Maximum of txs id asked in a single request
pub max_txs_per_request: usize,

// `DiscoveryBehaviour` related fields
pub bootstrap_nodes: Vec<Multiaddr>,
pub enable_mdns: bool,
Expand Down Expand Up @@ -151,6 +157,7 @@ impl Config<NotInitialized> {
tcp_port: self.tcp_port,
max_block_size: self.max_block_size,
max_headers_per_request: self.max_headers_per_request,
max_txs_per_request: self.max_txs_per_request,
bootstrap_nodes: self.bootstrap_nodes,
enable_mdns: self.enable_mdns,
max_peers_connected: self.max_peers_connected,
Expand Down Expand Up @@ -200,6 +207,7 @@ impl Config<NotInitialized> {
tcp_port: 0,
max_block_size: MAX_RESPONSE_SIZE,
max_headers_per_request: MAX_HEADERS_PER_REQUEST,
max_txs_per_request: MAX_TXS_PER_REQUEST,
bootstrap_nodes: vec![],
enable_mdns: false,
max_peers_connected: 50,
Expand Down
Loading