@@ -6,11 +6,13 @@ use freenet_stdlib::prelude::ContractKey;
66use futures:: Future ;
77use tokio:: sync:: mpsc:: error:: SendError ;
88
9+ use std:: net:: SocketAddr ;
10+
911use crate :: {
1012 client_events:: HostResult ,
1113 contract:: { ContractError , ExecutorError } ,
1214 message:: { InnerMessage , MessageStats , NetMessage , NetMessageV1 , Transaction , TransactionType } ,
13- node:: { ConnectionError , NetworkBridge , OpManager , OpNotAvailable , PeerId } ,
15+ node:: { ConnectionError , NetworkBridge , OpManager , OpNotAvailable } ,
1416 ring:: { Location , PeerKeyLocation , RingError } ,
1517} ;
1618
3133 fn load_or_init < ' a > (
3234 op_manager : & ' a OpManager ,
3335 msg : & ' a Self :: Message ,
36+ source_addr : Option < SocketAddr > ,
3437 ) -> impl Future < Output = Result < OpInitialization < Self > , OpError > > + ' a ;
3538
3639 fn id ( & self ) -> & Transaction ;
@@ -41,40 +44,48 @@ where
4144 conn_manager : & ' a mut CB ,
4245 op_manager : & ' a OpManager ,
4346 input : & ' a Self :: Message ,
44- // client_id : Option<ClientId >,
47+ source_addr : Option < SocketAddr > ,
4548 ) -> Pin < Box < dyn Future < Output = Result < OperationResult , OpError > > + Send + ' a > > ;
4649}
4750
4851pub ( crate ) struct OperationResult {
4952 /// Inhabited if there is a message to return to the other peer.
5053 pub return_msg : Option < NetMessage > ,
54+ /// Where to send the return message. Required if return_msg is Some.
55+ /// This replaces the old pattern of embedding target in the message itself.
56+ pub target_addr : Option < SocketAddr > ,
5157 /// None if the operation has been completed.
5258 pub state : Option < OpEnum > ,
5359}
5460
5561pub ( crate ) struct OpInitialization < Op > {
56- sender : Option < PeerId > ,
57- op : Op ,
62+ /// The source address of the peer that sent this message.
63+ /// Used for sending error responses (Aborted) and as upstream_addr.
64+ /// Note: Currently unused but prepared for Phase 4 of #2164.
65+ #[ allow( dead_code) ]
66+ pub source_addr : Option < SocketAddr > ,
67+ pub op : Op ,
5868}
5969
6070pub ( crate ) async fn handle_op_request < Op , NB > (
6171 op_manager : & OpManager ,
6272 network_bridge : & mut NB ,
6373 msg : & Op :: Message ,
74+ source_addr : Option < SocketAddr > ,
6475) -> Result < Option < OpEnum > , OpError >
6576where
6677 Op : Operation ,
6778 NB : NetworkBridge ,
6879{
69- let sender;
7080 let tx = * msg. id ( ) ;
7181 let result = {
72- let OpInitialization { sender : s, op } = Op :: load_or_init ( op_manager, msg) . await ?;
73- sender = s;
74- op. process_message ( network_bridge, op_manager, msg) . await
82+ let OpInitialization { source_addr : _, op } =
83+ Op :: load_or_init ( op_manager, msg, source_addr) . await ?;
84+ op. process_message ( network_bridge, op_manager, msg, source_addr)
85+ . await
7586 } ;
7687
77- handle_op_result ( op_manager, network_bridge, result, tx, sender ) . await
88+ handle_op_result ( op_manager, network_bridge, result, tx, source_addr ) . await
7889}
7990
8091#[ inline( always) ]
@@ -83,7 +94,7 @@ async fn handle_op_result<CB>(
8394 network_bridge : & mut CB ,
8495 result : Result < OperationResult , OpError > ,
8596 tx_id : Transaction ,
86- sender : Option < PeerId > ,
97+ source_addr : Option < SocketAddr > ,
8798) -> Result < Option < OpEnum > , OpError >
8899where
89100 CB : NetworkBridge ,
@@ -95,15 +106,16 @@ where
95106 return Ok ( None ) ;
96107 }
97108 Err ( err) => {
98- if let Some ( sender ) = sender {
109+ if let Some ( addr ) = source_addr {
99110 network_bridge
100- . send ( & sender , NetMessage :: V1 ( NetMessageV1 :: Aborted ( tx_id) ) )
111+ . send ( addr , NetMessage :: V1 ( NetMessageV1 :: Aborted ( tx_id) ) )
101112 . await ?;
102113 }
103114 return Err ( err) ;
104115 }
105116 Ok ( OperationResult {
106117 return_msg : None ,
118+ target_addr : _,
107119 state : Some ( final_state) ,
108120 } ) if final_state. finalized ( ) => {
109121 if op_manager. failed_parents ( ) . remove ( & tx_id) . is_some ( ) {
@@ -137,23 +149,24 @@ where
137149 }
138150 Ok ( OperationResult {
139151 return_msg : Some ( msg) ,
152+ target_addr,
140153 state : Some ( updated_state) ,
141154 } ) => {
142155 if updated_state. finalized ( ) {
143156 let id = * msg. id ( ) ;
144157 tracing:: debug!( %id, "operation finalized with outgoing message" ) ;
145158 op_manager. completed ( id) ;
146- if let Some ( target) = msg . target ( ) {
147- tracing:: debug!( %id, % target, "sending final message to target" ) ;
148- network_bridge. send ( & target. peer ( ) , msg) . await ?;
159+ if let Some ( target) = target_addr {
160+ tracing:: debug!( %id, ? target, "sending final message to target" ) ;
161+ network_bridge. send ( target, msg) . await ?;
149162 }
150163 return Ok ( Some ( updated_state) ) ;
151164 } else {
152165 let id = * msg. id ( ) ;
153166 tracing:: debug!( %id, "operation in progress" ) ;
154- if let Some ( target) = msg . target ( ) {
155- tracing:: debug!( %id, % target, "sending updated op state" ) ;
156- network_bridge. send ( & target. peer ( ) , msg) . await ?;
167+ if let Some ( target) = target_addr {
168+ tracing:: debug!( %id, ? target, "sending updated op state" ) ;
169+ network_bridge. send ( target, msg) . await ?;
157170 op_manager. push ( id, updated_state) . await ?;
158171 } else {
159172 tracing:: debug!( %id, "queueing op state for local processing" ) ;
@@ -174,24 +187,27 @@ where
174187
175188 Ok ( OperationResult {
176189 return_msg : None ,
190+ target_addr : _,
177191 state : Some ( updated_state) ,
178192 } ) => {
179193 let id = * updated_state. id ( ) ;
180194 op_manager. push ( id, updated_state) . await ?;
181195 }
182196 Ok ( OperationResult {
183197 return_msg : Some ( msg) ,
198+ target_addr,
184199 state : None ,
185200 } ) => {
186201 op_manager. completed ( tx_id) ;
187202
188- if let Some ( target) = msg . target ( ) {
189- tracing:: debug!( %tx_id, target=%target . peer ( ) , "sending back message to target" ) ;
190- network_bridge. send ( & target. peer ( ) , msg) . await ?;
203+ if let Some ( target) = target_addr {
204+ tracing:: debug!( %tx_id, ? target, "sending back message to target" ) ;
205+ network_bridge. send ( target, msg) . await ?;
191206 }
192207 }
193208 Ok ( OperationResult {
194209 return_msg : None ,
210+ target_addr : _,
195211 state : None ,
196212 } ) => {
197213 op_manager. completed ( tx_id) ;
0 commit comments