Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 10 additions & 28 deletions crates/core/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1225,18 +1225,20 @@ pub struct PeerId {

impl Hash for PeerId {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.addr.hash(state);
self.pub_key.hash(state);
}
}

impl PartialEq<PeerId> for PeerId {
fn eq(&self, other: &PeerId) -> bool {
self.addr == other.addr
self.pub_key == other.pub_key
}
}

impl Ord for PeerId {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// Compare by address since TransportPublicKey doesn't impl Ord.
// This is only used for ordering in BTreeMap/BTreeSet, not for equality.
self.addr.cmp(&other.addr)
}
}
Expand All @@ -1253,26 +1255,14 @@ impl PeerId {
}
}

thread_local! {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the reason this exists is because for many tests where we dont care about the peerid (e.g. no integration or e2e tests) generating a large amount of peers is dramatic (comptutationally expensive). For the cases we care about we use the explicit PeerId::random

Arbitrary is used for the most part for unit tests. Be careful with the impact of this change.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I investigated this and found that the only problematic usage was in aof.rs tests, which generate 10k-100k log entries, each previously getting a unique PeerId. Those tests validate log serialization/deserialization—they don't actually need unique peer identities.

I've pushed a fix that reuses a single PeerId per test in the three affected functions (test_aof_read_write_complex, test_aof_complex_reconstruction, test_aof_sequential_ids_reconstruction). All other usages in the codebase are small counts (5-45 peers per test), where the overhead is negligible.

With this fix, the cache is no longer needed.

[AI-assisted - Claude]

static PEER_ID: std::cell::RefCell<Option<TransportPublicKey>> = const { std::cell::RefCell::new(None) };
}

