From 159c54a4c62fc95c01f885b01313c302594bee05 Mon Sep 17 00:00:00 2001 From: andreasjansson Date: Thu, 29 Apr 2021 10:22:31 -0700 Subject: [PATCH] Record timing information in worker Write setup and run time to stderr and to a fixed-length Redis stream Signed-off-by: andreasjansson --- pkg/docker/cog.py | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/pkg/docker/cog.py b/pkg/docker/cog.py index 51f9e5d809..0104f00b07 100644 --- a/pkg/docker/cog.py +++ b/pkg/docker/cog.py @@ -237,6 +237,10 @@ def __init__( host=self.redis_host, port=self.redis_port, db=self.redis_db ) self.should_exit = False + self.setup_time_queue = input_queue + "-setup-time" + self.infer_time_queue = input_queue + "-run-time" + self.stats_queue_length = 100 + sys.stderr.write( f"Connected to Redis: {self.redis_host}:{self.redis_port} (db {self.redis_db})\n" ) @@ -281,7 +285,17 @@ def receive_message(self): def start(self): signal.signal(signal.SIGTERM, self.signal_exit) + start_time = time.time() self.model.setup() + + setup_time = time.time() - start_time + self.redis.xadd( + self.setup_time_queue, + fields={"duration": setup_time}, + maxlen=self.stats_queue_length, + ) + sys.stderr.write(f"Setup time: {setup_time:.2f}\n") + sys.stderr.write(f"Waiting for message on {self.input_queue}\n") while not self.should_exit: try: @@ -296,9 +310,19 @@ def start(self): sys.stderr.write(f"Received message {infer_id} on {self.input_queue}\n") cleanup_functions = [] try: + start_time = time.time() self.handle_message(response_queue, message, cleanup_functions) self.redis.xack(self.input_queue, self.input_queue, message_id) - self.redis.xdel(self.input_queue, message_id) # xdel to be able to get stream size + self.redis.xdel( + self.input_queue, message_id + ) # xdel to be able to get stream size + run_time = time.time() - start_time + self.redis.xadd( + self.infer_time_queue, + fields={"duration": run_time}, + maxlen=self.stats_queue_length, + ) + sys.stderr.write(f"Run time: {run_time:.2f}\n") except Exception as e: tb = traceback.format_exc() sys.stderr.write(f"Failed to handle message: {tb}\n")