Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 66 additions & 2 deletions crates/core/src/node/network_bridge/p2p_protoc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,53 @@ impl P2pConnManager {
}
}
}
ConnEvent::OutboundMessageWithTarget { target_addr, msg } => {
// This variant uses an explicit target address from OperationResult.target_addr,
// which is critical for NAT scenarios where the address in the message
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when would that happen?

// differs from the actual transport address we should send to.
tracing::info!(
tx = %msg.id(),
msg_type = %msg,
target_addr = %target_addr,
msg_target = ?msg.target().map(|t| t.addr()),
"Sending outbound message with explicit target address (NAT routing)"
);

// Look up the connection using the explicit target address
let peer_connection = ctx.connections.get(&target_addr);

match peer_connection {
Some(peer_connection) => {
if let Err(e) =
peer_connection.sender.send(Left(msg.clone())).await
{
tracing::error!(
tx = %msg.id(),
target_addr = %target_addr,
"Failed to send message to peer: {}", e
);
} else {
tracing::info!(
tx = %msg.id(),
target_addr = %target_addr,
"Message successfully sent to peer connection via explicit address"
);
}
}
None => {
// No existing connection - this is unexpected for NAT scenarios
// since we should have the connection from the original request
tracing::error!(
tx = %msg.id(),
target_addr = %target_addr,
msg_target = ?msg.target().map(|t| t.addr()),
connections = ?ctx.connections.keys().collect::<Vec<_>>(),
"No connection found for explicit target address - NAT routing failed"
);
ctx.bridge.op_manager.completed(*msg.id());
}
}
}
ConnEvent::TransportClosed { remote_addr, error } => {
tracing::debug!(
remote = %remote_addr,
Expand Down Expand Up @@ -2266,8 +2313,19 @@ impl P2pConnManager {

fn handle_bridge_msg(&self, msg: Option<P2pBridgeEvent>) -> EventResult {
match msg {
Some(Left((_target, msg))) => {
EventResult::Event(ConnEvent::OutboundMessage(*msg).into())
Some(Left((target, msg))) => {
// Use OutboundMessageWithTarget to preserve the target address from
// OperationResult.target_addr. This is critical for NAT scenarios where
// the address in the message differs from the actual transport address.
// The PeerId.addr contains the address that was used to look up the peer
// in P2pBridge::send(), which is the correct transport address.
EventResult::Event(
ConnEvent::OutboundMessageWithTarget {
target_addr: target.addr,
msg: *msg,
}
.into(),
)
}
Some(Right(action)) => EventResult::Event(ConnEvent::NodeAction(action).into()),
None => EventResult::Event(ConnEvent::ClosedChannel(ChannelCloseReason::Bridge).into()),
Expand Down Expand Up @@ -2398,6 +2456,12 @@ enum EventResult {
pub(super) enum ConnEvent {
InboundMessage(IncomingMessage),
OutboundMessage(NetMessage),
/// Outbound message with explicit target address from OperationResult.target_addr.
/// Used when the target address differs from what's in the message (NAT scenarios).
OutboundMessageWithTarget {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we were doing things correctly this variant wouldn't be needed cause we wouldn't be ever leaking out the internal address to other peers.

target_addr: SocketAddr,
msg: NetMessage,
},
NodeAction(NodeEvent),
ClosedChannel(ChannelCloseReason),
TransportClosed {
Expand Down
32 changes: 14 additions & 18 deletions crates/core/src/operations/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,8 @@ impl RelayState {
// Use the joiner with updated observed address for response routing
actions.response_target = Some(self.request.joiner.clone());
tracing::info!(
acceptor_pub_key = %acceptor.pub_key(),
joiner_pub_key = %self.request.joiner.pub_key(),
acceptor_key = %acceptor.pub_key(),
joiner_key = %self.request.joiner.pub_key(),
acceptor_loc = ?acceptor.location,
joiner_loc = ?self.request.joiner.location,
ring_distance = ?dist,
Expand Down Expand Up @@ -690,7 +690,7 @@ impl ConnectOp {
match self.state.as_mut() {
Some(ConnectState::WaitingForResponses(state)) => {
tracing::info!(
acceptor = %response.acceptor.peer(),
acceptor_key = %response.acceptor.pub_key(),
acceptor_loc = ?response.acceptor.location,
"connect: joiner received ConnectResponse"
);
Expand Down Expand Up @@ -830,11 +830,9 @@ impl Operation for ConnectOp {
};
// Route through upstream (where the request came from) since we may
// not have a direct connection to the target
if let Some(upstream) = source_addr {
network_bridge
.send(upstream, NetMessage::V1(NetMessageV1::Connect(msg)))
.await?;
}
network_bridge
.send(upstream_addr, NetMessage::V1(NetMessageV1::Connect(msg)))
.await?;
}

if let Some(peer) = actions.expect_connection_from {
Expand Down Expand Up @@ -876,14 +874,12 @@ impl Operation for ConnectOp {
};
// Route the response through upstream (where the request came from)
// since we may not have a direct connection to the joiner
if let Some(upstream) = source_addr {
network_bridge
.send(
upstream,
NetMessage::V1(NetMessageV1::Connect(response_msg)),
)
.await?;
}
network_bridge
.send(
upstream_addr,
NetMessage::V1(NetMessageV1::Connect(response_msg)),
)
.await?;
return Ok(store_operation_state(&mut self));
}

Expand Down Expand Up @@ -977,7 +973,7 @@ impl Operation for ConnectOp {
updated_payload
} else {
tracing::warn!(
acceptor = %payload.acceptor.peer(),
acceptor_key = %payload.acceptor.pub_key(),
"connect: response received without source_addr, cannot fill acceptor address"
);
payload.clone()
Expand All @@ -988,7 +984,7 @@ impl Operation for ConnectOp {

tracing::debug!(
upstream_addr = %upstream_addr,
acceptor = %forward_payload.acceptor.peer(),
acceptor_key = %forward_payload.acceptor.pub_key(),
"connect: forwarding response towards joiner"
);
// Forward response toward the joiner via upstream
Expand Down
36 changes: 24 additions & 12 deletions crates/core/src/operations/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -854,16 +854,23 @@ impl Operation for GetOp {
let id = *id;
let key = *key;

// Use sender_from_addr for logging
let sender = sender_from_addr.clone().expect(
"ReturnGet requires sender lookup from connection - source_addr should resolve to known peer",
);
// Handle case where sender lookup failed (e.g., peer disconnected)
let Some(sender) = sender_from_addr.clone() else {
tracing::warn!(
tx = %id,
%key,
source = ?source_addr,
"GET: ReturnGet (empty) received but sender lookup failed - cannot process"
);
return Err(OpError::invalid_transition(self.id));
};

// Use pub_key for logging to avoid panics on Unknown addresses
tracing::info!(
tx = %id,
%key,
from = %sender.peer(),
to = %target.peer(),
from = %sender.pub_key(),
to = %target.pub_key(),
skip = ?skip_list,
"GET: ReturnGet received with empty value"
);
Expand All @@ -875,7 +882,7 @@ impl Operation for GetOp {
%this_peer,
"Neither contract or contract value for contract found at peer {}, \
retrying with other peers",
sender.peer()
sender.pub_key()
);

match self.state {
Expand All @@ -894,8 +901,10 @@ impl Operation for GetOp {
}) => {
// todo: register in the stats for the outcome of the op that failed to get a response from this peer

// Add the failed peer to tried list
tried_peers.insert(sender.peer().clone());
// Add the failed peer to tried list (only if address is known)
if let Some(addr) = sender.socket_addr() {
tried_peers.insert(PeerId::new(addr, sender.pub_key().clone()));
}

// First, check if we have alternatives at this hop level
if !alternatives.is_empty() && attempts_at_hop < DEFAULT_MAX_BREADTH {
Expand All @@ -905,7 +914,7 @@ impl Operation for GetOp {
tracing::info!(
tx = %id,
%key,
next_peer = %next_target.peer(),
next_peer = %next_target.pub_key(),
fetch_contract,
attempts_at_hop = attempts_at_hop + 1,
max_attempts = DEFAULT_MAX_BREADTH,
Expand All @@ -923,8 +932,11 @@ impl Operation for GetOp {
skip_list: tried_peers.clone(),
});

// Update state with the new alternative being tried
tried_peers.insert(next_target.peer().clone());
// Update state with the new alternative being tried (only if address is known)
if let Some(addr) = next_target.socket_addr() {
tried_peers
.insert(PeerId::new(addr, next_target.pub_key().clone()));
}
let updated_tried_peers = tried_peers.clone();
new_state = Some(GetState::AwaitingResponse {
retries,
Expand Down
37 changes: 20 additions & 17 deletions crates/core/src/operations/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,11 +469,14 @@ impl Operation for SubscribeOp {
target: subscriber.clone(),
subscribed: false,
};
return Ok(OperationResult {
target_addr: return_msg.target_addr(),
return_msg: Some(NetMessage::from(return_msg)),
state: None,
});
// Use build_op_result to ensure upstream_addr is used for routing
// (important for peers behind NAT)
return build_op_result(
self.id,
None,
Some(return_msg),
self.upstream_addr,
);
}

let after_direct = subscribers_snapshot(op_manager, key);
Expand Down Expand Up @@ -581,18 +584,18 @@ impl Operation for SubscribeOp {
let ring_max_htl = op_manager.ring.max_hops_to_live.max(1);
let htl = (*htl).min(ring_max_htl);
let this_peer = op_manager.ring.connection_manager.own_location();
let return_not_subbed = || -> OperationResult {
// Capture upstream_addr for NAT-friendly routing in error responses
let upstream_addr = self.upstream_addr;
let return_not_subbed = || -> Result<OperationResult, OpError> {
let return_msg = SubscribeMsg::ReturnSub {
key: *key,
id: *id,
subscribed: false,
target: subscriber.clone(),
};
OperationResult {
target_addr: return_msg.target_addr(),
return_msg: Some(NetMessage::from(return_msg)),
state: None,
}
// Use build_op_result to ensure upstream_addr is used for routing
// (important for peers behind NAT)
build_op_result(*id, None, Some(return_msg), upstream_addr)
};

if htl == 0 {
Expand All @@ -602,7 +605,7 @@ impl Operation for SubscribeOp {
subscriber = %subscriber.peer(),
"Dropping Subscribe SeekNode with zero HTL"
);
return Ok(return_not_subbed());
return return_not_subbed();
}

if !super::has_contract(op_manager, *key).await? {
Expand Down Expand Up @@ -638,7 +641,7 @@ impl Operation for SubscribeOp {
error = %fetch_err,
"Failed to fetch contract locally while handling subscribe"
);
return Ok(return_not_subbed());
return return_not_subbed();
}

if wait_for_local_contract(op_manager, *key).await? {
Expand All @@ -653,18 +656,18 @@ impl Operation for SubscribeOp {
%key,
"Contract still unavailable locally after fetch attempt"
);
return Ok(return_not_subbed());
return return_not_subbed();
}
} else {
let Some(new_target) = candidates.first() else {
return Ok(return_not_subbed());
return return_not_subbed();
};
let new_target = new_target.clone();
let new_htl = htl.saturating_sub(1);

if new_htl == 0 {
tracing::debug!(tx = %id, %key, "Max number of hops reached while trying to get contract");
return Ok(return_not_subbed());
return return_not_subbed();
}

let mut new_skip_list = skip_list.clone();
Expand Down Expand Up @@ -735,7 +738,7 @@ impl Operation for SubscribeOp {
"subscribe: direct registration failed (max subscribers reached)"
);
// max number of subscribers for this contract reached
return Ok(return_not_subbed());
return return_not_subbed();
}
let after_direct = subscribers_snapshot(op_manager, key);
tracing::info!(
Expand Down
Loading