#[cfg(test)]
impl<'a> arbitrary::Arbitrary<'a> for PeerId {
fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
let addr: ([u8; 4], u16) = u.arbitrary()?;

let pub_key = PEER_ID.with(|peer_id| {
let mut peer_id = peer_id.borrow_mut();
match &*peer_id {
Some(k) => k.clone(),
None => {
let key = TransportKeypair::new().public().clone();
peer_id.replace(key.clone());
key
}
}
});
// Always generate a unique keypair. PeerId equality is based on pub_key,
// so distinct peers must have distinct keys.
let pub_key = TransportKeypair::new().public().clone();

Ok(Self {
addr: addr.into(),
Expand All @@ -1288,17 +1278,9 @@ impl PeerId {
rand::rng().fill(&mut addr[..]);
let port = crate::util::get_free_port().unwrap();

let pub_key = PEER_ID.with(|peer_id| {
let mut peer_id = peer_id.borrow_mut();
match &*peer_id {
Some(k) => k.clone(),
None => {
let key = TransportKeypair::new().public().clone();
peer_id.replace(key.clone());
key
}
}
});
// Always generate a unique keypair. PeerId equality is based on pub_key,
// so distinct peers must have distinct keys.
let pub_key = TransportKeypair::new().public().clone();

Self {
addr: (addr, port).into(),
Expand Down
17 changes: 0 additions & 17 deletions crates/core/src/node/op_state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,18 +552,6 @@ impl OpManager {
.expect_and_register_sub_operation(parent, child);
}

/// Returns true if all child operations of the parent have completed.
pub fn all_sub_operations_completed(&self, parent: Transaction) -> bool {
self.sub_op_tracker
.all_sub_operations_completed(parent, &self.ops.completed)
}

/// Returns the number of pending child operations for the parent.
pub fn count_pending_sub_operations(&self, parent: Transaction) -> usize {
self.sub_op_tracker
.count_pending_sub_operations(parent, &self.ops.completed)
}

/// Handle sub-operation failure - propagate error to parent.
pub async fn sub_operation_failed(
&self,
Expand Down Expand Up @@ -627,11 +615,6 @@ impl OpManager {
self.sub_op_tracker.is_sub_operation(tx)
}

/// Exposes root operations awaiting sub-operation completion.
pub(crate) fn root_ops_awaiting_sub_ops(&self) -> &Arc<DashMap<Transaction, OpEnum>> {
&self.sub_op_tracker.root_ops_awaiting_sub_ops
}

/// Exposes failed parent operations.
pub(crate) fn failed_parents(&self) -> &Arc<DashSet<Transaction>> {
&self.sub_op_tracker.failed_parents
Expand Down
8 changes: 8 additions & 0 deletions crates/core/src/operations/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,14 @@ impl Operation for ConnectOp {
}
ConnectMsg::ObservedAddress { address, .. } => {
self.handle_observed_address(*address, Instant::now());
// Update the node's own PeerId with the externally-observed address.
// This is critical for NAT peers - without this, they embed their
// local loopback address (127.0.0.1) in messages, causing remote
// peers to fail when trying to connect back.
op_manager
.ring
.connection_manager
.update_own_address(*address);
Ok(store_operation_state(&mut self))
}
}
Expand Down
27 changes: 7 additions & 20 deletions crates/core/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,26 +114,13 @@ where
op_manager.completed(tx_id);
return Ok(None);
}
if op_manager.all_sub_operations_completed(tx_id) {
tracing::debug!(%tx_id, "operation complete");
op_manager.completed(tx_id);
return Ok(Some(final_state));
} else {
let pending_count = op_manager.count_pending_sub_operations(tx_id);
tracing::debug!(
%tx_id,
pending_count,
"root operation awaiting child completion"
);

// Track the root op so child completions can finish it later.
op_manager
.root_ops_awaiting_sub_ops()
.insert(tx_id, final_state);
tracing::info!(%tx_id, "root operation registered as awaiting sub-ops");

return Ok(None);
}
// Return finalized state immediately to the client, regardless of pending
// sub-operations. Sub-operations like subscriptions are "fire and forget"
// from the client's perspective - they want to know their PUT/GET succeeded,
// not wait for the subscription to complete.
tracing::debug!(%tx_id, "operation complete");
op_manager.completed(tx_id);
return Ok(Some(final_state));
}
Ok(OperationResult {
return_msg: Some(msg),
Expand Down
18 changes: 18 additions & 0 deletions crates/core/src/operations/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,24 @@ async fn try_to_broadcast(
});
}
}
// Handle forwarded PUTs that have no prior state - this happens when a peer
// receives PutForward and is the final destination (e.g., has no ring location yet).
// We still need to send SuccessfulPut back to the sender.
None if last_hop && broadcast_to.is_empty() => {
tracing::debug!(
tx = %id,
%key,
"Sending SuccessfulPut for forwarded PUT with no prior state (likely peer has no ring location)"
);
new_state = None;
return_msg = Some(PutMsg::SuccessfulPut {
id,
target: upstream,
key,
sender: op_manager.ring.connection_manager.own_location(),
origin,
});
}
_ => return Err(OpError::invalid_transition(id)),
};

Expand Down
19 changes: 19 additions & 0 deletions crates/core/src/ring/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,25 @@ impl ConnectionManager {
}
}

/// Updates the own peer's address to the externally-observed address.
/// This is called when a NAT peer receives its observed address from a gateway.
/// Returns true if the address was updated, false if unchanged.
pub fn update_own_address(&self, observed_addr: SocketAddr) -> bool {
let mut this_peer = self.peer_key.lock();
if let Some(ref mut peer_id) = *this_peer {
if peer_id.addr != observed_addr {
tracing::info!(
old_addr = %peer_id.addr,
new_addr = %observed_addr,
"Updating own peer address to externally-observed address"
);
peer_id.addr = observed_addr;
return true;
}
}
false
}

pub fn prune_alive_connection(&self, peer: &PeerId) -> Option<Location> {
self.prune_connection(peer, true)
}
Expand Down
Loading