Skip to content

Commit f9a1f7a

Browse files
sanityclaude
andcommitted
fix: incorporate PR #2172 fixes into PR #2174
Merges branch fix/seeding-subscriber-nat-2164 to bring in Nacho's address validation fixes: - Better tracing for subscribe address updates - Clearer comments for subscription registration 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
2 parents bf5356f + 8bd39fc commit f9a1f7a

File tree

4 files changed

+74
-38
lines changed

4 files changed

+74
-38
lines changed

crates/core/src/operations/connect.rs

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -830,11 +830,21 @@ impl Operation for ConnectOp {
830830
};
831831
// Route through upstream (where the request came from) since we may
832832
// not have a direct connection to the target
833-
if let Some(upstream) = &source_addr {
834-
network_bridge
835-
.send(*upstream, NetMessage::V1(NetMessageV1::Connect(msg)))
836-
.await?;
837-
}
833+
let Some(upstream) = source_addr else {
834+
tracing::warn!(
835+
tx = %self.id,
836+
"ObservedAddress message has no upstream - was this locally initiated?"
837+
);
838+
// No upstream to route through - this shouldn't happen for relayed connections
839+
return Ok(OperationResult {
840+
return_msg: None,
841+
target_addr: None,
842+
state: Some(OpEnum::Connect(Box::new(self))),
843+
});
844+
};
845+
network_bridge
846+
.send(upstream, NetMessage::V1(NetMessageV1::Connect(msg)))
847+
.await?;
838848
}
839849

840850
if let Some(peer) = actions.expect_connection_from {
@@ -876,14 +886,20 @@ impl Operation for ConnectOp {
876886
};
877887
// Route the response through upstream (where the request came from)
878888
// since we may not have a direct connection to the joiner
879-
if let Some(upstream) = &source_addr {
880-
network_bridge
881-
.send(
882-
*upstream,
883-
NetMessage::V1(NetMessageV1::Connect(response_msg)),
884-
)
885-
.await?;
886-
}
889+
let Some(upstream) = source_addr else {
890+
tracing::warn!(
891+
tx = %self.id,
892+
"ConnectResponse has no upstream - was this locally initiated?"
893+
);
894+
// No upstream to route through - this shouldn't happen for relayed connections
895+
return Ok(store_operation_state(&mut self));
896+
};
897+
network_bridge
898+
.send(
899+
upstream,
900+
NetMessage::V1(NetMessageV1::Connect(response_msg)),
901+
)
902+
.await?;
887903
return Ok(store_operation_state(&mut self));
888904
}
889905

crates/core/src/operations/get.rs

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -549,9 +549,14 @@ impl Operation for GetOp {
549549
);
550550

551551
// Use sender_from_addr (looked up from source_addr) instead of message field
552-
let sender = sender_from_addr.clone().expect(
553-
"RequestGet requires sender lookup from connection - source_addr should resolve to known peer",
554-
);
552+
let Some(sender) = sender_from_addr.clone() else {
553+
tracing::warn!(
554+
tx = %id,
555+
%key,
556+
"GET: RequestGet without sender lookup - cannot process"
557+
);
558+
return Err(OpError::invalid_transition(self.id));
559+
};
555560

556561
// Check if operation is already completed
557562
if matches!(self.state, Some(GetState::Finished { .. })) {
@@ -702,9 +707,14 @@ impl Operation for GetOp {
702707
let this_peer = target.clone();
703708

704709
// Use sender_from_addr (looked up from source_addr) instead of message field
705-
let sender = sender_from_addr.clone().expect(
706-
"SeekNode requires sender lookup from connection - source_addr should resolve to known peer",
707-
);
710+
let Some(sender) = sender_from_addr.clone() else {
711+
tracing::warn!(
712+
tx = %id,
713+
%key,
714+
"GET: SeekNode without sender lookup - cannot process"
715+
);
716+
return Err(OpError::invalid_transition(self.id));
717+
};
708718

709719
if htl == 0 {
710720
let sender_display = sender.peer().to_string();
@@ -855,9 +865,14 @@ impl Operation for GetOp {
855865
let key = *key;
856866

857867
// Use sender_from_addr for logging
858-
let sender = sender_from_addr.clone().expect(
859-
"ReturnGet requires sender lookup from connection - source_addr should resolve to known peer",
860-
);
868+
let Some(sender) = sender_from_addr.clone() else {
869+
tracing::warn!(
870+
tx = %id,
871+
%key,
872+
"GET: ReturnGet without sender lookup - cannot process"
873+
);
874+
return Err(OpError::invalid_transition(self.id));
875+
};
861876

862877
tracing::info!(
863878
tx = %id,
@@ -1113,9 +1128,14 @@ impl Operation for GetOp {
11131128
let key = *key;
11141129

11151130
// Use sender_from_addr for logging
1116-
let sender = sender_from_addr.clone().expect(
1117-
"ReturnGet requires sender lookup from connection - source_addr should resolve to known peer",
1118-
);
1131+
let Some(sender) = sender_from_addr.clone() else {
1132+
tracing::warn!(
1133+
tx = %id,
1134+
%key,
1135+
"GET: ReturnGet without sender lookup - cannot process"
1136+
);
1137+
return Err(OpError::invalid_transition(self.id));
1138+
};
11191139

11201140
tracing::info!(tx = %id, %key, "Received get response with state: {:?}", self.state.as_ref().unwrap());
11211141

crates/core/src/operations/subscribe.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -411,24 +411,24 @@ impl Operation for SubscribeOp {
411411
// This is the key step where the first recipient (gateway) determines the
412412
// subscriber's external address from the actual packet source address.
413413
let mut subscriber = subscriber.clone();
414-
if subscriber.peer_addr.is_unknown() {
415-
if let Some(addr) = source_addr {
416-
subscriber.set_addr(addr);
417-
tracing::debug!(
418-
tx = %id,
419-
%key,
420-
subscriber_addr = %addr,
421-
"subscribe: filled subscriber address from source_addr"
422-
);
423-
}
424-
}
425414

426415
tracing::debug!(
427416
tx = %id,
428417
%key,
429-
subscriber = %subscriber.peer(),
418+
subscriber_orig = %subscriber.peer(),
419+
source_addr = ?source_addr,
430420
"subscribe: processing RequestSub"
431421
);
422+
423+
if let Some(addr) = source_addr {
424+
subscriber.set_addr(addr);
425+
tracing::debug!(
426+
tx = %id,
427+
%key,
428+
subscriber_updated = %subscriber.peer(),
429+
"subscribe: updated subscriber address from transport source"
430+
);
431+
}
432432
let own_loc = op_manager.ring.connection_manager.own_location();
433433

434434
if !matches!(

crates/core/src/operations/update.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1005,7 +1005,7 @@ pub(crate) async fn request_update(
10051005
.closest_potentially_caching(&key, [sender.peer().clone()].as_slice());
10061006

10071007
if let Some(target) = remote_target {
1008-
// Subscribe to the contract (local subscription, no upstream NAT addr)
1008+
// Subscribe on behalf of the requesting peer (no upstream_addr - direct registration)
10091009
op_manager
10101010
.ring
10111011
.add_subscriber(&key, sender.clone(), None)

0 commit comments

Comments
 (0)