@@ -237,6 +237,10 @@ def __init__(
237237 host = self .redis_host , port = self .redis_port , db = self .redis_db
238238 )
239239 self .should_exit = False
240+ self .setup_time_queue = input_queue + "-setup-time"
241+ self .infer_time_queue = input_queue + "-run-time"
242+ self .stats_queue_length = 100
243+
240244 sys .stderr .write (
241245 f"Connected to Redis: { self .redis_host } :{ self .redis_port } (db { self .redis_db } )\n "
242246 )
@@ -281,7 +285,17 @@ def receive_message(self):
281285
282286 def start (self ):
283287 signal .signal (signal .SIGTERM , self .signal_exit )
288+ start_time = time .time ()
284289 self .model .setup ()
290+
291+ setup_time = time .time () - start_time
292+ self .redis .xadd (
293+ self .setup_time_queue ,
294+ fields = {"duration" : setup_time },
295+ maxlen = self .stats_queue_length ,
296+ )
297+ sys .stderr .write (f"Setup time: { setup_time :.2f} \n " )
298+
285299 sys .stderr .write (f"Waiting for message on { self .input_queue } \n " )
286300 while not self .should_exit :
287301 try :
@@ -296,9 +310,19 @@ def start(self):
296310 sys .stderr .write (f"Received message { infer_id } on { self .input_queue } \n " )
297311 cleanup_functions = []
298312 try :
313+ start_time = time .time ()
299314 self .handle_message (response_queue , message , cleanup_functions )
300315 self .redis .xack (self .input_queue , self .input_queue , message_id )
301- self .redis .xdel (self .input_queue , message_id ) # xdel to be able to get stream size
316+ self .redis .xdel (
317+ self .input_queue , message_id
318+ ) # xdel to be able to get stream size
319+ run_time = time .time () - start_time
320+ self .redis .xadd (
321+ self .infer_time_queue ,
322+ fields = {"duration" : run_time },
323+ maxlen = self .stats_queue_length ,
324+ )
325+ sys .stderr .write (f"Run time: { run_time :.2f} \n " )
302326 except Exception as e :
303327 tb = traceback .format_exc ()
304328 sys .stderr .write (f"Failed to handle message: { tb } \n " )
0 commit comments