diff --git a/examples/chat.rs b/examples/chat.rs index cd9551bb..7d6a4e1d 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -64,6 +64,7 @@ async fn main() -> Result<()> { // Create and start the node let node = P2PNode::new(config).await?; + node.start().await?; // Handle bootstrap peers let mut bootstrap_addrs: Vec = Vec::new(); diff --git a/examples/test_network.rs b/examples/test_network.rs index 2a1f1649..1a30db63 100644 --- a/examples/test_network.rs +++ b/examples/test_network.rs @@ -71,6 +71,7 @@ impl TestNode { .await .context("Failed to create P2P node")?, ); + node.start().await.context("Failed to start P2P node")?; // Get actual listen addresses after node creation let actual_addrs = node.listen_addrs().await; diff --git a/src/messaging/service.rs b/src/messaging/service.rs index fc85cb7a..594283be 100644 --- a/src/messaging/service.rs +++ b/src/messaging/service.rs @@ -293,6 +293,7 @@ impl MessagingService { } let node = crate::network::P2PNode::new(node_config).await?; + node.start().await?; Arc::new(node) }; let transport = Arc::new(MessageTransport::new(network, dht_client.clone()).await?); diff --git a/src/network.rs b/src/network.rs index 450d42db..88d6fc9c 100644 --- a/src/network.rs +++ b/src/network.rs @@ -26,7 +26,7 @@ use crate::identity::rejection::RejectionReason; use crate::security::GeoProvider; use crate::production::{ProductionConfig, ResourceManager, ResourceMetrics}; -use crate::transport::ant_quic_adapter::DualStackNetworkNode; +use crate::transport::ant_quic_adapter::{DualStackNetworkNode, ant_peer_id_to_string}; #[allow(unused_imports)] // Temporarily unused during migration use crate::transport::{TransportOptions, TransportType}; use crate::validation::RateLimitConfig; @@ -38,9 +38,12 @@ use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; use tokio::sync::{RwLock, broadcast}; -use tokio::time::Instant; +use tokio::time::{Instant, interval}; use tracing::{debug, error, info, trace, warn}; +/// Payload bytes used for keepalive messages to prevent connection timeouts. +const KEEPALIVE_PAYLOAD: &[u8] = b"keepalive"; + /// Configuration for a P2P node #[derive(Debug, Clone, Serialize, Deserialize)] pub struct NodeConfig { @@ -1034,17 +1037,7 @@ impl P2PNode { entangled_id: None, binary_hash, }; - info!("Created P2P node with peer ID: {}", node.peer_id); - - // Start the network listeners to populate listen addresses - node.start_network_listeners().await?; - - // Update the connection monitor with actual peers reference - node.start_connection_monitor().await; - - // Start message receiving system so messages work immediately after node creation - // This is critical for basic P2P messaging to work - node.start_message_receiving_system().await?; + info!("Created P2P node with peer ID: {} (call start() to begin networking)", node.peer_id); Ok(node) } @@ -1156,12 +1149,13 @@ impl P2PNode { // Start listening on configured addresses using transport layer self.start_network_listeners().await?; + // Update the connection monitor with actual peers reference + self.start_connection_monitor().await; + // Log current listen addresses let listen_addrs = self.listen_addrs.read().await; info!("P2P node started on addresses: {:?}", *listen_addrs); - // MCP removed - // Start message receiving system self.start_message_receiving_system().await?; @@ -1195,8 +1189,7 @@ impl P2PNode { loop { match dual.accept_any().await { Ok((ant_peer_id, remote_sock)) => { - let peer_id = - crate::transport::ant_quic_adapter::ant_peer_id_to_string(&ant_peer_id); + let peer_id = ant_peer_id_to_string(&ant_peer_id); let remote_addr = NetworkAddress::from(remote_sock); // Optional: basic IP rate limiting let _ = rate_limiter.check_ip(&remote_sock.ip()); @@ -1227,55 +1220,29 @@ impl P2PNode { loop { match dual.receive_any().await { Ok((peer_id, bytes)) => { - info!("Received {} bytes from peer {}", bytes.len(), peer_id); + // Convert transport-level PeerId to the hex string used in + // the peers map / active_connections so that consumers of + // P2PEvent::Message can call send_message() with the source. + let transport_peer_id = ant_peer_id_to_string(&peer_id); + info!( + "Received {} bytes from peer {}", + bytes.len(), + transport_peer_id + ); // Skip keepalive and other protocol control messages - if bytes == b"keepalive" { - trace!("Received keepalive from {}", peer_id); + if bytes == KEEPALIVE_PAYLOAD { + trace!("Received keepalive from {}", transport_peer_id); continue; } // Expect the JSON message wrapper from create_protocol_message - match serde_json::from_slice::(&bytes) { - Ok(value) => { - if let (Some(protocol), Some(data), Some(from)) = ( - value.get("protocol").and_then(|v| v.as_str()), - value.get("data").and_then(|v| v.as_array()), - value.get("from").and_then(|v| v.as_str()), - ) { - let payload: Vec = data - .iter() - .filter_map(|v| v.as_u64().map(|n| n as u8)) - .collect(); - debug!( - "Emitting P2PEvent::Message - topic: {}, from: {}, payload_len: {}", - protocol, - from, - payload.len() - ); - let _ = event_tx.send(P2PEvent::Message { - topic: protocol.to_string(), - source: from.to_string(), - data: payload, - }); - } else { - debug!( - "Message missing required fields. protocol={:?}, data={:?}, from={:?}", - value.get("protocol"), - value.get("data").is_some(), - value.get("from") - ); - } + match parse_protocol_message(&bytes, &transport_peer_id) { + Some(event) => { + let _ = event_tx.send(event); } - Err(e) => { - // Log first 100 bytes to help debug - let preview: Vec = bytes.iter().take(100).copied().collect(); - warn!( - "Failed to parse message JSON: {} (received {} bytes): {:?}", - e, - bytes.len(), - preview - ); + None => { + warn!("Failed to parse protocol message ({} bytes)", bytes.len()); } } } @@ -1303,50 +1270,6 @@ impl P2PNode { Ok(()) } - /// Handle a received message and generate appropriate events - #[allow(dead_code)] - async fn handle_received_message( - &self, - message_data: Vec, - peer_id: &PeerId, - _protocol: &str, - event_tx: &broadcast::Sender, - ) -> Result<()> { - // MCP removed: no special protocol handling - - // Parse the message format we created in create_protocol_message - match serde_json::from_slice::(&message_data) { - Ok(message) => { - if let (Some(protocol), Some(data), Some(from)) = ( - message.get("protocol").and_then(|v| v.as_str()), - message.get("data").and_then(|v| v.as_array()), - message.get("from").and_then(|v| v.as_str()), - ) { - // Convert data array back to bytes - let data_bytes: Vec = data - .iter() - .filter_map(|v| v.as_u64().map(|n| n as u8)) - .collect(); - - // Generate message event - let event = P2PEvent::Message { - topic: protocol.to_string(), - source: from.to_string(), - data: data_bytes, - }; - - let _ = event_tx.send(event); - debug!("Generated message event from peer: {}", peer_id); - } - } - Err(e) => { - warn!("Failed to parse received message from {}: {}", peer_id, e); - } - } - - Ok(()) - } - // MCP removed // MCP removed @@ -1561,8 +1484,7 @@ impl P2PNode { .await { Ok(Ok(peer)) => { - let connected_peer_id = - crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer); + let connected_peer_id = ant_peer_id_to_string(&peer); info!("Successfully connected to peer: {}", connected_peer_id); // Prevent self-connections by checking if remote peer_id matches our own @@ -1816,6 +1738,42 @@ fn create_protocol_message_static(protocol: &str, data: Vec) -> Result Option { + let value: serde_json::Value = serde_json::from_slice(bytes).ok()?; + let protocol = value.get("protocol")?.as_str()?; + let data = value.get("data")?.as_array()?; + // "from" is a required protocol field (messages without it are dropped), + // but we use the transport peer ID as the authoritative source. + let logical_from = value.get("from")?.as_str()?; + let payload: Vec = data + .iter() + .filter_map(|v| v.as_u64().map(|n| n as u8)) + .collect(); + debug!( + "Parsed P2PEvent::Message - topic: {}, source: {} (logical: {}), payload_len: {}", + protocol, + source, + logical_from, + payload.len() + ); + Some(P2PEvent::Message { + topic: protocol.to_string(), + source: source.to_string(), + data: payload, + }) +} + impl P2PNode { /// Subscribe to network events pub fn subscribe_events(&self) -> broadcast::Receiver { @@ -2059,8 +2017,7 @@ impl P2PNode { peer_id, remote_address, } => { - let peer_id_str = - crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id); + let peer_id_str = ant_peer_id_to_string(&peer_id); debug!( "Connection established: peer={}, addr={}", peer_id_str, remote_address @@ -2119,8 +2076,7 @@ impl P2PNode { let _ = event_tx.send(P2PEvent::PeerConnected(peer_id_str)); } ConnectionEvent::Lost { peer_id, reason } => { - let peer_id_str = - crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id); + let peer_id_str = ant_peer_id_to_string(&peer_id); debug!("Connection lost: peer={}, reason={}", peer_id_str, reason); // Remove from active connections @@ -2136,8 +2092,7 @@ impl P2PNode { let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str)); } ConnectionEvent::Failed { peer_id, reason } => { - let peer_id_str = - crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id); + let peer_id_str = ant_peer_id_to_string(&peer_id); debug!("Connection failed: peer={}, reason={}", peer_id_str, reason); // Remove from active connections @@ -2195,8 +2150,7 @@ impl P2PNode { peer_id, remote_address, } => { - let peer_id_str = - crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id); + let peer_id_str = ant_peer_id_to_string(&peer_id); debug!( "Connection established: peer={}, addr={}", peer_id_str, remote_address @@ -2298,8 +2252,7 @@ impl P2PNode { let _ = event_tx.send(P2PEvent::PeerConnected(peer_id_str)); } ConnectionEvent::Lost { peer_id, reason } => { - let peer_id_str = - crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id); + let peer_id_str = ant_peer_id_to_string(&peer_id); debug!("Connection lost: peer={}, reason={}", peer_id_str, reason); // Remove from active connections @@ -2315,8 +2268,7 @@ impl P2PNode { let _ = event_tx.send(P2PEvent::PeerDisconnected(peer_id_str)); } ConnectionEvent::Failed { peer_id, reason } => { - let peer_id_str = - crate::transport::ant_quic_adapter::ant_peer_id_to_string(&peer_id); + let peer_id_str = ant_peer_id_to_string(&peer_id); warn!("Connection failed: peer={}, reason={}", peer_id_str, reason); // Remove from active connections @@ -2367,10 +2319,7 @@ impl P2PNode { dual_node: Arc, shutdown: Arc, ) { - use tokio::time::{Duration, interval}; - const KEEPALIVE_INTERVAL_SECS: u64 = 15; // Half of 30-second timeout - const KEEPALIVE_PAYLOAD: &[u8] = b"keepalive"; // Small payload let mut interval = interval(Duration::from_secs(KEEPALIVE_INTERVAL_SECS)); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); @@ -2436,8 +2385,6 @@ impl P2PNode { stale_threshold: Duration, shutdown: Arc, ) { - use tokio::time::interval; - let cleanup_threshold = stale_threshold * 2; let mut interval = interval(Duration::from_millis(100)); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); @@ -4051,4 +3998,101 @@ mod tests { let normalized_v4 = normalize_wildcard_to_loopback(loopback_v4); assert_eq!(normalized_v4, loopback_v4); } + + // ---- parse_protocol_message regression tests ---- + + #[test] + fn test_parse_protocol_message_uses_transport_peer_id_as_source() { + // Regression: P2PEvent::Message.source must be the transport peer ID, + // NOT the "from" field from the JSON payload. This ensures consumers + // can pass source directly to send_message(). + let transport_id = "abcdef0123456789"; + let logical_id = "spoofed-logical-id"; + let msg = serde_json::json!({ + "protocol": "test/v1", + "data": [1, 2, 3], + "from": logical_id, + }); + let bytes = serde_json::to_vec(&msg).unwrap(); + + let event = + parse_protocol_message(&bytes, transport_id).expect("valid message should parse"); + + match event { + P2PEvent::Message { + topic, + source, + data, + } => { + assert_eq!(source, transport_id, "source must be the transport peer ID"); + assert_ne!( + source, logical_id, + "source must NOT be the logical 'from' field" + ); + assert_eq!(topic, "test/v1"); + assert_eq!(data, vec![1u8, 2, 3]); + } + other => panic!("expected P2PEvent::Message, got {:?}", other), + } + } + + #[test] + fn test_parse_protocol_message_rejects_missing_from() { + // "from" is a required protocol field; messages without it are dropped. + let msg = serde_json::json!({ + "protocol": "test/v1", + "data": [1, 2, 3], + }); + let bytes = serde_json::to_vec(&msg).unwrap(); + + assert!( + parse_protocol_message(&bytes, "peer-id").is_none(), + "message missing 'from' must be rejected" + ); + } + + #[test] + fn test_parse_protocol_message_rejects_missing_protocol() { + let msg = serde_json::json!({ + "data": [1, 2, 3], + "from": "sender", + }); + let bytes = serde_json::to_vec(&msg).unwrap(); + + assert!(parse_protocol_message(&bytes, "peer-id").is_none()); + } + + #[test] + fn test_parse_protocol_message_rejects_missing_data() { + let msg = serde_json::json!({ + "protocol": "test/v1", + "from": "sender", + }); + let bytes = serde_json::to_vec(&msg).unwrap(); + + assert!(parse_protocol_message(&bytes, "peer-id").is_none()); + } + + #[test] + fn test_parse_protocol_message_rejects_invalid_json() { + assert!(parse_protocol_message(b"not json", "peer-id").is_none()); + } + + #[test] + fn test_parse_protocol_message_empty_payload() { + let msg = serde_json::json!({ + "protocol": "ping", + "data": [], + "from": "sender", + }); + let bytes = serde_json::to_vec(&msg).unwrap(); + + let event = parse_protocol_message(&bytes, "transport-peer") + .expect("valid message with empty data should parse"); + + match event { + P2PEvent::Message { data, .. } => assert!(data.is_empty()), + other => panic!("expected P2PEvent::Message, got {:?}", other), + } + } } diff --git a/tests/connection_lifecycle_integration_test.rs b/tests/connection_lifecycle_integration_test.rs index 3323822c..022e2bd5 100644 --- a/tests/connection_lifecycle_integration_test.rs +++ b/tests/connection_lifecycle_integration_test.rs @@ -66,7 +66,9 @@ async fn test_connection_lifecycle_with_keepalive() { }; let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); // Get their addresses let addrs1 = node1.listen_addrs().await; @@ -182,7 +184,9 @@ async fn test_send_message_validates_connection_state() { }; let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); // Get addresses and connect let addrs2 = node2.listen_addrs().await; @@ -265,7 +269,9 @@ async fn test_multiple_message_exchanges() { }; let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); // Connect nodes let addrs2 = node2.listen_addrs().await; diff --git a/tests/connection_lifecycle_proof_test.rs b/tests/connection_lifecycle_proof_test.rs index 64780115..d212d0cd 100644 --- a/tests/connection_lifecycle_proof_test.rs +++ b/tests/connection_lifecycle_proof_test.rs @@ -37,6 +37,7 @@ async fn test_connection_lifecycle_infrastructure_exists() { }; let node = P2PNode::new(config).await.expect("Failed to create node"); + node.start().await.expect("Failed to start node"); info!("Node created successfully"); @@ -112,6 +113,7 @@ async fn test_keepalive_task_initialized() { }; let _node = P2PNode::new(config).await.expect("Failed to create node"); + _node.start().await.expect("Failed to start node"); // The keepalive task is spawned in P2PNode::new() and runs in the background // It sends keepalive messages every 15 seconds to prevent the 30-second ant-quic timeout diff --git a/tests/end_to_end_scenarios_test.rs b/tests/end_to_end_scenarios_test.rs index 4b62dfdb..228bfa4d 100644 --- a/tests/end_to_end_scenarios_test.rs +++ b/tests/end_to_end_scenarios_test.rs @@ -46,6 +46,7 @@ impl TestUser { let peer_id = format!("test_user_{}", username); let node = P2PNode::new(config).await?; + node.start().await?; Ok(Self { node: Arc::new(node), @@ -55,7 +56,7 @@ impl TestUser { } async fn start(&self) -> Result<()> { - // TODO: Implement when node.start() API is available + // Node is already started after P2PNode::new() + start() in TestUser::new() sleep(Duration::from_millis(100)).await; Ok(()) } diff --git a/tests/network_wiring_e2e_test.rs b/tests/network_wiring_e2e_test.rs index dd7f5128..f5b940de 100644 --- a/tests/network_wiring_e2e_test.rs +++ b/tests/network_wiring_e2e_test.rs @@ -113,7 +113,9 @@ async fn test_two_node_message_exchange() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); // Subscribe to events on node2 BEFORE connecting let mut events2 = node2.subscribe_events(); @@ -217,7 +219,9 @@ async fn test_message_topic_preservation() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let mut events2 = node2.subscribe_events(); @@ -282,7 +286,9 @@ async fn test_bidirectional_message_exchange() { let config2 = create_test_node_config(); let node1 = Arc::new(P2PNode::new(config1).await.expect("Failed to create node1")); + node1.start().await.expect("Failed to start node1"); let node2 = Arc::new(P2PNode::new(config2).await.expect("Failed to create node2")); + node2.start().await.expect("Failed to start node2"); let mut events1 = node1.subscribe_events(); let mut events2 = node2.subscribe_events(); @@ -369,7 +375,9 @@ async fn test_periodic_tasks_updates_last_seen() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); // Connect let addrs2 = node2.listen_addrs().await; @@ -423,7 +431,9 @@ async fn test_stale_peer_removal() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let mut events1 = node1.subscribe_events(); @@ -485,7 +495,9 @@ async fn test_heartbeat_keeps_connection_alive() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let addrs2 = node2.listen_addrs().await; let addr2 = addrs2.first().expect("Need address").to_string(); @@ -535,6 +547,7 @@ async fn test_dht_network_manager_integration() { let config = create_test_node_config(); let _node = P2PNode::new(config).await.expect("Failed to create node"); + _node.start().await.expect("Failed to start node"); // Check if DHT network manager is accessible // This would require adding a method to P2PNode like: @@ -575,12 +588,15 @@ async fn test_three_node_dht_routing() { let node_a = P2PNode::new(config_a) .await .expect("Failed to create node A"); + node_a.start().await.expect("Failed to start node A"); let node_b = P2PNode::new(config_b) .await .expect("Failed to create node B"); + node_b.start().await.expect("Failed to start node B"); let node_c = P2PNode::new(config_c) .await .expect("Failed to create node C"); + node_c.start().await.expect("Failed to start node C"); // Get addresses let addrs_a = node_a.listen_addrs().await; @@ -655,7 +671,9 @@ async fn test_dht_message_routing() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let mut events2 = node2.subscribe_events(); @@ -699,6 +717,7 @@ async fn test_dht_message_routing() { async fn test_node_creation_sanity() { let config = create_test_node_config(); let node = P2PNode::new(config).await.expect("Failed to create node"); + node.start().await.expect("Failed to start node"); let addrs = node.listen_addrs().await; assert!( @@ -716,7 +735,9 @@ async fn test_event_subscription_sanity() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let mut events1 = node1.subscribe_events(); @@ -760,7 +781,9 @@ async fn test_simple_ping_pong() { let config2 = create_test_node_config(); let node1 = Arc::new(P2PNode::new(config1).await.expect("Failed to create node1")); + node1.start().await.expect("Failed to start node1"); let node2 = Arc::new(P2PNode::new(config2).await.expect("Failed to create node2")); + node2.start().await.expect("Failed to start node2"); let mut events1 = node1.subscribe_events(); let mut events2 = node2.subscribe_events(); @@ -841,7 +864,9 @@ async fn test_multiple_sequential_messages() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let mut events2 = node2.subscribe_events(); @@ -923,7 +948,9 @@ async fn test_connection_stays_alive() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let mut events2 = node2.subscribe_events(); @@ -981,7 +1008,9 @@ async fn test_reconnection_works() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let addrs2 = node2.listen_addrs().await; let addr2 = addrs2.first().expect("Need address").to_string(); @@ -1044,7 +1073,9 @@ async fn test_peer_events_sequence() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let mut events1 = node1.subscribe_events(); @@ -1103,7 +1134,9 @@ async fn test_large_message_transfer() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let mut events2 = node2.subscribe_events(); @@ -1168,7 +1201,9 @@ async fn test_multiple_protocols() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let mut events2 = node2.subscribe_events(); @@ -1247,7 +1282,9 @@ async fn test_no_duplicate_disconnect_events() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let mut events1 = node1.subscribe_events(); @@ -1315,7 +1352,9 @@ async fn test_peer_cleanup_timing() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let addrs2 = node2.listen_addrs().await; let addr2 = addrs2.first().expect("Need address").to_string(); @@ -1384,7 +1423,9 @@ async fn test_empty_message_handling() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let mut events2 = node2.subscribe_events(); @@ -1465,7 +1506,9 @@ async fn test_rapid_reconnection_stress() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let addrs2 = node2.listen_addrs().await; let addr2 = addrs2.first().expect("Need address").to_string(); @@ -1515,7 +1558,9 @@ async fn test_concurrent_message_flood() { let config2 = create_test_node_config(); let node1 = Arc::new(P2PNode::new(config1).await.expect("Failed to create node1")); + node1.start().await.expect("Failed to start node1"); let node2 = Arc::new(P2PNode::new(config2).await.expect("Failed to create node2")); + node2.start().await.expect("Failed to start node2"); let mut events1 = node1.subscribe_events(); let mut events2 = node2.subscribe_events(); @@ -1639,7 +1684,9 @@ async fn test_send_to_disconnecting_peer() { let config2 = create_test_node_config(); let node1 = Arc::new(P2PNode::new(config1).await.expect("Failed to create node1")); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let addrs2 = node2.listen_addrs().await; let addr2 = addrs2.first().expect("Need address").to_string(); @@ -1679,6 +1726,7 @@ async fn test_send_to_disconnecting_peer() { // Verify node1 is still functional let config3 = create_test_node_config(); let node3 = P2PNode::new(config3).await.expect("Failed to create node3"); + node3.start().await.expect("Failed to start node3"); let addrs3 = node3.listen_addrs().await; let addr3 = addrs3.first().expect("Need address").to_string(); @@ -1707,7 +1755,9 @@ async fn test_late_event_subscription() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); // Connect BEFORE subscribing let addrs2 = node2.listen_addrs().await; @@ -1762,7 +1812,9 @@ async fn test_zero_stale_threshold() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let addrs2 = node2.listen_addrs().await; let addr2 = addrs2.first().expect("Need address").to_string(); @@ -1812,7 +1864,9 @@ async fn test_short_stale_threshold() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let mut events2 = node2.subscribe_events(); @@ -1858,6 +1912,7 @@ async fn test_many_peers_scaling() { let config1 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let peer_count = 10; let mut nodes: Vec = Vec::with_capacity(peer_count); @@ -1869,6 +1924,7 @@ async fn test_many_peers_scaling() { let node = P2PNode::new(config) .await .expect("Failed to create peer node"); + node.start().await.expect("Failed to start peer node"); let addrs = node.listen_addrs().await; let addr = addrs.first().expect("Need address").to_string(); @@ -1931,7 +1987,9 @@ async fn test_graceful_shutdown() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let addrs2 = node2.listen_addrs().await; let addr2 = addrs2.first().expect("Need address").to_string(); @@ -1969,7 +2027,9 @@ async fn test_event_subscriber_cleanup() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); // Create multiple subscribers let sub1 = node2.subscribe_events();