Skip to content

Commit abf8368

Browse files
sanityclaude
andcommitted
fix: ensure originating peer updates local cache on subsequent PUTs
When a contract is already seeded locally, subsequent PUT operations were skipping local state updates with "skipping duplicate caching". This caused the originating peer to serve stale cached state even after successfully PUTting updated state to the network. Now the originating peer always calls put_contract() to upsert its local state, regardless of whether the contract is already seeded. The seed_contract() call is only skipped if already seeded. Also adds integration test `test_put_merge_persists_state()` that: - PUTs initial state - PUTs updated state to same contract - Verifies both originating peer and gateway serve updated state This test would have caught both: - Issue #1995 (target peer not persisting merged state) - This issue (originating peer not updating local cache) Related to #1995 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent 277cbd8 commit abf8368

File tree

2 files changed

+292
-25
lines changed

2 files changed

+292
-25
lines changed

crates/core/src/operations/put.rs

Lines changed: 19 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -547,40 +547,34 @@ impl Operation for PutOp {
547547
// Check if already stored before any operations
548548
let is_seeding_contract = op_manager.ring.is_seeding_contract(&key);
549549

550-
// Only store the contract locally if not already seeded
551-
if !is_seeding_contract {
552-
tracing::debug!(
553-
tx = %id,
554-
%key,
555-
peer = %op_manager.ring.connection_manager.get_peer_key().unwrap(),
556-
"Storing contract locally after successful put"
557-
);
550+
// Always store/update the contract locally
551+
tracing::debug!(
552+
tx = %id,
553+
%key,
554+
peer = %op_manager.ring.connection_manager.get_peer_key().unwrap(),
555+
is_seeding = is_seeding_contract,
556+
"Storing contract locally after successful put"
557+
);
558558

559-
// Store the contract locally
560-
put_contract(
561-
op_manager,
562-
key,
563-
state.clone(),
564-
RelatedContracts::default(),
565-
&contract.clone(),
566-
)
567-
.await?;
559+
// Store the contract locally (will upsert if already exists)
560+
put_contract(
561+
op_manager,
562+
key,
563+
state.clone(),
564+
RelatedContracts::default(),
565+
&contract.clone(),
566+
)
567+
.await?;
568568

569-
// Always seed the contract locally after a successful put
569+
// Mark as seeded if not already seeded
570+
if !is_seeding_contract {
570571
tracing::debug!(
571572
tx = %id,
572573
%key,
573574
peer = %op_manager.ring.connection_manager.get_peer_key().unwrap(),
574575
"Adding contract to local seed list"
575576
);
576577
op_manager.ring.seed_contract(key);
577-
} else {
578-
tracing::debug!(
579-
tx = %id,
580-
%key,
581-
peer = %op_manager.ring.connection_manager.get_peer_key().unwrap(),
582-
"Contract already seeded locally, skipping duplicate caching"
583-
);
584578
}
585579

586580
// Start subscription if the contract is already seeded and the user requested it

crates/core/tests/operations.rs

Lines changed: 273 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -537,6 +537,279 @@ async fn test_update_contract() -> TestResult {
537537
Ok(())
538538
}
539539

540+
/// Test that a second PUT to an already cached contract persists the merged state.
541+
/// This is a regression test for issue #1995.
542+
#[test_log::test(tokio::test(flavor = "multi_thread", worker_threads = 4))]
543+
async fn test_put_merge_persists_state() -> TestResult {
544+
// Load test contract
545+
const TEST_CONTRACT: &str = "test-contract-integration";
546+
let contract = test_utils::load_contract(TEST_CONTRACT, vec![].into())?;
547+
let contract_key = contract.key();
548+
549+
// Create initial state with empty todo list
550+
let initial_state = test_utils::create_empty_todo_list();
551+
let initial_wrapped_state = WrappedState::from(initial_state);
552+
553+
// Create network sockets
554+
let network_socket_b = TcpListener::bind("127.0.0.1:0")?;
555+
let ws_api_port_socket_a = TcpListener::bind("127.0.0.1:0")?;
556+
let ws_api_port_socket_b = TcpListener::bind("127.0.0.1:0")?;
557+
558+
// Configure gateway node B
559+
let (config_b, preset_cfg_b, config_b_gw) = {
560+
let (cfg, preset) = base_node_test_config(
561+
true,
562+
vec![],
563+
Some(network_socket_b.local_addr()?.port()),
564+
ws_api_port_socket_b.local_addr()?.port(),
565+
)
566+
.await?;
567+
let public_port = cfg.network_api.public_port.unwrap();
568+
let path = preset.temp_dir.path().to_path_buf();
569+
(cfg, preset, gw_config(public_port, &path)?)
570+
};
571+
let ws_api_port_peer_b = config_b.ws_api.ws_api_port.unwrap();
572+
573+
// Configure peer node A
574+
let (config_a, preset_cfg_a) = base_node_test_config(
575+
false,
576+
vec![serde_json::to_string(&config_b_gw)?],
577+
None,
578+
ws_api_port_socket_a.local_addr()?.port(),
579+
)
580+
.await?;
581+
let ws_api_port_peer_a = config_a.ws_api.ws_api_port.unwrap();
582+
583+
tracing::info!("Node A data dir: {:?}", preset_cfg_a.temp_dir.path());
584+
tracing::info!("Node B (gw) data dir: {:?}", preset_cfg_b.temp_dir.path());
585+
586+
// Start node A (peer)
587+
std::mem::drop(ws_api_port_socket_a);
588+
let node_a = async move {
589+
let _span = with_peer_id("peer-a");
590+
tracing::info!("Starting peer A node");
591+
let config = config_a.build().await?;
592+
let node = NodeConfig::new(config.clone())
593+
.await?
594+
.build(serve_gateway(config.ws_api).await)
595+
.await?;
596+
tracing::info!("Peer A node running");
597+
node.run().await
598+
}
599+
.boxed_local();
600+
601+
// Start node B (gateway)
602+
std::mem::drop(network_socket_b);
603+
std::mem::drop(ws_api_port_socket_b);
604+
let node_b = async {
605+
let _span = with_peer_id("gateway");
606+
tracing::info!("Starting gateway node");
607+
let config = config_b.build().await?;
608+
let node = NodeConfig::new(config.clone())
609+
.await?
610+
.build(serve_gateway(config.ws_api).await)
611+
.await?;
612+
tracing::info!("Gateway node running");
613+
node.run().await
614+
}
615+
.boxed_local();
616+
617+
let test = tokio::time::timeout(Duration::from_secs(180), async {
618+
// Wait for nodes to start up
619+
tracing::info!("Waiting for nodes to start up...");
620+
tokio::time::sleep(Duration::from_secs(15)).await;
621+
tracing::info!("Nodes should be ready, proceeding with test...");
622+
623+
// Connect to node A's websocket API
624+
let uri = format!(
625+
"ws://127.0.0.1:{ws_api_port_peer_a}/v1/contract/command?encodingProtocol=native"
626+
);
627+
let (stream, _) = connect_async(&uri).await?;
628+
let mut client_api_a = WebApi::start(stream);
629+
630+
// First PUT: Store initial contract state
631+
tracing::info!("Sending first PUT with initial state...");
632+
make_put(
633+
&mut client_api_a,
634+
initial_wrapped_state.clone(),
635+
contract.clone(),
636+
false,
637+
)
638+
.await?;
639+
640+
// Wait for first put response
641+
let resp = tokio::time::timeout(Duration::from_secs(120), client_api_a.recv()).await;
642+
match resp {
643+
Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => {
644+
tracing::info!("First PUT successful for contract: {}", key);
645+
assert_eq!(key, contract_key);
646+
}
647+
Ok(Ok(other)) => {
648+
bail!("Unexpected response for first PUT: {:?}", other);
649+
}
650+
Ok(Err(e)) => {
651+
bail!("Error receiving first PUT response: {}", e);
652+
}
653+
Err(_) => {
654+
bail!("Timeout waiting for first PUT response");
655+
}
656+
}
657+
658+
// Wait a bit to ensure state is fully cached
659+
tokio::time::sleep(Duration::from_secs(2)).await;
660+
661+
// Create updated state with more data (simulating a state merge)
662+
let mut updated_todo_list: test_utils::TodoList =
663+
serde_json::from_slice(initial_wrapped_state.as_ref()).unwrap();
664+
665+
// Add multiple tasks to make the state larger
666+
for i in 1..=5 {
667+
updated_todo_list.tasks.push(test_utils::Task {
668+
id: i,
669+
title: format!("Task {}", i),
670+
description: format!("Description for task {}", i),
671+
completed: false,
672+
priority: i as u8,
673+
});
674+
}
675+
676+
let updated_bytes = serde_json::to_vec(&updated_todo_list).unwrap();
677+
let updated_wrapped_state = WrappedState::from(updated_bytes);
678+
679+
tracing::info!(
680+
"Initial state size: {} bytes, Updated state size: {} bytes",
681+
initial_wrapped_state.as_ref().len(),
682+
updated_wrapped_state.as_ref().len()
683+
);
684+
685+
// Second PUT: Update the already-cached contract with new state
686+
// This tests the bug fix - the merged state should be persisted
687+
tracing::info!("Sending second PUT with updated state...");
688+
make_put(
689+
&mut client_api_a,
690+
updated_wrapped_state.clone(),
691+
contract.clone(),
692+
false,
693+
)
694+
.await?;
695+
696+
// Wait for second put response
697+
let resp = tokio::time::timeout(Duration::from_secs(120), client_api_a.recv()).await;
698+
match resp {
699+
Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => {
700+
tracing::info!("Second PUT successful for contract: {}", key);
701+
assert_eq!(key, contract_key);
702+
}
703+
Ok(Ok(other)) => {
704+
bail!("Unexpected response for second PUT: {:?}", other);
705+
}
706+
Ok(Err(e)) => {
707+
bail!("Error receiving second PUT response: {}", e);
708+
}
709+
Err(_) => {
710+
bail!("Timeout waiting for second PUT response");
711+
}
712+
}
713+
714+
// Wait a bit to ensure the merge and persistence completes
715+
tokio::time::sleep(Duration::from_secs(2)).await;
716+
717+
// Now GET the contract from both nodes and verify they both return the updated state
718+
tracing::info!("Getting contract from peer A to verify updated state persisted...");
719+
let (response_contract_a, response_state_a) =
720+
get_contract(&mut client_api_a, contract_key, &preset_cfg_a.temp_dir).await?;
721+
722+
assert_eq!(response_contract_a.key(), contract_key);
723+
724+
// Deserialize and verify the state from peer A
725+
let response_todo_list_a: test_utils::TodoList =
726+
serde_json::from_slice(response_state_a.as_ref())
727+
.expect("Failed to deserialize state from peer A");
728+
729+
tracing::info!(
730+
"Peer A returned state with {} tasks, size {} bytes",
731+
response_todo_list_a.tasks.len(),
732+
response_state_a.as_ref().len()
733+
);
734+
735+
// This is the key assertion: the state should have the 5 tasks from the second PUT
736+
// If the bug exists, it would return the empty todo list from the first PUT
737+
assert_eq!(
738+
response_todo_list_a.tasks.len(),
739+
5,
740+
"Peer A should return updated state with 5 tasks, not cached stale state"
741+
);
742+
743+
// Verify the state size matches (another way to catch the bug)
744+
assert_eq!(
745+
response_state_a.as_ref().len(),
746+
updated_wrapped_state.as_ref().len(),
747+
"State size should match the updated state, not the initial cached state"
748+
);
749+
750+
// Also verify from gateway node B
751+
let uri = format!(
752+
"ws://127.0.0.1:{ws_api_port_peer_b}/v1/contract/command?encodingProtocol=native"
753+
);
754+
let (stream, _) = connect_async(&uri).await?;
755+
let mut client_api_b = WebApi::start(stream);
756+
757+
tracing::info!("Getting contract from gateway to verify state propagated...");
758+
let (response_contract_b, response_state_b) =
759+
get_contract(&mut client_api_b, contract_key, &preset_cfg_b.temp_dir).await?;
760+
761+
assert_eq!(response_contract_b.key(), contract_key);
762+
763+
let response_todo_list_b: test_utils::TodoList =
764+
serde_json::from_slice(response_state_b.as_ref())
765+
.expect("Failed to deserialize state from gateway");
766+
767+
tracing::info!(
768+
"Gateway returned state with {} tasks, size {} bytes",
769+
response_todo_list_b.tasks.len(),
770+
response_state_b.as_ref().len()
771+
);
772+
773+
assert_eq!(
774+
response_todo_list_b.tasks.len(),
775+
5,
776+
"Gateway should also return updated state with 5 tasks"
777+
);
778+
779+
tracing::info!(
780+
"✓ Test passed: Second PUT correctly persisted merged state (issue #1995 fixed)"
781+
);
782+
783+
// Cleanup
784+
client_api_a
785+
.send(ClientRequest::Disconnect { cause: None })
786+
.await?;
787+
client_api_b
788+
.send(ClientRequest::Disconnect { cause: None })
789+
.await?;
790+
tokio::time::sleep(Duration::from_millis(100)).await;
791+
792+
Ok::<_, anyhow::Error>(())
793+
});
794+
795+
select! {
796+
a = node_a => {
797+
let Err(a) = a;
798+
return Err(anyhow!(a).into());
799+
}
800+
b = node_b => {
801+
let Err(b) = b;
802+
return Err(anyhow!(b).into());
803+
}
804+
r = test => {
805+
r??;
806+
tokio::time::sleep(Duration::from_secs(3)).await;
807+
}
808+
}
809+
810+
Ok(())
811+
}
812+
540813
// This test is disabled due to race conditions in subscription propagation logic.
541814
// The test expects multiple clients across different nodes to receive subscription updates,
542815
// but the PUT caching refactor (commits 2cd337b5-0d432347) changed the subscription semantics.

0 commit comments

Comments
 (0)