Skip to content

Commit c2b9865

Browse files
mickvandijkeclaude
andcommitted
fix: advertise dialable addresses to DHT and harden NAT traversal
Bundles six NAT-traversal fixes uncovered during a deep audit of the sporadic-failure pattern reported by saorsa-node-lmdb. Root cause: DhtNetworkManager::local_dht_node() built its self-entry from the static NodeConfig::listen_addrs() derivation, which returns wildcard IPs (0.0.0.0 / ::) and the *configured* port (0 for ephemeral binds). Receivers' dialable_addresses() filter then rejected the entry, so DHT-based peer discovery returned zero contactable addresses for any node. Connections only succeeded via side channels (static bootstrap, in-memory cache, relay fallback) — exactly the sporadic pattern. Rewritten as `async fn` that sources addresses, in priority order, from TransportHandle::observed_external_address() (the OBSERVED_ADDRESS reflexive address, which was defined on the adapter and never called outside its own file) and the runtime listen_addrs RwLock, with unspecified IPs substituted via a new primary_local_ip() helper that uses the standard UdpSocket::connect-then-getsockname trick. No new dependencies. Other fixes bundled: - bootstrap_from_peers now records the bootstrap peer as the natural hole-punch coordinator referrer for every node it returned, threading through the existing dial_candidate -> set_hole_punch_preferred_coordinator plumbing that the iterative-lookup path already used. - Relay-established and peer-address-update events drained via a tokio::select! over new TransportHandle::recv_relay_established() / recv_peer_address_update() instead of the previous 1-second polling loop, eliminating the race window where outbound DHT queries advertised no relay address for up to a second after relay setup. - BOOTSTRAP_IDENTITY_TIMEOUT_SECS bumped 5 -> 15 with explanatory comment. Identity exchange (two RTTs + ML-DSA-65 signature, ~3.3kB) blew through 5s sporadically on congested cellular and cross-region links. - Added dial_addresses() helper that loops through every dialable address until one succeeds. Replaced 5 single-address dial sites (bootstrap_from_peers, spawn_bucket_refresh_task, trigger_self_lookup, find_closest_nodes_network parallel-query block, send_dht_request fallback) so a stale NAT binding on entry #1 no longer kills the dial when entry #2 would have worked. Regression tests: tests/dht_self_advertisement.rs adds three #[tokio::test] cases that pin the published self-entry behaviour. All three FAILED on main with concrete evidence (e.g. `[Quic(0.0.0.0:0)]` published as the only address; transport bound to port 55886 but DHT advertised port 0). All PASS after this commit. They use only public APIs (DhtNetworkManager::find_closest_nodes_local_with_self) so no new test surface area was added to the library. Verified: cargo test --lib 269/269 cargo test --test dht_self_advertisement 3/3 cargo test --test two_node_messaging 7/7 cargo test --test node_lifecycle 17/17 cargo clippy --lib -- -D warnings clean The dead hole-punch chain in saorsa-transport's nat_traversal_api.rs (Bug 4 from the audit) is fixed in a separate commit in that crate. The --port 0 default in ant-node (Bug 5) is left as a deliberate decision for the operator config layer. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 05d47b8 commit c2b9865

4 files changed

Lines changed: 610 additions & 128 deletions

File tree

src/dht_network_manager.rs

Lines changed: 204 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use crate::{
3131
use anyhow::Context as _;
3232
use serde::{Deserialize, Serialize};
3333
use std::collections::{BTreeMap, HashMap, HashSet};
34-
use std::net::IpAddr;
34+
use std::net::{IpAddr, SocketAddr, UdpSocket};
3535
use std::sync::{Arc, Mutex};
3636
use std::time::{Duration, Instant, SystemTime};
3737
use tokio::sync::{RwLock, Semaphore, broadcast, oneshot};
@@ -363,6 +363,51 @@ impl Drop for BucketRevalidationGuard {
363363
}
364364
}
365365

