Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/freenet-ping/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ members = ["contracts/ping", "app", "types"]

[workspace.dependencies]
# freenet-stdlib = { path = "./../../stdlib/rust", features = ["contract"] }
freenet-stdlib = { version = "0.1.24" }
freenet-stdlib = { version = "0.1.14" }
freenet-ping-types = { path = "types", default-features = false }
chrono = { version = "0.4", default-features = false }
testresult = "0.4"
Expand Down
2 changes: 1 addition & 1 deletion apps/freenet-ping/app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ testing = ["freenet-stdlib/testing", "freenet/testing"]
anyhow = "1.0"
chrono = { workspace = true, features = ["default"] }
clap = { version = "4.5", features = ["derive"] }
freenet-stdlib = { version = "0.1.24", features = ["net"] }
freenet-stdlib = { version = "0.1.22", features = ["net"] }
freenet-ping-types = { path = "../types", features = ["std", "clap"] }
futures = "0.3.31"
rand = "0.9.2"
Expand Down
130 changes: 107 additions & 23 deletions crates/core/src/node/network_bridge/p2p_protoc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::{
sync::Arc,
};
use tokio::net::UdpSocket;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::sync::mpsc::{self, error::TryRecvError, Receiver, Sender};
use tokio::sync::oneshot::{self};
use tokio::time::timeout;
use tracing::Instrument;
Expand Down Expand Up @@ -928,6 +928,12 @@ impl P2pConnManager {
op_manager: &Arc<OpManager>,
state: &mut EventListenerState,
) -> anyhow::Result<()> {
let tx = *msg.id();
tracing::debug!(
%tx,
tx_type = ?tx.transaction_type(),
"Handling inbound NetMessage at event loop"
);
match msg {
NetMessage::V1(NetMessageV1::Aborted(tx)) => {
handle_aborted_op(tx, op_manager, &self.gateways).await?;
Expand Down Expand Up @@ -1277,6 +1283,7 @@ impl P2pConnManager {
Some(Ok(peer_conn)) => {
// Get the remote address from the connection
let remote_addr = peer_conn.conn.remote_addr();
let tx = *peer_conn.msg.id();

// Check if we need to establish a connection back to the sender
let should_connect = !self.connections.keys().any(|peer| peer.addr == remote_addr)
Expand Down Expand Up @@ -1307,6 +1314,12 @@ impl P2pConnManager {
}
}

tracing::debug!(
peer_addr = %remote_addr,
%tx,
tx_type = ?tx.transaction_type(),
"Queueing inbound NetMessage from peer connection"
);
let task = peer_connection_listener(peer_conn.rx, peer_conn.conn).boxed();
select_stream.push_peer_connection(task);
Ok(EventResult::Event(
Expand Down Expand Up @@ -1587,45 +1600,116 @@ pub(super) struct PeerConnectionInbound {
pub msg: NetMessage,
}

async fn handle_peer_channel_message(
conn: &mut PeerConnection,
msg: Either<NetMessage, ConnEvent>,
) -> Result<(), TransportError> {
match msg {
Left(msg) => {
tracing::debug!(to=%conn.remote_addr() ,"Sending message to peer. Msg: {msg}");
if let Err(error) = conn.send(msg).await {
tracing::error!(
to = %conn.remote_addr(),
?error,
"[CONN_LIFECYCLE] Failed to send message to peer"
);
return Err(error);
}
tracing::debug!(
to = %conn.remote_addr(),
"[CONN_LIFECYCLE] Message enqueued on transport socket"
);
}
Right(action) => {
tracing::debug!(to=%conn.remote_addr(), "Received action from channel");
match action {
ConnEvent::NodeAction(NodeEvent::DropConnection(peer)) => {
tracing::info!(
to = %conn.remote_addr(),
peer = %peer,
"[CONN_LIFECYCLE] Closing connection per DropConnection action"
);
return Err(TransportError::ConnectionClosed(conn.remote_addr()));
}
ConnEvent::ClosedChannel(reason) => {
tracing::info!(
to = %conn.remote_addr(),
reason = ?reason,
"[CONN_LIFECYCLE] Closing connection due to ClosedChannel action"
);
return Err(TransportError::ConnectionClosed(conn.remote_addr()));
}
other => {
unreachable!(
"Unexpected action from peer_connection_listener channel: {:?}",
other
);
}
}
}
}
Ok(())
}

async fn peer_connection_listener(
mut rx: PeerConnChannelRecv,
mut conn: PeerConnection,
) -> Result<PeerConnectionInbound, TransportError> {
const MAX_IMMEDIATE_SENDS: usize = 32;
loop {
tokio::select! {
msg = rx.recv() => {
let Some(msg) = msg else { break Err(TransportError::ConnectionClosed(conn.remote_addr())); };
match msg {
Left(msg) => {
tracing::debug!(to=%conn.remote_addr() ,"Sending message to peer. Msg: {msg}");
conn
.send(msg)
.await?;
}
Right(action) => {
tracing::debug!(to=%conn.remote_addr(), "Received action from channel");
match action {
ConnEvent::NodeAction(NodeEvent::DropConnection(_))
| ConnEvent::ClosedChannel(_) => {
break Err(TransportError::ConnectionClosed(conn.remote_addr()));
}
other => {
unreachable!("Unexpected action from peer_connection_listener channel: {:?}", other);
}
}
let mut drained = 0;
loop {
match rx.try_recv() {
Ok(msg) => {
handle_peer_channel_message(&mut conn, msg).await?;
drained += 1;
if drained >= MAX_IMMEDIATE_SENDS {
break;
}
}
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => {
tracing::warn!(
to = %conn.remote_addr(),
"[CONN_LIFECYCLE] peer_connection_listener channel closed without explicit DropConnection"
);
return Err(TransportError::ConnectionClosed(conn.remote_addr()));
}
}
}

tokio::select! {
msg = rx.recv() => {
let Some(msg) = msg else {
tracing::warn!(
to = %conn.remote_addr(),
"[CONN_LIFECYCLE] peer_connection_listener channel closed without explicit DropConnection"
);
break Err(TransportError::ConnectionClosed(conn.remote_addr()));
};
handle_peer_channel_message(&mut conn, msg).await?;
}
msg = conn.recv() => {
let Ok(msg) = msg
.inspect_err(|error| {
tracing::error!(from=%conn.remote_addr(), "Error while receiving message: {error}");
})
else {
tracing::debug!(
from = %conn.remote_addr(),
"[CONN_LIFECYCLE] peer_connection_listener terminating after recv error"
);
break Err(TransportError::ConnectionClosed(conn.remote_addr()));
};
let net_message = decode_msg(&msg).unwrap();
tracing::debug!(from=%conn.remote_addr() ,"Received message from peer. Msg: {net_message}");
let tx = *net_message.id();
tracing::debug!(
from = %conn.remote_addr(),
%tx,
tx_type = ?tx.transaction_type(),
msg_type = %net_message,
"[CONN_LIFECYCLE] Received inbound NetMessage from peer"
);
break Ok(PeerConnectionInbound { conn, rx, msg: net_message });
}
}
Expand Down
Loading
Loading