diff --git a/.gitignore b/.gitignore index 00e2f6a9ed..29335979a8 100644 --- a/.gitignore +++ b/.gitignore @@ -55,3 +55,10 @@ crates/core/tmp_work/ .claude-flow/ .swarm/ test-results/ + +# Git worktrees +worktrees/ +.worktrees/ + +# Development logs +logs/* diff --git a/.worktrees/pr1859 b/.worktrees/pr1859 deleted file mode 160000 index 362724e312..0000000000 --- a/.worktrees/pr1859 +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 362724e312d179e5cbf8db04e1b4791ee152b6fb diff --git a/.worktrees/pr1861 b/.worktrees/pr1861 deleted file mode 160000 index a9e27602e0..0000000000 --- a/.worktrees/pr1861 +++ /dev/null @@ -1 +0,0 @@ -Subproject commit a9e27602e0542874c74c2041aac429443a5b3663 diff --git a/crates/core/src/client_events/mod.rs b/crates/core/src/client_events/mod.rs index 2b95e8490b..9475358f1e 100644 --- a/crates/core/src/client_events/mod.rs +++ b/crates/core/src/client_events/mod.rs @@ -398,99 +398,151 @@ async fn process_open_request( let contract_key = contract.key(); - // Use RequestRouter for deduplication if in actor mode, otherwise direct operation - if let Some(router) = &request_router { + // Check if this will be a local-only PUT (no network peers available) + // This prevents race condition where PUT completes instantly and TX is removed + // before a second client can reuse it (issue #1886) + let own_location = op_manager.ring.connection_manager.own_location(); + let has_remote_peers = op_manager + .ring + .closest_potentially_caching(&contract_key, &[own_location.peer][..]) + .is_some(); + + if !has_remote_peers { + // Local-only PUT - bypass router to avoid race condition tracing::debug!( peer_id = %peer_id, key = %contract_key, - "Routing PUT request through deduplication layer (actor mode)", + "PUT will complete locally (no remote peers), starting direct local PUT operation" ); - let request = crate::node::DeduplicatedRequest::Put { - key: contract_key, - contract: contract.clone(), - related_contracts: related_contracts.clone(), - state: state.clone(), + // Start a local PUT operation without going through the router + // This avoids the race condition while still providing proper result delivery + let op = put::start_op( + contract.clone(), + related_contracts.clone(), + state.clone(), + op_manager.ring.max_hops_to_live, subscribe, - client_id, - request_id, - }; - - let (transaction_id, should_start_operation) = - router.route_request(request).await.map_err(|e| { - Error::Node(format!("Request routing failed: {}", e)) - })?; + ); + let op_id = op.id; - // Always register this client for the result + // Register client for transaction result op_manager .ch_outbound - .waiting_for_transaction_result( - transaction_id, - client_id, - request_id, - ) + .waiting_for_transaction_result(op_id, client_id, request_id) .await .inspect_err(|err| { - tracing::error!( - "Error waiting for transaction result: {}", - err - ); + tracing::error!("Error waiting for transaction result: {}", err) })?; - // Only start new network operation if this is a new operation - if should_start_operation { + // Execute the PUT operation + // Since there are no remote peers, this will complete locally + if let Err(err) = put::request_put(&op_manager, op).await { + tracing::error!("Local PUT request error: {}", err); + } + + // Note: We bypass the router for local-only PUTs to avoid the race + // condition where the transaction completes instantly and is removed + // before other clients can join. The operation will complete locally + // and deliver results through the normal transaction mechanism. + } else { + // Has remote peers - use RequestRouter for deduplication if in actor mode, otherwise direct operation + if let Some(router) = &request_router { tracing::debug!( peer_id = %peer_id, key = %contract_key, - "Starting new PUT network operation via RequestRouter", + "Routing PUT request through deduplication layer (actor mode)", ); - let op = put::start_op_with_id( - contract.clone(), - related_contracts.clone(), - state.clone(), - op_manager.ring.max_hops_to_live, + let request = crate::node::DeduplicatedRequest::Put { + key: contract_key, + contract: contract.clone(), + related_contracts: related_contracts.clone(), + state: state.clone(), subscribe, - transaction_id, - ); + client_id, + request_id, + }; - if let Err(err) = put::request_put(&op_manager, op).await { - tracing::error!("Put request error: {}", err); + let (transaction_id, should_start_operation) = + router.route_request(request).await.map_err(|e| { + Error::Node(format!("Request routing failed: {}", e)) + })?; + + // Always register this client for the result + op_manager + .ch_outbound + .waiting_for_transaction_result( + transaction_id, + client_id, + request_id, + ) + .await + .inspect_err(|err| { + tracing::error!( + "Error waiting for transaction result: {}", + err + ); + })?; + + // Only start new network operation if this is a new operation + if should_start_operation { + tracing::debug!( + peer_id = %peer_id, + key = %contract_key, + "Starting new PUT network operation via RequestRouter", + ); + + let op = put::start_op_with_id( + contract.clone(), + related_contracts.clone(), + state.clone(), + op_manager.ring.max_hops_to_live, + subscribe, + transaction_id, + ); + + if let Err(err) = put::request_put(&op_manager, op).await { + tracing::error!("Put request error: {}", err); + } + } else { + tracing::debug!( + peer_id = %peer_id, + key = %contract_key, + "Reusing existing PUT operation via RequestRouter - client registered for result", + ); } } else { tracing::debug!( peer_id = %peer_id, key = %contract_key, - "Reusing existing PUT operation via RequestRouter - client registered for result", + "Starting direct PUT operation (legacy mode)", ); - } - } else { - tracing::debug!( - peer_id = %peer_id, - key = %contract_key, - "Starting direct PUT operation (legacy mode)", - ); - // Legacy mode: direct operation without deduplication - let op = put::start_op( - contract.clone(), - related_contracts.clone(), - state.clone(), - op_manager.ring.max_hops_to_live, - subscribe, - ); - let op_id = op.id; + // Legacy mode: direct operation without deduplication + let op = put::start_op( + contract.clone(), + related_contracts.clone(), + state.clone(), + op_manager.ring.max_hops_to_live, + subscribe, + ); + let op_id = op.id; - op_manager - .ch_outbound - .waiting_for_transaction_result(op_id, client_id, request_id) - .await - .inspect_err(|err| { - tracing::error!("Error waiting for transaction result: {}", err) - })?; + op_manager + .ch_outbound + .waiting_for_transaction_result(op_id, client_id, request_id) + .await + .inspect_err(|err| { + tracing::error!( + "Error waiting for transaction result: {}", + err + ) + })?; - if let Err(err) = put::request_put(&op_manager, op).await { - tracing::error!("Put request error: {}", err); + if let Err(err) = put::request_put(&op_manager, op).await { + tracing::error!("Put request error: {}", err); + } } } diff --git a/crates/core/tests/isolated_node_regression.rs b/crates/core/tests/isolated_node_regression.rs index 74712cbe4d..c575394804 100644 --- a/crates/core/tests/isolated_node_regression.rs +++ b/crates/core/tests/isolated_node_regression.rs @@ -235,6 +235,191 @@ async fn test_isolated_node_put_get_workflow() -> anyhow::Result<()> { Ok(()) } +/// Test concurrent GET operations to reproduce deduplication race condition (issue #1886) +/// +/// This test attempts to reproduce the race condition where: +/// 1. Client 1 sends GET request → Router creates operation with TX +/// 2. Operation completes instantly (contract cached locally) +/// 3. Result delivered to Client 1, TX removed from tracking +/// 4. Client 2 sends identical GET request → Router tries to reuse removed TX +/// 5. Bug: Client 2 never receives response +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_concurrent_get_deduplication_race() -> anyhow::Result<()> { + freenet::config::set_logger(Some(tracing::level_filters::LevelFilter::INFO), None); + + // Start a single isolated node (no peers) - ensures instant completion + let ws_port = 50900; + let network_port = 50901; + let (config, _temp_dir) = create_test_node_config(true, ws_port, Some(network_port)).await?; + + // Load a small test contract + const TEST_CONTRACT: &str = "test-contract-integration"; + let contract = load_contract(TEST_CONTRACT, vec![].into())?; + let contract_key = contract.key(); + let initial_state = freenet::test_utils::create_empty_todo_list(); + let wrapped_state = WrappedState::from(initial_state); + + // Start the node + let node_handle = { + let config = config.clone(); + async move { + let built_config = config.build().await?; + let node = NodeConfig::new(built_config.clone()) + .await? + .build(serve_gateway(built_config.ws_api).await) + .await?; + node.run().await + } + .boxed_local() + }; + + // Run the test with timeout + let test_result = timeout(Duration::from_secs(60), async { + // Give node time to start + println!("Waiting for node to start up..."); + tokio::time::sleep(Duration::from_secs(10)).await; + println!("Node should be ready, proceeding with test..."); + + let url = format!( + "ws://localhost:{}/v1/contract/command?encodingProtocol=native", + ws_port + ); + + // Connect multiple clients + let (ws_stream1, _) = connect_async(&url).await?; + let mut client1 = WebApi::start(ws_stream1); + + let (ws_stream2, _) = connect_async(&url).await?; + let mut client2 = WebApi::start(ws_stream2); + + let (ws_stream3, _) = connect_async(&url).await?; + let mut client3 = WebApi::start(ws_stream3); + + println!("Step 1: PUT contract to cache it locally"); + + // Cache the contract locally using client1 + make_put(&mut client1, wrapped_state.clone(), contract.clone(), false).await?; + let put_result = timeout(Duration::from_secs(30), client1.recv()).await; + + match put_result { + Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => { + assert_eq!(key, contract_key); + println!("Contract cached successfully"); + } + other => { + panic!("PUT failed: {:?}", other); + } + } + + println!("Step 2: Concurrent GET requests from multiple clients"); + println!("This tests the deduplication race condition from issue #1886"); + + // Send GET requests concurrently from all clients + // The contract is cached, so these will complete instantly + // This creates the race condition: TX may be removed before all clients register + let get1 = async { + make_get(&mut client1, contract_key, true, false).await?; + let result = timeout(Duration::from_secs(5), client1.recv()).await; + Ok::<_, anyhow::Error>((1, result)) + }; + + let get2 = async { + make_get(&mut client2, contract_key, true, false).await?; + let result = timeout(Duration::from_secs(5), client2.recv()).await; + Ok::<_, anyhow::Error>((2, result)) + }; + + let get3 = async { + make_get(&mut client3, contract_key, true, false).await?; + let result = timeout(Duration::from_secs(5), client3.recv()).await; + Ok::<_, anyhow::Error>((3, result)) + }; + + // Execute all GETs concurrently + let (result1, result2, result3) = tokio::join!(get1, get2, get3); + + // Verify all clients received responses + let check_result = + |client_num: i32, result: anyhow::Result<(i32, Result, _>)>| { + match result { + Ok(( + _, + Ok(Ok(HostResponse::ContractResponse(ContractResponse::GetResponse { + key, + state, + .. + }))), + )) => { + assert_eq!(key, contract_key); + assert_eq!(state, wrapped_state); + println!("Client {}: ✅ Received GET response", client_num); + true + } + Ok((_, Ok(Ok(other)))) => { + println!("Client {}: ❌ Unexpected response: {:?}", client_num, other); + false + } + Ok((_, Ok(Err(e)))) => { + println!("Client {}: ❌ Error: {}", client_num, e); + false + } + Ok((_, Err(_))) => { + println!( + "Client {}: ❌ TIMEOUT - This is the bug from issue #1886!", + client_num + ); + false + } + Err(e) => { + println!("Client {}: ❌ Failed to send request: {}", client_num, e); + false + } + } + }; + + let success1 = check_result(1, result1); + let success2 = check_result(2, result2); + let success3 = check_result(3, result3); + + // REGRESSION TEST: All clients should receive responses + // If any client times out, it indicates the deduplication race condition + assert!( + success1 && success2 && success3, + "All clients should receive GET responses. Failures indicate issue #1886 race condition." + ); + + println!("✅ All clients received responses - no race condition detected"); + + // Cleanup + client1 + .send(ClientRequest::Disconnect { cause: None }) + .await?; + client2 + .send(ClientRequest::Disconnect { cause: None }) + .await?; + client3 + .send(ClientRequest::Disconnect { cause: None }) + .await?; + tokio::time::sleep(Duration::from_millis(100)).await; + + Ok::<(), anyhow::Error>(()) + }); + + // Run node and test concurrently + select! { + _ = node_handle => { + error!("Node exited unexpectedly"); + panic!("Node should not exit during test"); + } + result = test_result => { + result??; + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + + Ok(()) +} + /// Test subscription operations on isolated node with local contracts /// /// This regression test verifies that Subscribe operations complete successfully