Skip to content

Commit 11a7d16

Browse files
sanityclaude
andcommitted
fix: use ObservedAddr newtype for NAT routing in seeding
Apply ObservedAddr newtype for consistent NAT routing: - Use ObservedAddr in seeding subscriber tracking - Update subscribe/update operations for address handling 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent bb74066 commit 11a7d16

File tree

6 files changed

+118
-32
lines changed

6 files changed

+118
-32
lines changed

crates/core/src/node/testing_impl/in_memory.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,12 +125,12 @@ where
125125
self.op_manager.ring.seed_contract(key);
126126
}
127127
if let Some(subscribers) = contract_subscribers.get(&key) {
128-
// add contract subscribers
128+
// add contract subscribers (test setup - no upstream_addr)
129129
for subscriber in subscribers {
130130
if self
131131
.op_manager
132132
.ring
133-
.add_subscriber(&key, subscriber.clone())
133+
.add_subscriber(&key, subscriber.clone(), None)
134134
.is_err()
135135
{
136136
tracing::warn!("Max subscribers for contract {} reached", key);

crates/core/src/operations/subscribe.rs

Lines changed: 55 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use crate::{
1111
message::{InnerMessage, NetMessage, Transaction},
1212
node::{NetworkBridge, OpManager, PeerId},
1313
ring::{Location, PeerKeyLocation, RingError},
14+
transport::ObservedAddr,
1415
};
1516
use freenet_stdlib::{
1617
client_api::{ContractResponse, ErrorKind, HostResponse},
@@ -274,7 +275,11 @@ async fn complete_local_subscription(
274275
key: ContractKey,
275276
) -> Result<(), OpError> {
276277
let subscriber = op_manager.ring.connection_manager.own_location();
277-
if let Err(err) = op_manager.ring.add_subscriber(&key, subscriber.clone()) {
278+
// Local subscription - no upstream_addr needed since it's our own peer
279+
if let Err(err) = op_manager
280+
.ring
281+
.add_subscriber(&key, subscriber.clone(), None)
282+
{
278283
tracing::warn!(
279284
%key,
280285
tx = %id,
@@ -305,7 +310,7 @@ pub(crate) struct SubscribeOp {
305310
state: Option<SubscribeState>,
306311
/// The address we received this operation's message from.
307312
/// Used for connection-based routing: responses are sent back to this address.
308-
upstream_addr: Option<std::net::SocketAddr>,
313+
upstream_addr: Option<ObservedAddr>,
309314
}
310315

311316
impl SubscribeOp {
@@ -359,11 +364,16 @@ impl Operation for SubscribeOp {
359364
}
360365
Ok(None) => {
361366
// new request to subscribe to a contract, initialize the machine
367+
tracing::info!(
368+
tx = %id,
369+
?source_addr,
370+
"subscribe: load_or_init creating new op with source_addr as upstream_addr"
371+
);
362372
Ok(OpInitialization {
363373
op: Self {
364374
state: Some(SubscribeState::ReceivedRequest),
365375
id,
366-
upstream_addr: source_addr, // Connection-based routing: store who sent us this request
376+
upstream_addr: source_addr.map(ObservedAddr::new), // Connection-based routing: store who sent us this request
367377
},
368378
source_addr,
369379
})
@@ -403,20 +413,19 @@ impl Operation for SubscribeOp {
403413
target: _,
404414
subscriber,
405415
} => {
406-
// Fill in subscriber's external address from transport layer if unknown.
407-
// This is the key step where the first recipient (gateway) determines the
408-
// subscriber's external address from the actual packet source address.
416+
// ALWAYS use the transport-level source address when available.
417+
// This is critical for NAT peers: they may embed a "known" but wrong address
418+
// (e.g., 127.0.0.1:31337 for loopback). The transport address is the only
419+
// reliable way to route responses back through the NAT.
409420
let mut subscriber = subscriber.clone();
410-
if subscriber.peer_addr.is_unknown() {
411-
if let Some(addr) = source_addr {
412-
subscriber.set_addr(addr);
413-
tracing::debug!(
414-
tx = %id,
415-
%key,
416-
subscriber_addr = %addr,
417-
"subscribe: filled subscriber address from source_addr"
418-
);
419-
}
421+
if let Some(addr) = source_addr {
422+
subscriber.set_addr(addr);
423+
tracing::debug!(
424+
tx = %id,
425+
%key,
426+
subscriber_addr = %addr,
427+
"subscribe: using transport source_addr for subscriber"
428+
);
420429
}
421430

422431
tracing::debug!(
@@ -451,9 +460,10 @@ impl Operation for SubscribeOp {
451460
"subscribe: handling RequestSub locally (contract available)"
452461
);
453462

463+
// Use upstream_addr for NAT routing - subscriber may embed wrong address
454464
if op_manager
455465
.ring
456-
.add_subscriber(key, subscriber.clone())
466+
.add_subscriber(key, subscriber.clone(), self.upstream_addr)
457467
.is_err()
458468
{
459469
tracing::warn!(
@@ -523,6 +533,13 @@ impl Operation for SubscribeOp {
523533
subscribed: true,
524534
};
525535

536+
tracing::info!(
537+
tx = %id,
538+
%key,
539+
upstream_addr = ?self.upstream_addr,
540+
"subscribe: creating ReturnSub with upstream_addr"
541+
);
542+
526543
return build_op_result(
527544
self.id,
528545
None,
@@ -725,9 +742,10 @@ impl Operation for SubscribeOp {
725742
subscribers_before = ?before_direct,
726743
"subscribe: attempting to register direct subscriber"
727744
);
745+
// Use upstream_addr for NAT routing - subscriber may embed wrong address
728746
if op_manager
729747
.ring
730-
.add_subscriber(key, subscriber.clone())
748+
.add_subscriber(key, subscriber.clone(), self.upstream_addr)
731749
.is_err()
732750
{
733751
tracing::warn!(
@@ -875,9 +893,10 @@ impl Operation for SubscribeOp {
875893
subscribers_before = ?before_upstream,
876894
"subscribe: attempting to register upstream link"
877895
);
896+
// upstream_subscriber was stored in op state, no transport address available
878897
if op_manager
879898
.ring
880-
.add_subscriber(key, upstream_subscriber.clone())
899+
.add_subscriber(key, upstream_subscriber.clone(), None)
881900
.is_err()
882901
{
883902
tracing::warn!(
@@ -907,7 +926,12 @@ impl Operation for SubscribeOp {
907926
subscribers_before = ?before_provider,
908927
"subscribe: registering provider/subscription source"
909928
);
910-
if op_manager.ring.add_subscriber(key, sender.clone()).is_err() {
929+
// Use upstream_addr for NAT routing - sender may embed wrong address
930+
if op_manager
931+
.ring
932+
.add_subscriber(key, sender.clone(), self.upstream_addr)
933+
.is_err()
934+
{
911935
// concurrently it reached max number of subscribers for this contract
912936
tracing::debug!(
913937
tx = %id,
@@ -964,17 +988,26 @@ fn build_op_result(
964988
id: Transaction,
965989
state: Option<SubscribeState>,
966990
msg: Option<SubscribeMsg>,
967-
upstream_addr: Option<std::net::SocketAddr>,
991+
upstream_addr: Option<ObservedAddr>,
968992
) -> Result<OperationResult, OpError> {
969993
// For response messages (ReturnSub), use upstream_addr directly for routing.
970994
// This is more reliable than extracting from the message's target field, which
971995
// may have been looked up from connection_manager (subject to race conditions).
972996
// For forward messages (SeekNode, RequestSub, FetchRouting), use the message's target.
973997
let target_addr = match &msg {
974-
Some(SubscribeMsg::ReturnSub { .. }) => upstream_addr,
998+
// Convert ObservedAddr to SocketAddr at the transport boundary
999+
Some(SubscribeMsg::ReturnSub { .. }) => upstream_addr.map(|a| a.socket_addr()),
9751000
_ => msg.as_ref().and_then(|m| m.target_addr()),
9761001
};
9771002

1003+
tracing::info!(
1004+
tx = %id,
1005+
msg_type = ?msg.as_ref().map(|m| std::any::type_name_of_val(m)),
1006+
?upstream_addr,
1007+
?target_addr,
1008+
"build_op_result: computed target_addr"
1009+
);
1010+
9781011
let output_op = state.map(|state| SubscribeOp {
9791012
id,
9801013
state: Some(state),

crates/core/src/operations/update.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1005,10 +1005,10 @@ pub(crate) async fn request_update(
10051005
.closest_potentially_caching(&key, [sender.peer().clone()].as_slice());
10061006

10071007
if let Some(target) = remote_target {
1008-
// Subscribe to the contract
1008+
// Subscribe to the contract - sender is ourselves, no upstream_addr needed
10091009
op_manager
10101010
.ring
1011-
.add_subscriber(&key, sender.clone())
1011+
.add_subscriber(&key, sender.clone(), None)
10121012
.map_err(|_| RingError::NoCachingPeers(key))?;
10131013

10141014
target

crates/core/src/ring/mod.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use crate::topology::rate::Rate;
2020
use crate::topology::TopologyAdjustment;
2121
use crate::tracing::{NetEventLog, NetEventRegister};
2222

23-
use crate::transport::TransportPublicKey;
23+
use crate::transport::{ObservedAddr, TransportPublicKey};
2424
use crate::util::Contains;
2525
use crate::{
2626
config::GlobalExecutor,
@@ -338,12 +338,18 @@ impl Ring {
338338
}
339339

340340
/// Will return an error in case the max number of subscribers has been added.
341+
///
342+
/// The `upstream_addr` parameter is the transport-level address from which the subscribe
343+
/// message was received. This is used instead of the address embedded in `subscriber`
344+
/// because NAT peers may embed incorrect addresses in their messages.
341345
pub fn add_subscriber(
342346
&self,
343347
contract: &ContractKey,
344348
subscriber: PeerKeyLocation,
349+
upstream_addr: Option<ObservedAddr>,
345350
) -> Result<(), ()> {
346-
self.seeding_manager.add_subscriber(contract, subscriber)
351+
self.seeding_manager
352+
.add_subscriber(contract, subscriber, upstream_addr)
347353
}
348354

349355
/// Remove a subscriber by peer ID from a specific contract

crates/core/src/ring/seeding.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use super::{Location, PeerKeyLocation, Score};
2+
use crate::transport::ObservedAddr;
23
use dashmap::{mapref::one::Ref as DmRef, DashMap};
34
use freenet_stdlib::prelude::ContractKey;
45
use tracing::{info, warn};
@@ -102,11 +103,23 @@ impl SeedingManager {
102103
}
103104

104105
/// Will return an error in case the max number of subscribers has been added.
106+
///
107+
/// The `upstream_addr` parameter is the transport-level address from which the subscribe
108+
/// message was received. This is used instead of the address embedded in `subscriber`
109+
/// because NAT peers may embed incorrect (e.g., loopback) addresses in their messages.
110+
/// The transport address is the only reliable way to route back to them.
105111
pub fn add_subscriber(
106112
&self,
107113
contract: &ContractKey,
108114
subscriber: PeerKeyLocation,
115+
upstream_addr: Option<ObservedAddr>,
109116
) -> Result<(), ()> {
117+
// Use the transport-level address if available, otherwise fall back to the embedded address
118+
let subscriber = if let Some(addr) = upstream_addr {
119+
PeerKeyLocation::new(subscriber.pub_key.clone(), addr.socket_addr())
120+
} else {
121+
subscriber
122+
};
110123
let mut subs = self
111124
.subscribers
112125
.entry(*contract)
@@ -255,15 +268,15 @@ mod tests {
255268
Location::try_from(0.3).unwrap(),
256269
);
257270

258-
// Add subscribers
271+
// Add subscribers (test setup - no upstream_addr)
259272
assert!(seeding_manager
260-
.add_subscriber(&contract_key, peer_loc1.clone())
273+
.add_subscriber(&contract_key, peer_loc1.clone(), None)
261274
.is_ok());
262275
assert!(seeding_manager
263-
.add_subscriber(&contract_key, peer_loc2.clone())
276+
.add_subscriber(&contract_key, peer_loc2.clone(), None)
264277
.is_ok());
265278
assert!(seeding_manager
266-
.add_subscriber(&contract_key, peer_loc3.clone())
279+
.add_subscriber(&contract_key, peer_loc3.clone(), None)
267280
.is_ok());
268281

269282
// Verify all subscribers are present

crates/core/src/transport/mod.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,40 @@ pub(crate) use self::{
3333
peer_connection::PeerConnection,
3434
};
3535

36+
/// Address observed at the transport layer (from UDP packet source).
37+
///
38+
/// This is the "ground truth" for NAT scenarios - it's the actual address we see
39+
/// at the network layer, not what the peer claims in protocol messages.
40+
///
41+
/// Using a newtype instead of raw `SocketAddr` makes the address semantics explicit
42+
/// and prevents accidental confusion with advertised/claimed addresses.
43+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
44+
pub struct ObservedAddr(SocketAddr);
45+
46+
impl ObservedAddr {
47+
/// Create a new observed address from a socket address.
48+
pub fn new(addr: SocketAddr) -> Self {
49+
Self(addr)
50+
}
51+
52+
/// Get the underlying socket address.
53+
pub fn socket_addr(&self) -> SocketAddr {
54+
self.0
55+
}
56+
}
57+
58+
impl std::fmt::Display for ObservedAddr {
59+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60+
write!(f, "{}", self.0)
61+
}
62+
}
63+
64+
impl From<SocketAddr> for ObservedAddr {
65+
fn from(addr: SocketAddr) -> Self {
66+
Self(addr)
67+
}
68+
}
69+
3670
#[derive(Debug, thiserror::Error)]
3771
pub(crate) enum TransportError {
3872
#[error("transport handler channel closed, socket likely closed")]

0 commit comments

Comments
 (0)