366+
/// IPv4 probe target used by [`primary_local_ip`] to discover the host's
367+
/// primary outbound interface address. Picked from the TEST-NET-3 range
368+
/// (RFC 5737) so the address is guaranteed never to be allocated to a real
369+
/// host. No packets are actually sent — `UdpSocket::connect` only sets the
370+
/// kernel's default route entry on the socket, after which `local_addr`
371+
/// returns the IP the kernel would route to that destination.
372+
const PRIMARY_IP_PROBE_V4: &str = "203.0.113.1:80";
373+
374+
/// IPv6 probe target used by [`primary_local_ip`]. Picked from the
375+
/// 2001:db8::/32 documentation prefix (RFC 3849) for the same reason as
376+
/// [`PRIMARY_IP_PROBE_V4`].
377+
const PRIMARY_IP_PROBE_V6: &str = "[2001:db8::1]:80";
378+
379+
/// Discover the host's primary outbound interface IP for the requested
380+
/// address family.
381+
///
382+
/// Uses the standard "UDP connect to a public address, then read local_addr"
383+
/// trick: opening a UDP socket and "connecting" it to a remote IP causes the
384+
/// kernel to consult its routing table and bind the local end of the socket
385+
/// to the source address it would use for that destination, *without sending
386+
/// any packets*. The chosen IP is then readable via `local_addr()`.
387+
///
388+
/// Returns `None` when:
389+
/// - the host has no usable interface for the requested family,
390+
/// - the kernel returns an unspecified address (rare, indicates no default
391+
/// route),
392+
/// - or any I/O error occurs.
393+
fn primary_local_ip(want_ipv4: bool) -> Option<IpAddr> {
394+
let bind_addr: &str = if want_ipv4 { "0.0.0.0:0" } else { "[::]:0" };
395+
let probe_addr: &str = if want_ipv4 {
396+
PRIMARY_IP_PROBE_V4
397+
} else {
398+
PRIMARY_IP_PROBE_V6
399+
};
400+
401+
let socket = UdpSocket::bind(bind_addr).ok()?;
402+
socket.connect(probe_addr).ok()?;
403+
let local = socket.local_addr().ok()?;
404+
405+
if local.ip().is_unspecified() {
406+
return None;
407+
}
408+
Some(local.ip())
409+
}
410+
366411
impl DhtNetworkManager {
367412
fn new_from_components(
368413
transport: Arc<crate::transport_handle::TransportHandle>,
@@ -590,11 +635,8 @@ impl DhtNetworkManager {
590635
if dht_node.peer_id == this.config.peer_id {
591636
continue;
592637
}
593-
if let Some(addr) =
594-
Self::first_dialable_address(&dht_node.addresses)
595-
{
596-
this.dial_candidate(&dht_node.peer_id, &addr, None).await;
597-
}
638+
this.dial_addresses(&dht_node.peer_id, &dht_node.addresses, None)
639+
.await;
598640
}
599641
}
600642
Err(e) => {
@@ -626,10 +668,11 @@ impl DhtNetworkManager {
626668
if dht_node.peer_id == self_id {
627669
continue;
628670
}
629-
// Dial if not already connected
630-
if let Some(addr) = Self::first_dialable_address(&dht_node.addresses) {
631-
self.dial_candidate(&dht_node.peer_id, &addr, None).await;
632-
}
671+
// Dial if not already connected — try every advertised
672+
// address, not just the first, so a stale NAT binding on
673+
// one entry doesn't kill the dial.
674+
self.dial_addresses(&dht_node.peer_id, &dht_node.addresses, None)
675+
.await;
633676
}
634677
Ok(())
635678
}
@@ -725,21 +768,24 @@ impl DhtNetworkManager {
725768
.first()
726769
.and_then(|a| a.dialable_socket_addr());
727770

771+
// The bootstrap peer is the natural NAT-traversal referrer for
772+
// every node it returns: it has a live connection to us (we just
773+
// queried it) and presumably also to the nodes it tells us about.
774+
// Passing its socket address as the preferred coordinator lets
775+
// hole-punch PUNCH_ME_NOW be relayed through it.
728776
let op = DhtNetworkOperation::FindNode { key };
729777
match self.send_dht_request(peer_id, op, None).await {
730778
Ok(DhtNetworkResult::NodesFound { nodes, .. }) => {
731779
for node in &nodes {
732-
let first = Self::first_dialable_address(&node.addresses);
780+
let dialable = Self::dialable_addresses(&node.addresses);
733781
debug!(
734-
"DHT bootstrap: peer={} num_addresses={} first_dialable={:?}",
782+
"DHT bootstrap: peer={} num_addresses={} dialable={}",
735783
node.peer_id.to_hex(),
736784
node.addresses.len(),
737-
first.as_ref().map(|a| a.to_string())
785+
dialable.len()
738786
);
739-
if seen.insert(node.peer_id)
740-
&& let Some(addr) = first
741-
{
742-
self.dial_candidate(&node.peer_id, &addr, bootstrap_addr)
787+
if seen.insert(node.peer_id) && !dialable.is_empty() {
788+
self.dial_addresses(&node.peer_id, &node.addresses, bootstrap_addr)
743789
.await;
744790
}
745791
}
@@ -923,7 +969,7 @@ impl DhtNetworkManager {
923969
// back to `count`. Self may displace the farthest peer.
924970
let mut nodes = self.find_closest_nodes_local(key, count).await;
925971

926-
nodes.push(self.local_dht_node());
972+
nodes.push(self.local_dht_node().await);
927973

928974
let key_peer = PeerId::from_bytes(*key);
929975
nodes.sort_by(|a, b| {
@@ -972,7 +1018,7 @@ impl DhtNetworkManager {
9721018
// final K-closest result, but we must never send an RPC to ourselves.
9731019
// Seed best_nodes with self and mark self as "queried" so the iterative
9741020
// loop never tries to contact us.
975-
best_nodes.push(self.local_dht_node());
1021+
best_nodes.push(self.local_dht_node().await);
9761022
self.mark_self_queried(&mut queried_nodes);
9771023

9781024
// Candidates sorted by XOR distance to target (closest first).
@@ -1039,16 +1085,21 @@ impl DhtNetworkManager {
10391085
.iter()
10401086
.map(|node| {
10411087
let peer_id = node.peer_id;
1042-
let address = Self::first_dialable_address(&node.addresses);
1088+
let addresses = node.addresses.clone();
10431089
let referrer = referrers.get(&peer_id).copied();
10441090
let op = DhtNetworkOperation::FindNode { key: *key };
10451091
async move {
1046-
if let Some(ref addr) = address {
1047-
self.dial_candidate(&peer_id, addr, referrer).await;
1048-
}
1092+
// Try every dialable address, not just the first.
1093+
// If at least one succeeds the peer is connected and
1094+
// `send_dht_request` will reuse that channel; if all
1095+
// fail, `send_dht_request`'s own fallback will retry
1096+
// with the routing-table addresses.
1097+
self.dial_addresses(&peer_id, &addresses, referrer).await;
1098+
let address_hint = Self::first_dialable_address(&addresses);
10491099
(
10501100
peer_id,
1051-
self.send_dht_request(&peer_id, op, address.as_ref()).await,
1101+
self.send_dht_request(&peer_id, op, address_hint.as_ref())
1102+
.await,
10521103
)
10531104
}
10541105
})
@@ -1234,14 +1285,58 @@ impl DhtNetworkManager {
12341285
/// Build a `DHTNode` representing the local node for inclusion in
12351286
/// K-closest results. The local node always participates in distance
12361287
/// ranking but is never queried over the network.
1237-
fn local_dht_node(&self) -> DHTNode {
1238-
let mut addresses: Vec<MultiAddr> = self.config.node_config.listen_addrs().to_vec();
1239-
if addresses.is_empty() {
1240-
addresses.push(MultiAddr::quic(std::net::SocketAddr::new(
1241-
std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
1242-
0,
1243-
)));
1288+
///
1289+
/// The published address list is sourced — in priority order — from:
1290+
///
1291+
/// 1. The transport's externally-observed reflexive address (set by
1292+
/// OBSERVED_ADDRESS frames received from peers). This is the most
1293+
/// authoritative source because it is the actual post-NAT address
1294+
/// that remote peers see.
1295+
/// 2. The transport's runtime-bound `listen_addrs`, with any unspecified
1296+
/// IP (`0.0.0.0` / `[::]`) substituted for the host's primary outbound
1297+
/// interface address. This makes the entry usable on a LAN even before
1298+
/// OBSERVED_ADDRESS has flowed.
1299+
///
1300+
/// **Why not `NodeConfig::listen_addrs()`:** that helper is a pure
1301+
/// derivation of `(port, ipv6, local)` and returns wildcard IPs and the
1302+
/// configured port — which is `0` for ephemeral binds. The result was
1303+
/// being filtered out by every consumer's [`Self::dialable_addresses`]
1304+
/// (it rejects unspecified IPs), so DHT-based peer discovery for this
1305+
/// node returned no contactable address — the root cause of sporadic
1306+
/// NAT traversal failures.
1307+
async fn local_dht_node(&self) -> DHTNode {
1308+
let mut addresses: Vec<MultiAddr> = Vec::new();
1309+
1310+
// 1. Observed external address (most authoritative).
1311+
if let Some(observed) = self.transport.observed_external_address() {
1312+
addresses.push(MultiAddr::quic(observed));
12441313
}
1314+
1315+
// 2. Runtime-bound listen addresses, with wildcards substituted.
1316+
for la in self.transport.listen_addrs().await {
1317+
let Some(sa) = la.dialable_socket_addr() else {
1318+
continue;
1319+
};
1320+
if sa.port() == 0 {
1321+
// Pre-bind placeholder; never dialable.
1322+
continue;
1323+
}
1324+
1325+
let resolved_ip = if sa.ip().is_unspecified() {
1326+
let Some(ip) = primary_local_ip(sa.is_ipv4()) else {
1327+
continue;
1328+
};
1329+
ip
1330+
} else {
1331+
sa.ip()
1332+
};
1333+
1334+
let resolved = MultiAddr::quic(SocketAddr::new(resolved_ip, sa.port()));
1335+
if !addresses.contains(&resolved) {
1336+
addresses.push(resolved);
1337+
}
1338+
}
1339+
12451340
DHTNode {
12461341
peer_id: self.config.peer_id,
12471342
addresses,
@@ -1287,6 +1382,43 @@ impl DhtNetworkManager {
12871382
Self::dialable_addresses(addresses).into_iter().next()
12881383
}
12891384

1385+
/// Try dialing each dialable address in `addresses` in order until one
1386+
/// succeeds. Returns the channel ID of the first successful dial, or
1387+
/// `None` if every address was rejected, failed, or timed out.
1388+
///
1389+
/// This is the multi-address counterpart of [`Self::dial_candidate`]
1390+
/// and is the right entry point for any code path that has been handed
1391+
/// a `DHTNode` (or any peer entry that exposes multiple addresses) —
1392+
/// using only the first dialable address means a stale NAT binding,
1393+
/// failed relay, or unreachable family kills the connection attempt
1394+
/// even when other published addresses would have worked.
1395+
async fn dial_addresses(
1396+
&self,
1397+
peer_id: &PeerId,
1398+
addresses: &[MultiAddr],
1399+
referrer: Option<SocketAddr>,
1400+
) -> Option<String> {
1401+
let dialable = Self::dialable_addresses(addresses);
1402+
if dialable.is_empty() {
1403+
debug!(
1404+
"dial_addresses: no dialable addresses for {}",
1405+
peer_id.to_hex()
1406+
);
1407+
return None;
1408+
}
1409+
for addr in &dialable {
1410+
if let Some(channel_id) = self.dial_candidate(peer_id, addr, referrer).await {
1411+
return Some(channel_id);
1412+
}
1413+
}
1414+
debug!(
1415+
"dial_addresses: all {} address(es) failed for {}",
1416+
dialable.len(),
1417+
peer_id.to_hex()
1418+
);
1419+
None
1420+
}
1421+
12901422
async fn record_peer_failure(&self, peer_id: &PeerId) {
12911423
if let Some(ref engine) = self.trust_engine {
12921424
engine.update_node_stats(
@@ -1397,23 +1529,39 @@ impl DhtNetworkManager {
13971529
// `peer_to_channel` mapping is only populated after the asynchronous
13981530
// identity-exchange handshake completes. Without waiting, the
13991531
// subsequent `send_message` would fail with `PeerNotFound`.
1400-
let resolved_address: Option<MultiAddr> = if self.transport.is_peer_connected(peer_id).await
1532+
//
1533+
// Build the candidate address list: caller's hint first (if any),
1534+
// then the peer's addresses from the routing table. Trying every
1535+
// candidate — instead of stopping at the first — protects against
1536+
// stale NAT bindings, single-IP-family failures, and recently-relayed
1537+
// peers whose direct address is no longer reachable.
1538+
let candidate_addresses: Vec<MultiAddr> = if self.transport.is_peer_connected(peer_id).await
14011539
{
1402-
None
1403-
} else if let Some(hint) = address_hint {
1404-
Some(hint.clone())
1540+
Vec::new()
14051541
} else {
1406-
self.peer_addresses_for_dial(peer_id)
1407-
.await
1408-
.into_iter()
1409-
.next()
1542+
let mut addrs = Vec::new();
1543+
if let Some(hint) = address_hint {
1544+
addrs.push(hint.clone());
1545+
}
1546+
for addr in self.peer_addresses_for_dial(peer_id).await {
1547+
if !addrs.contains(&addr) {
1548+
addrs.push(addr);
1549+
}
1550+
}
1551+
addrs
14101552
};
1411-
if let Some(ref address) = resolved_address {
1553+
1554+
if !candidate_addresses.is_empty() {
14121555
info!(
1413-
"[STEP 1b] {} -> {}: No open channel, dialling {}",
1414-
local_hex, peer_hex, address
1556+
"[STEP 1b] {} -> {}: No open channel, trying {} dialable address(es)",
1557+
local_hex,
1558+
peer_hex,
1559+
candidate_addresses.len()
14151560
);
1416-
if let Some(channel_id) = self.dial_candidate(peer_id, address, None).await {
1561+
if let Some(channel_id) = self
1562+
.dial_addresses(peer_id, &candidate_addresses, None)
1563+
.await
1564+
{
14171565
let identity_timeout = self.config.request_timeout.min(IDENTITY_EXCHANGE_TIMEOUT);
14181566
match self
14191567
.transport
@@ -1423,11 +1571,10 @@ impl DhtNetworkManager {
14231571
Ok(authenticated) => {
14241572
if &authenticated != peer_id {
14251573
warn!(
1426-
"[STEP 1b] {} -> {}: identity MISMATCH — dialled {} but authenticated as {}. \
1574+
"[STEP 1b] {} -> {}: identity MISMATCH — authenticated as {}. \
14271575
Routing table entry may be stale.",
14281576
local_hex,
14291577
peer_hex,
1430-
address,
14311578
authenticated.to_hex()
14321579
);
14331580
if let Ok(mut ops) = self.active_operations.lock() {
@@ -1462,15 +1609,22 @@ impl DhtNetworkManager {
14621609
}
14631610
} else {
14641611
warn!(
1465-
"[STEP 1b] {} -> {}: dial failed to {}",
1466-
local_hex, peer_hex, address
1612+
"[STEP 1b] {} -> {}: dial failed for all {} candidate address(es)",
1613+
local_hex,
1614+
peer_hex,
1615+
candidate_addresses.len()
14671616
);
14681617
if let Ok(mut ops) = self.active_operations.lock() {
14691618
ops.remove(&message_id);
14701619
}
14711620
self.record_peer_failure(peer_id).await;
14721621
return Err(P2PError::Network(NetworkError::PeerNotFound(
1473-
format!("failed to dial {} at {}", peer_hex, address).into(),
1622+
format!(
1623+
"failed to dial {} at any of {} candidate address(es)",
1624+
peer_hex,
1625+
candidate_addresses.len()
1626+
)
1627+
.into(),
14741628
)));
14751629
}
14761630
}
@@ -1590,7 +1744,8 @@ impl DhtNetworkManager {
15901744
let pid_bytes = *peer_id.to_bytes();
15911745
info!(
15921746
"dial_candidate: setting hole_punch_target_peer_id for {} = {}",
1593-
socket_addr, hex::encode(&pid_bytes[..8])
1747+
socket_addr,
1748+
hex::encode(&pid_bytes[..8])
15941749
);
15951750
self.transport
15961751
.set_hole_punch_target_peer_id(socket_addr, pid_bytes)

0 commit comments

Comments
 (0)