Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion pkg/docker/cog.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ def __init__(
host=self.redis_host, port=self.redis_port, db=self.redis_db
)
self.should_exit = False
self.setup_time_queue = input_queue + "-setup-time"
self.infer_time_queue = input_queue + "-run-time"
self.stats_queue_length = 100

sys.stderr.write(
f"Connected to Redis: {self.redis_host}:{self.redis_port} (db {self.redis_db})\n"
)
Expand Down Expand Up @@ -281,7 +285,17 @@ def receive_message(self):

def start(self):
signal.signal(signal.SIGTERM, self.signal_exit)
start_time = time.time()
self.model.setup()

setup_time = time.time() - start_time
self.redis.xadd(
self.setup_time_queue,
fields={"duration": setup_time},
maxlen=self.stats_queue_length,
)
sys.stderr.write(f"Setup time: {setup_time:.2f}\n")

sys.stderr.write(f"Waiting for message on {self.input_queue}\n")
while not self.should_exit:
try:
Expand All @@ -296,9 +310,19 @@ def start(self):
sys.stderr.write(f"Received message {infer_id} on {self.input_queue}\n")
cleanup_functions = []
try:
start_time = time.time()
self.handle_message(response_queue, message, cleanup_functions)
self.redis.xack(self.input_queue, self.input_queue, message_id)
self.redis.xdel(self.input_queue, message_id) # xdel to be able to get stream size
self.redis.xdel(
self.input_queue, message_id
) # xdel to be able to get stream size
run_time = time.time() - start_time
self.redis.xadd(
self.infer_time_queue,
fields={"duration": run_time},
maxlen=self.stats_queue_length,
)
sys.stderr.write(f"Run time: {run_time:.2f}\n")
except Exception as e:
tb = traceback.format_exc()
sys.stderr.write(f"Failed to handle message: {tb}\n")
Expand Down