Skip to content
Merged
Show file tree
Hide file tree
Changes from 84 commits
Commits
Show all changes
86 commits
Select commit Hold shift + click to select a range
23e7c94
ci: trigger workflow
sanity Nov 29, 2025
6db7a1c
ci: trigger workflow
sanity Nov 29, 2025
665c15d
ci: trigger workflow
sanity Nov 29, 2025
9e0c1c6
ci: trigger workflow
sanity Nov 29, 2025
0cb7aa4
ci: trigger workflow
sanity Nov 29, 2025
184f5dc
ci: trigger workflow
sanity Nov 29, 2025
b160fe7
ci: trigger workflow
sanity Nov 29, 2025
3210f83
ci: trigger workflow
sanity Nov 29, 2025
0d145f7
ci: trigger workflow
sanity Nov 29, 2025
953a94e
ci: trigger workflow
sanity Nov 29, 2025
492f3bd
ci: trigger workflow
sanity Nov 29, 2025
105bc12
ci: trigger workflow
sanity Nov 29, 2025
844f997
ci: trigger workflow
sanity Nov 29, 2025
ea6bd39
ci: trigger workflow
sanity Nov 29, 2025
4a8c31d
ci: trigger workflow
sanity Nov 29, 2025
9a2bf62
ci: trigger workflow
sanity Nov 29, 2025
a8660f3
ci: trigger workflow
sanity Nov 29, 2025
46bcb8d
ci: retrigger CI
sanity Nov 30, 2025
01cd810
ci: retrigger CI
sanity Nov 30, 2025
8704c31
ci: trigger workflow
sanity Nov 29, 2025
dd9d9e8
ci: trigger workflow
sanity Nov 29, 2025
05290cd
ci: trigger workflow
sanity Nov 29, 2025
46dbd73
ci: trigger workflow
sanity Nov 29, 2025
128a012
ci: trigger workflow
sanity Nov 29, 2025
08dfa32
ci: trigger workflow
sanity Nov 29, 2025
c09018c
ci: trigger workflow
sanity Nov 29, 2025
4055765
ci: trigger workflow
sanity Nov 29, 2025
37d81ac
ci: trigger workflow
sanity Nov 29, 2025
9a1fc72
ci: trigger workflow
sanity Nov 29, 2025
39aa8c6
ci: trigger workflow
sanity Nov 29, 2025
fdf245e
ci: trigger workflow
sanity Nov 29, 2025
4d44419
ci: retrigger CI
sanity Nov 30, 2025
442f79b
ci: retrigger CI
sanity Nov 30, 2025
3d50a69
ci: trigger workflow
sanity Nov 29, 2025
6e8edbd
ci: trigger workflow
sanity Nov 29, 2025
11df228
ci: trigger workflow
sanity Nov 29, 2025
f12734d
ci: trigger workflow
sanity Nov 29, 2025
6ca5c58
ci: trigger workflow
sanity Nov 29, 2025
1ec15bc
refactor(ring): restructure PeerKeyLocation to separate identity from…
sanity Nov 27, 2025
3bcb68e
ci: trigger workflow
sanity Nov 29, 2025
1969da5
refactor: wire protocol cleanup - remove sender fields from messages
sanity Nov 29, 2025
97c0e06
ci: trigger workflow
sanity Nov 29, 2025
8a35f1f
ci: trigger workflow
sanity Nov 29, 2025
ea42bed
ci: retrigger CI
sanity Nov 30, 2025
a67f088
ci: retrigger CI
sanity Nov 30, 2025
31a5f0a
ci: trigger workflow
sanity Nov 29, 2025
845a348
ci: trigger workflow
sanity Nov 29, 2025
90861db
ci: trigger workflow
sanity Nov 29, 2025
6d7add6
ci: trigger workflow
sanity Nov 29, 2025
bbe456c
ci: trigger workflow
sanity Nov 29, 2025
5d1e6c3
ci: trigger workflow
sanity Nov 29, 2025
319bd7c
ci: trigger workflow
sanity Nov 29, 2025
0f82d71
ci: trigger workflow
sanity Nov 29, 2025
9e0bc5e
ci: retrigger CI
sanity Nov 30, 2025
3238051
ci: retrigger CI
sanity Nov 30, 2025
8f104b7
ci: trigger workflow
sanity Nov 29, 2025
77d5eb7
ci: trigger workflow
sanity Nov 29, 2025
87ab2ab
ci: trigger workflow
sanity Nov 29, 2025
40b0c07
ci: trigger workflow
sanity Nov 29, 2025
9172101
refactor: wire protocol cleanup - remove sender fields from messages
sanity Nov 29, 2025
6d46ac6
refactor: migrate PeerKeyLocation field accesses to use new methods
sanity Nov 27, 2025
3cfc103
ci: trigger workflow
sanity Nov 29, 2025
1deecca
refactor: wire protocol cleanup - remove sender fields from messages
sanity Nov 29, 2025
46771ec
ci: retrigger CI
sanity Nov 30, 2025
e7b5a54
ci: retrigger CI
sanity Nov 30, 2025
24cabd9
refactor(ring): restructure PeerKeyLocation to separate identity from…
sanity Nov 27, 2025
223f45d
ci: trigger workflow
sanity Nov 29, 2025
773a11a
refactor: wire protocol cleanup - remove sender fields from messages
sanity Nov 29, 2025
bba9a02
ci: trigger workflow
sanity Nov 29, 2025
a67aaab
refactor: migrate PeerKeyLocation field accesses to use new methods
sanity Nov 27, 2025
e7a8b5e
ci: trigger workflow
sanity Nov 29, 2025
a730092
refactor: wire protocol cleanup - remove sender fields from messages
sanity Nov 29, 2025
bd3775a
ci: retrigger CI
sanity Nov 30, 2025
d88332e
ci: retrigger CI
sanity Nov 30, 2025
e282c13
refactor: migrate PeerKeyLocation field accesses to use new methods
sanity Nov 27, 2025
2f1c538
refactor(ring): restructure PeerKeyLocation to separate identity from…
sanity Nov 27, 2025
3c3e3d7
refactor: use ObservedAddr newtype for source_addr throughout
sanity Nov 29, 2025
7bbc163
ci: trigger workflow
sanity Nov 29, 2025
d5bf4ff
refactor: wire protocol cleanup - remove sender fields from messages
sanity Nov 29, 2025
942a45d
fix: allow parallel connection attempts (max 3 concurrent)
sanity Nov 30, 2025
2dccc75
test: add unit tests for active_transaction_count
sanity Nov 30, 2025
41d73f2
ci: retrigger CI
sanity Nov 30, 2025
ab1ee8d
fix: resolve API mismatches after rebase
sanity Dec 1, 2025
bf5356f
fix: use pub_key() instead of peer() in connect tracing
sanity Dec 1, 2025
f9a1f7a
fix: incorporate PR #2172 fixes into PR #2174
sanity Dec 1, 2025
c57e118
fix: fill subscriber address before calling .peer() to avoid panic
sanity Dec 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 23 additions & 50 deletions crates/core/src/operations/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use crate::{
message::{InnerMessage, NetMessage, Transaction},
node::{NetworkBridge, OpManager, PeerId},
ring::{Location, PeerKeyLocation, RingError},
transport::ObservedAddr,
};
use freenet_stdlib::{
client_api::{ContractResponse, ErrorKind, HostResponse},
Expand Down Expand Up @@ -275,7 +274,7 @@ async fn complete_local_subscription(
key: ContractKey,
) -> Result<(), OpError> {
let subscriber = op_manager.ring.connection_manager.own_location();
// Local subscription - no upstream_addr needed since it's our own peer
// Local subscription - no upstream NAT address
if let Err(err) = op_manager
.ring
.add_subscriber(&key, subscriber.clone(), None)
Expand Down Expand Up @@ -310,7 +309,7 @@ pub(crate) struct SubscribeOp {
state: Option<SubscribeState>,
/// The address we received this operation's message from.
/// Used for connection-based routing: responses are sent back to this address.
upstream_addr: Option<ObservedAddr>,
upstream_addr: Option<std::net::SocketAddr>,
}

impl SubscribeOp {
Expand Down Expand Up @@ -364,16 +363,11 @@ impl Operation for SubscribeOp {
}
Ok(None) => {
// new request to subscribe to a contract, initialize the machine
tracing::debug!(
tx = %id,
?source_addr,
"subscribe: load_or_init creating new op with source_addr as upstream_addr"
);
Ok(OpInitialization {
op: Self {
state: Some(SubscribeState::ReceivedRequest),
id,
upstream_addr: source_addr.map(ObservedAddr::new), // Connection-based routing: store who sent us this request
upstream_addr: source_addr, // Connection-based routing: store who sent us this request
},
source_addr,
})
Expand Down Expand Up @@ -413,19 +407,20 @@ impl Operation for SubscribeOp {
target: _,
subscriber,
} => {
// ALWAYS use the transport-level source address when available.
// This is critical for NAT peers: they may embed a "known" but wrong address
// (e.g., 127.0.0.1:31337 for loopback). The transport address is the only
// reliable way to route responses back through the NAT.
// Fill in subscriber's external address from transport layer if unknown.
// This is the key step where the first recipient (gateway) determines the
// subscriber's external address from the actual packet source address.
let mut subscriber = subscriber.clone();
if let Some(addr) = source_addr {
subscriber.set_addr(addr);
tracing::debug!(
tx = %id,
%key,
subscriber_addr = %addr,
"subscribe: using transport source_addr for subscriber"
);
if subscriber.peer_addr.is_unknown() {
if let Some(addr) = source_addr {
subscriber.set_addr(addr);
tracing::debug!(
tx = %id,
%key,
subscriber_addr = %addr,
"subscribe: filled subscriber address from source_addr"
);
}
}
Copy link

Copilot AI Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This behavioral change from the previous implementation removes protection against known-but-incorrect NAT addresses. The old code (visible in removed lines elsewhere in this diff) would always use the transport source address when available, specifically to handle cases where NAT peers embed known-but-wrong addresses (e.g., 127.0.0.1:31337 or internal network addresses like 192.168.x.x).

The new conditional logic only fills in addresses marked as PeerAddr::Unknown, meaning if a NAT peer mistakenly sends PeerAddr::Known(127.0.0.1:31337) instead of PeerAddr::Unknown, the loopback address will not be corrected.

While this change aligns subscribe.rs with the pattern used in connect.rs and put.rs, it removes a safety mechanism that was explicitly documented as "critical for NAT peers" in the old code comments. This could break existing clients behind NAT that haven't been updated to send PeerAddr::Unknown when they don't know their external address.

Consider either:

  1. Reverting to unconditional address correction for backward compatibility, or
  2. Adding validation to reject obviously incorrect addresses (loopback, private ranges) when received from remote peers, or
  3. Documenting this as a breaking change requiring client updates

Copilot uses AI. Check for mistakes.

tracing::debug!(
Expand Down Expand Up @@ -460,10 +455,10 @@ impl Operation for SubscribeOp {
"subscribe: handling RequestSub locally (contract available)"
);

// Use upstream_addr for NAT routing - subscriber may embed wrong address
// Local registration - no upstream NAT address
if op_manager
.ring
.add_subscriber(key, subscriber.clone(), self.upstream_addr)
.add_subscriber(key, subscriber.clone(), None)
.is_err()
{
tracing::warn!(
Expand Down Expand Up @@ -530,13 +525,6 @@ impl Operation for SubscribeOp {
subscribed: true,
};

tracing::debug!(
tx = %id,
%key,
upstream_addr = ?self.upstream_addr,
"subscribe: creating ReturnSub with upstream_addr"
);

return build_op_result(
self.id,
None,
Expand Down Expand Up @@ -739,10 +727,7 @@ impl Operation for SubscribeOp {
subscribers_before = ?before_direct,
"subscribe: attempting to register direct subscriber"
);
// Pass None: subscriber address was already corrected by Gateway at the
// start of the subscribe flow. Using self.upstream_addr here would
// incorrectly overwrite with the forwarder's address instead of the
// original subscriber's Gateway-corrected address.
// Local registration - no upstream NAT address
if op_manager
.ring
.add_subscriber(key, subscriber.clone(), None)
Expand Down Expand Up @@ -893,7 +878,7 @@ impl Operation for SubscribeOp {
subscribers_before = ?before_upstream,
"subscribe: attempting to register upstream link"
);
// upstream_subscriber was stored in op state, no transport address available
// Local registration - no upstream NAT address
if op_manager
.ring
.add_subscriber(key, upstream_subscriber.clone(), None)
Expand Down Expand Up @@ -926,10 +911,7 @@ impl Operation for SubscribeOp {
subscribers_before = ?before_provider,
"subscribe: registering provider/subscription source"
);
// Pass None: sender was already looked up from source_addr (line ~866),
// so it has the correct transport address. Using self.upstream_addr
// would incorrectly use the original requester's address instead of
// the provider's address.
// Local registration - no upstream NAT address
if op_manager
.ring
.add_subscriber(key, sender.clone(), None)
Expand Down Expand Up @@ -991,26 +973,17 @@ fn build_op_result(
id: Transaction,
state: Option<SubscribeState>,
msg: Option<SubscribeMsg>,
upstream_addr: Option<ObservedAddr>,
upstream_addr: Option<std::net::SocketAddr>,
) -> Result<OperationResult, OpError> {
// For response messages (ReturnSub), use upstream_addr directly for routing.
// This is more reliable than extracting from the message's target field, which
// may have been looked up from connection_manager (subject to race conditions).
// For forward messages (SeekNode, RequestSub, FetchRouting), use the message's target.
let target_addr = match &msg {
// Convert ObservedAddr to SocketAddr at the transport boundary
Some(SubscribeMsg::ReturnSub { .. }) => upstream_addr.map(|a| a.socket_addr()),
Some(SubscribeMsg::ReturnSub { .. }) => upstream_addr,
_ => msg.as_ref().and_then(|m| m.target_addr()),
};

tracing::debug!(
tx = %id,
msg_type = ?msg.as_ref().map(|m| std::any::type_name_of_val(m)),
?upstream_addr,
?target_addr,
"build_op_result: computed target_addr"
);

let output_op = state.map(|state| SubscribeOp {
id,
state: Some(state),
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1005,7 +1005,7 @@ pub(crate) async fn request_update(
.closest_potentially_caching(&key, [sender.peer().clone()].as_slice());

if let Some(target) = remote_target {
// Subscribe to the contract - sender is ourselves, no upstream_addr needed
// Subscribe to the contract (local subscription, no upstream NAT addr)
op_manager
.ring
.add_subscriber(&key, sender.clone(), None)
Expand Down
68 changes: 64 additions & 4 deletions crates/core/src/ring/live_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,71 @@ impl LiveTransactionTracker {
self.tx_per_peer.contains_key(&peer_addr)
}

pub(crate) fn still_alive(&self, tx: &Transaction) -> bool {
self.tx_per_peer.iter().any(|e| e.value().contains(tx))
}

pub(crate) fn len(&self) -> usize {
self.tx_per_peer.len()
}

/// Returns the total number of active transactions across all peers.
pub(crate) fn active_transaction_count(&self) -> usize {
self.tx_per_peer
.iter()
.map(|entry| entry.value().len())
.sum()
}
Comment on lines +54 to +60
Copy link

Copilot AI Nov 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new active_transaction_count() method lacks test coverage. Consider adding unit tests to verify:

  • Returns 0 when no transactions are tracked
  • Correctly counts a single transaction for one peer
  • Correctly sums transactions across multiple peers
  • Updates correctly after transactions are removed via remove_finished_transaction()

Example test:

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn active_transaction_count_empty() {
        let tracker = LiveTransactionTracker::new();
        assert_eq!(tracker.active_transaction_count(), 0);
    }

    #[test]
    fn active_transaction_count_multiple_peers() {
        let tracker = LiveTransactionTracker::new();
        let addr1 = "127.0.0.1:8080".parse().unwrap();
        let addr2 = "127.0.0.1:8081".parse().unwrap();
        
        tracker.add_transaction(addr1, Transaction::new::<()>());
        tracker.add_transaction(addr1, Transaction::new::<()>());
        tracker.add_transaction(addr2, Transaction::new::<()>());
        
        assert_eq!(tracker.active_transaction_count(), 3);
    }
}

Copilot uses AI. Check for mistakes.
}

#[cfg(test)]
mod tests {
use super::*;
use crate::operations::connect::ConnectMsg;

#[test]
fn active_transaction_count_empty() {
let tracker = LiveTransactionTracker::new();
assert_eq!(tracker.active_transaction_count(), 0);
}

#[test]
fn active_transaction_count_single_peer() {
let tracker = LiveTransactionTracker::new();
let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();

tracker.add_transaction(addr, Transaction::new::<ConnectMsg>());
assert_eq!(tracker.active_transaction_count(), 1);

tracker.add_transaction(addr, Transaction::new::<ConnectMsg>());
assert_eq!(tracker.active_transaction_count(), 2);
}

#[test]
fn active_transaction_count_multiple_peers() {
let tracker = LiveTransactionTracker::new();
let addr1: SocketAddr = "127.0.0.1:8080".parse().unwrap();
let addr2: SocketAddr = "127.0.0.1:8081".parse().unwrap();

tracker.add_transaction(addr1, Transaction::new::<ConnectMsg>());
tracker.add_transaction(addr1, Transaction::new::<ConnectMsg>());
tracker.add_transaction(addr2, Transaction::new::<ConnectMsg>());

assert_eq!(tracker.active_transaction_count(), 3);
}

#[test]
fn active_transaction_count_after_removal() {
let tracker = LiveTransactionTracker::new();
let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();

let tx1 = Transaction::new::<ConnectMsg>();
let tx2 = Transaction::new::<ConnectMsg>();

tracker.add_transaction(addr, tx1);
tracker.add_transaction(addr, tx2);
assert_eq!(tracker.active_transaction_count(), 2);

tracker.remove_finished_transaction(tx1);
assert_eq!(tracker.active_transaction_count(), 1);

tracker.remove_finished_transaction(tx2);
assert_eq!(tracker.active_transaction_count(), 0);
}
}
35 changes: 21 additions & 14 deletions crates/core/src/ring/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,8 @@ impl Ring {
///
/// The `upstream_addr` parameter is the transport-level address from which the subscribe
/// message was received. This is used instead of the address embedded in `subscriber`
/// because NAT peers may embed incorrect addresses in their messages.
/// because NAT peers may embed incorrect (e.g., loopback) addresses in their messages.
/// The transport address is the only reliable way to route back to them.
pub fn add_subscriber(
&self,
contract: &ContractKey,
Expand Down Expand Up @@ -389,6 +390,11 @@ impl Ring {

const REGENERATE_DENSITY_MAP_INTERVAL: Duration = Duration::from_secs(60);

/// Maximum number of concurrent connection acquisition attempts.
/// Allows parallel connection attempts to speed up network formation
/// instead of serial blocking on a single connection at a time.
const MAX_CONCURRENT_CONNECTIONS: usize = 3;

let mut check_interval = tokio::time::interval(CHECK_TICK_DURATION);
check_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let mut refresh_density_map = tokio::time::interval(REGENERATE_DENSITY_MAP_INTERVAL);
Expand All @@ -399,7 +405,6 @@ impl Ring {
tokio::time::sleep(Duration::from_secs(2)).await;
tracing::info!("Connection maintenance task: initial sleep completed");

let mut live_tx = None;
let mut pending_conn_adds = BTreeSet::new();
let mut this_peer = None;
loop {
Expand All @@ -422,20 +427,17 @@ impl Ring {
let mut skip_list = HashSet::new();
skip_list.insert(this_peer);

// if there are no open connections, we need to acquire more
if let Some(tx) = &live_tx {
if !live_tx_tracker.still_alive(tx) {
let _ = live_tx.take();
}
}

// Acquire new connections up to MAX_CONCURRENT_CONNECTIONS limit
let active_count = live_tx_tracker.active_transaction_count();
if let Some(ideal_location) = pending_conn_adds.pop_first() {
if live_tx.is_none() {
if active_count < MAX_CONCURRENT_CONNECTIONS {
tracing::info!(
active_connections = active_count,
max_concurrent = MAX_CONCURRENT_CONNECTIONS,
"Attempting to acquire new connection for location: {:?}",
ideal_location
);
live_tx = self
let tx = self
.acquire_new(
ideal_location,
&skip_list,
Expand All @@ -451,18 +453,23 @@ impl Ring {
);
error
})?;
if live_tx.is_none() {
if tx.is_none() {
let conns = self.connection_manager.connection_count();
tracing::warn!(
"acquire_new returned None - likely no peers to query through (connections: {})",
conns
);
} else {
tracing::info!("Successfully initiated connection acquisition");
tracing::info!(
active_connections = active_count + 1,
"Successfully initiated connection acquisition"
);
}
} else {
tracing::debug!(
"Skipping connection attempt - live transaction still active, re-queuing location {}",
active_connections = active_count,
max_concurrent = MAX_CONCURRENT_CONNECTIONS,
"At max concurrent connections, re-queuing location {}",
ideal_location
);
pending_conn_adds.insert(ideal_location);
Expand Down
24 changes: 9 additions & 15 deletions crates/core/src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,7 @@ type MessagePayload = Vec<u8>;

type PacketId = u32;

pub use self::crypto::{TransportKeypair, TransportPublicKey};
pub(crate) use self::{
connection_handler::{
create_connection_handler, InboundConnectionHandler, OutboundConnectionHandler,
},
peer_connection::PeerConnection,
};

/// Address observed at the transport layer (from UDP packet source).
///
/// A wrapper around SocketAddr that represents an address observed at the transport layer.
/// This is the "ground truth" for NAT scenarios - it's the actual address we see
/// at the network layer, not what the peer claims in protocol messages.
///
Expand All @@ -44,11 +35,6 @@ pub(crate) use self::{
pub struct ObservedAddr(SocketAddr);

impl ObservedAddr {
/// Create a new observed address from a socket address.
pub fn new(addr: SocketAddr) -> Self {
Self(addr)
}

/// Get the underlying socket address.
pub fn socket_addr(&self) -> SocketAddr {
self.0
Expand All @@ -67,6 +53,14 @@ impl From<SocketAddr> for ObservedAddr {
}
}

pub use self::crypto::{TransportKeypair, TransportPublicKey};
pub(crate) use self::{
connection_handler::{
create_connection_handler, InboundConnectionHandler, OutboundConnectionHandler,
},
peer_connection::PeerConnection,
};

#[derive(Debug, thiserror::Error)]
pub(crate) enum TransportError {
#[error("transport handler channel closed, socket likely closed")]
Expand Down
Loading