diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index 50b521aa4..c81be75a6 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -603,6 +603,53 @@ impl P2pConnManager { } } } + ConnEvent::OutboundMessageWithTarget { target_addr, msg } => { + // This variant uses an explicit target address from OperationResult.target_addr, + // which is critical for NAT scenarios where the address in the message + // differs from the actual transport address we should send to. + tracing::info!( + tx = %msg.id(), + msg_type = %msg, + target_addr = %target_addr, + msg_target = ?msg.target().map(|t| t.addr()), + "Sending outbound message with explicit target address (NAT routing)" + ); + + // Look up the connection using the explicit target address + let peer_connection = ctx.connections.get(&target_addr); + + match peer_connection { + Some(peer_connection) => { + if let Err(e) = + peer_connection.sender.send(Left(msg.clone())).await + { + tracing::error!( + tx = %msg.id(), + target_addr = %target_addr, + "Failed to send message to peer: {}", e + ); + } else { + tracing::info!( + tx = %msg.id(), + target_addr = %target_addr, + "Message successfully sent to peer connection via explicit address" + ); + } + } + None => { + // No existing connection - this is unexpected for NAT scenarios + // since we should have the connection from the original request + tracing::error!( + tx = %msg.id(), + target_addr = %target_addr, + msg_target = ?msg.target().map(|t| t.addr()), + connections = ?ctx.connections.keys().collect::>(), + "No connection found for explicit target address - NAT routing failed" + ); + ctx.bridge.op_manager.completed(*msg.id()); + } + } + } ConnEvent::TransportClosed { remote_addr, error } => { tracing::debug!( remote = %remote_addr, @@ -2266,8 +2313,19 @@ impl P2pConnManager { fn handle_bridge_msg(&self, msg: Option) -> EventResult { match msg { - Some(Left((_target, msg))) => { - EventResult::Event(ConnEvent::OutboundMessage(*msg).into()) + Some(Left((target, msg))) => { + // Use OutboundMessageWithTarget to preserve the target address from + // OperationResult.target_addr. This is critical for NAT scenarios where + // the address in the message differs from the actual transport address. + // The PeerId.addr contains the address that was used to look up the peer + // in P2pBridge::send(), which is the correct transport address. + EventResult::Event( + ConnEvent::OutboundMessageWithTarget { + target_addr: target.addr, + msg: *msg, + } + .into(), + ) } Some(Right(action)) => EventResult::Event(ConnEvent::NodeAction(action).into()), None => EventResult::Event(ConnEvent::ClosedChannel(ChannelCloseReason::Bridge).into()), @@ -2398,6 +2456,12 @@ enum EventResult { pub(super) enum ConnEvent { InboundMessage(IncomingMessage), OutboundMessage(NetMessage), + /// Outbound message with explicit target address from OperationResult.target_addr. + /// Used when the target address differs from what's in the message (NAT scenarios). + OutboundMessageWithTarget { + target_addr: SocketAddr, + msg: NetMessage, + }, NodeAction(NodeEvent), ClosedChannel(ChannelCloseReason), TransportClosed { diff --git a/crates/core/src/operations/connect.rs b/crates/core/src/operations/connect.rs index 48e52c286..db55887f1 100644 --- a/crates/core/src/operations/connect.rs +++ b/crates/core/src/operations/connect.rs @@ -320,8 +320,8 @@ impl RelayState { // Use the joiner with updated observed address for response routing actions.response_target = Some(self.request.joiner.clone()); tracing::info!( - acceptor_pub_key = %acceptor.pub_key(), - joiner_pub_key = %self.request.joiner.pub_key(), + acceptor_key = %acceptor.pub_key(), + joiner_key = %self.request.joiner.pub_key(), acceptor_loc = ?acceptor.location, joiner_loc = ?self.request.joiner.location, ring_distance = ?dist, @@ -690,7 +690,7 @@ impl ConnectOp { match self.state.as_mut() { Some(ConnectState::WaitingForResponses(state)) => { tracing::info!( - acceptor = %response.acceptor.peer(), + acceptor_key = %response.acceptor.pub_key(), acceptor_loc = ?response.acceptor.location, "connect: joiner received ConnectResponse" ); @@ -830,11 +830,9 @@ impl Operation for ConnectOp { }; // Route through upstream (where the request came from) since we may // not have a direct connection to the target - if let Some(upstream) = source_addr { - network_bridge - .send(upstream, NetMessage::V1(NetMessageV1::Connect(msg))) - .await?; - } + network_bridge + .send(upstream_addr, NetMessage::V1(NetMessageV1::Connect(msg))) + .await?; } if let Some(peer) = actions.expect_connection_from { @@ -876,14 +874,12 @@ impl Operation for ConnectOp { }; // Route the response through upstream (where the request came from) // since we may not have a direct connection to the joiner - if let Some(upstream) = source_addr { - network_bridge - .send( - upstream, - NetMessage::V1(NetMessageV1::Connect(response_msg)), - ) - .await?; - } + network_bridge + .send( + upstream_addr, + NetMessage::V1(NetMessageV1::Connect(response_msg)), + ) + .await?; return Ok(store_operation_state(&mut self)); } @@ -977,7 +973,7 @@ impl Operation for ConnectOp { updated_payload } else { tracing::warn!( - acceptor = %payload.acceptor.peer(), + acceptor_key = %payload.acceptor.pub_key(), "connect: response received without source_addr, cannot fill acceptor address" ); payload.clone() @@ -988,7 +984,7 @@ impl Operation for ConnectOp { tracing::debug!( upstream_addr = %upstream_addr, - acceptor = %forward_payload.acceptor.peer(), + acceptor_key = %forward_payload.acceptor.pub_key(), "connect: forwarding response towards joiner" ); // Forward response toward the joiner via upstream diff --git a/crates/core/src/operations/get.rs b/crates/core/src/operations/get.rs index 4da895306..039ec92f4 100644 --- a/crates/core/src/operations/get.rs +++ b/crates/core/src/operations/get.rs @@ -854,16 +854,23 @@ impl Operation for GetOp { let id = *id; let key = *key; - // Use sender_from_addr for logging - let sender = sender_from_addr.clone().expect( - "ReturnGet requires sender lookup from connection - source_addr should resolve to known peer", - ); + // Handle case where sender lookup failed (e.g., peer disconnected) + let Some(sender) = sender_from_addr.clone() else { + tracing::warn!( + tx = %id, + %key, + source = ?source_addr, + "GET: ReturnGet (empty) received but sender lookup failed - cannot process" + ); + return Err(OpError::invalid_transition(self.id)); + }; + // Use pub_key for logging to avoid panics on Unknown addresses tracing::info!( tx = %id, %key, - from = %sender.peer(), - to = %target.peer(), + from = %sender.pub_key(), + to = %target.pub_key(), skip = ?skip_list, "GET: ReturnGet received with empty value" ); @@ -875,7 +882,7 @@ impl Operation for GetOp { %this_peer, "Neither contract or contract value for contract found at peer {}, \ retrying with other peers", - sender.peer() + sender.pub_key() ); match self.state { @@ -894,8 +901,10 @@ impl Operation for GetOp { }) => { // todo: register in the stats for the outcome of the op that failed to get a response from this peer - // Add the failed peer to tried list - tried_peers.insert(sender.peer().clone()); + // Add the failed peer to tried list (only if address is known) + if let Some(addr) = sender.socket_addr() { + tried_peers.insert(PeerId::new(addr, sender.pub_key().clone())); + } // First, check if we have alternatives at this hop level if !alternatives.is_empty() && attempts_at_hop < DEFAULT_MAX_BREADTH { @@ -905,7 +914,7 @@ impl Operation for GetOp { tracing::info!( tx = %id, %key, - next_peer = %next_target.peer(), + next_peer = %next_target.pub_key(), fetch_contract, attempts_at_hop = attempts_at_hop + 1, max_attempts = DEFAULT_MAX_BREADTH, @@ -923,8 +932,11 @@ impl Operation for GetOp { skip_list: tried_peers.clone(), }); - // Update state with the new alternative being tried - tried_peers.insert(next_target.peer().clone()); + // Update state with the new alternative being tried (only if address is known) + if let Some(addr) = next_target.socket_addr() { + tried_peers + .insert(PeerId::new(addr, next_target.pub_key().clone())); + } let updated_tried_peers = tried_peers.clone(); new_state = Some(GetState::AwaitingResponse { retries, diff --git a/crates/core/src/operations/subscribe.rs b/crates/core/src/operations/subscribe.rs index 4deeea336..2f5d798df 100644 --- a/crates/core/src/operations/subscribe.rs +++ b/crates/core/src/operations/subscribe.rs @@ -469,11 +469,14 @@ impl Operation for SubscribeOp { target: subscriber.clone(), subscribed: false, }; - return Ok(OperationResult { - target_addr: return_msg.target_addr(), - return_msg: Some(NetMessage::from(return_msg)), - state: None, - }); + // Use build_op_result to ensure upstream_addr is used for routing + // (important for peers behind NAT) + return build_op_result( + self.id, + None, + Some(return_msg), + self.upstream_addr, + ); } let after_direct = subscribers_snapshot(op_manager, key); @@ -581,18 +584,18 @@ impl Operation for SubscribeOp { let ring_max_htl = op_manager.ring.max_hops_to_live.max(1); let htl = (*htl).min(ring_max_htl); let this_peer = op_manager.ring.connection_manager.own_location(); - let return_not_subbed = || -> OperationResult { + // Capture upstream_addr for NAT-friendly routing in error responses + let upstream_addr = self.upstream_addr; + let return_not_subbed = || -> Result { let return_msg = SubscribeMsg::ReturnSub { key: *key, id: *id, subscribed: false, target: subscriber.clone(), }; - OperationResult { - target_addr: return_msg.target_addr(), - return_msg: Some(NetMessage::from(return_msg)), - state: None, - } + // Use build_op_result to ensure upstream_addr is used for routing + // (important for peers behind NAT) + build_op_result(*id, None, Some(return_msg), upstream_addr) }; if htl == 0 { @@ -602,7 +605,7 @@ impl Operation for SubscribeOp { subscriber = %subscriber.peer(), "Dropping Subscribe SeekNode with zero HTL" ); - return Ok(return_not_subbed()); + return return_not_subbed(); } if !super::has_contract(op_manager, *key).await? { @@ -638,7 +641,7 @@ impl Operation for SubscribeOp { error = %fetch_err, "Failed to fetch contract locally while handling subscribe" ); - return Ok(return_not_subbed()); + return return_not_subbed(); } if wait_for_local_contract(op_manager, *key).await? { @@ -653,18 +656,18 @@ impl Operation for SubscribeOp { %key, "Contract still unavailable locally after fetch attempt" ); - return Ok(return_not_subbed()); + return return_not_subbed(); } } else { let Some(new_target) = candidates.first() else { - return Ok(return_not_subbed()); + return return_not_subbed(); }; let new_target = new_target.clone(); let new_htl = htl.saturating_sub(1); if new_htl == 0 { tracing::debug!(tx = %id, %key, "Max number of hops reached while trying to get contract"); - return Ok(return_not_subbed()); + return return_not_subbed(); } let mut new_skip_list = skip_list.clone(); @@ -735,7 +738,7 @@ impl Operation for SubscribeOp { "subscribe: direct registration failed (max subscribers reached)" ); // max number of subscribers for this contract reached - return Ok(return_not_subbed()); + return return_not_subbed(); } let after_direct = subscribers_snapshot(op_manager, key); tracing::info!(