Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,10 @@ crates/core/tmp_work/
.claude-flow/
.swarm/
test-results/

# Git worktrees
worktrees/
.worktrees/

# Development logs
logs/*
1 change: 0 additions & 1 deletion .worktrees/pr1859
Submodule pr1859 deleted from 362724
1 change: 0 additions & 1 deletion .worktrees/pr1861
Submodule pr1861 deleted from a9e276
4 changes: 2 additions & 2 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ opentelemetry-jaeger = { features = ["collector_client", "isahc", "rt-tokio"], o
tracing = { version = "0.1" }
tracing-opentelemetry = { optional = true, version = "0.30.0" }
tracing-subscriber = { optional = true, version = "0.3" }
opentelemetry-otlp = { optional = true, version = "0.27.0" }
opentelemetry_sdk = { optional = true, version = "0.29", features = ["rt-tokio"] }
opentelemetry-otlp = { optional = true, version = "0.31.0" }
opentelemetry_sdk = { optional = true, version = "0.31", features = ["rt-tokio"] }

# internal deps
freenet-stdlib = { features = ["net"], workspace = true }
Expand Down
180 changes: 116 additions & 64 deletions crates/core/src/client_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,99 +398,151 @@ async fn process_open_request(

let contract_key = contract.key();

// Use RequestRouter for deduplication if in actor mode, otherwise direct operation
if let Some(router) = &request_router {
// Check if this will be a local-only PUT (no network peers available)
// This prevents race condition where PUT completes instantly and TX is removed
// before a second client can reuse it (issue #1886)
let own_location = op_manager.ring.connection_manager.own_location();
let has_remote_peers = op_manager
.ring
.closest_potentially_caching(&contract_key, &[own_location.peer][..])
.is_some();

if !has_remote_peers {
// Local-only PUT - bypass router to avoid race condition
tracing::debug!(
peer_id = %peer_id,
key = %contract_key,
"Routing PUT request through deduplication layer (actor mode)",
"PUT will complete locally (no remote peers), starting direct local PUT operation"
);

let request = crate::node::DeduplicatedRequest::Put {
key: contract_key,
contract: contract.clone(),
related_contracts: related_contracts.clone(),
state: state.clone(),
// Start a local PUT operation without going through the router
// This avoids the race condition while still providing proper result delivery
let op = put::start_op(
contract.clone(),
related_contracts.clone(),
state.clone(),
op_manager.ring.max_hops_to_live,
subscribe,
client_id,
request_id,
};

let (transaction_id, should_start_operation) =
router.route_request(request).await.map_err(|e| {
Error::Node(format!("Request routing failed: {}", e))
})?;
);
let op_id = op.id;

// Always register this client for the result
// Register client for transaction result
op_manager
.ch_outbound
.waiting_for_transaction_result(
transaction_id,
client_id,
request_id,
)
.waiting_for_transaction_result(op_id, client_id, request_id)
.await
.inspect_err(|err| {
tracing::error!(
"Error waiting for transaction result: {}",
err
);
tracing::error!("Error waiting for transaction result: {}", err)
})?;

// Only start new network operation if this is a new operation
if should_start_operation {
// Execute the PUT operation
// Since there are no remote peers, this will complete locally
if let Err(err) = put::request_put(&op_manager, op).await {
tracing::error!("Local PUT request error: {}", err);
}

// Note: We bypass the router for local-only PUTs to avoid the race
// condition where the transaction completes instantly and is removed
// before other clients can join. The operation will complete locally
// and deliver results through the normal transaction mechanism.
} else {
// Has remote peers - use RequestRouter for deduplication if in actor mode, otherwise direct operation
if let Some(router) = &request_router {
tracing::debug!(
peer_id = %peer_id,
key = %contract_key,
"Starting new PUT network operation via RequestRouter",
"Routing PUT request through deduplication layer (actor mode)",
);

let op = put::start_op_with_id(
contract.clone(),
related_contracts.clone(),
state.clone(),
op_manager.ring.max_hops_to_live,
let request = crate::node::DeduplicatedRequest::Put {
key: contract_key,
contract: contract.clone(),
related_contracts: related_contracts.clone(),
state: state.clone(),
subscribe,
transaction_id,
);
client_id,
request_id,
};

if let Err(err) = put::request_put(&op_manager, op).await {
tracing::error!("Put request error: {}", err);
let (transaction_id, should_start_operation) =
router.route_request(request).await.map_err(|e| {
Error::Node(format!("Request routing failed: {}", e))
})?;

// Always register this client for the result
op_manager
.ch_outbound
.waiting_for_transaction_result(
transaction_id,
client_id,
request_id,
)
.await
.inspect_err(|err| {
tracing::error!(
"Error waiting for transaction result: {}",
err
);
})?;

// Only start new network operation if this is a new operation
if should_start_operation {
tracing::debug!(
peer_id = %peer_id,
key = %contract_key,
"Starting new PUT network operation via RequestRouter",
);

let op = put::start_op_with_id(
contract.clone(),
related_contracts.clone(),
state.clone(),
op_manager.ring.max_hops_to_live,
subscribe,
transaction_id,
);

if let Err(err) = put::request_put(&op_manager, op).await {
tracing::error!("Put request error: {}", err);
}
} else {
tracing::debug!(
peer_id = %peer_id,
key = %contract_key,
"Reusing existing PUT operation via RequestRouter - client registered for result",
);
}
} else {
tracing::debug!(
peer_id = %peer_id,
key = %contract_key,
"Reusing existing PUT operation via RequestRouter - client registered for result",
"Starting direct PUT operation (legacy mode)",
);
}
} else {
tracing::debug!(
peer_id = %peer_id,
key = %contract_key,
"Starting direct PUT operation (legacy mode)",
);

// Legacy mode: direct operation without deduplication
let op = put::start_op(
contract.clone(),
related_contracts.clone(),
state.clone(),
op_manager.ring.max_hops_to_live,
subscribe,
);
let op_id = op.id;
// Legacy mode: direct operation without deduplication
let op = put::start_op(
contract.clone(),
related_contracts.clone(),
state.clone(),
op_manager.ring.max_hops_to_live,
subscribe,
);
let op_id = op.id;

op_manager
.ch_outbound
.waiting_for_transaction_result(op_id, client_id, request_id)
.await
.inspect_err(|err| {
tracing::error!("Error waiting for transaction result: {}", err)
})?;
op_manager
.ch_outbound
.waiting_for_transaction_result(op_id, client_id, request_id)
.await
.inspect_err(|err| {
tracing::error!(
"Error waiting for transaction result: {}",
err
)
})?;

if let Err(err) = put::request_put(&op_manager, op).await {
tracing::error!("Put request error: {}", err);
if let Err(err) = put::request_put(&op_manager, op).await {
tracing::error!("Put request error: {}", err);
}
}
}

Expand Down
Loading