Skip to content

Commit 0eedf62

Browse files
committed
refactor(transport): replace handshake pipeline (#2065)
1 parent 8cac58b commit 0eedf62

File tree

6 files changed

+109
-532
lines changed

6 files changed

+109
-532
lines changed

crates/core/src/message.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,6 @@ pub(crate) enum NodeEvent {
367367
/// Register expectation for an inbound connection from the given peer.
368368
ExpectPeerConnection {
369369
peer: PeerId,
370-
courtesy: bool,
371370
},
372371
}
373372

@@ -445,11 +444,8 @@ impl Display for NodeEvent {
445444
"Local subscribe complete (tx: {tx}, key: {key}, subscribed: {subscribed})"
446445
)
447446
}
448-
NodeEvent::ExpectPeerConnection { peer, courtesy } => {
449-
write!(
450-
f,
451-
"ExpectPeerConnection (from {peer}, courtesy: {courtesy})"
452-
)
447+
NodeEvent::ExpectPeerConnection { peer } => {
448+
write!(f, "ExpectPeerConnection (from {peer})")
453449
}
454450
}
455451
}

crates/core/src/node/network_bridge/p2p_protoc.rs

Lines changed: 17 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,7 @@ use std::{
1414
sync::Arc,
1515
};
1616
use tokio::net::UdpSocket;
17-
use tokio::sync::mpsc::{self, Receiver, Sender};
18-
use tokio::sync::mpsc::error::TryRecvError;
17+
use tokio::sync::mpsc::{self, error::TryRecvError, Receiver, Sender};
1918
use tokio::time::timeout;
2019
use tracing::Instrument;
2120

