-
-
Notifications
You must be signed in to change notification settings - Fork 107
refactor(routing): add upstream_addr for connection-based routing #2167
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
05bd395
18ccae1
7412431
075e75d
316998d
36cad43
3f1faba
ff5db9e
f99bec2
2bfb7cc
f8cfabe
ca24455
054cc7b
74f095f
784309a
15eb451
02a3b5b
28c4f5a
7bd31cc
3a9254e
f9b81b1
0131c7b
58dd1a3
ded8e4c
d91ca3a
85001cc
766e53d
8e1b4b7
848656f
37bee61
aaf3e3b
b4f626d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -61,7 +61,7 @@ use crate::operations::handle_op_request; | |
| pub(crate) use network_bridge::{ConnectionError, EventLoopNotificationsSender, NetworkBridge}; | ||
|
|
||
| use crate::topology::rate::Rate; | ||
| use crate::transport::{TransportKeypair, TransportPublicKey}; | ||
| use crate::transport::{ObservedAddr, TransportKeypair, TransportPublicKey}; | ||
| pub(crate) use op_state_manager::{OpManager, OpNotAvailable}; | ||
|
|
||
| mod message_processor; | ||
|
|
@@ -369,9 +369,12 @@ impl NodeConfig { | |
| let gateways: Vec<PeerKeyLocation> = self | ||
| .gateways | ||
| .iter() | ||
| .map(|node| PeerKeyLocation { | ||
| peer: node.peer_id.clone(), | ||
| location: Some(node.location), | ||
| .map(|node| { | ||
| PeerKeyLocation::with_location( | ||
| node.peer_id.pub_key.clone(), | ||
| node.peer_id.addr, | ||
| node.location, | ||
| ) | ||
| }) | ||
| .collect(); | ||
|
|
||
|
|
@@ -617,6 +620,7 @@ pub(super) async fn process_message<CB>( | |
| #[allow(clippy::too_many_arguments)] | ||
| pub(crate) async fn process_message_decoupled<CB>( | ||
| msg: NetMessage, | ||
| source_addr: Option<ObservedAddr>, | ||
| op_manager: Arc<OpManager>, | ||
| conn_manager: CB, | ||
| mut event_listener: Box<dyn NetEventRegister>, | ||
|
|
@@ -631,6 +635,7 @@ pub(crate) async fn process_message_decoupled<CB>( | |
| // Pure network message processing - no client types involved | ||
| let op_result = handle_pure_network_message( | ||
| msg, | ||
| source_addr, | ||
| op_manager.clone(), | ||
| conn_manager, | ||
| event_listener.as_mut(), | ||
|
|
@@ -676,6 +681,7 @@ pub(crate) async fn process_message_decoupled<CB>( | |
| #[allow(clippy::too_many_arguments)] | ||
| async fn handle_pure_network_message<CB>( | ||
| msg: NetMessage, | ||
| source_addr: Option<ObservedAddr>, | ||
| op_manager: Arc<OpManager>, | ||
| conn_manager: CB, | ||
| event_listener: &mut dyn NetEventRegister, | ||
|
|
@@ -688,6 +694,7 @@ where | |
| NetMessage::V1(msg_v1) => { | ||
| handle_pure_network_message_v1( | ||
| msg_v1, | ||
| source_addr, | ||
| op_manager, | ||
| conn_manager, | ||
| event_listener, | ||
|
|
@@ -725,8 +732,9 @@ async fn process_message_v1<CB>( | |
| transaction = %msg.id(), | ||
| tx_type = %msg.id().transaction_type() | ||
| ); | ||
| // Legacy path - no source_addr available | ||
| let op_result = | ||
| handle_op_request::<ConnectOp, _>(&op_manager, &mut conn_manager, op) | ||
| handle_op_request::<ConnectOp, _>(&op_manager, &mut conn_manager, op, None) | ||
| .instrument(span) | ||
| .await; | ||
|
|
||
|
|
@@ -741,8 +749,10 @@ async fn process_message_v1<CB>( | |
| .await; | ||
| } | ||
| NetMessageV1::Put(ref op) => { | ||
| // Legacy path - no source_addr available | ||
| let op_result = | ||
| handle_op_request::<put::PutOp, _>(&op_manager, &mut conn_manager, op).await; | ||
| handle_op_request::<put::PutOp, _>(&op_manager, &mut conn_manager, op, None) | ||
| .await; | ||
|
|
||
| if is_operation_completed(&op_result) { | ||
| if let Some(ref op_execution_callback) = pending_op_result { | ||
|
|
@@ -767,8 +777,10 @@ async fn process_message_v1<CB>( | |
| .await; | ||
| } | ||
| NetMessageV1::Get(ref op) => { | ||
| // Legacy path - no source_addr available | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this happen? Should we log it? Should we error out?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, this can happen - it's the current code path used by all operations (Connect/Put/Get/Subscribe/Update) in PR #2167. The This legacy path will be removed in PR #2169 when we complete the wire protocol cleanup and switch to connection-based routing everywhere. For now in #2167, I don't think we need to log it - it's expected behavior during this transition. Once #2169 lands and we switch to ObservedAddr everywhere, these legacy paths will be deleted. [AI-assisted - Claude] |
||
| let op_result = | ||
| handle_op_request::<get::GetOp, _>(&op_manager, &mut conn_manager, op).await; | ||
| handle_op_request::<get::GetOp, _>(&op_manager, &mut conn_manager, op, None) | ||
| .await; | ||
| if is_operation_completed(&op_result) { | ||
| if let Some(ref op_execution_callback) = pending_op_result { | ||
| let tx_id = *op.id(); | ||
|
|
@@ -789,10 +801,12 @@ async fn process_message_v1<CB>( | |
| .await; | ||
| } | ||
| NetMessageV1::Subscribe(ref op) => { | ||
| // Legacy path - no source_addr available | ||
| let op_result = handle_op_request::<subscribe::SubscribeOp, _>( | ||
| &op_manager, | ||
| &mut conn_manager, | ||
| op, | ||
| None, | ||
| ) | ||
| .await; | ||
| if is_operation_completed(&op_result) { | ||
|
|
@@ -815,9 +829,14 @@ async fn process_message_v1<CB>( | |
| .await; | ||
| } | ||
| NetMessageV1::Update(ref op) => { | ||
| let op_result = | ||
| handle_op_request::<update::UpdateOp, _>(&op_manager, &mut conn_manager, op) | ||
| .await; | ||
| // Legacy path - no source_addr available | ||
| let op_result = handle_op_request::<update::UpdateOp, _>( | ||
| &op_manager, | ||
| &mut conn_manager, | ||
| op, | ||
| None, | ||
| ) | ||
| .await; | ||
| if is_operation_completed(&op_result) { | ||
| if let Some(ref op_execution_callback) = pending_op_result { | ||
| let tx_id = *op.id(); | ||
|
|
@@ -857,6 +876,7 @@ async fn process_message_v1<CB>( | |
| #[allow(clippy::too_many_arguments)] | ||
| async fn handle_pure_network_message_v1<CB>( | ||
| msg: NetMessageV1, | ||
| source_addr: Option<ObservedAddr>, | ||
| op_manager: Arc<OpManager>, | ||
| mut conn_manager: CB, | ||
| event_listener: &mut dyn NetEventRegister, | ||
|
|
@@ -884,10 +904,14 @@ where | |
| transaction = %msg.id(), | ||
| tx_type = %msg.id().transaction_type() | ||
| ); | ||
| let op_result = | ||
| handle_op_request::<ConnectOp, _>(&op_manager, &mut conn_manager, op) | ||
| .instrument(span) | ||
| .await; | ||
| let op_result = handle_op_request::<ConnectOp, _>( | ||
| &op_manager, | ||
| &mut conn_manager, | ||
| op, | ||
| source_addr, | ||
| ) | ||
| .instrument(span) | ||
| .await; | ||
|
|
||
| if let Err(OpError::OpNotAvailable(state)) = &op_result { | ||
| match state { | ||
|
|
@@ -916,8 +940,13 @@ where | |
| tx = %op.id(), | ||
| "handle_pure_network_message_v1: Processing PUT message" | ||
| ); | ||
| let op_result = | ||
| handle_op_request::<put::PutOp, _>(&op_manager, &mut conn_manager, op).await; | ||
| let op_result = handle_op_request::<put::PutOp, _>( | ||
| &op_manager, | ||
| &mut conn_manager, | ||
| op, | ||
| source_addr, | ||
| ) | ||
| .await; | ||
| tracing::debug!( | ||
| tx = %op.id(), | ||
| op_result_ok = op_result.is_ok(), | ||
|
|
@@ -958,8 +987,13 @@ where | |
| .await; | ||
| } | ||
| NetMessageV1::Get(ref op) => { | ||
| let op_result = | ||
| handle_op_request::<get::GetOp, _>(&op_manager, &mut conn_manager, op).await; | ||
| let op_result = handle_op_request::<get::GetOp, _>( | ||
| &op_manager, | ||
| &mut conn_manager, | ||
| op, | ||
| source_addr, | ||
| ) | ||
| .await; | ||
|
|
||
| // Handle pending operation results (network concern) | ||
| if is_operation_completed(&op_result) { | ||
|
|
@@ -995,9 +1029,13 @@ where | |
| .await; | ||
| } | ||
| NetMessageV1::Update(ref op) => { | ||
| let op_result = | ||
| handle_op_request::<update::UpdateOp, _>(&op_manager, &mut conn_manager, op) | ||
| .await; | ||
| let op_result = handle_op_request::<update::UpdateOp, _>( | ||
| &op_manager, | ||
| &mut conn_manager, | ||
| op, | ||
| source_addr, | ||
| ) | ||
| .await; | ||
|
|
||
| if let Err(OpError::OpNotAvailable(state)) = &op_result { | ||
| match state { | ||
|
|
@@ -1026,6 +1064,7 @@ where | |
| &op_manager, | ||
| &mut conn_manager, | ||
| op, | ||
| source_addr, | ||
| ) | ||
| .await; | ||
|
|
||
|
|
@@ -1174,7 +1213,7 @@ async fn handle_aborted_op( | |
| { | ||
| let gateway = op.gateway().cloned(); | ||
| if let Some(gateway) = gateway { | ||
| tracing::warn!("Retry connecting to gateway {}", gateway.peer); | ||
| tracing::warn!("Retry connecting to gateway {}", gateway.peer()); | ||
| connect::join_ring_request(None, &gateway, op_manager).await?; | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this happen? Should we log it? Should we error out?