Skip to content

Commit d0cd575

Browse files
committed
Add asking of txs when we dial a new node
1 parent 56fc16f commit d0cd575

File tree

3 files changed

+93
-14
lines changed

3 files changed

+93
-14
lines changed

crates/services/p2p/src/p2p_service.rs

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,10 @@ pub enum FuelP2PEvent {
160160
request_id: InboundRequestId,
161161
request_message: RequestMessage,
162162
},
163-
PeerConnected(PeerId),
163+
PeerConnected {
164+
peer_id: PeerId,
165+
is_outgoing: bool,
166+
},
164167
PeerDisconnected(PeerId),
165168
PeerInfoUpdated {
166169
peer_id: PeerId,
@@ -533,11 +536,17 @@ impl FuelP2PService {
533536
PeerReportEvent::PerformDecay => {
534537
self.peer_manager.batch_update_score_with_decay()
535538
}
536-
PeerReportEvent::PeerConnected { peer_id } => {
539+
PeerReportEvent::PeerConnected {
540+
peer_id,
541+
is_outgoing,
542+
} => {
537543
if self.peer_manager.handle_peer_connected(&peer_id) {
538544
let _ = self.swarm.disconnect_peer_id(peer_id);
539545
} else {
540-
return Some(FuelP2PEvent::PeerConnected(peer_id));
546+
return Some(FuelP2PEvent::PeerConnected {
547+
peer_id,
548+
is_outgoing,
549+
});
541550
}
542551
}
543552
PeerReportEvent::PeerDisconnected { peer_id } => {
@@ -925,7 +934,7 @@ mod tests {
925934
}
926935
}
927936
}
928-
if let Some(FuelP2PEvent::PeerConnected(peer_id)) = sentry_node_event {
937+
if let Some(FuelP2PEvent::PeerConnected{peer_id, ..}) = sentry_node_event {
929938
// we connected to the desired reserved node
930939
if peer_id == reserved_node_peer_id {
931940
break
@@ -991,7 +1000,7 @@ mod tests {
9911000
while instance.elapsed().as_secs() < 5 {
9921001
tokio::select! {
9931002
event_from_node_a = node_a.next_event() => {
994-
if let Some(FuelP2PEvent::PeerConnected(_)) = event_from_node_a {
1003+
if let Some(FuelP2PEvent::PeerConnected{..}) = event_from_node_a {
9951004
if node_a.peer_manager().total_peers_connected() > node_a_max_peers_allowed {
9961005
panic!("The node should only connect to max {node_a_max_peers_allowed} peers");
9971006
}
@@ -1000,7 +1009,7 @@ mod tests {
10001009
tracing::info!("Event from the node_a: {:?}", event_from_node_a);
10011010
},
10021011
event_from_node_b = node_b.next_event() => {
1003-
if let Some(FuelP2PEvent::PeerConnected(_)) = event_from_node_b {
1012+
if let Some(FuelP2PEvent::PeerConnected{..}) = event_from_node_b {
10041013
if node_b.peer_manager().total_peers_connected() > node_b_max_peers_allowed {
10051014
panic!("The node should only connect to max {node_b_max_peers_allowed} peers");
10061015
}
@@ -1083,15 +1092,15 @@ mod tests {
10831092
while instance.elapsed().as_secs() < 5 {
10841093
tokio::select! {
10851094
event_from_first_guarded = first_guarded_node.next_event() => {
1086-
if let Some(FuelP2PEvent::PeerConnected(peer_id)) = event_from_first_guarded {
1095+
if let Some(FuelP2PEvent::PeerConnected{peer_id, ..}) = event_from_first_guarded {
10871096
if !first_sentry_set.contains(&peer_id) {
10881097
panic!("The node should only connect to the specified reserved nodes!");
10891098
}
10901099
}
10911100
tracing::info!("Event from the first guarded node: {:?}", event_from_first_guarded);
10921101
},
10931102
event_from_second_guarded = second_guarded_node.next_event() => {
1094-
if let Some(FuelP2PEvent::PeerConnected(peer_id)) = event_from_second_guarded {
1103+
if let Some(FuelP2PEvent::PeerConnected{peer_id, ..}) = event_from_second_guarded {
10951104
if !second_sentry_set.contains(&peer_id) {
10961105
panic!("The node should only connect to the specified reserved nodes!");
10971106
}
@@ -1100,7 +1109,7 @@ mod tests {
11001109
},
11011110
// Poll one of the reserved, sentry nodes
11021111
sentry_node_event = single_sentry_node.next_event() => {
1103-
if let Some(FuelP2PEvent::PeerConnected(peer_id)) = sentry_node_event {
1112+
if let Some(FuelP2PEvent::PeerConnected{peer_id, ..}) = sentry_node_event {
11041113
sentry_node_connections.insert(peer_id);
11051114
}
11061115
}
@@ -1131,7 +1140,7 @@ mod tests {
11311140
loop {
11321141
tokio::select! {
11331142
node_b_event = node_b.next_event() => {
1134-
if let Some(FuelP2PEvent::PeerConnected(_)) = node_b_event {
1143+
if let Some(FuelP2PEvent::PeerConnected{..}) = node_b_event {
11351144
// successfully connected to Node A
11361145
break
11371146
}
@@ -1169,7 +1178,7 @@ mod tests {
11691178
}
11701179
},
11711180
node_b_event = node_b.next_event() => {
1172-
if let Some(FuelP2PEvent::PeerConnected(_)) = node_b_event {
1181+
if let Some(FuelP2PEvent::PeerConnected{..}) = node_b_event {
11731182
panic!("Node B should not connect to Node A!")
11741183
}
11751184
tracing::info!("Node B Event: {:?}", node_b_event);
@@ -1206,7 +1215,7 @@ mod tests {
12061215
},
12071216

12081217
node_c_event = node_c.next_event() => {
1209-
if let Some(FuelP2PEvent::PeerConnected(peer_id)) = node_c_event {
1218+
if let Some(FuelP2PEvent::PeerConnected{peer_id, ..}) = node_c_event {
12101219
// we have connected to Node B!
12111220
if peer_id == node_b.local_peer_id {
12121221
break
@@ -1254,7 +1263,7 @@ mod tests {
12541263
tracing::info!("Node A Event: {:?}", node_a_event);
12551264
},
12561265
node_b_event = node_b.next_event() => {
1257-
if let Some(FuelP2PEvent::PeerConnected(_)) = node_b_event {
1266+
if let Some(FuelP2PEvent::PeerConnected{..}) = node_b_event {
12581267
// we've connected to Peer A
12591268
// let's update our BlockHeight
12601269
node_b.update_block_height(latest_block_height);

crates/services/p2p/src/peer_report.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ const REPUTATION_DECAY_INTERVAL_IN_SECONDS: u64 = 1;
5656
pub enum PeerReportEvent {
5757
PeerConnected {
5858
peer_id: PeerId,
59+
is_outgoing: bool,
5960
},
6061
PeerDisconnected {
6162
peer_id: PeerId,
@@ -131,10 +132,14 @@ impl NetworkBehaviour for Behaviour {
131132
let ConnectionEstablished {
132133
peer_id,
133134
connection_id,
135+
endpoint,
134136
..
135137
} = connection_established;
136138
self.pending_events.push_back(ToSwarm::GenerateEvent(
137-
PeerReportEvent::PeerConnected { peer_id },
139+
PeerReportEvent::PeerConnected {
140+
peer_id,
141+
is_outgoing: endpoint.is_dialer(),
142+
},
138143
));
139144
if self.reserved_nodes_multiaddr.contains_key(&peer_id) {
140145
self.connected_reserved_nodes.insert(peer_id);

crates/services/p2p/src/service.rs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,7 @@ where
454454
V: AtomicView + 'static,
455455
V::LatestView: P2pDb,
456456
T: TxPool + 'static,
457+
B: Broadcast + 'static,
457458
{
458459
fn process_request(
459460
&mut self,
@@ -597,6 +598,65 @@ where
597598
})?;
598599
Ok(())
599600
}
601+
602+
// Ask for all transaction ids from the peer and ask only for full transaction if we don't know them
603+
fn get_missing_txs_on_connection(&mut self, peer_id: PeerId) -> anyhow::Result<()> {
604+
let request_sender = self.request_sender.clone();
605+
let txpool_tx_ids = self.tx_pool.get_all_tx_ids();
606+
tokio::spawn(async move {
607+
let (sender, receiver) = oneshot::channel();
608+
609+
let request = TaskRequest::GetAllTxIds {
610+
from_peer: peer_id,
611+
channel: sender,
612+
};
613+
request_sender.send(request).await?;
614+
615+
let (response_from_peer, response) =
616+
receiver.await.map_err(|e| anyhow!("{e}"))?;
617+
assert_eq!(
618+
peer_id, response_from_peer,
619+
"Bug: response from non-requested peer"
620+
);
621+
622+
let Some(mut tx_ids) =
623+
response.map_err(|e| anyhow!("Invalid response from peer {e:?}"))?
624+
else {
625+
return Ok(())
626+
};
627+
628+
tx_ids.retain(|tx_id| !txpool_tx_ids.contains(tx_id));
629+
630+
let (sender, receiver) = oneshot::channel();
631+
let request = TaskRequest::GetFullTransactions {
632+
tx_ids,
633+
from_peer: peer_id,
634+
channel: sender,
635+
};
636+
637+
request_sender.send(request).await?;
638+
639+
let (response_from_peer, response) =
640+
receiver.await.map_err(|e| anyhow!("{e}"))?;
641+
assert_eq!(
642+
peer_id, response_from_peer,
643+
"Bug: response from non-requested peer"
644+
);
645+
646+
let Some(txs) =
647+
response.map_err(|e| anyhow!("Invalid response from peer {e:?}"))?
648+
else {
649+
return Ok(())
650+
};
651+
652+
for _tx in txs.into_iter().flatten() {
653+
// TODO: Send all the txs to the pool
654+
}
655+
656+
Ok::<(), anyhow::Error>(())
657+
});
658+
Ok(())
659+
}
600660
}
601661

602662
fn convert_peer_id(peer_id: &PeerId) -> anyhow::Result<FuelPeerId> {
@@ -796,6 +856,11 @@ where
796856
p2p_event = self.p2p_service.next_event() => {
797857
should_continue = true;
798858
match p2p_event {
859+
Some(FuelP2PEvent::PeerConnected { peer_id, is_outgoing }) => {
860+
if is_outgoing {
861+
self.get_missing_txs_on_connection(peer_id)?;
862+
}
863+
}
799864
Some(FuelP2PEvent::PeerInfoUpdated { peer_id, block_height }) => {
800865
let peer_id: Vec<u8> = peer_id.into();
801866
let block_height_data = BlockHeightHeartbeatData {

0 commit comments

Comments
 (0)