Skip to content

Commit ea79962

Browse files
sanityclaude
andauthored
fix: handle NAT address in subscribe and seeding operations (#2193)
Co-authored-by: Claude <[email protected]>
1 parent 55e978e commit ea79962

File tree

12 files changed

+283
-172
lines changed

12 files changed

+283
-172
lines changed

crates/core/src/node/network_bridge/p2p_protoc.rs

Lines changed: 2 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -603,53 +603,6 @@ impl P2pConnManager {
603603
}
604604
}
605605
}
606-
ConnEvent::OutboundMessageWithTarget { target_addr, msg } => {
607-
// This variant uses an explicit target address from OperationResult.target_addr,
608-
// which is critical for NAT scenarios where the address in the message
609-
// differs from the actual transport address we should send to.
610-
tracing::info!(
611-
tx = %msg.id(),
612-
msg_type = %msg,
613-
target_addr = %target_addr,
614-
msg_target = ?msg.target().map(|t| t.addr()),
615-
"Sending outbound message with explicit target address (NAT routing)"
616-
);
617-
618-
// Look up the connection using the explicit target address
619-
let peer_connection = ctx.connections.get(&target_addr);
620-
621-
match peer_connection {
622-
Some(peer_connection) => {
623-
if let Err(e) =
624-
peer_connection.sender.send(Left(msg.clone())).await
625-
{
626-
tracing::error!(
627-
tx = %msg.id(),
628-
target_addr = %target_addr,
629-
"Failed to send message to peer: {}", e
630-
);
631-
} else {
632-
tracing::info!(
633-
tx = %msg.id(),
634-
target_addr = %target_addr,
635-
"Message successfully sent to peer connection via explicit address"
636-
);
637-
}
638-
}
639-
None => {
640-
// No existing connection - this is unexpected for NAT scenarios
641-
// since we should have the connection from the original request
642-
tracing::error!(
643-
tx = %msg.id(),
644-
target_addr = %target_addr,
645-
msg_target = ?msg.target().map(|t| t.addr()),
646-
connections = ?ctx.connections.keys().collect::<Vec<_>>(),
647-
"No connection found for explicit target address - NAT routing failed"
648-
);
649-
ctx.bridge.op_manager.completed(*msg.id());
650-
}
651-
}
652-
}
653606
ConnEvent::TransportClosed { remote_addr, error } => {
654607
tracing::debug!(
655608
remote = %remote_addr,
@@ -2313,19 +2266,8 @@ impl P2pConnManager {
23132266

23142267
fn handle_bridge_msg(&self, msg: Option<P2pBridgeEvent>) -> EventResult {
23152268
match msg {
2316-
Some(Left((target, msg))) => {
2317-
// Use OutboundMessageWithTarget to preserve the target address from
2318-
// OperationResult.target_addr. This is critical for NAT scenarios where
2319-
// the address in the message differs from the actual transport address.
2320-
// The PeerId.addr contains the address that was used to look up the peer
2321-
// in P2pBridge::send(), which is the correct transport address.
2322-
EventResult::Event(
2323-
ConnEvent::OutboundMessageWithTarget {
2324-
target_addr: target.addr,
2325-
msg: *msg,
2326-
}
2327-
.into(),
2328-
)
2269+
Some(Left((_target, msg))) => {
2270+
EventResult::Event(ConnEvent::OutboundMessage(*msg).into())
23292271
}
23302272
Some(Right(action)) => EventResult::Event(ConnEvent::NodeAction(action).into()),
23312273
None => EventResult::Event(ConnEvent::ClosedChannel(ChannelCloseReason::Bridge).into()),
@@ -2456,12 +2398,6 @@ enum EventResult {
24562398
pub(super) enum ConnEvent {
24572399
InboundMessage(IncomingMessage),
24582400
OutboundMessage(NetMessage),
2459-
/// Outbound message with explicit target address from OperationResult.target_addr.
2460-
/// Used when the target address differs from what's in the message (NAT scenarios).
2461-
OutboundMessageWithTarget {
2462-
target_addr: SocketAddr,
2463-
msg: NetMessage,
2464-
},
24652401
NodeAction(NodeEvent),
24662402
ClosedChannel(ChannelCloseReason),
24672403
TransportClosed {

crates/core/src/node/testing_impl/in_memory.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,12 +125,12 @@ where
125125
self.op_manager.ring.seed_contract(key);
126126
}
127127
if let Some(subscribers) = contract_subscribers.get(&key) {
128-
// add contract subscribers
128+
// add contract subscribers (test setup - no upstream_addr)
129129
for subscriber in subscribers {
130130
if self
131131
.op_manager
132132
.ring
133-
.add_subscriber(&key, subscriber.clone())
133+
.add_subscriber(&key, subscriber.clone(), None)
134134
.is_err()
135135
{
136136
tracing::warn!("Max subscribers for contract {} reached", key);

crates/core/src/operations/connect.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -320,8 +320,8 @@ impl RelayState {
320320
// Use the joiner with updated observed address for response routing
321321
actions.response_target = Some(self.request.joiner.clone());
322322
tracing::info!(
323-
acceptor_key = %acceptor.pub_key(),
324-
joiner_key = %self.request.joiner.pub_key(),
323+
acceptor_pub_key = %acceptor.pub_key(),
324+
joiner_pub_key = %self.request.joiner.pub_key(),
325325
acceptor_loc = ?acceptor.location,
326326
joiner_loc = ?self.request.joiner.location,
327327
ring_distance = ?dist,
@@ -690,7 +690,7 @@ impl ConnectOp {
690690
match self.state.as_mut() {
691691
Some(ConnectState::WaitingForResponses(state)) => {
692692
tracing::info!(
693-
acceptor_key = %response.acceptor.pub_key(),
693+
acceptor_pub_key = %response.acceptor.pub_key(),
694694
acceptor_loc = ?response.acceptor.location,
695695
"connect: joiner received ConnectResponse"
696696
);
@@ -829,7 +829,8 @@ impl Operation for ConnectOp {
829829
address,
830830
};
831831
// Route through upstream (where the request came from) since we may
832-
// not have a direct connection to the target
832+
// not have a direct connection to the target.
833+
// Note: upstream_addr is already validated from source_addr at the start of this match arm.
833834
network_bridge
834835
.send(upstream_addr, NetMessage::V1(NetMessageV1::Connect(msg)))
835836
.await?;
@@ -873,7 +874,8 @@ impl Operation for ConnectOp {
873874
payload: response,
874875
};
875876
// Route the response through upstream (where the request came from)
876-
// since we may not have a direct connection to the joiner
877+
// since we may not have a direct connection to the joiner.
878+
// Note: upstream_addr is already validated from source_addr at the start of this match arm.
877879
network_bridge
878880
.send(
879881
upstream_addr,
@@ -966,14 +968,14 @@ impl Operation for ConnectOp {
966968
let mut updated_payload = payload.clone();
967969
updated_payload.acceptor.peer_addr = PeerAddr::Known(acceptor_addr);
968970
tracing::debug!(
969-
acceptor = %updated_payload.acceptor.peer(),
971+
acceptor_pub_key = %updated_payload.acceptor.pub_key(),
970972
acceptor_addr = %acceptor_addr,
971973
"connect: filled acceptor address from source_addr"
972974
);
973975
updated_payload
974976
} else {
975977
tracing::warn!(
976-
acceptor_key = %payload.acceptor.pub_key(),
978+
acceptor_pub_key = %payload.acceptor.pub_key(),
977979
"connect: response received without source_addr, cannot fill acceptor address"
978980
);
979981
payload.clone()
@@ -984,7 +986,7 @@ impl Operation for ConnectOp {
984986

985987
tracing::debug!(
986988
upstream_addr = %upstream_addr,
987-
acceptor_key = %forward_payload.acceptor.pub_key(),
989+
acceptor_pub_key = %forward_payload.acceptor.pub_key(),
988990
"connect: forwarding response towards joiner"
989991
);
990992
// Forward response toward the joiner via upstream

crates/core/src/operations/get.rs

Lines changed: 34 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -549,9 +549,14 @@ impl Operation for GetOp {
549549
);
550550

551551
// Use sender_from_addr (looked up from source_addr) instead of message field
552-
let sender = sender_from_addr.clone().expect(
553-
"RequestGet requires sender lookup from connection - source_addr should resolve to known peer",
554-
);
552+
let Some(sender) = sender_from_addr.clone() else {
553+
tracing::warn!(
554+
tx = %id,
555+
%key,
556+
"GET: RequestGet without sender lookup - cannot process"
557+
);
558+
return Err(OpError::invalid_transition(self.id));
559+
};
555560

556561
// Check if operation is already completed
557562
if matches!(self.state, Some(GetState::Finished { .. })) {
@@ -702,9 +707,14 @@ impl Operation for GetOp {
702707
let this_peer = target.clone();
703708

704709
// Use sender_from_addr (looked up from source_addr) instead of message field
705-
let sender = sender_from_addr.clone().expect(
706-
"SeekNode requires sender lookup from connection - source_addr should resolve to known peer",
707-
);
710+
let Some(sender) = sender_from_addr.clone() else {
711+
tracing::warn!(
712+
tx = %id,
713+
%key,
714+
"GET: SeekNode without sender lookup - cannot process"
715+
);
716+
return Err(OpError::invalid_transition(self.id));
717+
};
708718

709719
if htl == 0 {
710720
let sender_display = sender.peer().to_string();
@@ -854,23 +864,21 @@ impl Operation for GetOp {
854864
let id = *id;
855865
let key = *key;
856866

857-
// Handle case where sender lookup failed (e.g., peer disconnected)
867+
// Use sender_from_addr for logging
858868
let Some(sender) = sender_from_addr.clone() else {
859869
tracing::warn!(
860870
tx = %id,
861871
%key,
862-
source = ?source_addr,
863-
"GET: ReturnGet (empty) received but sender lookup failed - cannot process"
872+
"GET: ReturnGet without sender lookup - cannot process"
864873
);
865874
return Err(OpError::invalid_transition(self.id));
866875
};
867876

868-
// Use pub_key for logging to avoid panics on Unknown addresses
869877
tracing::info!(
870878
tx = %id,
871879
%key,
872-
from = %sender.pub_key(),
873-
to = %target.pub_key(),
880+
from = %sender.peer(),
881+
to = %target.peer(),
874882
skip = ?skip_list,
875883
"GET: ReturnGet received with empty value"
876884
);
@@ -882,7 +890,7 @@ impl Operation for GetOp {
882890
%this_peer,
883891
"Neither contract or contract value for contract found at peer {}, \
884892
retrying with other peers",
885-
sender.pub_key()
893+
sender.peer()
886894
);
887895

888896
match self.state {
@@ -901,10 +909,8 @@ impl Operation for GetOp {
901909
}) => {
902910
// todo: register in the stats for the outcome of the op that failed to get a response from this peer
903911

904-
// Add the failed peer to tried list (only if address is known)
905-
if let Some(addr) = sender.socket_addr() {
906-
tried_peers.insert(PeerId::new(addr, sender.pub_key().clone()));
907-
}
912+
// Add the failed peer to tried list
913+
tried_peers.insert(sender.peer().clone());
908914

909915
// First, check if we have alternatives at this hop level
910916
if !alternatives.is_empty() && attempts_at_hop < DEFAULT_MAX_BREADTH {
@@ -914,7 +920,7 @@ impl Operation for GetOp {
914920
tracing::info!(
915921
tx = %id,
916922
%key,
917-
next_peer = %next_target.pub_key(),
923+
next_peer = %next_target.peer(),
918924
fetch_contract,
919925
attempts_at_hop = attempts_at_hop + 1,
920926
max_attempts = DEFAULT_MAX_BREADTH,
@@ -932,11 +938,8 @@ impl Operation for GetOp {
932938
skip_list: tried_peers.clone(),
933939
});
934940

935-
// Update state with the new alternative being tried (only if address is known)
936-
if let Some(addr) = next_target.socket_addr() {
937-
tried_peers
938-
.insert(PeerId::new(addr, next_target.pub_key().clone()));
939-
}
941+
// Update state with the new alternative being tried
942+
tried_peers.insert(next_target.peer().clone());
940943
let updated_tried_peers = tried_peers.clone();
941944
new_state = Some(GetState::AwaitingResponse {
942945
retries,
@@ -1125,9 +1128,14 @@ impl Operation for GetOp {
11251128
let key = *key;
11261129

11271130
// Use sender_from_addr for logging
1128-
let sender = sender_from_addr.clone().expect(
1129-
"ReturnGet requires sender lookup from connection - source_addr should resolve to known peer",
1130-
);
1131+
let Some(sender) = sender_from_addr.clone() else {
1132+
tracing::warn!(
1133+
tx = %id,
1134+
%key,
1135+
"GET: ReturnGet without sender lookup - cannot process"
1136+
);
1137+
return Err(OpError::invalid_transition(self.id));
1138+
};
11311139

11321140
tracing::info!(tx = %id, %key, "Received get response with state: {:?}", self.state.as_ref().unwrap());
11331141

0 commit comments

Comments
 (0)