File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -221,7 +221,8 @@ func (c *asyncClient) sendEvents(ref *msgRef, events []publisher.Event) error {
221221 window [i ] = & events [i ].Content
222222 }
223223 ref .count .Add (1 )
224- return client .Send (ref .callback , window )
224+
225+ return client .Send (ref .customizedCallback (), window )
225226}
226227
227228func (c * asyncClient ) getClient () * v2.AsyncClient {
@@ -231,7 +232,15 @@ func (c *asyncClient) getClient() *v2.AsyncClient {
231232 return client
232233}
233234
234- func (r * msgRef ) callback (n uint32 , err error ) {
235+ func (r * msgRef ) customizedCallback () func (uint32 , error ) {
236+ start := time .Now ()
237+
238+ return func (n uint32 , err error ) {
239+ r .callback (start , n , err )
240+ }
241+ }
242+
243+ func (r * msgRef ) callback (start time.Time , n uint32 , err error ) {
235244 r .client .observer .AckedEvents (int (n ))
236245 r .slice = r .slice [n :]
237246 r .deadlockListener .ack (int (n ))
@@ -246,6 +255,11 @@ func (r *msgRef) callback(n uint32, err error) {
246255 r .win .tryGrowWindow (r .batchSize )
247256 }
248257 }
258+
259+ // Report the latency for the batch of events
260+ duration := time .Since (start )
261+ r .client .observer .ReportLatency (duration )
262+
249263 r .dec ()
250264}
251265
You can’t perform that action at this time.
0 commit comments