Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docker/.env
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,7 @@ LOKI_VERSION=2.8.1
# They are kept here for reference and documentation purposes
RAY_VERSION=latest
API_VERSION=latest


# Setting to a positive integer value can be useful for testing purposes, but this is a convenient default for developers.
NDIF_MINIMUM_DEPLOYMENT_TIME_SECONDS=0
10 changes: 8 additions & 2 deletions src/services/api/src/queue/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,15 @@ def loop(self):
self.initialize()

# Step each processor to advance its state machine.
for processor in self.processors.values():
for processor in list(self.processors.values()):
# TODO catch exceptions and raise them only after all processors are done
processor.step()
try:
processor.step()
except LookupError as e:
self.remove(
processor.model_key,
message="Model deployment evicted. Please try again later. Sorry for the inconvenience.",
)

# Serve controller status snapshots to waiting Redis consumers.
self.fulfill_status()
Expand Down
48 changes: 20 additions & 28 deletions src/services/api/src/queue/handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@
`model_key`, checks readiness, and provides `execute` to submit requests.
"""

import os
import logging

from ray.serve._private.client import *
from ray.serve.api import _get_global_client
import ray
from ray.serve._private.client import RayServeException
from ray.util.client import ray as client_ray
from ray.util.client.common import return_refs
from slugify import slugify

from ..schema import BackendRequestModel

logger = logging.getLogger("ndif")


class Handle:
"""Encapsulates interaction with a single model Ray Serve deployment."""
Expand All @@ -27,28 +28,12 @@ def __init__(self, model_key: str):

self.model_key = model_key

app_name = self.app_name

# We reimplement `serve.get_app_handle` to manually look up the ingress name.
# That way we can add a timeout and avoid a deadlock.
client = _get_global_client()
deployment_name = ray.get(
client._controller.get_ingress_deployment_name.remote(app_name),
timeout=int(os.environ.get("COORDINATOR_HANDLE_TIMEOUT_S", "5")),
)

if deployment_name is None:
raise RayServeException(f"Application '{app_name}' does not exist.")

self.handle = DeploymentHandle(deployment_name, app_name)
# We need the namespace of the deployment as otherwise we can't lookup its model actor.
# We use completion of this call to determine the deployment is finished initializing.
self.namespace = self.handle.ready.remote()

@property
def app_name(self):
"""Canonical Ray Serve app name derived from `model_key`."""
return f"Model:{slugify(self.model_key)}"
try:
actor = ray.get_actor(self.actor_name, namespace="NDIF")
self.ready_future = actor.__ray_ready__.remote()
except Exception as e:
logger.info(f"Error getting actor '{self.actor_name}': {e}")
raise RayServeException(f"Actor '{self.actor_name}' does not exist.")

@property
def actor_name(self):
Expand All @@ -60,15 +45,22 @@ def ready(self):
"""Whether the Ray Serve deployment reports readiness."""
try:
# Poll the namespace future until it completes.
self.namespace = self.namespace.result(timeout_s=0)
ray.get(self.ready_future, timeout=0)
return True
except TimeoutError:
return False

def execute(self, request: BackendRequestModel):
"""Submit `request` to the model actor and return a Ray future."""

actor = ray.get_actor(self.actor_name, namespace=self.namespace)
try:
actor = ray.get_actor(self.actor_name, namespace="NDIF")
except ValueError as e:

if str(e).startswith("Failed to look up actor"):
raise LookupError("Model deployment evicted.")

raise

# We reimplement `handle.remote()` to avoid an issue about looking up the signature for the remote call.
# I imagine this has something to do with getting a named actor from a client not in the namespace.
Expand Down
21 changes: 14 additions & 7 deletions src/services/api/src/queue/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,17 +121,19 @@ def execute(self):
result,
)
# If there is an error submitting the request, respond to the user with an error.
except:
except Exception as e:

if isinstance(e, LookupError):
message = "Model deployment evicted. Please try again later. Sorry for the inconvenience."
else:
message = f"{traceback.format_exc()}\nIssue submitting job to model deployment."

request.create_response(
BackendResponseModel.JobStatus.ERROR,
logger,
f"{traceback.format_exc()}\nIssue submitting job to model deployment.",
message,
).respond()

# Re-raise the error to be handled by the coordinator to check for connection issues.
raise

else:

# The request was submitted successfully, so we can transition to BUSY.
Expand Down Expand Up @@ -181,17 +183,22 @@ def check_submission(self):
return

# If there is an error checking the status of the in-flight submission, respond to the user with an error.
except:
except Exception as e:

request = self.submission.request

self.status = ProcessorStatus.READY
self.submission = None

if isinstance(e, LookupError):
message = "Model deployment evicted. Please try again later. Sorry for the inconvenience."
else:
message = f"{traceback.format_exc()}\nIssue checking job status."

request.create_response(
BackendResponseModel.JobStatus.ERROR,
logger,
f"{traceback.format_exc()}\nIssue checking job status.",
message,
).respond()

# Re-raise the error to be handled by the coordinator to check for connection issues.
Expand Down
26 changes: 10 additions & 16 deletions src/services/ray/src/ray/deployments/controller/cluster/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ def __init__(
self.minimum_deployment_time_seconds = minimum_deployment_time_seconds
self.model_cache_percentage = model_cache_percentage

self.update_nodes()

@property
def state(self):
Expand Down Expand Up @@ -105,7 +104,7 @@ def update_nodes(self):
gpu_memory_bytes=gpu_memory_bytes,
cpu_memory_bytes=cpu_memory_bytes,
available_cpu_memory_bytes=cpu_memory_bytes,
available_gpus=total_gpus,
available_gpus=list(range(int(total_gpus))),
),
minimum_deployment_time_seconds=self.minimum_deployment_time_seconds,
)
Expand All @@ -114,12 +113,15 @@ def update_nodes(self):
f"=> Node {name} updated with resources: {self.nodes[id].resources}"
)

for node in self.nodes.keys():
if node not in current_nodes:
for node_id in self.nodes.keys():
if node_id not in current_nodes:

del self.nodes[node]
node = self.nodes.pop(node_id)
node.purge()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to what I commented regarding deployment.delete(), I think any exceptions that happen in purge should propagate up here, and at the very least be logged as an error before proceeding


logger.info(f"=> Node {node_id} removed from cluster")


logger.info(f"=> Node {node} removed from cluster")

def deploy(self, model_keys: List[MODEL_KEY], dedicated: Optional[bool] = False):
"""
Expand Down Expand Up @@ -161,8 +163,6 @@ def deploy(self, model_keys: List[MODEL_KEY], dedicated: Optional[bool] = False)

