diff --git a/crates/core/src/node/testing_impl/in_memory.rs b/crates/core/src/node/testing_impl/in_memory.rs index adde6de93..937892665 100644 --- a/crates/core/src/node/testing_impl/in_memory.rs +++ b/crates/core/src/node/testing_impl/in_memory.rs @@ -125,12 +125,12 @@ where self.op_manager.ring.seed_contract(key); } if let Some(subscribers) = contract_subscribers.get(&key) { - // add contract subscribers + // add contract subscribers (test setup - no upstream_addr) for subscriber in subscribers { if self .op_manager .ring - .add_subscriber(&key, subscriber.clone()) + .add_subscriber(&key, subscriber.clone(), None) .is_err() { tracing::warn!("Max subscribers for contract {} reached", key); diff --git a/crates/core/src/operations/connect.rs b/crates/core/src/operations/connect.rs index 48e52c286..4efe65545 100644 --- a/crates/core/src/operations/connect.rs +++ b/crates/core/src/operations/connect.rs @@ -690,7 +690,7 @@ impl ConnectOp { match self.state.as_mut() { Some(ConnectState::WaitingForResponses(state)) => { tracing::info!( - acceptor = %response.acceptor.peer(), + acceptor_pub_key = %response.acceptor.pub_key(), acceptor_loc = ?response.acceptor.location, "connect: joiner received ConnectResponse" ); @@ -830,11 +830,21 @@ impl Operation for ConnectOp { }; // Route through upstream (where the request came from) since we may // not have a direct connection to the target - if let Some(upstream) = source_addr { - network_bridge - .send(upstream, NetMessage::V1(NetMessageV1::Connect(msg))) - .await?; - } + let Some(upstream) = source_addr else { + tracing::warn!( + tx = %self.id, + "ObservedAddress message has no upstream - was this locally initiated?" + ); + // No upstream to route through - this shouldn't happen for relayed connections + return Ok(OperationResult { + return_msg: None, + target_addr: None, + state: Some(OpEnum::Connect(Box::new(self))), + }); + }; + network_bridge + .send(upstream, NetMessage::V1(NetMessageV1::Connect(msg))) + .await?; } if let Some(peer) = actions.expect_connection_from { @@ -876,14 +886,20 @@ impl Operation for ConnectOp { }; // Route the response through upstream (where the request came from) // since we may not have a direct connection to the joiner - if let Some(upstream) = source_addr { - network_bridge - .send( - upstream, - NetMessage::V1(NetMessageV1::Connect(response_msg)), - ) - .await?; - } + let Some(upstream) = source_addr else { + tracing::warn!( + tx = %self.id, + "ConnectResponse has no upstream - was this locally initiated?" + ); + // No upstream to route through - this shouldn't happen for relayed connections + return Ok(store_operation_state(&mut self)); + }; + network_bridge + .send( + upstream, + NetMessage::V1(NetMessageV1::Connect(response_msg)), + ) + .await?; return Ok(store_operation_state(&mut self)); } @@ -970,14 +986,14 @@ impl Operation for ConnectOp { let mut updated_payload = payload.clone(); updated_payload.acceptor.peer_addr = PeerAddr::Known(acceptor_addr); tracing::debug!( - acceptor = %updated_payload.acceptor.peer(), + acceptor_pub_key = %updated_payload.acceptor.pub_key(), acceptor_addr = %acceptor_addr, "connect: filled acceptor address from source_addr" ); updated_payload } else { tracing::warn!( - acceptor = %payload.acceptor.peer(), + acceptor_pub_key = %payload.acceptor.pub_key(), "connect: response received without source_addr, cannot fill acceptor address" ); payload.clone() @@ -988,7 +1004,7 @@ impl Operation for ConnectOp { tracing::debug!( upstream_addr = %upstream_addr, - acceptor = %forward_payload.acceptor.peer(), + acceptor_pub_key = %forward_payload.acceptor.pub_key(), "connect: forwarding response towards joiner" ); // Forward response toward the joiner via upstream diff --git a/crates/core/src/operations/get.rs b/crates/core/src/operations/get.rs index 4da895306..073fc58d1 100644 --- a/crates/core/src/operations/get.rs +++ b/crates/core/src/operations/get.rs @@ -549,9 +549,14 @@ impl Operation for GetOp { ); // Use sender_from_addr (looked up from source_addr) instead of message field - let sender = sender_from_addr.clone().expect( - "RequestGet requires sender lookup from connection - source_addr should resolve to known peer", - ); + let Some(sender) = sender_from_addr.clone() else { + tracing::warn!( + tx = %id, + %key, + "GET: RequestGet without sender lookup - cannot process" + ); + return Err(OpError::invalid_transition(self.id)); + }; // Check if operation is already completed if matches!(self.state, Some(GetState::Finished { .. })) { @@ -702,9 +707,14 @@ impl Operation for GetOp { let this_peer = target.clone(); // Use sender_from_addr (looked up from source_addr) instead of message field - let sender = sender_from_addr.clone().expect( - "SeekNode requires sender lookup from connection - source_addr should resolve to known peer", - ); + let Some(sender) = sender_from_addr.clone() else { + tracing::warn!( + tx = %id, + %key, + "GET: SeekNode without sender lookup - cannot process" + ); + return Err(OpError::invalid_transition(self.id)); + }; if htl == 0 { let sender_display = sender.peer().to_string(); @@ -855,9 +865,14 @@ impl Operation for GetOp { let key = *key; // Use sender_from_addr for logging - let sender = sender_from_addr.clone().expect( - "ReturnGet requires sender lookup from connection - source_addr should resolve to known peer", - ); + let Some(sender) = sender_from_addr.clone() else { + tracing::warn!( + tx = %id, + %key, + "GET: ReturnGet without sender lookup - cannot process" + ); + return Err(OpError::invalid_transition(self.id)); + }; tracing::info!( tx = %id, @@ -1113,9 +1128,14 @@ impl Operation for GetOp { let key = *key; // Use sender_from_addr for logging - let sender = sender_from_addr.clone().expect( - "ReturnGet requires sender lookup from connection - source_addr should resolve to known peer", - ); + let Some(sender) = sender_from_addr.clone() else { + tracing::warn!( + tx = %id, + %key, + "GET: ReturnGet without sender lookup - cannot process" + ); + return Err(OpError::invalid_transition(self.id)); + }; tracing::info!(tx = %id, %key, "Received get response with state: {:?}", self.state.as_ref().unwrap()); diff --git a/crates/core/src/operations/subscribe.rs b/crates/core/src/operations/subscribe.rs index 4deeea336..4140ea7a8 100644 --- a/crates/core/src/operations/subscribe.rs +++ b/crates/core/src/operations/subscribe.rs @@ -11,6 +11,7 @@ use crate::{ message::{InnerMessage, NetMessage, Transaction}, node::{NetworkBridge, OpManager, PeerId}, ring::{Location, PeerKeyLocation, RingError}, + transport::ObservedAddr, }; use freenet_stdlib::{ client_api::{ContractResponse, ErrorKind, HostResponse}, @@ -274,7 +275,11 @@ async fn complete_local_subscription( key: ContractKey, ) -> Result<(), OpError> { let subscriber = op_manager.ring.connection_manager.own_location(); - if let Err(err) = op_manager.ring.add_subscriber(&key, subscriber.clone()) { + // Local subscription - no upstream_addr needed since it's our own peer + if let Err(err) = op_manager + .ring + .add_subscriber(&key, subscriber.clone(), None) + { tracing::warn!( %key, tx = %id, @@ -305,7 +310,7 @@ pub(crate) struct SubscribeOp { state: Option, /// The address we received this operation's message from. /// Used for connection-based routing: responses are sent back to this address. - upstream_addr: Option, + upstream_addr: Option, } impl SubscribeOp { @@ -359,11 +364,16 @@ impl Operation for SubscribeOp { } Ok(None) => { // new request to subscribe to a contract, initialize the machine + tracing::debug!( + tx = %id, + ?source_addr, + "subscribe: load_or_init creating new op with source_addr as upstream_addr" + ); Ok(OpInitialization { op: Self { state: Some(SubscribeState::ReceivedRequest), id, - upstream_addr: source_addr, // Connection-based routing: store who sent us this request + upstream_addr: source_addr.map(ObservedAddr::new), // Connection-based routing: store who sent us this request }, source_addr, }) @@ -403,28 +413,29 @@ impl Operation for SubscribeOp { target: _, subscriber, } => { - // Fill in subscriber's external address from transport layer if unknown. - // This is the key step where the first recipient (gateway) determines the - // subscriber's external address from the actual packet source address. + // ALWAYS use the transport-level source address when available. + // This is critical for NAT peers: they may embed a "known" but wrong address + // (e.g., 127.0.0.1:31337 for loopback). The transport address is the only + // reliable way to route responses back through the NAT. let mut subscriber = subscriber.clone(); - if subscriber.peer_addr.is_unknown() { - if let Some(addr) = source_addr { - subscriber.set_addr(addr); - tracing::debug!( - tx = %id, - %key, - subscriber_addr = %addr, - "subscribe: filled subscriber address from source_addr" - ); - } - } tracing::debug!( tx = %id, %key, - subscriber = %subscriber.peer(), + subscriber_orig = %subscriber.peer(), + source_addr = ?source_addr, "subscribe: processing RequestSub" ); + + if let Some(addr) = source_addr { + subscriber.set_addr(addr); + tracing::debug!( + tx = %id, + %key, + subscriber_updated = %subscriber.peer(), + "subscribe: updated subscriber address from transport source" + ); + } let own_loc = op_manager.ring.connection_manager.own_location(); if !matches!( @@ -451,9 +462,10 @@ impl Operation for SubscribeOp { "subscribe: handling RequestSub locally (contract available)" ); + // Use upstream_addr for NAT routing - subscriber may embed wrong address if op_manager .ring - .add_subscriber(key, subscriber.clone()) + .add_subscriber(key, subscriber.clone(), self.upstream_addr) .is_err() { tracing::warn!( @@ -520,6 +532,13 @@ impl Operation for SubscribeOp { subscribed: true, }; + tracing::debug!( + tx = %id, + %key, + upstream_addr = ?self.upstream_addr, + "subscribe: creating ReturnSub with upstream_addr" + ); + return build_op_result( self.id, None, @@ -722,9 +741,13 @@ impl Operation for SubscribeOp { subscribers_before = ?before_direct, "subscribe: attempting to register direct subscriber" ); + // Pass None: subscriber address was already corrected by Gateway at the + // start of the subscribe flow. Using self.upstream_addr here would + // incorrectly overwrite with the forwarder's address instead of the + // original subscriber's Gateway-corrected address. if op_manager .ring - .add_subscriber(key, subscriber.clone()) + .add_subscriber(key, subscriber.clone(), None) .is_err() { tracing::warn!( @@ -872,9 +895,10 @@ impl Operation for SubscribeOp { subscribers_before = ?before_upstream, "subscribe: attempting to register upstream link" ); + // upstream_subscriber was stored in op state, no transport address available if op_manager .ring - .add_subscriber(key, upstream_subscriber.clone()) + .add_subscriber(key, upstream_subscriber.clone(), None) .is_err() { tracing::warn!( @@ -904,7 +928,15 @@ impl Operation for SubscribeOp { subscribers_before = ?before_provider, "subscribe: registering provider/subscription source" ); - if op_manager.ring.add_subscriber(key, sender.clone()).is_err() { + // Pass None: sender was already looked up from source_addr (line ~866), + // so it has the correct transport address. Using self.upstream_addr + // would incorrectly use the original requester's address instead of + // the provider's address. + if op_manager + .ring + .add_subscriber(key, sender.clone(), None) + .is_err() + { // concurrently it reached max number of subscribers for this contract tracing::debug!( tx = %id, @@ -961,17 +993,26 @@ fn build_op_result( id: Transaction, state: Option, msg: Option, - upstream_addr: Option, + upstream_addr: Option, ) -> Result { // For response messages (ReturnSub), use upstream_addr directly for routing. // This is more reliable than extracting from the message's target field, which // may have been looked up from connection_manager (subject to race conditions). // For forward messages (SeekNode, RequestSub, FetchRouting), use the message's target. let target_addr = match &msg { - Some(SubscribeMsg::ReturnSub { .. }) => upstream_addr, + // Convert ObservedAddr to SocketAddr at the transport boundary + Some(SubscribeMsg::ReturnSub { .. }) => upstream_addr.map(|a| a.socket_addr()), _ => msg.as_ref().and_then(|m| m.target_addr()), }; + tracing::debug!( + tx = %id, + msg_type = ?msg.as_ref().map(|m| std::any::type_name_of_val(m)), + ?upstream_addr, + ?target_addr, + "build_op_result: computed target_addr" + ); + let output_op = state.map(|state| SubscribeOp { id, state: Some(state), diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 0f07eab79..4fcebb84f 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -1005,10 +1005,10 @@ pub(crate) async fn request_update( .closest_potentially_caching(&key, [sender.peer().clone()].as_slice()); if let Some(target) = remote_target { - // Subscribe to the contract + // Subscribe on behalf of the requesting peer (no upstream_addr - direct registration) op_manager .ring - .add_subscriber(&key, sender.clone()) + .add_subscriber(&key, sender.clone(), None) .map_err(|_| RingError::NoCachingPeers(key))?; target diff --git a/crates/core/src/ring/mod.rs b/crates/core/src/ring/mod.rs index 1a8a04f29..7229a8af2 100644 --- a/crates/core/src/ring/mod.rs +++ b/crates/core/src/ring/mod.rs @@ -20,7 +20,7 @@ use crate::topology::rate::Rate; use crate::topology::TopologyAdjustment; use crate::tracing::{NetEventLog, NetEventRegister}; -use crate::transport::TransportPublicKey; +use crate::transport::{ObservedAddr, TransportPublicKey}; use crate::util::Contains; use crate::{ config::GlobalExecutor, @@ -327,12 +327,18 @@ impl Ring { } /// Will return an error in case the max number of subscribers has been added. + /// + /// The `upstream_addr` parameter is the transport-level address from which the subscribe + /// message was received. This is used instead of the address embedded in `subscriber` + /// because NAT peers may embed incorrect addresses in their messages. pub fn add_subscriber( &self, contract: &ContractKey, subscriber: PeerKeyLocation, + upstream_addr: Option, ) -> Result<(), ()> { - self.seeding_manager.add_subscriber(contract, subscriber) + self.seeding_manager + .add_subscriber(contract, subscriber, upstream_addr) } /// Remove a subscriber by peer ID from a specific contract diff --git a/crates/core/src/ring/seeding.rs b/crates/core/src/ring/seeding.rs index 5b3c940b8..3cb08a362 100644 --- a/crates/core/src/ring/seeding.rs +++ b/crates/core/src/ring/seeding.rs @@ -1,4 +1,5 @@ use super::{Location, PeerKeyLocation, Score}; +use crate::transport::ObservedAddr; use dashmap::{mapref::one::Ref as DmRef, DashMap}; use freenet_stdlib::prelude::ContractKey; use tracing::{info, warn}; @@ -102,11 +103,23 @@ impl SeedingManager { } /// Will return an error in case the max number of subscribers has been added. + /// + /// The `upstream_addr` parameter is the transport-level address from which the subscribe + /// message was received. This is used instead of the address embedded in `subscriber` + /// because NAT peers may embed incorrect (e.g., loopback) addresses in their messages. + /// The transport address is the only reliable way to route back to them. pub fn add_subscriber( &self, contract: &ContractKey, subscriber: PeerKeyLocation, + upstream_addr: Option, ) -> Result<(), ()> { + // Use the transport-level address if available, otherwise fall back to the embedded address + let subscriber = if let Some(addr) = upstream_addr { + PeerKeyLocation::new(subscriber.pub_key.clone(), addr.socket_addr()) + } else { + subscriber + }; let mut subs = self .subscribers .entry(*contract) @@ -255,15 +268,15 @@ mod tests { Location::try_from(0.3).unwrap(), ); - // Add subscribers + // Add subscribers (test setup - no upstream_addr) assert!(seeding_manager - .add_subscriber(&contract_key, peer_loc1.clone()) + .add_subscriber(&contract_key, peer_loc1.clone(), None) .is_ok()); assert!(seeding_manager - .add_subscriber(&contract_key, peer_loc2.clone()) + .add_subscriber(&contract_key, peer_loc2.clone(), None) .is_ok()); assert!(seeding_manager - .add_subscriber(&contract_key, peer_loc3.clone()) + .add_subscriber(&contract_key, peer_loc3.clone(), None) .is_ok()); // Verify all subscribers are present diff --git a/crates/core/src/transport/mod.rs b/crates/core/src/transport/mod.rs index d833a27cf..e30deee56 100644 --- a/crates/core/src/transport/mod.rs +++ b/crates/core/src/transport/mod.rs @@ -33,6 +33,40 @@ pub(crate) use self::{ peer_connection::PeerConnection, }; +/// Address observed at the transport layer (from UDP packet source). +/// +/// This is the "ground truth" for NAT scenarios - it's the actual address we see +/// at the network layer, not what the peer claims in protocol messages. +/// +/// Using a newtype instead of raw `SocketAddr` makes the address semantics explicit +/// and prevents accidental confusion with advertised/claimed addresses. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct ObservedAddr(SocketAddr); + +impl ObservedAddr { + /// Create a new observed address from a socket address. + pub fn new(addr: SocketAddr) -> Self { + Self(addr) + } + + /// Get the underlying socket address. + pub fn socket_addr(&self) -> SocketAddr { + self.0 + } +} + +impl std::fmt::Display for ObservedAddr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +impl From for ObservedAddr { + fn from(addr: SocketAddr) -> Self { + Self(addr) + } +} + #[derive(Debug, thiserror::Error)] pub(crate) enum TransportError { #[error("transport handler channel closed, socket likely closed")]