diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 67ba9645..a884b235 100755 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -15,7 +15,7 @@ services: shm_size: '15gb' volumes: - ~/.cache/huggingface/:/root/.cache/huggingface - - ray-data:/tmp/ray/ + - ray-data:/tmp/ray/ ports: - ${DEV_RAY_HEAD_PORT}:${RAY_HEAD_INTERNAL_PORT} @@ -32,7 +32,7 @@ services: capabilities: [ gpu ] environment: - LOKI_URL=http://${HOST_IP}:${DEV_LOKI_PORT}/loki/api/v1/push - - OBJECT_STORE_URL=${HOST_IP}:${DEV_MINIO_PORT} + - OBJECT_STORE_URL=http://${HOST_IP}:${DEV_MINIO_PORT} - API_URL=http://${HOST_IP}:${DEV_API_PORT} - INFLUXDB_ADDRESS=http://${HOST_IP}:${DEV_INFLUXDB_PORT} - INFLUXDB_ADMIN_TOKEN=${SECRET_INFLUXDB_ADMIN_TOKEN} @@ -50,7 +50,7 @@ services: ports: - ${DEV_API_PORT}:${API_INTERNAL_PORT} environment: - OBJECT_STORE_URL: ${HOST_IP}:${DEV_MINIO_PORT} + OBJECT_STORE_URL: http://${HOST_IP}:${DEV_MINIO_PORT} BROKER_URL: ${BROKER_PROTOCOL}@${HOST_IP}:${DEV_BROKER_PORT}/ WORKERS: ${API_WORKERS} RAY_ADDRESS: ray://${HOST_IP}:${DEV_RAY_CLIENT_PORT} diff --git a/src/common/providers/objectstore.py b/src/common/providers/objectstore.py index a8fc2d9d..2411434e 100755 --- a/src/common/providers/objectstore.py +++ b/src/common/providers/objectstore.py @@ -43,7 +43,7 @@ def connect(cls): logger.info(f"Connecting to object store at {cls.object_store_url}...") cls.object_store = boto3.client( cls.object_store_service, - endpoint_url=f"http://{cls.object_store_url}", + endpoint_url=cls.object_store_url, aws_access_key_id=cls.object_store_access_key, aws_secret_access_key=cls.object_store_secret_key, region_name=cls.object_store_region, diff --git a/src/common/providers/ray.py b/src/common/providers/ray.py index 719a1b6a..60f50a98 100755 --- a/src/common/providers/ray.py +++ b/src/common/providers/ray.py @@ -61,14 +61,24 @@ def connect(cls): host, port = cls.get_host_port() if not verify_connection(host, port): raise ConnectionError(f"Ray is not listening on {host}:{port}") - - logger.info(f"Connecting to Ray at {cls.ray_url}...") ray.init(logging_level="error", address=cls.ray_url) - logger.info("Connected to Ray") @classmethod def connected(cls) -> bool: - return ray.is_initialized() and cls.is_listening() + + connected = ray.is_initialized() and cls.is_listening() + + if connected: + + try: + ray.get_actor("Controller", namespace="NDIF") + except: + return False + else: + return True + + return False + @classmethod def reset(cls): diff --git a/src/common/providers/socketio.py b/src/common/providers/socketio.py index 13eaf4a4..0fa11bef 100755 --- a/src/common/providers/socketio.py +++ b/src/common/providers/socketio.py @@ -29,7 +29,6 @@ def to_env(cls) -> dict: @classmethod @retry def connect(cls): - logger.info(f"Connecting to API at {cls.api_url}...") if cls.sio is None: logger.debug("Creating new socketio client") cls.sio = socketio.SimpleClient(reconnection_attempts=10) @@ -42,17 +41,11 @@ def connect(cls): ) # Wait for connection to be fully established time.sleep(0.1) - logger.info("Connected to API") @classmethod def disconnect(cls): - logger.debug("SioProvider.disconnect() called") if cls.sio is not None: - logger.info("Disconnecting socketio client") cls.sio.disconnect() - logger.debug("Socketio client disconnected") - else: - logger.debug("No socketio client to disconnect") @classmethod def connected(cls) -> bool: diff --git a/src/common/schema/mixins.py b/src/common/schema/mixins.py index 366f5c5b..d4e8bd00 100644 --- a/src/common/schema/mixins.py +++ b/src/common/schema/mixins.py @@ -50,6 +50,9 @@ class ObjectStorageMixin(BaseModel): @classmethod def object_name(cls, id: str): return f"{id}.{cls._file_extension}" + + def _url(self, client: boto3.client) -> str: + return client.generate_presigned_url('get_object', Params={'Bucket': self._bucket_name, 'Key': self.object_name(self.id)}, ExpiresIn=3600 * 2) def _save(self, client: boto3.client, data: BytesIO, content_type: str, bucket_name: str = None) -> None: bucket_name = self._bucket_name if bucket_name is None else bucket_name diff --git a/src/common/schema/result.py b/src/common/schema/result.py index 740df305..63ec0b08 100644 --- a/src/common/schema/result.py +++ b/src/common/schema/result.py @@ -1,10 +1,13 @@ from typing import ClassVar -from pydantic import ConfigDict +from pydantic import ConfigDict from .mixins import ObjectStorageMixin - +from ..providers.objectstore import ObjectStoreProvider class BackendResultModel(ObjectStorageMixin): - - model_config = ConfigDict(extra='allow') + model_config = ConfigDict(extra='allow', validate_assignment=False, frozen=False, arbitrary_types_allowed=True, str_strip_whitespace=False, strict=False) _bucket_name: ClassVar[str] = "dev-ndif-results" _file_extension: ClassVar[str] = "pt" + + def url(self) -> str: + return self._url(ObjectStoreProvider.object_store) + diff --git a/src/services/api/src/app.py b/src/services/api/src/app.py index 2bab00f0..9aac934b 100755 --- a/src/services/api/src/app.py +++ b/src/services/api/src/app.py @@ -6,27 +6,22 @@ import redis import socketio import uvicorn -from fastapi import BackgroundTasks, Depends, FastAPI, Request +from fastapi import BackgroundTasks, Depends, FastAPI from fastapi.middleware.cors import CORSMiddleware -from fastapi.responses import StreamingResponse - -from fastapi_cache.decorator import cache from fastapi_socketio import SocketManager -from nnsight.schema.response import ResponseModel from prometheus_fastapi_instrumentator import Instrumentator -from .types import REQUEST_ID, SESSION_ID +from nnsight.schema.response import ResponseModel + from .logging import set_logger +from .types import REQUEST_ID, SESSION_ID logger = set_logger("API") from .dependencies import validate_request from .metrics import NetworkStatusMetric from .providers.objectstore import ObjectStoreProvider -from .schema import (BackendRequestModel, BackendResponseModel, - BackendResultModel) - - +from .schema import BackendRequestModel, BackendResponseModel # Init FastAPI app app = FastAPI() @@ -64,7 +59,7 @@ @app.post("/request") async def request( background_tasks: BackgroundTasks, - backend_request: BackendRequestModel = Depends(validate_request) + backend_request: BackendRequestModel = Depends(validate_request), ) -> BackendResponseModel: """Endpoint to submit request. See src/common/schema/request.py to see the headers and data that are validated and populated. @@ -108,36 +103,6 @@ async def request( return response -# @app.delete("/request/{request_id}") -# async def delete_request(request_id: str): -# """Delete a submitted request, provided it is either queued or running""" -# try: -# endpoint = f"http://{os.environ.get('QUEUE_URL')}/queue/{request_id}" -# async with httpx.AsyncClient() as client: -# response = await client.delete(endpoint) -# response.raise_for_status() -# return {"message": f"Request {request_id} successfully submitted for deletion!"} -# except httpx.HTTPStatusError as e: -# # Handle HTTP errors from the queue service -# if e.response is not None and e.response.status_code == 404: -# raise HTTPException(status_code=404, detail=f"Request {request_id} not found") -# elif e.response is not None and e.response.status_code == 500: -# # Try to extract the error message from the queue service -# try: -# error_detail = e.response.json().get('detail', str(e)) -# except: -# error_detail = str(e) -# raise HTTPException(status_code=500, detail=f"Failed to delete request: {error_detail}") -# else: -# status = e.response.status_code if e.response is not None else 500 -# raise HTTPException(status_code=status, detail=str(e)) -# except httpx.RequestError as e: -# # Handle connection errors, timeouts, etc. -# raise HTTPException(status_code=503, detail=f"Queue service unavailable: {e}") -# except Exception as e: -# raise HTTPException(status_code=500, detail=f"Unexpected error: {e}") - - @sm.on("connect") async def connect(session_id: SESSION_ID, environ: Dict): params = environ.get("QUERY_STRING") @@ -149,14 +114,18 @@ async def connect(session_id: SESSION_ID, environ: Dict): @sm.on("blocking_response") -async def blocking_response(session_id: SESSION_ID, client_session_id: SESSION_ID, data: Any): +async def blocking_response( + session_id: SESSION_ID, client_session_id: SESSION_ID, data: Any +): await sm.emit("blocking_response", data=data, to=client_session_id) @sm.on("stream") -async def stream(session_id: SESSION_ID, client_session_id: SESSION_ID, data: bytes, job_id: str): - +async def stream( + session_id: SESSION_ID, client_session_id: SESSION_ID, data: bytes, job_id: str +): + await sm.enter_room(session_id, job_id) await blocking_response(session_id, client_session_id, data) @@ -183,65 +152,20 @@ async def response(id: REQUEST_ID) -> BackendResponseModel: return BackendResponseModel.load(ObjectStoreProvider.object_store, id) -@app.get("/result/{id}") -async def result(id: REQUEST_ID) -> BackendResultModel: - """Endpoint to retrieve result for id. - - Args: - id: ID of request/response. - - Returns: - BackendResultModel: Result. - - Yields: - Iterator[BackendResultModel]: _description_ - """ - - # Get cursor to bytes stored in data backend. - object, content_length = BackendResultModel.load(ObjectStoreProvider.object_store, id, stream=True) - - # Inform client the total size of result in bytes. - headers = { - "Content-length": str(content_length), - } - - def stream(): - try: - while True: - data = object.read(8192) - if not data: - break - yield data - finally: - object.close() - - BackendResultModel.delete(ObjectStoreProvider.object_store, id) - BackendResponseModel.delete(ObjectStoreProvider.object_store, id) - BackendRequestModel.delete(ObjectStoreProvider.object_store, id) - - return StreamingResponse( - content=stream(), - media_type="application/octet-stream", - headers=headers, - ) - - @app.get("/ping", status_code=200) async def ping(): - """Endpoint to check if the server is online. - """ + """Endpoint to check if the server is online.""" return "pong" @app.get("/status", status_code=200) async def status(): - + id = str(os.getpid()) - + await redis_client.lpush("status", id) result = await redis_client.brpop(id) return pickle.loads(result[1]) - if __name__ == "__main__": diff --git a/src/services/api/src/dependencies.py b/src/services/api/src/dependencies.py index fb9badec..642e69f9 100755 --- a/src/services/api/src/dependencies.py +++ b/src/services/api/src/dependencies.py @@ -55,6 +55,10 @@ async def validate_python_version(python_version: str) -> str: Raises: HTTPException: If the Python version is missing or incompatible. """ + + if DEV_MODE: + return python_version + server_python_version = '.'.join(sys.version.split('.')[0:2]) # e.g. 3.12 user_python_version = '.'.join(python_version.split('.')[0:2]) @@ -85,6 +89,10 @@ async def validate_nnsight_version(nnsight_version: str) -> str: Raises: HTTPException: If the nnsight version is missing or incompatible. """ + + if DEV_MODE: + return nnsight_version + if nnsight_version == '': raise HTTPException( status_code=HTTP_400_BAD_REQUEST, @@ -143,7 +151,7 @@ async def validate_request(raw_request: Request) -> BackendRequestModel: nnsight_version = raw_request.headers.get("nnsight-version", "") python_version = raw_request.headers.get("python-version", "") - # Validate using existing dependency functions (call them directly, not as dependencies) + # # Validate using existing dependency functions (call them directly, not as dependencies) await authenticate_api_key(api_key) await validate_nnsight_version(nnsight_version) await validate_python_version(python_version) diff --git a/src/services/api/src/gunicorn.conf.py b/src/services/api/src/gunicorn.conf.py index ff333c40..558701c9 100644 --- a/src/services/api/src/gunicorn.conf.py +++ b/src/services/api/src/gunicorn.conf.py @@ -1,6 +1,6 @@ from multiprocessing import Process -from src.queue.coordinator import Coordinator +from src.queue.dispatcher import Dispatcher def on_starting(server): - Process(target=Coordinator.start, daemon=False).start() \ No newline at end of file + Process(target=Dispatcher.start, daemon=False).start() \ No newline at end of file diff --git a/src/services/api/src/queue/coordinator.py b/src/services/api/src/queue/coordinator.py deleted file mode 100755 index a77b66e9..00000000 --- a/src/services/api/src/queue/coordinator.py +++ /dev/null @@ -1,331 +0,0 @@ -"""Queue coordinator for routing API requests to model processors. - -The coordinator: -- Receives serialized `BackendRequestModel` items from a Redis list ("queue") -- Routes each request to the `Processor` for its `model_key` -- Requests deployments via the Ray Serve `Controller` when processors are missing -- Tracks deployment futures and advances processors through lifecycle states -- Serves status snapshots to consumers via a Redis list ("status") -""" - -import os -import pickle -import time -import traceback -from concurrent.futures import Future -from dataclasses import dataclass -from enum import Enum -from functools import lru_cache -from typing import Optional - -import ray -import redis -from ray.util.client import ray as client_ray -from ray.util.client.common import return_refs - -from ..logging import set_logger -from ..providers.objectstore import ObjectStoreProvider -from ..providers.ray import RayProvider -from ..schema import BackendRequestModel, BackendResponseModel -from ..types import MODEL_KEY -from .processor import Processor, ProcessorStatus -from .util import cache_maintainer, patch - - -class DeploymentStatus(Enum): - """Deployment states reported by the controller.""" - - DEPLOYED = "deployed" - CACHED_AND_FREE = "cached_and_free" - FREE = "free" - CACHED_AND_FULL = "cached_and_full" - FULL = "full" - CANT_ACCOMMODATE = "cant_accommodate" - - -@dataclass -class DeploymentSubmission: - - model_keys: list[MODEL_KEY] - deployment_future: ray.ObjectRef - - -class Coordinator: - """Orchestrates request routing and model deployment lifecycle.""" - - def __init__(self): - - self.redis_client = redis.Redis.from_url(os.environ.get("BROKER_URL")) - self.processors: dict[MODEL_KEY, Processor] = {} - - self.deployment_submissions: list[DeploymentSubmission] = [] - self.processors_to_deploy: list[Processor] = [] - - self.status_future: Future = None - self.status_cache = None - self.last_status_time = 0 - self.status_cache_freq_s = int( - os.environ.get("COORDINATOR_STATUS_CACHE_FREQ_S", "120") - ) - - self.logger = set_logger("coordinator") - - # We patch the _async_send method to avoid a nasty deadlock bug in Ray. - patch() - - ObjectStoreProvider.connect() - - # Connect to Ray initially. - self.connect() - - @classmethod - def start(cls): - """Start a coordinator and enter its main loop.""" - coordinator = cls() - coordinator.loop() - - @property - def controller_handle(self): - """Return Ray Serve handle to the `Controller` application.""" - return ray.get_actor("Controller", namespace="NDIF") - - def connect(self): - """Ensure connection to Ray, retrying until successful. - - On errors, purge processors to reset local state before reconnecting. - """ - self.logger.info(f"Connecting to Ray") - - while not RayProvider.connected(): - - self.purge() - - try: - - RayProvider.reset() - RayProvider.connect() - - except Exception as e: - self.logger.error(f"Error connecting to Ray: {e}") - - def loop(self): - """Main event loop for routing, deploying, and stepping processors.""" - while True: - - try: - # Get all requests currently in the queue and route them to the appropriate processors. - for _ in range(self.redis_client.llen("queue")): - request = self.get() - self.route(request) - - # If there are processors waiting to be deployed, deploy them. - if len(self.processors_to_deploy) > 0: - self.deploy() - - # If there are deployments in progress, check their status. - if len(self.deployment_submissions) > 0: - self.initialize() - - # Step each processor to advance its state machine. - for processor in list(self.processors.values()): - # TODO catch exceptions and raise them only after all processors are done - try: - processor.step() - except LookupError as e: - self.remove( - processor.model_key, - message="Model deployment evicted. Please try again later. Sorry for the inconvenience.", - ) - - # Serve controller status snapshots to waiting Redis consumers. - self.fulfill_status() - - # If there is an error in the coordinator loop, it might be due to a connection issue. - # So we reconnect to Ray and try again. - except Exception as e: - - self.logger.error( - f"Error in coordinator loop: {e}\n{traceback.format_exc()}" - ) - self.connect() - - def deploy(self): - """Batch request deployments for processors awaiting provisioning.""" - - handle = self.controller_handle - - model_keys = [] - - for processor in self.processors_to_deploy: - - model_keys.append(processor.model_key) - processor.status = ProcessorStatus.PROVISIONING - - self.deployment_submissions.append( - DeploymentSubmission(model_keys, return_refs(client_ray.call_remote(handle.deploy, model_keys))) - ) - self.processors_to_deploy.clear() - - def get(self) -> BackendRequestModel: - """Pop one serialized request from Redis and deserialize it.""" - return pickle.loads(self.redis_client.brpop("queue")[1]) - - def route(self, request: BackendRequestModel): - """Route a request to the per-model processor, creating it if missing.""" - - # If user does not have hotswapping access, check that the model is dedicated. - if not request.hotswapping: - if not self.is_dedicated_model(request.model_key): - request.create_response( - status=BackendResponseModel.JobStatus.ERROR, - description=f"Model {request.model_key} is not a scheduled model and hotswapping is not supported for this API key. See https://nnsight.net/status/ for a list of scheduled models.", - logger=self.logger, - ).respond() - return - - if request.model_key not in self.processors: - - self.processors[request.model_key] = Processor(request.model_key) - self.processors_to_deploy.append(self.processors[request.model_key]) - - self.processors[request.model_key].enqueue(request) - - def initialize(self): - """Advance deployment submissions and update processor states.""" - - ready = [] - not_ready = [] - - for deployment_submission in self.deployment_submissions: - - try: - - result = ray.get(deployment_submission.deployment_future, timeout=0) - - except TimeoutError: - not_ready.append(deployment_submission) - - except Exception as e: - for model_key in deployment_submission.model_keys: - self.remove( - model_key, - message=f"{e}\n\nThere was an error provisioning the model deployment. Please try again later. Sorry for the inconvenience.", - ) - - else: - ready.append(result) - - for result in ready: - - deployment_statuses = result["result"] - - evictions = result["evictions"] - - for model_key, status in deployment_statuses.items(): - - status_str = str(status).lower() - - try: - - deployment_status = DeploymentStatus(status_str) - - except ValueError: - - self.remove( - model_key, - message=f"{status_str}\n\nThere was an error provisioning the model deployment. Please try again later. Sorry for the inconvenience.", - ) - - continue - - if deployment_status == DeploymentStatus.CANT_ACCOMMODATE: - - self.remove( - model_key, - message="Model deployment cannot be accomodated at this time. Please try again later. Sorry for the inconvenience.", - ) - - continue - - else: - - self.processors[model_key].status = ProcessorStatus.DEPLOYING - - for eviction in evictions: - self.remove( - eviction, - message="Model deployment evicted. Please try again later. Sorry for the inconvenience.", - ) - - self.deployment_submissions = not_ready - - def purge(self): - """Remove all processors and purge their pending work.""" - for model_key in list(self.processors.keys()): - self.remove(model_key) - - def remove(self, model_key: MODEL_KEY, message: Optional[str] = None): - """Remove a processor and purge its outstanding work. - - Args: - model_key: Model identifier for the processor to remove. - message: Optional message sent to any queued requests. - """ - processor = self.processors.pop(model_key) - - processor.purge(message=message) - - def fulfill_status(self): - """Serve controller status snapshots to waiting Redis consumers.""" - if self.status_future is not None: - - try: - - result = ray.get(self.status_future, timeout=0) - - except TimeoutError: - return - - else: - - status = pickle.dumps(result) - - for _ in range(self.redis_client.llen("status")): - id = self.redis_client.brpop("status")[1] - self.redis_client.lpush(id, status) - - self.status_future = None - self.last_status_time = time.time() - self.status_cache = status - - elif self.redis_client.llen("status") > 0: - - if ( - self.status_cache is None - or time.time() - self.last_status_time > self.status_cache_freq_s - ): - - self.status_future = return_refs(client_ray.call_remote(self.controller_handle.status)) - - else: - - for _ in range(self.redis_client.llen("status")): - id = self.redis_client.brpop("status")[1] - self.redis_client.lpush(id, self.status_cache) - - @cache_maintainer(clear_time=6000) - @lru_cache(maxsize=1000) - def is_dedicated_model(self, model_key: MODEL_KEY) -> bool: - """Check if the model is dedicated.""" - try: - result = ray.get(return_refs(client_ray.call_remote(self.controller_handle.get_deployment, model_key)), timeout=int(os.environ.get("COORDINATOR_HANDLE_TIMEOUT_S", "5"))) - - # Dedicated models are deployed automatically on startup - the absence of a deployment means it's not dedicated. - if result is None: - return False - - return result.get("dedicated", False) - - except Exception as e: - self.logger.error(f"Error checking if model is dedicated: {e}") - return False diff --git a/src/services/api/src/queue/dispatcher.py b/src/services/api/src/queue/dispatcher.py new file mode 100755 index 00000000..2ea2e68f --- /dev/null +++ b/src/services/api/src/queue/dispatcher.py @@ -0,0 +1,160 @@ +import asyncio +import os +import pickle +import time +import redis + +from ..logging import set_logger +from ..providers.ray import RayProvider +from ..schema import BackendRequestModel +from .processor import Processor, ProcessorStatus +from .util import patch, controller_handle, submit + + +class Dispatcher: + + def __init__(self): + self.redis_client = redis.asyncio.Redis.from_url(os.environ.get("BROKER_URL")) + self.processors: dict[str, Processor] = {} + + self.error_queue = asyncio.Queue() + self.eviction_queue = asyncio.Queue() + + self.cached_status = None + self.last_status_time = 0 + self.status_cache_freq_s = int( + os.environ.get("COORDINATOR_STATUS_CACHE_FREQ_S", "120") + ) + + self.logger = set_logger("coordinator") + + patch() + + self.connect() + + @classmethod + def start(cls): + dispatcher = cls() + asyncio.run(dispatcher.dispatch_worker()) + + def connect(self): + + + self.logger.info(f"Connecting to Ray") + + while not RayProvider.connected(): + + try: + + RayProvider.reset() + RayProvider.connect() + + except Exception as e: + self.logger.error(f"Error connecting to Ray: {e}") + + time.sleep(1) + + self.logger.info(f"Connected to Ray") + + async def get(self): + + result = await self.redis_client.brpop("queue", timeout=1) + + if result is not None: + return pickle.loads(result[1]) + + def dispatch(self, request: BackendRequestModel): + """Route a request to the per-model processor, creating it if missing.""" + + if request.model_key not in self.processors: + + processor = Processor(request.model_key, self.eviction_queue, self.error_queue) + + self.processors[request.model_key] = processor + + asyncio.create_task(processor.processor_worker()) + + self.processors[request.model_key].enqueue(request) + + + def remove(self, model_key: str, message: str): + self.logger.error(f"Removing processor {model_key} with status {self.processors[model_key].status}") + processor = self.processors.pop(model_key) + processor.status = ProcessorStatus.CANCELLED + processor.purge(message) + + def purge(self, message: str): + for model_key in list(self.processors.keys()): + self.remove(model_key, message) + + def handle_evictions(self): + while not self.eviction_queue.empty(): + model_key, reason = self.eviction_queue.get_nowait() + self.remove(model_key, reason) + + def handle_errors(self): + + if not self.error_queue.empty(): + + if not RayProvider.connected(): + + self.purge("Critical server error occurred. Please try again later. Sorry for the inconvenience.") + + self.connect() + + while not self.error_queue.empty(): + model_key, error = self.error_queue.get_nowait() + self.logger.error(f"Error in model {model_key}: {error}") + + if model_key in self.processors: + processor = self.processors[model_key] + processor.status = ProcessorStatus.READY + + async def dispatch_worker(self): + """Main asyncio task for monitoring the dispatch queue and routing requests to the appropriate processors. + """ + + asyncio.create_task(self.status_worker()) + + while True: + + # Get the next request from the queue. + request = await self.get() + if request is not None: + + # Dispatch the request to the appropriate processor. + self.dispatch(request) + + # Handle any evictions or errors that may have been added by the processors. + self.handle_evictions() + self.handle_errors() + + async def status_worker(self) -> None: + """Asyncio task for responding to requests for cluster status + """ + while True: + + id = (await self.redis_client.brpop("status"))[1] + + if time.time() - self.last_status_time > self.status_cache_freq_s: + + try: + + handle = controller_handle() + + self.cached_status = await submit(handle, "status") + + except Exception as e: + + self.logger.error(f"Error getting status: {e}") + + continue + + else: + + self.cached_status = pickle.dumps(self.cached_status) + + self.last_status_time = time.time() + + await self.redis_client.lpush(id, self.cached_status) + \ No newline at end of file diff --git a/src/services/api/src/queue/handle.py b/src/services/api/src/queue/handle.py deleted file mode 100755 index 90cad9d5..00000000 --- a/src/services/api/src/queue/handle.py +++ /dev/null @@ -1,68 +0,0 @@ -"""Ray Serve handle wrapper for model deployment interactions. - -`Handle` resolves the Ray Serve application and actor namespace for a given -`model_key`, checks readiness, and provides `execute` to submit requests. -""" - -import logging - -import ray -from ray.serve._private.client import RayServeException -from ray.util.client import ray as client_ray -from ray.util.client.common import return_refs - -from ..types import MODEL_KEY, RAY_APP_NAME -from ..schema import BackendRequestModel - -logger = logging.getLogger("ndif") - - -class Handle: - """Encapsulates interaction with a single model Ray Serve deployment.""" - - def __init__(self, model_key: MODEL_KEY): - """Look up the deployment handle and namespace for `model_key`. - - Raises: - RayServeException: If the application cannot be found yet. - """ - - self.model_key = model_key - - try: - actor = ray.get_actor(self.actor_name, namespace="NDIF") - self.ready_future = actor.__ray_ready__.remote() - except Exception as e: - logger.info(f"Error getting actor '{self.actor_name}': {e}") - raise RayServeException(f"Actor '{self.actor_name}' does not exist.") - - @property - def actor_name(self) -> RAY_APP_NAME: - """Canonical Ray actor name for the model deployment actor.""" - return f"ModelActor:{self.model_key}" - - @property - def ready(self): - """Whether the Ray Serve deployment reports readiness.""" - try: - # Poll the namespace future until it completes. - ray.get(self.ready_future, timeout=0) - return True - except TimeoutError: - return False - - def execute(self, request: BackendRequestModel): - """Submit `request` to the model actor and return a Ray future.""" - - try: - actor = ray.get_actor(self.actor_name, namespace="NDIF") - except ValueError as e: - - if str(e).startswith("Failed to look up actor"): - raise LookupError("Model deployment evicted.") - - raise - - # We reimplement `handle.remote()` to avoid an issue about looking up the signature for the remote call. - # I imagine this has something to do with getting a named actor from a client not in the namespace. - return return_refs(client_ray.call_remote(actor.__call__, request)) diff --git a/src/services/api/src/queue/processor.py b/src/services/api/src/queue/processor.py index 149512e9..ad0552e8 100755 --- a/src/services/api/src/queue/processor.py +++ b/src/services/api/src/queue/processor.py @@ -1,252 +1,372 @@ -"""Per-model request queue and execution processor. - -`Processor` instances are keyed by `model_key` and manage the lifecycle of -requests targeting that model: -- Queue incoming `BackendRequestModel` items -- Track deployment availability and transition through states -- Dispatch requests to the Ray Serve deployment via `Handle` -- Poll for completion and respond to clients -""" - +import asyncio import logging import os -import time -import traceback -from dataclasses import dataclass from enum import Enum from typing import Optional import ray -from ray import serve -from ..types import MODEL_KEY from ..schema import BackendRequestModel, BackendResponseModel -from .handle import Handle + +from .util import controller_handle, get_actor_handle, submit logger = logging.getLogger("ndif") class ProcessorStatus(Enum): - """States for the per-model processor lifecycle.""" - UNINITIALIZED = "uninitialized" PROVISIONING = "provisioning" DEPLOYING = "deploying" READY = "ready" BUSY = "busy" + CANCELLED = "cancelled" -@dataclass -class Submission: - """Tracks an in-flight submission and its response future.""" +class DeploymentStatus(Enum): + """Deployment states reported by the controller.""" - request: BackendRequestModel - execution_future: ray.ObjectRef + DEPLOYED = "deployed" + CACHED_AND_FREE = "cached_and_free" + FREE = "free" + CACHED_AND_FULL = "cached_and_full" + FULL = "full" + CANT_ACCOMMODATE = "cant_accommodate" class Processor: - """Manages a per-model queue and talks to the Ray backend via `Handle`.""" + """ + Orchestrates per-model request queues, deployment lifecycle, and communication with backend model actors. - def __init__(self, model_key: MODEL_KEY): + The Processor manages a queue of inference requests for a specific model, handles deployment + provisioning, monitors readiness and status, and relays requests to the model actor. It reports + errors and eviction events to the Dispatcher via shared queues. + """ + def __init__( + self, model_key: str, eviction_queue: asyncio.Queue, error_queue: asyncio.Queue + ): self.model_key = model_key + self.queue: asyncio.Queue[BackendRequestModel] = asyncio.Queue() + self.eviction_queue = eviction_queue + self.error_queue = error_queue + self.status = ProcessorStatus.UNINITIALIZED - self.queue: list[BackendRequestModel] = list() + self.dedicated = None - self.status = ProcessorStatus.UNINITIALIZED + @property + def handle(self) -> ray.actor.ActorHandle: + """Get the handle for the model deployment.""" + return get_actor_handle(f"ModelActor:{self.model_key}") - self.handle: Handle = None - self.submission: Submission = None + def enqueue(self, request: BackendRequestModel): + """Add a request to the queue and update the user with their position in the queue.""" - self.reply_freq_s = int( - os.environ.get("COORDINATOR_PROCESSOR_REPLY_FREQ_S", "3") - ) + if self.dedicated is False and not request.hotswapping: - self.last_reply_time: float = 0 + request.create_response( + BackendResponseModel.JobStatus.ERROR, + logger, + "Model is not dedicated and hotswapping is not supported for this API key. See https://nnsight.net/status/ for a list of scheduled models.", + ).respond() - def enqueue(self, request: BackendRequestModel): - """Add a request to the local queue and immediately update queue position for the user.""" - self.queue.append(request) + return + + self.queue.put_nowait(request) # Update the user with their new position in the queue. request.create_response( BackendResponseModel.JobStatus.QUEUED, logger, - f"Moved to position {len(self.queue)} in Queue.", + f"Moved to position {self.queue.qsize()} in Queue.", ).respond() - def step(self): - """Advance the processor state machine by one tick.""" + async def check_dedicated(self, handle: ray.actor.ActorHandle) -> bool: - # UNINITIALIZED: Before even being sent to deploy. - # This branch should never be hit. - if self.status == ProcessorStatus.UNINITIALIZED: - self.reply("Uninitialized...") + result = await submit(handle, "get_deployment", self.model_key) + + if result is None: + return False - # PROVISIONING: Waiting for the controller to respond with the deployment status. - # (I.e. checking to see whether the deployment can be created.) - elif self.status == ProcessorStatus.PROVISIONING: - self.reply("Model Deployment Provisioning...") + return result.get("dedicated", False) - # DEPLOYING: Waiting for the deployment to be ready. - # (I.e. checking to see whether the model is finished loading.) - elif self.status == ProcessorStatus.DEPLOYING: - self.check_deployment() + async def provision(self): + """Provision the model deployment by contacting the Controller to create the model deployment.""" - # READY: The model is ready to accept requests. - # (I.e. the model is loaded and the deployment is ready with no active requests running.) - elif self.status == ProcessorStatus.READY: + try: - # If there are requests in the queue, execute the next one. - if len(self.queue) > 0: - self.execute() + # Get the controller handle. + controller = controller_handle() - # BUSY: The model is busy executing a request. - elif self.status == ProcessorStatus.BUSY: - # Poll the in-flight submission and transition to READY when done. - self.check_submission() + self.dedicated = await self.check_dedicated(controller) - def execute(self): - """Submit the next request to the model deployment via `Handle`.""" + if not self.dedicated: + + hotswap = False - request = self.queue.pop(0) + valid_queue = list() - # Submit the request to the model deployment via `Handle`. - try: + while not self.queue.empty(): + request = self.queue.get_nowait() - result = self.handle.execute(request) + if request.hotswapping: + hotswap = True - self.submission = Submission( - request, - result, - ) - # If there is an error submitting the request, respond to the user with an error. - except Exception as e: + valid_queue.append(request) + else: + request.create_response( + BackendResponseModel.JobStatus.ERROR, + logger, + "Model is not dedicated and hotswapping is not supported for this API key. See https://nnsight.net/status/ for a list of scheduled models.", + ).respond() - if isinstance(e, LookupError): - message = "Model deployment evicted. Please try again later. Sorry for the inconvenience." - else: - message = f"{traceback.format_exc()}\nIssue submitting job to model deployment." + for request in valid_queue: + self.queue.put_nowait(request) - request.create_response( - BackendResponseModel.JobStatus.ERROR, - logger, - message, - ).respond() + if not hotswap: + self.eviction_queue.put_nowait( + ( + self.model_key, + "Model is not dedicated and hotswapping is not supported for this API key. See https://nnsight.net/status/ for a list of scheduled models.", + ) + ) + self.status = ProcessorStatus.CANCELLED + return - else: + # Submit the request to the controller to deploy the model deployment to the controller. + # Wait for the request to finish. + result = await submit(controller, "deploy", [self.model_key]) - # The request was submitted successfully, so we can transition to BUSY. - self.status = ProcessorStatus.BUSY + # If there was an error provisioning the model deployment, evict and cancel the processor. + # Add the error to the error queue to be handled by the dispatcher. + except Exception as e: + self.eviction_queue.put_nowait( + ( + self.model_key, + "Error provisioning model deployment. Please try again later. Sorry for the inconvenience.", + ) + ) + self.status = ProcessorStatus.CANCELLED + self.error_queue.put_nowait((self.model_key, e)) - # Update the user with the request being dispatched to the model deployment. - request.create_response( - BackendResponseModel.JobStatus.DISPATCHED, - logger, - "Your job has been sent to the model deployment.", - ).respond() + return - # Update the other users in the queue with their new position in the queue. - self.reply(force=True) + deployment_statuses = result["result"] + + evictions = result["evictions"] - def check_deployment(self): - """Poll deployment readiness and transition to READY when available.""" + # Add any evictions to the eviction queue to be handled by the dispatcher. + for model_key in evictions: + self.eviction_queue.put_nowait( + ( + model_key, + "Model deployment evicted. Please try again later. Sorry for the inconvenience.", + ) + ) + + for model_key, status in deployment_statuses.items(): - # If the handle is not yet created, create it. - if self.handle is None: + status_str = str(status).lower() try: - self.handle = Handle(self.model_key) - # If there is a RayServeException, its okay and means the deployment stub hasn't been created yet. - # Update the users that the model is still deploying. - except serve.exceptions.RayServeException as e: - self.reply("Model Deploying...") + + deployment_status = DeploymentStatus(status_str) + + # If the status is not a valid deployment status, there was an issue evaluating the model on the Controller. + # Evict and cancel the processor. + except ValueError: + + self.eviction_queue.put_nowait( + ( + model_key, + f"{status_str}\n\nThere was an error provisioning the model deployment. Please try again later. Sorry for the inconvenience.", + ) + ) + self.status = ProcessorStatus.CANCELLED + return - # If the deployment is finished loading, transition to READY. - if self.handle.ready: - self.status = ProcessorStatus.READY - self.step() + # If the model deployment cannot be accommodated, evict and cancel the processor. + if deployment_status == DeploymentStatus.CANT_ACCOMMODATE: - # If the deployment is not finished loading, update the users that the model is still deploying. - else: - self.reply("Model Deploying...") + self.eviction_queue.put_nowait( + ( + model_key, + "Model deployment cannot be accomodated at this time. Please try again later. Sorry for the inconvenience.", + ) + ) + self.status = ProcessorStatus.CANCELLED + + return + + async def initialize(self) -> None: + """Wait for the model deployment to complete initialization.""" + + # Loop until the model deployment is initialized. + while True: - def check_submission(self): - """Poll the in-flight submission and transition to READY when done.""" + try: + # Try and get the handle for the model deployment. It might not be created yet. + handle = self.handle + + # Wait for the model deployment to be initialized. + await submit(handle, "__ray_ready__") + + break + + except Exception as e: + # If the error is not becuase the model deployment hasnt been created yet, its a critical error, + # Cancel the processor and let the dispatcher handle the error. + if not str(e).startswith("Failed to look up actor"): + self.eviction_queue.put_nowait( + ( + self.model_key, + "Error initializing model deployment. Please try again later. Sorry for the inconvenience.", + ) + ) + self.error_queue.put_nowait((self.model_key, e)) + self.status = ProcessorStatus.CANCELLED + + async def execute(self, request: BackendRequestModel) -> None: + """Submit a request to the model deployment and update the user with the status.""" try: - # Check the status of the in-flight submission. - ray.get(self.submission.execution_future, timeout=0) - except TimeoutError: - # If the submission is still in progress, continue polling. - return - # If there is an error checking the status of the in-flight submission, respond to the user with an error. - except Exception as e: + # Get the handle for the model deployment. + handle = self.handle - request = self.submission.request + # Submit the request to the model deployment. + result = submit(handle, "__call__", request) - self.status = ProcessorStatus.READY - self.submission = None - - if isinstance(e, LookupError): - message = "Model deployment evicted. Please try again later. Sorry for the inconvenience." - else: - message = f"{traceback.format_exc()}\nIssue checking job status." + # Update the user their request has been dispatched. + request.create_response( + BackendResponseModel.JobStatus.DISPATCHED, + logger, + "Your job has been sent to the model deployment.", + ).respond() + + # Wait for the request to be completed + result = await result + + # If there was an error submitting the request... + except Exception as e: + # Respond to the dispatched user with an error. request.create_response( BackendResponseModel.JobStatus.ERROR, logger, - message, + "Error submitting request to model deployment. Please try again later. Sorry for the inconvenience.", ).respond() - # Re-raise the error to be handled by the coordinator to check for connection issues. - raise + # If the error is because the model deployment was evicted / cached, evict and cancel the processor. + if str(e).startswith("Failed to look up actor"): + self.eviction_queue.put_nowait( + ( + self.model_key, + "Model deployment evicted. Please try again later. Sorry for the inconvenience.", + ) + ) + self.status = ProcessorStatus.CANCELLED + else: + # If there is another error, add it ot the error queue to be handled by the dispatcher. Remain busy until the dispatcher has cleared the error. + self.error_queue.put_nowait((self.model_key, e)) + # Otherwise the processor is ready to accept new requests. else: - # The submission is complete, so we can transition to READY. + self.status = ProcessorStatus.READY - self.submission = None - self.step() + + async def processor_worker(self) -> None: + """Main asyncio task for creating, monitoring and submitting requests to the a model deployment.""" + + self.status = ProcessorStatus.PROVISIONING + + # Create a task to reply to users with statts of model deployment. + asyncio.create_task(self.reply_worker()) + + # Provision and deploy the model deployment. + await self.provision() + + # If there was a problem provisioning the model deployment, return. + if self.status == ProcessorStatus.CANCELLED: + return + + self.status = ProcessorStatus.DEPLOYING + + # Wait for the model deployment to be initialized. + await self.initialize() + + # If there was a problem initializing the model deployment, return. + if self.status == ProcessorStatus.CANCELLED: + return + + self.status = ProcessorStatus.READY + + # Loop until the model deployment is cancelled. + while self.status != ProcessorStatus.CANCELLED: + + # If there was previously an error executing, wait for the dispatcher to check and clear the error. + if self.status == ProcessorStatus.BUSY: + + await asyncio.sleep(1) + + continue + + # Get the next request from the queue. + request = await self.queue.get() + + self.status = ProcessorStatus.BUSY + + # Update the other users in the queue with their new position in the queue. + self.reply() + + # Submit the request to the model deployment. + await self.execute(request) + + async def reply_worker(self) -> None: + """Asyncio task for replying to users with status of model deploymentevery N seconds.""" + + # Set the frequency to reply to users. + reply_freq_s = int(os.environ.get("COORDINATOR_PROCESSOR_REPLY_FREQ_S", "3")) + + while ( + self.status != ProcessorStatus.READY + and self.status != ProcessorStatus.CANCELLED + ): + + if self.status == ProcessorStatus.PROVISIONING: + self.reply("Model Provisioning...") + elif self.status == ProcessorStatus.DEPLOYING: + self.reply("Model Deploying...") + + await asyncio.sleep(reply_freq_s) def reply( self, description: str | None = None, - force: bool = False, status: BackendResponseModel.JobStatus = BackendResponseModel.JobStatus.QUEUED, - ): + ) -> None: """ Reply to all users with a message. Args: description (str | None, optional): The message to send to users. If None, a default queue position message is sent. - force (bool, optional): If True, force sending replies to all users - regardless of last reply time. Defaults to False. """ - if force or time.time() - self.last_reply_time > self.reply_freq_s: - - for i, request in enumerate(self.queue): - request.create_response( - status, - logger, - ( - description - if description is not None - else f"Moved to position {i+1} in Queue." - ), - ).respond() - - self.last_reply_time = time.time() + for i, request in enumerate(self.queue._queue): + request.create_response( + status, + logger, + ( + description + if description is not None + else f"Moved to position {i+1} in Queue." + ), + ).respond() def purge(self, message: Optional[str] = None): - """Flush the queue and respond to all queued requests with `message`.""" if message is None: message = "Critical server error occurred. Please try again later. Sorry for the inconvenience." - self.reply(message, force=True, status=BackendResponseModel.JobStatus.ERROR) - - self.queue.clear() + self.reply(message, status=BackendResponseModel.JobStatus.ERROR) diff --git a/src/services/api/src/queue/util.py b/src/services/api/src/queue/util.py index 39415ea6..ee3e368a 100755 --- a/src/services/api/src/queue/util.py +++ b/src/services/api/src/queue/util.py @@ -1,22 +1,9 @@ import time +import ray -def cache_maintainer(clear_time: int): - """ - A function decorator that clears lru_cache clear_time seconds - :param clear_time: In seconds, how often to clear cache (only checks when called) - """ - def inner(func): - def wrapper(*args, **kwargs): - if hasattr(func, 'next_clear'): - if time.time() > func.next_clear: - func.cache_clear() - func.next_clear = time.time() + clear_time - else: - func.next_clear = time.time() + clear_time +from ray.util.client import ray as client_ray +from ray.util.client.common import return_refs - return func(*args, **kwargs) - return wrapper - return inner def patch(): # We patch the _async_send method to avoid a nasty deadlock bug in Ray. @@ -38,4 +25,18 @@ def _async_send(_self, req, callback = None): finally: common.ClientObjectRef.__del__ = original_ref_deletion - dataclient.DataClient._async_send = _async_send \ No newline at end of file + dataclient.DataClient._async_send = _async_send + + + +def submit(actor: ray.actor.ActorHandle, method: str, *args, **kwargs): + return return_refs(client_ray.call_remote(getattr(actor, method), *args, **kwargs)) + +def get_actor_handle(name: str) -> ray.actor.ActorHandle: + + return ray.get_actor(name, namespace="NDIF") + + +def controller_handle(): + + return get_actor_handle("Controller") \ No newline at end of file diff --git a/src/services/ray/src/ray/deployments/modeling/base.py b/src/services/ray/src/ray/deployments/modeling/base.py index bf69930a..c4fabc43 100755 --- a/src/services/ray/src/ray/deployments/modeling/base.py +++ b/src/services/ray/src/ray/deployments/modeling/base.py @@ -192,7 +192,7 @@ async def __call__(self, request: BackendRequestModel) -> None: if self.cached: - raise LookupError("This model is cached.") + raise LookupError("Failed to look up actor") self.request = request @@ -308,7 +308,7 @@ def post(self, result: Any) -> None: gpu_mem: int = result[1] execution_time_s: float = result[2] - result = BackendResultModel( + result_object = BackendResultModel( id=self.request.id, **saves, ).save(ObjectStoreProvider.object_store) @@ -316,9 +316,10 @@ def post(self, result: Any) -> None: self.respond( status=BackendResponseModel.JobStatus.COMPLETED, description="Your job has been completed.", + data=(result_object.url(), result_object._size), ) - RequestResponseSizeMetric.update(self.request, result._size) + RequestResponseSizeMetric.update(self.request, result_object._size) GPUMemMetric.update(self.request, gpu_mem) ExecutionTimeMetric.update(self.request, execution_time_s)