Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
05bd395
refactor: add PeerAddr enum for explicit address state
sanity Nov 27, 2025
18ccae1
refactor: add accessor methods to PeerKeyLocation
sanity Nov 27, 2025
7412431
refactor: migrate PeerKeyLocation field accesses to use new methods
sanity Nov 27, 2025
075e75d
refactor(ring): restructure PeerKeyLocation to separate identity from…
sanity Nov 27, 2025
316998d
refactor(ring): remove dead_code allows from PeerAddr
sanity Nov 27, 2025
36cad43
refactor(ring): change LiveTransactionTracker key from PeerId to Sock…
sanity Nov 27, 2025
3f1faba
refactor(network): change NetworkBridge trait to use SocketAddr inste…
sanity Nov 28, 2025
ff5db9e
fix(network): rewrite sender addresses for NAT traversal in all opera…
sanity Nov 28, 2025
f99bec2
refactor(network): change connections HashMap key from PeerId to Sock…
sanity Nov 28, 2025
2bfb7cc
refactor(routing): thread source_addr through call chain, remove rewr…
sanity Nov 28, 2025
f8cfabe
refactor(operations): add source_addr to Operation trait and handle_o…
sanity Nov 28, 2025
ca24455
refactor(get): add upstream_addr to GetOp for connection-based routing
sanity Nov 28, 2025
054cc7b
refactor(subscribe): add upstream_addr to SubscribeOp for connection-…
sanity Nov 28, 2025
74f095f
refactor(operations): add upstream_addr to PutOp and UpdateOp
sanity Nov 28, 2025
784309a
fix: preserve target address through bridge event for NAT routing
sanity Nov 29, 2025
15eb451
refactor: use ObservedAddr newtype for source_addr throughout
sanity Nov 29, 2025
02a3b5b
ci: trigger workflow
sanity Nov 29, 2025
28c4f5a
fix: resolve post-rebase compilation errors
sanity Nov 29, 2025
7bd31cc
fix: use socket_addr() and set_addr() for proper address mutation
sanity Nov 29, 2025
3a9254e
fix: route connect messages through upstream for NAT relay
sanity Nov 29, 2025
f9b81b1
fix: allow Get::ReturnGet messages to be requeued locally
sanity Nov 30, 2025
0131c7b
fix: run connection monitoring concurrently with event listener
sanity Nov 30, 2025
58dd1a3
fix: only return connected peers from k_closest_potentially_caching
sanity Nov 30, 2025
ded8e4c
fix: use pub_key() and socket_addr() in connect operation
sanity Nov 30, 2025
d91ca3a
fix: remove reverted update_peer_address call and add missing is_gate…
sanity Nov 30, 2025
85001cc
ci: retrigger CI
sanity Nov 30, 2025
766e53d
fix: use unique temp dir in test_serde_config_args
sanity Nov 30, 2025
8e1b4b7
perf(test): cache keypair in PeerKeyLocation::random() to speed up tests
sanity Dec 1, 2025
848656f
ci: retry CI [AI-assisted - Claude]
sanity Dec 1, 2025
37bee61
fix: eliminate port reservation race condition in test infrastructure
sanity Dec 1, 2025
aaf3e3b
refactor: tighten address typing for connect/network bridge
sanity Dec 1, 2025
b4f626d
refactor: move ObservedAddr work to correct PR in stack
sanity Dec 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/core/src/client_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ async fn process_open_request(
let own_location = op_manager.ring.connection_manager.own_location();
let has_remote_peers = op_manager
.ring
.closest_potentially_caching(&contract_key, &[own_location.peer][..])
.closest_potentially_caching(&contract_key, &[own_location.peer()][..])
.is_some();

if !has_remote_peers {
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1257,8 +1257,11 @@ mod tests {

#[tokio::test]
async fn test_serde_config_args() {
// Use a unique ID to avoid conflicts with other tests or stale /tmp/freenet directories
let unique_id = format!("test-serde-{}", std::process::id());
let args = ConfigArgs {
mode: Some(OperationMode::Local),
id: Some(unique_id),
..Default::default()
};
let cfg = args.build().await.unwrap();
Expand Down
53 changes: 51 additions & 2 deletions crates/core/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use std::{
borrow::{Borrow, Cow},
fmt::Display,
net::SocketAddr,
time::{Duration, SystemTime},
};

Expand Down Expand Up @@ -267,6 +268,54 @@ pub(crate) trait MessageStats {
fn requested_location(&self) -> Option<Location>;
}

/// Wrapper for inbound messages that carries the source address from the transport layer.
/// This separates routing concerns from message content - the source address is determined by
/// the network layer (from the packet), not embedded in the serialized message.
///
/// Generic over the message type so it can wrap:
/// - `NetMessage` at the network layer (p2p_protoc.rs)
/// - Specific operation messages (GetMsg, PutMsg, etc.) at the operation layer
///
/// Note: Currently unused but prepared for Phase 4 of #2164.
/// Will be used to thread source addresses to operations for routing.
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct InboundMessage<M> {
/// The message content
pub msg: M,
/// The socket address this message was received from (from UDP packet source)
pub source_addr: SocketAddr,
}

#[allow(dead_code)]
impl<M> InboundMessage<M> {
/// Create a new inbound message wrapper
pub fn new(msg: M, source_addr: SocketAddr) -> Self {
Self { msg, source_addr }
}

/// Transform the inner message while preserving source_addr
pub fn map<N>(self, f: impl FnOnce(M) -> N) -> InboundMessage<N> {
InboundMessage {
msg: f(self.msg),
source_addr: self.source_addr,
}
}

/// Get a reference to the inner message
pub fn inner(&self) -> &M {
&self.msg
}
}

#[allow(dead_code)]
impl InboundMessage<NetMessage> {
/// Get the transaction ID from the wrapped network message
pub fn id(&self) -> &Transaction {
self.msg.id()
}
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub(crate) enum NetMessage {
V1(NetMessageV1),
Expand Down Expand Up @@ -333,8 +382,8 @@ type ConnectResult = Result<(PeerId, RemainingChecks), ()>;
/// Internal node events emitted to the event loop.
#[derive(Debug, Clone)]
pub(crate) enum NodeEvent {
/// Drop the given peer connection.
DropConnection(PeerId),
/// Drop the given peer connection by socket address.
DropConnection(std::net::SocketAddr),
// Try connecting to the given peer.
ConnectPeer {
peer: PeerId,
Expand Down
83 changes: 61 additions & 22 deletions crates/core/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use crate::operations::handle_op_request;
pub(crate) use network_bridge::{ConnectionError, EventLoopNotificationsSender, NetworkBridge};

use crate::topology::rate::Rate;
use crate::transport::{TransportKeypair, TransportPublicKey};
use crate::transport::{ObservedAddr, TransportKeypair, TransportPublicKey};
pub(crate) use op_state_manager::{OpManager, OpNotAvailable};

mod message_processor;
Expand Down Expand Up @@ -369,9 +369,12 @@ impl NodeConfig {
let gateways: Vec<PeerKeyLocation> = self
.gateways
.iter()
.map(|node| PeerKeyLocation {
peer: node.peer_id.clone(),
location: Some(node.location),
.map(|node| {
PeerKeyLocation::with_location(
node.peer_id.pub_key.clone(),
node.peer_id.addr,
node.location,
)
})
.collect();

Expand Down Expand Up @@ -617,6 +620,7 @@ pub(super) async fn process_message<CB>(
#[allow(clippy::too_many_arguments)]
pub(crate) async fn process_message_decoupled<CB>(
msg: NetMessage,
source_addr: Option<ObservedAddr>,
op_manager: Arc<OpManager>,
conn_manager: CB,
mut event_listener: Box<dyn NetEventRegister>,
Expand All @@ -631,6 +635,7 @@ pub(crate) async fn process_message_decoupled<CB>(
// Pure network message processing - no client types involved
let op_result = handle_pure_network_message(
msg,
source_addr,
op_manager.clone(),
conn_manager,
event_listener.as_mut(),
Expand Down Expand Up @@ -676,6 +681,7 @@ pub(crate) async fn process_message_decoupled<CB>(
#[allow(clippy::too_many_arguments)]
async fn handle_pure_network_message<CB>(
msg: NetMessage,
source_addr: Option<ObservedAddr>,
op_manager: Arc<OpManager>,
conn_manager: CB,
event_listener: &mut dyn NetEventRegister,
Expand All @@ -688,6 +694,7 @@ where
NetMessage::V1(msg_v1) => {
handle_pure_network_message_v1(
msg_v1,
source_addr,
op_manager,
conn_manager,
event_listener,
Expand Down Expand Up @@ -725,8 +732,9 @@ async fn process_message_v1<CB>(
transaction = %msg.id(),
tx_type = %msg.id().transaction_type()
);
// Legacy path - no source_addr available
let op_result =
handle_op_request::<ConnectOp, _>(&op_manager, &mut conn_manager, op)
handle_op_request::<ConnectOp, _>(&op_manager, &mut conn_manager, op, None)
.instrument(span)
.await;

Expand All @@ -741,8 +749,10 @@ async fn process_message_v1<CB>(
.await;
}
NetMessageV1::Put(ref op) => {
// Legacy path - no source_addr available
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this happen? Should we log it? Should we error out?

let op_result =
handle_op_request::<put::PutOp, _>(&op_manager, &mut conn_manager, op).await;
handle_op_request::<put::PutOp, _>(&op_manager, &mut conn_manager, op, None)
.await;

if is_operation_completed(&op_result) {
if let Some(ref op_execution_callback) = pending_op_result {
Expand All @@ -767,8 +777,10 @@ async fn process_message_v1<CB>(
.await;
}
NetMessageV1::Get(ref op) => {
// Legacy path - no source_addr available
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this happen? Should we log it? Should we error out?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this can happen - it's the current code path used by all operations (Connect/Put/Get/Subscribe/Update) in PR #2167. The NetMessageV1::* handlers all pass None for source_addr as a transitional state.

This legacy path will be removed in PR #2169 when we complete the wire protocol cleanup and switch to connection-based routing everywhere.

For now in #2167, I don't think we need to log it - it's expected behavior during this transition. Once #2169 lands and we switch to ObservedAddr everywhere, these legacy paths will be deleted.

[AI-assisted - Claude]

let op_result =
handle_op_request::<get::GetOp, _>(&op_manager, &mut conn_manager, op).await;
handle_op_request::<get::GetOp, _>(&op_manager, &mut conn_manager, op, None)
.await;
if is_operation_completed(&op_result) {
if let Some(ref op_execution_callback) = pending_op_result {
let tx_id = *op.id();
Expand All @@ -789,10 +801,12 @@ async fn process_message_v1<CB>(
.await;
}
NetMessageV1::Subscribe(ref op) => {
// Legacy path - no source_addr available
let op_result = handle_op_request::<subscribe::SubscribeOp, _>(
&op_manager,
&mut conn_manager,
op,
None,
)
.await;
if is_operation_completed(&op_result) {
Expand All @@ -815,9 +829,14 @@ async fn process_message_v1<CB>(
.await;
}
NetMessageV1::Update(ref op) => {
let op_result =
handle_op_request::<update::UpdateOp, _>(&op_manager, &mut conn_manager, op)
.await;
// Legacy path - no source_addr available
let op_result = handle_op_request::<update::UpdateOp, _>(
&op_manager,
&mut conn_manager,
op,
None,
)
.await;
if is_operation_completed(&op_result) {
if let Some(ref op_execution_callback) = pending_op_result {
let tx_id = *op.id();
Expand Down Expand Up @@ -857,6 +876,7 @@ async fn process_message_v1<CB>(
#[allow(clippy::too_many_arguments)]
async fn handle_pure_network_message_v1<CB>(
msg: NetMessageV1,
source_addr: Option<ObservedAddr>,
op_manager: Arc<OpManager>,
mut conn_manager: CB,
event_listener: &mut dyn NetEventRegister,
Expand Down Expand Up @@ -884,10 +904,14 @@ where
transaction = %msg.id(),
tx_type = %msg.id().transaction_type()
);
let op_result =
handle_op_request::<ConnectOp, _>(&op_manager, &mut conn_manager, op)
.instrument(span)
.await;
let op_result = handle_op_request::<ConnectOp, _>(
&op_manager,
&mut conn_manager,
op,
source_addr,
)
.instrument(span)
.await;

if let Err(OpError::OpNotAvailable(state)) = &op_result {
match state {
Expand Down Expand Up @@ -916,8 +940,13 @@ where
tx = %op.id(),
"handle_pure_network_message_v1: Processing PUT message"
);
let op_result =
handle_op_request::<put::PutOp, _>(&op_manager, &mut conn_manager, op).await;
let op_result = handle_op_request::<put::PutOp, _>(
&op_manager,
&mut conn_manager,
op,
source_addr,
)
.await;
tracing::debug!(
tx = %op.id(),
op_result_ok = op_result.is_ok(),
Expand Down Expand Up @@ -958,8 +987,13 @@ where
.await;
}
NetMessageV1::Get(ref op) => {
let op_result =
handle_op_request::<get::GetOp, _>(&op_manager, &mut conn_manager, op).await;
let op_result = handle_op_request::<get::GetOp, _>(
&op_manager,
&mut conn_manager,
op,
source_addr,
)
.await;

// Handle pending operation results (network concern)
if is_operation_completed(&op_result) {
Expand Down Expand Up @@ -995,9 +1029,13 @@ where
.await;
}
NetMessageV1::Update(ref op) => {
let op_result =
handle_op_request::<update::UpdateOp, _>(&op_manager, &mut conn_manager, op)
.await;
let op_result = handle_op_request::<update::UpdateOp, _>(
&op_manager,
&mut conn_manager,
op,
source_addr,
)
.await;

if let Err(OpError::OpNotAvailable(state)) = &op_result {
match state {
Expand Down Expand Up @@ -1026,6 +1064,7 @@ where
&op_manager,
&mut conn_manager,
op,
source_addr,
)
.await;

Expand Down Expand Up @@ -1174,7 +1213,7 @@ async fn handle_aborted_op(
{
let gateway = op.gateway().cloned();
if let Some(gateway) = gateway {
tracing::warn!("Retry connecting to gateway {}", gateway.peer);
tracing::warn!("Retry connecting to gateway {}", gateway.peer());
connect::join_ring_request(None, &gateway, op_manager).await?;
}
}
Expand Down
26 changes: 18 additions & 8 deletions crates/core/src/node/network_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
//! See [`../../architecture.md`](../../architecture.md) for its interactions with event loops and other components.

use std::future::Future;
use std::net::SocketAddr;

use either::Either;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::{self, Receiver, Sender};

use super::PeerId;
use crate::message::{NetMessage, NodeEvent};

mod handshake;
Expand All @@ -25,19 +25,29 @@ pub(crate) type ConnResult<T> = std::result::Result<T, ConnectionError>;

/// Allows handling of connections to the network as well as sending messages
/// to other peers in the network with whom connection has been established.
///
/// Connections are keyed by socket address since that's what identifies
/// a network connection. The cryptographic identity is handled separately
/// at the transport layer.
pub(crate) trait NetworkBridge: Send + Sync {
fn drop_connection(&mut self, peer: &PeerId) -> impl Future<Output = ConnResult<()>> + Send;

fn send(&self, target: &PeerId, msg: NetMessage)
-> impl Future<Output = ConnResult<()>> + Send;
fn drop_connection(
&mut self,
peer_addr: SocketAddr,
) -> impl Future<Output = ConnResult<()>> + Send;

fn send(
&self,
target_addr: SocketAddr,
msg: NetMessage,
) -> impl Future<Output = ConnResult<()>> + Send;
}

#[derive(Debug, thiserror::Error, Serialize, Deserialize)]
pub(crate) enum ConnectionError {
#[error("location unknown for this node")]
LocationUnknown,
#[error("unable to send message")]
SendNotCompleted(PeerId),
#[error("unable to send message to {0}")]
SendNotCompleted(SocketAddr),
#[error("Unexpected connection req")]
UnexpectedReq,
#[error("error while de/serializing message")]
Expand Down Expand Up @@ -76,7 +86,7 @@ impl Clone for ConnectionError {
match self {
Self::LocationUnknown => Self::LocationUnknown,
Self::Serialization(_) => Self::Serialization(None),
Self::SendNotCompleted(peer) => Self::SendNotCompleted(peer.clone()),
Self::SendNotCompleted(addr) => Self::SendNotCompleted(*addr),
Self::IOError(err) => Self::IOError(err.clone()),
Self::Timeout => Self::Timeout,
Self::UnexpectedReq => Self::UnexpectedReq,
Expand Down
Loading
Loading