Skip to content

Commit b52ed01

Browse files
Merge pull request #190 from ndif-team/dev
Alert users and purge requests for processors that fail to deploy (in…
2 parents 374074d + 5be908f commit b52ed01

File tree

1 file changed

+40
-23
lines changed

1 file changed

+40
-23
lines changed

src/services/api/src/queue/coordinator.py

Lines changed: 40 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -8,25 +8,26 @@
88
- Serves status snapshots to consumers via a Redis list ("status")
99
"""
1010

11-
import json
1211
import os
1312
import pickle
1413
import time
1514
import traceback
1615
from concurrent.futures import Future
16+
from dataclasses import dataclass
1717
from enum import Enum
1818
from functools import lru_cache
1919
from typing import Optional
2020

2121
import redis
2222
from ray import serve
23+
from ray.serve.handle import DeploymentResponse
2324

2425
from ..logging import set_logger
25-
from ..providers.ray import RayProvider
2626
from ..providers.objectstore import ObjectStoreProvider
27+
from ..providers.ray import RayProvider
2728
from ..schema import BackendRequestModel, BackendResponseModel
2829
from .processor import Processor, ProcessorStatus
29-
from .util import patch, cache_maintainer
30+
from .util import cache_maintainer, patch
3031

3132

3233
class DeploymentStatus(Enum):
@@ -40,6 +41,13 @@ class DeploymentStatus(Enum):
4041
CANT_ACCOMMODATE = "cant_accommodate"
4142

4243

44+
@dataclass
45+
class DeploymentSubmission:
46+
47+
model_keys: list[str]
48+
deployment_future: DeploymentResponse
49+
50+
4351
class Coordinator:
4452
"""Orchestrates request routing and model deployment lifecycle."""
4553

@@ -48,19 +56,21 @@ def __init__(self):
4856
self.redis_client = redis.Redis.from_url(os.environ.get("BROKER_URL"))
4957
self.processors: dict[str, Processor] = {}
5058

51-
self.deployment_futures: list[Future] = []
59+
self.deployment_submissions: list[DeploymentSubmission] = []
5260
self.processors_to_deploy: list[Processor] = []
5361

5462
self.status_future: Future = None
5563
self.status_cache = None
5664
self.last_status_time = 0
57-
self.status_cache_freq_s = int(os.environ.get("COORDINATOR_STATUS_CACHE_FREQ_S", "120"))
65+
self.status_cache_freq_s = int(
66+
os.environ.get("COORDINATOR_STATUS_CACHE_FREQ_S", "120")
67+
)
5868

5969
self.logger = set_logger("coordinator")
6070

6171
# We patch the _async_send method to avoid a nasty deadlock bug in Ray.
6272
patch()
63-
73+
6474
ObjectStoreProvider.connect()
6575

6676
# Connect to Ray initially.
@@ -111,12 +121,12 @@ def loop(self):
111121
self.deploy()
112122

113123
# If there are deployments in progress, check their status.
114-
if len(self.deployment_futures) > 0:
124+
if len(self.deployment_submissions) > 0:
115125
self.initialize()
116126

117127
# Step each processor to advance its state machine.
118128
for processor in self.processors.values():
119-
#TODO catch exceptions and raise them only after all processors are done
129+
# TODO catch exceptions and raise them only after all processors are done
120130
processor.step()
121131

122132
# Serve controller status snapshots to waiting Redis consumers.
@@ -125,8 +135,10 @@ def loop(self):
125135
# If there is an error in the coordinator loop, it might be due to a connection issue.
126136
# So we reconnect to Ray and try again.
127137
except Exception as e:
128-
129-
self.logger.error(f"Error in coordinator loop: {e}\n{traceback.format_exc()}")
138+
139+
self.logger.error(
140+
f"Error in coordinator loop: {e}\n{traceback.format_exc()}"
141+
)
130142
self.connect()
131143

132144
def deploy(self):
@@ -141,7 +153,9 @@ def deploy(self):
141153
model_keys.append(processor.model_key)
142154
processor.status = ProcessorStatus.PROVISIONING
143155

144-
self.deployment_futures.append(handle.deploy.remote(model_keys))
156+
self.deployment_submissions.append(
157+
DeploymentSubmission(model_keys, handle.deploy.remote(model_keys))
158+
)
145159
self.processors_to_deploy.clear()
146160

147161
def get(self) -> BackendRequestModel:
@@ -160,7 +174,7 @@ def route(self, request: BackendRequestModel):
160174
logger=self.logger,
161175
).respond()
162176
return
163-
177+
164178
if request.model_key not in self.processors:
165179

166180
self.processors[request.model_key] = Processor(request.model_key)
@@ -169,23 +183,26 @@ def route(self, request: BackendRequestModel):
169183
self.processors[request.model_key].enqueue(request)
170184

171185
def initialize(self):
172-
"""Advance deployment futures and update processor states."""
186+
"""Advance deployment submissions and update processor states."""
173187

174188
ready = []
175189
not_ready = []
176190

177-
for i, deployment_future in enumerate(self.deployment_futures):
191+
for deployment_submission in self.deployment_submissions:
178192

179193
try:
180194

181-
result = deployment_future.result(timeout_s=0)
195+
result = deployment_submission.deployment_future.result(timeout_s=0)
182196

183197
except TimeoutError:
184-
not_ready.append(deployment_future)
185-
186-
# TODO inform those waiting on this deployment that it failed
198+
not_ready.append(deployment_submission)
199+
187200
except Exception as e:
188-
pass
201+
for model_key in deployment_submission.model_keys:
202+
self.remove(
203+
model_key,
204+
message=f"{e}\n\nThere was an error provisioning the model deployment. Please try again later. Sorry for the inconvenience.",
205+
)
189206

190207
else:
191208
ready.append(result)
@@ -208,7 +225,7 @@ def initialize(self):
208225

209226
self.remove(
210227
model_key,
211-
message=f"{status_str}\n\nThere was an error provisioning the model deployment. Sorry for the inconvenience.",
228+
message=f"{status_str}\n\nThere was an error provisioning the model deployment. Please try again later. Sorry for the inconvenience.",
212229
)
213230

214231
continue
@@ -232,7 +249,7 @@ def initialize(self):
232249
message="Model deployment evicted. Please try again later. Sorry for the inconvenience.",
233250
)
234251

235-
self.deployment_futures = not_ready
252+
self.deployment_submissions = not_ready
236253

237254
def purge(self):
238255
"""Remove all processors and purge their pending work."""
@@ -262,7 +279,7 @@ def fulfill_status(self):
262279
return
263280

264281
else:
265-
282+
266283
status = pickle.dumps(result)
267284

268285
for _ in range(self.redis_client.llen("status")):
@@ -288,7 +305,7 @@ def fulfill_status(self):
288305
id = self.redis_client.brpop("status")[1]
289306
self.redis_client.lpush(id, self.status_cache)
290307

291-
@cache_maintainer(clear_time=600)
308+
@cache_maintainer(clear_time=6000)
292309
@lru_cache(maxsize=1000)
293310
def is_dedicated_model(self, model_key: str) -> bool:
294311
"""Check if the model is dedicated."""

0 commit comments

Comments
 (0)