Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 51 additions & 6 deletions crates/core/src/operations/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,39 @@ pub(crate) async fn request_subscribe(
let target = match op_manager.ring.closest_potentially_caching(key, EMPTY) {
Some(peer) => peer,
None => {
// No remote peers available
tracing::debug!(%key, "No remote peers available for subscription");
return Err(RingError::NoCachingPeers(*key).into());
// No remote peers available - check if we have the contract locally
tracing::debug!(%key, "No remote peers available for subscription, checking locally");

if super::has_contract(op_manager, *key).await? {
// We have the contract locally, subscribe to ourselves
let own_location = op_manager.ring.connection_manager.own_location();
if op_manager
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure why we need to do this even... we may as well complete the operation. As with all other ops, having self as target for network ops leads to problems more often than not.

I think that this should already be handled by the ws API and all the client request routing and the Session Actor machinery

.ring
.add_subscriber(key, own_location.clone())
.is_err()
{
tracing::debug!(%key, "Max number of subscribers reached for local contract");
return Err(OpError::UnexpectedOpState);
}

tracing::info!(%key, "Successfully subscribed to local contract");

// Complete the operation immediately with a successful local subscription
let completed_op = SubscribeOp {
id: *id,
state: Some(SubscribeState::Completed { key: *key }),
};

// Push the completed operation back to the manager so it gets reported
op_manager
.push(*id, OpEnum::Subscribe(completed_op))
.await?;

return Ok(());
} else {
tracing::debug!(%key, "Contract not available locally and no remote peers");
return Err(RingError::NoCachingPeers(*key).into());
}
}
};

Expand Down Expand Up @@ -403,11 +433,15 @@ impl Operation for SubscribeOp {
subscribed: true,
});
} else {
tracing::debug!(
tracing::info!(
tx = %id,
%key,
"No upstream subscriber, subscription completed"
"Subscribe operation completed at originating node, should notify client"
);
// No upstream subscriber, this is the originating node
// The operation should complete and be reported to the client
// This will create OperationResult { return_msg: None, state: Some(Completed) }
// which should trigger the finalized pattern in handle_op_result
return_msg = None;
}
}
Expand All @@ -418,7 +452,18 @@ impl Operation for SubscribeOp {
_ => return Err(OpError::UnexpectedOpState),
}

build_op_result(self.id, new_state, return_msg)
let result = build_op_result(self.id, new_state, return_msg);
if let Ok(ref op_result) = result {
if let Some(ref state) = op_result.state {
if state.finalized() {
tracing::debug!(
tx = %self.id,
"Subscribe operation completing with finalized state, should be reported to client"
);
}
}
}
result
})
}
}
Expand Down
Loading