diff --git a/comps/__init__.py b/comps/__init__.py index a35e703675..501f5ceb58 100644 --- a/comps/__init__.py +++ b/comps/__init__.py @@ -51,8 +51,7 @@ from comps.cores.mega.micro_service import MicroService, register_microservice, opea_microservices # Telemetry -if os.getenv("ENABLE_OPEA_TELEMETRY", "false").lower() == "true": - from comps.cores.telemetry.opea_telemetry import opea_telemetry +from comps.cores.telemetry.opea_telemetry import opea_telemetry # Common from comps.cores.common.component import OpeaComponent, OpeaComponentRegistry, OpeaComponentLoader diff --git a/comps/cores/mega/orchestrator.py b/comps/cores/mega/orchestrator.py index 2d1957b1b7..4bda652eff 100644 --- a/comps/cores/mega/orchestrator.py +++ b/comps/cores/mega/orchestrator.py @@ -25,6 +25,7 @@ logger = CustomLogger("comps-core-orchestrator") LOGFLAG = os.getenv("LOGFLAG", False) +ENABLE_OPEA_TELEMETRY = os.getenv("ENABLE_OPEA_TELEMETRY", "false").lower() == "true" class OrchestratorMetrics: @@ -208,11 +209,11 @@ def process_outputs(self, prev_nodes: List, result_dict: Dict) -> Dict: def wrap_iterable(self, iterable, is_first=True): - with tracer.start_as_current_span("llm_generate_stream"): + with tracer.start_as_current_span("llm_generate_stream") if ENABLE_OPEA_TELEMETRY else contextlib.nullcontext(): while True: with ( tracer.start_as_current_span("llm_generate_stream_first_token") - if is_first + if is_first and ENABLE_OPEA_TELEMETRY else contextlib.nullcontext() ): # else tracer.start_as_current_span(f"llm_generate_stream_next_token") try: @@ -253,7 +254,11 @@ async def execute( # Still leave to sync requests.post for StreamingResponse if LOGFLAG: logger.info(inputs) - with tracer.start_as_current_span(f"{cur_node}_asyn_generate"): + with ( + tracer.start_as_current_span(f"{cur_node}_asyn_generate") + if ENABLE_OPEA_TELEMETRY + else contextlib.nullcontext() + ): response = requests.post( url=endpoint, data=json.dumps(inputs), @@ -320,8 +325,14 @@ def generate(): input_data = {k: v for k, v in input_data.items() if v is not None} else: input_data = inputs - with tracer.start_as_current_span(f"{cur_node}_generate"): + + with ( + tracer.start_as_current_span(f"{cur_node}_generate") + if ENABLE_OPEA_TELEMETRY + else contextlib.nullcontext() + ): response = await session.post(endpoint, json=input_data) + if response.content_type == "audio/wav": audio_data = await response.read() data = self.align_outputs(audio_data, cur_node, inputs, runtime_graph, llm_parameters_dict, **kwargs) diff --git a/comps/cores/telemetry/opea_telemetry.py b/comps/cores/telemetry/opea_telemetry.py index 5f08d4bd4d..965a041831 100644 --- a/comps/cores/telemetry/opea_telemetry.py +++ b/comps/cores/telemetry/opea_telemetry.py @@ -1,6 +1,7 @@ # Copyright (C) 2024 Intel Corporation # SPDX-License-Identifier: Apache-2.0 +import contextlib import inspect import os from functools import wraps @@ -13,6 +14,8 @@ from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter +ENABLE_OPEA_TELEMETRY = os.getenv("ENABLE_OPEA_TELEMETRY", "false").lower() == "true" + def detach_ignore_err(self, token: object) -> None: """Resets Context to a previous value. @@ -47,7 +50,7 @@ def opea_telemetry(func): @wraps(func) async def wrapper(*args, **kwargs): - with tracer.start_as_current_span(func.__name__): + with tracer.start_as_current_span(func.__name__) if ENABLE_OPEA_TELEMETRY else contextlib.nullcontext(): res = await func(*args, **kwargs) return res @@ -55,7 +58,7 @@ async def wrapper(*args, **kwargs): @wraps(func) def wrapper(*args, **kwargs): - with tracer.start_as_current_span(func.__name__): + with tracer.start_as_current_span(func.__name__) if ENABLE_OPEA_TELEMETRY else contextlib.nullcontext(): res = func(*args, **kwargs) return res diff --git a/tests/cores/telemetry/test_telemetry.py b/tests/cores/telemetry/test_telemetry.py index 3589ac232d..6e2086bb73 100644 --- a/tests/cores/telemetry/test_telemetry.py +++ b/tests/cores/telemetry/test_telemetry.py @@ -2,9 +2,11 @@ # SPDX-License-Identifier: Apache-2.0 import asyncio +import os import time import unittest +os.environ["ENABLE_OPEA_TELEMETRY"] = "True" from comps.cores.telemetry.opea_telemetry import in_memory_exporter, opea_telemetry @@ -32,6 +34,9 @@ async def dummy_async_func(): class TestTelemetry(unittest.TestCase): + def tearDown(self): + os.environ["ENABLE_OPEA_TELEMETRY"] = "False" + def test_time_tracing(self): dummy_func() asyncio.run(dummy_async_func())