diff --git a/crates/core/src/node/mod.rs b/crates/core/src/node/mod.rs index 9fd18e66a..7ed797659 100644 --- a/crates/core/src/node/mod.rs +++ b/crates/core/src/node/mod.rs @@ -1225,18 +1225,20 @@ pub struct PeerId { impl Hash for PeerId { fn hash(&self, state: &mut H) { - self.addr.hash(state); + self.pub_key.hash(state); } } impl PartialEq for PeerId { fn eq(&self, other: &PeerId) -> bool { - self.addr == other.addr + self.pub_key == other.pub_key } } impl Ord for PeerId { fn cmp(&self, other: &Self) -> std::cmp::Ordering { + // Compare by address since TransportPublicKey doesn't impl Ord. + // This is only used for ordering in BTreeMap/BTreeSet, not for equality. self.addr.cmp(&other.addr) } } @@ -1253,26 +1255,14 @@ impl PeerId { } } -thread_local! { - static PEER_ID: std::cell::RefCell> = const { std::cell::RefCell::new(None) }; -} - #[cfg(test)] impl<'a> arbitrary::Arbitrary<'a> for PeerId { fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result { let addr: ([u8; 4], u16) = u.arbitrary()?; - let pub_key = PEER_ID.with(|peer_id| { - let mut peer_id = peer_id.borrow_mut(); - match &*peer_id { - Some(k) => k.clone(), - None => { - let key = TransportKeypair::new().public().clone(); - peer_id.replace(key.clone()); - key - } - } - }); + // Always generate a unique keypair. PeerId equality is based on pub_key, + // so distinct peers must have distinct keys. + let pub_key = TransportKeypair::new().public().clone(); Ok(Self { addr: addr.into(), @@ -1288,17 +1278,9 @@ impl PeerId { rand::rng().fill(&mut addr[..]); let port = crate::util::get_free_port().unwrap(); - let pub_key = PEER_ID.with(|peer_id| { - let mut peer_id = peer_id.borrow_mut(); - match &*peer_id { - Some(k) => k.clone(), - None => { - let key = TransportKeypair::new().public().clone(); - peer_id.replace(key.clone()); - key - } - } - }); + // Always generate a unique keypair. PeerId equality is based on pub_key, + // so distinct peers must have distinct keys. + let pub_key = TransportKeypair::new().public().clone(); Self { addr: (addr, port).into(), diff --git a/crates/core/src/node/op_state_manager.rs b/crates/core/src/node/op_state_manager.rs index b3464d370..e38148ae2 100644 --- a/crates/core/src/node/op_state_manager.rs +++ b/crates/core/src/node/op_state_manager.rs @@ -552,18 +552,6 @@ impl OpManager { .expect_and_register_sub_operation(parent, child); } - /// Returns true if all child operations of the parent have completed. - pub fn all_sub_operations_completed(&self, parent: Transaction) -> bool { - self.sub_op_tracker - .all_sub_operations_completed(parent, &self.ops.completed) - } - - /// Returns the number of pending child operations for the parent. - pub fn count_pending_sub_operations(&self, parent: Transaction) -> usize { - self.sub_op_tracker - .count_pending_sub_operations(parent, &self.ops.completed) - } - /// Handle sub-operation failure - propagate error to parent. pub async fn sub_operation_failed( &self, @@ -627,11 +615,6 @@ impl OpManager { self.sub_op_tracker.is_sub_operation(tx) } - /// Exposes root operations awaiting sub-operation completion. - pub(crate) fn root_ops_awaiting_sub_ops(&self) -> &Arc> { - &self.sub_op_tracker.root_ops_awaiting_sub_ops - } - /// Exposes failed parent operations. pub(crate) fn failed_parents(&self) -> &Arc> { &self.sub_op_tracker.failed_parents diff --git a/crates/core/src/operations/connect.rs b/crates/core/src/operations/connect.rs index b99d7013c..630da0c61 100644 --- a/crates/core/src/operations/connect.rs +++ b/crates/core/src/operations/connect.rs @@ -941,6 +941,14 @@ impl Operation for ConnectOp { } ConnectMsg::ObservedAddress { address, .. } => { self.handle_observed_address(*address, Instant::now()); + // Update the node's own PeerId with the externally-observed address. + // This is critical for NAT peers - without this, they embed their + // local loopback address (127.0.0.1) in messages, causing remote + // peers to fail when trying to connect back. + op_manager + .ring + .connection_manager + .update_own_address(*address); Ok(store_operation_state(&mut self)) } } diff --git a/crates/core/src/operations/mod.rs b/crates/core/src/operations/mod.rs index 34e342597..1bd7ac5e4 100644 --- a/crates/core/src/operations/mod.rs +++ b/crates/core/src/operations/mod.rs @@ -114,26 +114,13 @@ where op_manager.completed(tx_id); return Ok(None); } - if op_manager.all_sub_operations_completed(tx_id) { - tracing::debug!(%tx_id, "operation complete"); - op_manager.completed(tx_id); - return Ok(Some(final_state)); - } else { - let pending_count = op_manager.count_pending_sub_operations(tx_id); - tracing::debug!( - %tx_id, - pending_count, - "root operation awaiting child completion" - ); - - // Track the root op so child completions can finish it later. - op_manager - .root_ops_awaiting_sub_ops() - .insert(tx_id, final_state); - tracing::info!(%tx_id, "root operation registered as awaiting sub-ops"); - - return Ok(None); - } + // Return finalized state immediately to the client, regardless of pending + // sub-operations. Sub-operations like subscriptions are "fire and forget" + // from the client's perspective - they want to know their PUT/GET succeeded, + // not wait for the subscription to complete. + tracing::debug!(%tx_id, "operation complete"); + op_manager.completed(tx_id); + return Ok(Some(final_state)); } Ok(OperationResult { return_msg: Some(msg), diff --git a/crates/core/src/operations/put.rs b/crates/core/src/operations/put.rs index 27df6ebeb..bd67a1c66 100644 --- a/crates/core/src/operations/put.rs +++ b/crates/core/src/operations/put.rs @@ -1055,6 +1055,24 @@ async fn try_to_broadcast( }); } } + // Handle forwarded PUTs that have no prior state - this happens when a peer + // receives PutForward and is the final destination (e.g., has no ring location yet). + // We still need to send SuccessfulPut back to the sender. + None if last_hop && broadcast_to.is_empty() => { + tracing::debug!( + tx = %id, + %key, + "Sending SuccessfulPut for forwarded PUT with no prior state (likely peer has no ring location)" + ); + new_state = None; + return_msg = Some(PutMsg::SuccessfulPut { + id, + target: upstream, + key, + sender: op_manager.ring.connection_manager.own_location(), + origin, + }); + } _ => return Err(OpError::invalid_transition(id)), }; diff --git a/crates/core/src/ring/connection_manager.rs b/crates/core/src/ring/connection_manager.rs index 2ffadb5e2..11463297d 100644 --- a/crates/core/src/ring/connection_manager.rs +++ b/crates/core/src/ring/connection_manager.rs @@ -395,6 +395,25 @@ impl ConnectionManager { } } + /// Updates the own peer's address to the externally-observed address. + /// This is called when a NAT peer receives its observed address from a gateway. + /// Returns true if the address was updated, false if unchanged. + pub fn update_own_address(&self, observed_addr: SocketAddr) -> bool { + let mut this_peer = self.peer_key.lock(); + if let Some(ref mut peer_id) = *this_peer { + if peer_id.addr != observed_addr { + tracing::info!( + old_addr = %peer_id.addr, + new_addr = %observed_addr, + "Updating own peer address to externally-observed address" + ); + peer_id.addr = observed_addr; + return true; + } + } + false + } + pub fn prune_alive_connection(&self, peer: &PeerId) -> Option { self.prune_connection(peer, true) } diff --git a/crates/core/src/tracing/aof.rs b/crates/core/src/tracing/aof.rs index 52da9f03c..52482d2f3 100644 --- a/crates/core/src/tracing/aof.rs +++ b/crates/core/src/tracing/aof.rs @@ -485,26 +485,26 @@ mod tests { let bytes = crate::util::test::random_bytes_2mb(); let mut gen = arbitrary::Unstructured::new(&bytes); let mut transactions = vec![]; - let mut peers = vec![]; let mut events = vec![]; + // Reuse a single PeerId - these tests validate log I/O, not peer identity + let peer = PeerId::random(); + for _ in 0..TEST_LOGS { let tx: Transaction = gen.arbitrary()?; transactions.push(tx); - let peer: PeerId = PeerId::random(); - peers.push(peer); } let mut total_route_events: usize = 0; - for i in 0..TEST_LOGS { + for tx in &transactions { let kind: EventKind = gen.arbitrary()?; // The route events in first REMOVE_RECS will be dropped if matches!(kind, EventKind::Route(_)) { total_route_events += 1; } events.push(NetEventLog { - tx: &transactions[i], - peer_id: peers[i].clone(), + tx, + peer_id: peer.clone(), kind, }); } @@ -531,26 +531,26 @@ mod tests { let bytes = crate::util::test::random_bytes_2mb(); let mut gen = arbitrary::Unstructured::new(&bytes); let mut transactions = vec![]; - let mut peers = vec![]; let mut events = vec![]; + // Reuse a single PeerId - these tests validate log I/O, not peer identity + let peer = PeerId::random(); + for _ in 0..TEST_LOGS { let tx: Transaction = gen.arbitrary()?; transactions.push(tx); - let peer: PeerId = PeerId::random(); - peers.push(peer); } let mut total_route_events: usize = 0; - for i in 0..TEST_LOGS { + for tx in &transactions { let kind: EventKind = gen.arbitrary()?; // The route events in first REMOVE_RECS will be dropped if matches!(kind, EventKind::Route(_)) { total_route_events += 1; } events.push(NetEventLog { - tx: &transactions[i], - peer_id: peers[i].clone(), + tx, + peer_id: peer.clone(), kind, }); } @@ -577,26 +577,26 @@ mod tests { let bytes = crate::util::test::random_bytes_2mb(); let mut gen = arbitrary::Unstructured::new(&bytes); let mut transactions = vec![]; - let mut peers = vec![]; let mut events = vec![]; + // Reuse a single PeerId - these tests validate log I/O, not peer identity + let peer = PeerId::random(); + for _ in 0..TEST_LOGS { let tx: Transaction = gen.arbitrary()?; transactions.push(tx); - let peer: PeerId = PeerId::random(); - peers.push(peer); } let mut total_route_events: usize = 0; - for i in 0..TEST_LOGS { + for (i, tx) in transactions.iter().enumerate() { let kind: EventKind = gen.arbitrary()?; // The route events in first REMOVE_RECS will be dropped if matches!(kind, EventKind::Route(_)) && i >= REMOVE_RECS { total_route_events += 1; } events.push(NetEventLog { - tx: &transactions[i], - peer_id: peers[i].clone(), + tx, + peer_id: peer.clone(), kind, }); }