logger.info("=> Checking to evict deprecated dedicated deployments...")

cache_futures = []

for node in self.nodes.values():

for model_key, deployment in list(node.deployments.items()):
Expand All @@ -175,16 +175,10 @@ def deploy(self, model_keys: List[MODEL_KEY], dedicated: Optional[bool] = False)

results['evictions'].add(model_key)

if node.evict(model_key, exclude=set(model_keys)):
cache_future = deployment.cache()
node.evict(model_key, exclude=set(model_keys))

if cache_future is not None:
cache_futures.append(cache_future)

change = True

ray.get(cache_futures)

# Sort models by size in descending order (deploy biggest ones first)
sorted_models = sorted(
model_sizes_in_bytes.items(), key=lambda x: x[1], reverse=True
Expand Down Expand Up @@ -262,7 +256,7 @@ def deploy(self, model_keys: List[MODEL_KEY], dedicated: Optional[bool] = False)
logger.info(
f"=> Deploying {model_key} with size {size_in_bytes} on {self.nodes[node_id].name} because {candidate_level.name}. Requiring evictions: {candidate.evictions}"
)

self.nodes[node_id].deploy(
model_key, candidate, size_in_bytes, dedicated=dedicated, exclude=set(model_keys)
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
import time
import logging
import time
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Dict

import ray

from .....providers.mailgun import MailgunProvider
from .....providers.objectstore import ObjectStoreProvider
from .....providers.socketio import SioProvider
from .....types import MODEL_KEY
from ...modeling.base import BaseModelDeploymentArgs, ModelActor

logger = logging.getLogger("ndif")


class DeploymentLevel(Enum):

HOT = "hot"
Expand All @@ -22,34 +28,38 @@ def __init__(
self,
model_key: MODEL_KEY,
deployment_level: DeploymentLevel,
gpus_required: int,
gpus: list[int],
size_bytes: int,
dedicated: bool = False,
cached: bool = False,
):

self.model_key = MODEL_KEY(model_key)
self.deployment_level = deployment_level
self.gpus_required = gpus_required
self.gpus = gpus
self.size_bytes = size_bytes
self.dedicated = dedicated
self.cached = cached
self.deployed = time.time()

@property
def name(self):
return f"ModelActor:{self.model_key}"

@property
def actor(self):
return ray.get_actor(self.name, namespace="NDIF")

def get_state(self) -> Dict[str, Any]:
"""Get the state of the deployment."""

return {
"model_key": self.model_key,
"deployment_level": self.deployment_level.value,
"gpus_required": self.gpus_required,
"gpus": self.gpus,
"size_bytes": self.size_bytes,
"dedicated": self.dedicated,
"cached": self.cached,
"deployed": self.deployed,
}


def end_time(self, minimim_deployment_time_seconds: int) -> datetime:

return datetime.fromtimestamp(
Expand All @@ -59,17 +69,54 @@ def end_time(self, minimim_deployment_time_seconds: int) -> datetime:
def delete(self):

try:
actor = ray.get_actor(f"ModelActor:{self.model_key}")
ray.kill(actor)
actor = self.actor
ray.kill(actor, no_restart=True)
except Exception:
logger.exception(f"Error removing actor {self.model_key} from cache.")
logger.exception(f"Error deleting actor {self.model_key}.")
pass


def restart(self):

try:
actor = self.actor
ray.kill(actor, no_restart=False)
except Exception:
logger.exception(f"Error restarting actor {self.model_key}.")
pass

def cache(self):

try:
actor = ray.get_actor(f"ModelActor:{self.model_key}")
actor = self.actor
return actor.to_cache.remote()
except Exception:
logger.exception(f"Error adding actor {self.model_key} to cache.")
pass
return None

def from_cache(self):

try:
actor = self.actor
return actor.from_cache.remote(",".join(str(gpu) for gpu in self.gpus))
except Exception:
logger.exception(f"Error removing actor {self.model_key} from cache.")
return None

def create(self, node_name: str, deployment_args: BaseModelDeploymentArgs):

try:

actor = ModelActor.options(
name=self.name,
resources={f"node:{node_name}": 0.01},
namespace="NDIF",
lifetime="detached",
runtime_env={
**SioProvider.to_env(),
**ObjectStoreProvider.to_env(),
**MailgunProvider.to_env(),
},
).remote(**deployment_args.model_dump())

except Exception:
logger.exception(f"Error creating actor {self.model_key}.")
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def __call__(self, model_key: MODEL_KEY) -> Union[float, Exception]:
model_size_bytes,
n_params,
meta_model._model.config,
getattr(meta_model._model, "revision", "main"),
meta_model.revision,
)

logger.info(f"=> New model evaluated: {model_key} size: {model_size_bytes}")
Expand Down
Loading