diff --git a/examples/chat.rs b/examples/chat.rs index cd9551bb..7d6a4e1d 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -64,6 +64,7 @@ async fn main() -> Result<()> { // Create and start the node let node = P2PNode::new(config).await?; + node.start().await?; // Handle bootstrap peers let mut bootstrap_addrs: Vec = Vec::new(); diff --git a/examples/test_network.rs b/examples/test_network.rs index 2a1f1649..1a30db63 100644 --- a/examples/test_network.rs +++ b/examples/test_network.rs @@ -71,6 +71,7 @@ impl TestNode { .await .context("Failed to create P2P node")?, ); + node.start().await.context("Failed to start P2P node")?; // Get actual listen addresses after node creation let actual_addrs = node.listen_addrs().await; diff --git a/src/messaging/service.rs b/src/messaging/service.rs index fc85cb7a..594283be 100644 --- a/src/messaging/service.rs +++ b/src/messaging/service.rs @@ -293,6 +293,7 @@ impl MessagingService { } let node = crate::network::P2PNode::new(node_config).await?; + node.start().await?; Arc::new(node) }; let transport = Arc::new(MessageTransport::new(network, dht_client.clone()).await?); diff --git a/src/network.rs b/src/network.rs index 45419342..88d6fc9c 100644 --- a/src/network.rs +++ b/src/network.rs @@ -1037,17 +1037,7 @@ impl P2PNode { entangled_id: None, binary_hash, }; - info!("Created P2P node with peer ID: {}", node.peer_id); - - // Start the network listeners to populate listen addresses - node.start_network_listeners().await?; - - // Update the connection monitor with actual peers reference - node.start_connection_monitor().await; - - // Start message receiving system so messages work immediately after node creation - // This is critical for basic P2P messaging to work - node.start_message_receiving_system().await?; + info!("Created P2P node with peer ID: {} (call start() to begin networking)", node.peer_id); Ok(node) } @@ -1159,12 +1149,13 @@ impl P2PNode { // Start listening on configured addresses using transport layer self.start_network_listeners().await?; + // Update the connection monitor with actual peers reference + self.start_connection_monitor().await; + // Log current listen addresses let listen_addrs = self.listen_addrs.read().await; info!("P2P node started on addresses: {:?}", *listen_addrs); - // MCP removed - // Start message receiving system self.start_message_receiving_system().await?; diff --git a/tests/connection_lifecycle_integration_test.rs b/tests/connection_lifecycle_integration_test.rs index 3323822c..022e2bd5 100644 --- a/tests/connection_lifecycle_integration_test.rs +++ b/tests/connection_lifecycle_integration_test.rs @@ -66,7 +66,9 @@ async fn test_connection_lifecycle_with_keepalive() { }; let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); // Get their addresses let addrs1 = node1.listen_addrs().await; @@ -182,7 +184,9 @@ async fn test_send_message_validates_connection_state() { }; let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); // Get addresses and connect let addrs2 = node2.listen_addrs().await; @@ -265,7 +269,9 @@ async fn test_multiple_message_exchanges() { }; let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); // Connect nodes let addrs2 = node2.listen_addrs().await; diff --git a/tests/connection_lifecycle_proof_test.rs b/tests/connection_lifecycle_proof_test.rs index 64780115..d212d0cd 100644 --- a/tests/connection_lifecycle_proof_test.rs +++ b/tests/connection_lifecycle_proof_test.rs @@ -37,6 +37,7 @@ async fn test_connection_lifecycle_infrastructure_exists() { }; let node = P2PNode::new(config).await.expect("Failed to create node"); + node.start().await.expect("Failed to start node"); info!("Node created successfully"); @@ -112,6 +113,7 @@ async fn test_keepalive_task_initialized() { }; let _node = P2PNode::new(config).await.expect("Failed to create node"); + _node.start().await.expect("Failed to start node"); // The keepalive task is spawned in P2PNode::new() and runs in the background // It sends keepalive messages every 15 seconds to prevent the 30-second ant-quic timeout diff --git a/tests/end_to_end_scenarios_test.rs b/tests/end_to_end_scenarios_test.rs index 4b62dfdb..228bfa4d 100644 --- a/tests/end_to_end_scenarios_test.rs +++ b/tests/end_to_end_scenarios_test.rs @@ -46,6 +46,7 @@ impl TestUser { let peer_id = format!("test_user_{}", username); let node = P2PNode::new(config).await?; + node.start().await?; Ok(Self { node: Arc::new(node), @@ -55,7 +56,7 @@ impl TestUser { } async fn start(&self) -> Result<()> { - // TODO: Implement when node.start() API is available + // Node is already started after P2PNode::new() + start() in TestUser::new() sleep(Duration::from_millis(100)).await; Ok(()) } diff --git a/tests/network_wiring_e2e_test.rs b/tests/network_wiring_e2e_test.rs index dd7f5128..f5b940de 100644 --- a/tests/network_wiring_e2e_test.rs +++ b/tests/network_wiring_e2e_test.rs @@ -113,7 +113,9 @@ async fn test_two_node_message_exchange() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); // Subscribe to events on node2 BEFORE connecting let mut events2 = node2.subscribe_events(); @@ -217,7 +219,9 @@ async fn test_message_topic_preservation() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let mut events2 = node2.subscribe_events(); @@ -282,7 +286,9 @@ async fn test_bidirectional_message_exchange() { let config2 = create_test_node_config(); let node1 = Arc::new(P2PNode::new(config1).await.expect("Failed to create node1")); + node1.start().await.expect("Failed to start node1"); let node2 = Arc::new(P2PNode::new(config2).await.expect("Failed to create node2")); + node2.start().await.expect("Failed to start node2"); let mut events1 = node1.subscribe_events(); let mut events2 = node2.subscribe_events(); @@ -369,7 +375,9 @@ async fn test_periodic_tasks_updates_last_seen() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); // Connect let addrs2 = node2.listen_addrs().await; @@ -423,7 +431,9 @@ async fn test_stale_peer_removal() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let mut events1 = node1.subscribe_events(); @@ -485,7 +495,9 @@ async fn test_heartbeat_keeps_connection_alive() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let addrs2 = node2.listen_addrs().await; let addr2 = addrs2.first().expect("Need address").to_string(); @@ -535,6 +547,7 @@ async fn test_dht_network_manager_integration() { let config = create_test_node_config(); let _node = P2PNode::new(config).await.expect("Failed to create node"); + _node.start().await.expect("Failed to start node"); // Check if DHT network manager is accessible // This would require adding a method to P2PNode like: @@ -575,12 +588,15 @@ async fn test_three_node_dht_routing() { let node_a = P2PNode::new(config_a) .await .expect("Failed to create node A"); + node_a.start().await.expect("Failed to start node A"); let node_b = P2PNode::new(config_b) .await .expect("Failed to create node B"); + node_b.start().await.expect("Failed to start node B"); let node_c = P2PNode::new(config_c) .await .expect("Failed to create node C"); + node_c.start().await.expect("Failed to start node C"); // Get addresses let addrs_a = node_a.listen_addrs().await; @@ -655,7 +671,9 @@ async fn test_dht_message_routing() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let mut events2 = node2.subscribe_events(); @@ -699,6 +717,7 @@ async fn test_dht_message_routing() { async fn test_node_creation_sanity() { let config = create_test_node_config(); let node = P2PNode::new(config).await.expect("Failed to create node"); + node.start().await.expect("Failed to start node"); let addrs = node.listen_addrs().await; assert!( @@ -716,7 +735,9 @@ async fn test_event_subscription_sanity() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let mut events1 = node1.subscribe_events(); @@ -760,7 +781,9 @@ async fn test_simple_ping_pong() { let config2 = create_test_node_config(); let node1 = Arc::new(P2PNode::new(config1).await.expect("Failed to create node1")); + node1.start().await.expect("Failed to start node1"); let node2 = Arc::new(P2PNode::new(config2).await.expect("Failed to create node2")); + node2.start().await.expect("Failed to start node2"); let mut events1 = node1.subscribe_events(); let mut events2 = node2.subscribe_events(); @@ -841,7 +864,9 @@ async fn test_multiple_sequential_messages() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let mut events2 = node2.subscribe_events(); @@ -923,7 +948,9 @@ async fn test_connection_stays_alive() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let mut events2 = node2.subscribe_events(); @@ -981,7 +1008,9 @@ async fn test_reconnection_works() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let addrs2 = node2.listen_addrs().await; let addr2 = addrs2.first().expect("Need address").to_string(); @@ -1044,7 +1073,9 @@ async fn test_peer_events_sequence() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let mut events1 = node1.subscribe_events(); @@ -1103,7 +1134,9 @@ async fn test_large_message_transfer() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let mut events2 = node2.subscribe_events(); @@ -1168,7 +1201,9 @@ async fn test_multiple_protocols() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let mut events2 = node2.subscribe_events(); @@ -1247,7 +1282,9 @@ async fn test_no_duplicate_disconnect_events() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let mut events1 = node1.subscribe_events(); @@ -1315,7 +1352,9 @@ async fn test_peer_cleanup_timing() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let addrs2 = node2.listen_addrs().await; let addr2 = addrs2.first().expect("Need address").to_string(); @@ -1384,7 +1423,9 @@ async fn test_empty_message_handling() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let mut events2 = node2.subscribe_events(); @@ -1465,7 +1506,9 @@ async fn test_rapid_reconnection_stress() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let addrs2 = node2.listen_addrs().await; let addr2 = addrs2.first().expect("Need address").to_string(); @@ -1515,7 +1558,9 @@ async fn test_concurrent_message_flood() { let config2 = create_test_node_config(); let node1 = Arc::new(P2PNode::new(config1).await.expect("Failed to create node1")); + node1.start().await.expect("Failed to start node1"); let node2 = Arc::new(P2PNode::new(config2).await.expect("Failed to create node2")); + node2.start().await.expect("Failed to start node2"); let mut events1 = node1.subscribe_events(); let mut events2 = node2.subscribe_events(); @@ -1639,7 +1684,9 @@ async fn test_send_to_disconnecting_peer() { let config2 = create_test_node_config(); let node1 = Arc::new(P2PNode::new(config1).await.expect("Failed to create node1")); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let addrs2 = node2.listen_addrs().await; let addr2 = addrs2.first().expect("Need address").to_string(); @@ -1679,6 +1726,7 @@ async fn test_send_to_disconnecting_peer() { // Verify node1 is still functional let config3 = create_test_node_config(); let node3 = P2PNode::new(config3).await.expect("Failed to create node3"); + node3.start().await.expect("Failed to start node3"); let addrs3 = node3.listen_addrs().await; let addr3 = addrs3.first().expect("Need address").to_string(); @@ -1707,7 +1755,9 @@ async fn test_late_event_subscription() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); // Connect BEFORE subscribing let addrs2 = node2.listen_addrs().await; @@ -1762,7 +1812,9 @@ async fn test_zero_stale_threshold() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let addrs2 = node2.listen_addrs().await; let addr2 = addrs2.first().expect("Need address").to_string(); @@ -1812,7 +1864,9 @@ async fn test_short_stale_threshold() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let mut events2 = node2.subscribe_events(); @@ -1858,6 +1912,7 @@ async fn test_many_peers_scaling() { let config1 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let peer_count = 10; let mut nodes: Vec = Vec::with_capacity(peer_count); @@ -1869,6 +1924,7 @@ async fn test_many_peers_scaling() { let node = P2PNode::new(config) .await .expect("Failed to create peer node"); + node.start().await.expect("Failed to start peer node"); let addrs = node.listen_addrs().await; let addr = addrs.first().expect("Need address").to_string(); @@ -1931,7 +1987,9 @@ async fn test_graceful_shutdown() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); let addrs2 = node2.listen_addrs().await; let addr2 = addrs2.first().expect("Need address").to_string(); @@ -1969,7 +2027,9 @@ async fn test_event_subscriber_cleanup() { let config2 = create_test_node_config(); let node1 = P2PNode::new(config1).await.expect("Failed to create node1"); + node1.start().await.expect("Failed to start node1"); let node2 = P2PNode::new(config2).await.expect("Failed to create node2"); + node2.start().await.expect("Failed to start node2"); // Create multiple subscribers let sub1 = node2.subscribe_events();