@@ -630,14 +629,14 @@ impl P2pConnManager {
630629
)
631630
.await?;
632631
}
633-
NodeEvent::ExpectPeerConnection { peer, courtesy } => {
634-
tracing::debug!(%peer, courtesy, "ExpectPeerConnection event received; registering inbound expectation via handshake driver");
632+
NodeEvent::ExpectPeerConnection { peer } => {
633+
tracing::debug!(%peer, "ExpectPeerConnection event received; registering inbound expectation via handshake driver");
635634
state.outbound_handler.expect_incoming(peer.addr);
636635
if let Err(error) = handshake_cmd_sender
637636
.send(HandshakeCommand::ExpectInbound {
638637
peer: peer.clone(),
639638
transaction: None,
640-
courtesy,
639+
courtesy: false,
641640
})
642641
.await
643642
{
@@ -751,12 +750,8 @@ impl P2pConnManager {
751750

752751
// Collect node information
753752
if config.include_node_info {
754-
// Prefer the runtime's current ring location; fall back to derivation from the peer's
755-
// advertised address if we don't have one yet.
756-
let current_location =
757-
op_manager.ring.connection_manager.own_location().location;
758-
759-
let (addr, fallback_location) = if let Some(peer_id) =
753+
// Calculate location and adress if is set
754+
let (addr, location) = if let Some(peer_id) =
760755
op_manager.ring.connection_manager.get_peer_key()
761756
{
762757
let location = Location::from_address(&peer_id.addr);
@@ -765,15 +760,11 @@ impl P2pConnManager {
765760
(None, None)
766761
};
767762

768-
let location_str = current_location
769-
.or(fallback_location)
770-
.map(|loc| format!("{:.6}", loc.as_f64()));
771-
772763
// Always include basic node info, but only include address/location if available
773764
response.node_info = Some(NodeInfo {
774765
peer_id: ctx.key_pair.public().to_string(),
775766
is_gateway: self.is_gateway,
776-
location: location_str,
767+
location: location.map(|loc| format!("{:.6}", loc.0)),
777768
listening_address: addr
778769
.map(|peer_addr| peer_addr.to_string()),
779770
uptime_seconds: 0, // TODO: implement actual uptime tracking
@@ -1267,12 +1258,6 @@ impl P2pConnManager {
12671258
"connect_peer: registered new pending connection"
12681259
);
12691260
state.outbound_handler.expect_incoming(peer_addr);
1270-
let loc_hint = Location::from_address(&peer.addr);
1271-
self.bridge
1272-
.op_manager
1273-
.ring
1274-
.connection_manager
1275-
.register_outbound_pending(&peer, Some(loc_hint));
12761261
}
12771262
}
12781263

@@ -1365,7 +1350,6 @@ impl P2pConnManager {
13651350
}
13661351
}
13671352

1368-
let mut derived_courtesy = courtesy;
13691353
let peer_id = peer.unwrap_or_else(|| {
13701354
tracing::info!(
13711355
remote = %remote_addr,
@@ -1385,31 +1369,15 @@ impl P2pConnManager {
13851369
)
13861370
});
13871371

1388-
if !derived_courtesy {
1389-
derived_courtesy = self
1390-
.bridge
1391-
.op_manager
1392-
.ring
1393-
.connection_manager
1394-
.take_pending_courtesy_by_addr(&remote_addr);
1395-
}
1396-
13971372
tracing::info!(
13981373
remote = %peer_id.addr,
1399-
courtesy = derived_courtesy,
1374+
courtesy,
14001375
transaction = ?transaction,
14011376
"Inbound connection established"
14021377
);
14031378

1404-
self.handle_successful_connection(
1405-
peer_id,
1406-
connection,
1407-
state,
1408-
select_stream,
1409-
None,
1410-
derived_courtesy,
1411-
)
1412-
.await?;
1379+
self.handle_successful_connection(peer_id, connection, state, select_stream, None)
1380+
.await?;
14131381
}
14141382
HandshakeEvent::OutboundEstablished {
14151383
transaction,
@@ -1423,15 +1391,8 @@ impl P2pConnManager {
14231391
transaction = %transaction,
14241392
"Outbound connection established"
14251393
);
1426-
self.handle_successful_connection(
1427-
peer,
1428-
connection,
1429-
state,
1430-
select_stream,
1431-
None,
1432-
courtesy,
1433-
)
1434-
.await?;
1394+
self.handle_successful_connection(peer, connection, state, select_stream, None)
1395+
.await?;
14351396
}
14361397
HandshakeEvent::OutboundFailed {
14371398
transaction,
@@ -1546,7 +1507,6 @@ impl P2pConnManager {
15461507
state: &mut EventListenerState,
15471508
select_stream: &mut priority_select::ProductionPrioritySelectStream,
15481509
remaining_checks: Option<usize>,
1549-
courtesy: bool,
15501510
) -> anyhow::Result<()> {
15511511
let pending_txs = state
15521512
.awaiting_connection_txs
@@ -1622,41 +1582,18 @@ impl P2pConnManager {
16221582
}
16231583

16241584
if newly_inserted {
1625-
let loc = self
1585+
let pending_loc = self
16261586
.bridge
16271587
.op_manager
16281588
.ring
16291589
.connection_manager
1630-
.pending_location_hint(&peer_id)
1631-
.unwrap_or_else(|| Location::from_address(&peer_id.addr));
1632-
let eviction_candidate = self
1633-
.bridge
1590+
.prune_in_transit_connection(&peer_id);
1591+
let loc = pending_loc.unwrap_or_else(|| Location::from_address(&peer_id.addr));
1592+
self.bridge
16341593
.op_manager
16351594
.ring
1636-
.add_connection(loc, peer_id.clone(), false, courtesy)
1595+
.add_connection(loc, peer_id.clone(), false)
16371596
.await;
1638-
if let Some(victim) = eviction_candidate {
1639-
if victim == peer_id {
1640-
tracing::debug!(
1641-
%peer_id,
1642-
"Courtesy eviction candidate matched current connection; skipping drop"
1643-
);
1644-
} else {
1645-
tracing::info!(
1646-
%victim,
1647-
%peer_id,
1648-
courtesy_limit = true,
1649-
"Courtesy connection budget exceeded; dropping oldest courtesy peer"
1650-
);
1651-
if let Err(error) = self.bridge.drop_connection(&victim).await {
1652-
tracing::warn!(
1653-
%victim,
1654-
?error,
1655-
"Failed to drop courtesy connection after hitting budget"
1656-
);
1657-
}
1658-
}
1659-
}
16601597
}
16611598
Ok(())
16621599
}
@@ -1714,46 +1651,6 @@ impl P2pConnManager {
17141651
}
17151652
}
17161653

1717-
if let Some(sender_peer) = extract_sender_from_message(&peer_conn.msg) {
1718-
if sender_peer.peer.addr == remote_addr
1719-
|| sender_peer.peer.addr.ip().is_unspecified()
1720-
{
1721-
let mut new_peer_id = sender_peer.peer.clone();
1722-
if new_peer_id.addr.ip().is_unspecified() {
1723-
new_peer_id.addr = remote_addr;
1724-
if let Some(sender_mut) =
1725-
extract_sender_from_message_mut(&mut peer_conn.msg)
1726-
{
1727-
if sender_mut.peer.addr.ip().is_unspecified() {
1728-
sender_mut.peer.addr = remote_addr;
1729-
}
1730-
}
1731-
}
1732-
if let Some(existing_key) = self
1733-
.connections
1734-
.keys()
1735-
.find(|peer| {
1736-
peer.addr == remote_addr && peer.pub_key != new_peer_id.pub_key
1737-
})
1738-
.cloned()
1739-
{
1740-
if let Some(channel) = self.connections.remove(&existing_key) {
1741-
tracing::info!(
1742-
remote = %remote_addr,
1743-
old_peer = %existing_key,
1744-
new_peer = %new_peer_id,
1745-
"Updating provisional peer identity after inbound message"
1746-
);
1747-
self.bridge
1748-
.op_manager
1749-
.ring
1750-
.update_connection_identity(&existing_key, new_peer_id.clone());
1751-
self.connections.insert(new_peer_id, channel);
1752-
}
1753-
}
1754-
}
1755-
}
1756-
17571654
// Check if we need to establish a connection back to the sender
17581655
let should_connect = !self.connections.keys().any(|peer| peer.addr == remote_addr)
17591656
&& !state.awaiting_connection.contains_key(&remote_addr);

crates/core/src/node/testing_impl.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -940,8 +940,8 @@ where
940940
NodeEvent::QueryNodeDiagnostics { .. } => {
941941
unimplemented!()
942942
}
943-
NodeEvent::ExpectPeerConnection { peer, courtesy } => {
944-
tracing::debug!(%peer, courtesy, "ExpectPeerConnection ignored in testing impl");
943+
NodeEvent::ExpectPeerConnection { peer } => {
944+
tracing::debug!(%peer, "ExpectPeerConnection ignored in testing impl");
945945
continue;
946946
}
947947
},

0 commit comments

Comments
 (0)