diff --git a/comps/cores/mega/orchestrator.py b/comps/cores/mega/orchestrator.py index 4053988566..97ee2a76b3 100644 --- a/comps/cores/mega/orchestrator.py +++ b/comps/cores/mega/orchestrator.py @@ -7,6 +7,7 @@ import json import os import re +import threading import time from typing import Dict, List @@ -27,20 +28,58 @@ class OrchestratorMetrics: - # Because: + # Need an instance ID for metric prefix because: + # - Orchestror instances are not named # - CI creates several orchestrator instances - # - Prometheus requires metrics to be singletons - # - Oorchestror instances are not provided their own names - # Metrics are class members with "megaservice" name prefix - first_token_latency = Histogram("megaservice_first_token_latency", "First token latency (histogram)") - inter_token_latency = Histogram("megaservice_inter_token_latency", "Inter-token latency (histogram)") - request_latency = Histogram("megaservice_request_latency", "Whole request/reply latency (histogram)") - request_pending = Gauge("megaservice_request_pending", "Count of currently pending requests (gauge)") + # - Prometheus requires metrics (their names) to be unique + _instance_id = 0 def __init__(self) -> None: - pass + self._instance_id += 1 + if self._instance_id > 1: + self._prefix = f"megaservice{self._instance_id}" + else: + self._prefix = "megaservice" + + self.request_pending = Gauge(f"{self._prefix}_request_pending", "Count of currently pending requests (gauge)") + + # locking for latency metric creation / method change + self._lock = threading.Lock() + + # Metrics related to token processing are created on demand, + # to avoid bogus ones for services that never handle tokens + self.first_token_latency = None + self.inter_token_latency = None + self.request_latency = None + + # initial methods to create the metrics + self.token_update = self._token_update_create + self.request_update = self._request_update_create + + def _token_update_create(self, token_start: float, is_first: bool) -> float: + with self._lock: + # in case another thread already got here + if self.token_update == self._token_update_create: + self.first_token_latency = Histogram( + f"{self._prefix}_first_token_latency", "First token latency (histogram)" + ) + self.inter_token_latency = Histogram( + f"{self._prefix}_inter_token_latency", "Inter-token latency (histogram)" + ) + self.token_update = self._token_update_real + return self.token_update(token_start, is_first) + + def _request_update_create(self, req_start: float) -> None: + with self._lock: + # in case another thread already got here + if self.request_update == self._request_update_create: + self.request_latency = Histogram( + f"{self._prefix}_request_latency", "Whole LLM request/reply latency (histogram)" + ) + self.request_update = self._request_update_real + self.request_update(req_start) - def token_update(self, token_start: float, is_first: bool) -> float: + def _token_update_real(self, token_start: float, is_first: bool) -> float: now = time.time() if is_first: self.first_token_latency.observe(now - token_start) @@ -48,7 +87,7 @@ def token_update(self, token_start: float, is_first: bool) -> float: self.inter_token_latency.observe(now - token_start) return now - def request_update(self, req_start: float) -> None: + def _request_update_real(self, req_start: float) -> None: self.request_latency.observe(time.time() - req_start) def pending_update(self, increase: bool) -> None: