Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions comps/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@
from comps.cores.mega.micro_service import MicroService, register_microservice, opea_microservices

# Telemetry
if os.getenv("ENABLE_OPEA_TELEMETRY", "false").lower() == "true":
from comps.cores.telemetry.opea_telemetry import opea_telemetry
from comps.cores.telemetry.opea_telemetry import opea_telemetry

# Common
from comps.cores.common.component import OpeaComponent, OpeaComponentRegistry, OpeaComponentLoader
Expand Down
19 changes: 15 additions & 4 deletions comps/cores/mega/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

logger = CustomLogger("comps-core-orchestrator")
LOGFLAG = os.getenv("LOGFLAG", False)
ENABLE_OPEA_TELEMETRY = os.getenv("ENABLE_OPEA_TELEMETRY", "false").lower() == "true"


class OrchestratorMetrics:
Expand Down Expand Up @@ -208,11 +209,11 @@ def process_outputs(self, prev_nodes: List, result_dict: Dict) -> Dict:

def wrap_iterable(self, iterable, is_first=True):

with tracer.start_as_current_span("llm_generate_stream"):
with tracer.start_as_current_span("llm_generate_stream") if ENABLE_OPEA_TELEMETRY else contextlib.nullcontext():
while True:
with (
tracer.start_as_current_span("llm_generate_stream_first_token")
if is_first
if is_first and ENABLE_OPEA_TELEMETRY
else contextlib.nullcontext()
): # else tracer.start_as_current_span(f"llm_generate_stream_next_token")
try:
Expand Down Expand Up @@ -253,7 +254,11 @@ async def execute(
# Still leave to sync requests.post for StreamingResponse
if LOGFLAG:
logger.info(inputs)
with tracer.start_as_current_span(f"{cur_node}_asyn_generate"):
with (
tracer.start_as_current_span(f"{cur_node}_asyn_generate")
if ENABLE_OPEA_TELEMETRY
else contextlib.nullcontext()
):
response = requests.post(
url=endpoint,
data=json.dumps(inputs),
Expand Down Expand Up @@ -320,8 +325,14 @@ def generate():
input_data = {k: v for k, v in input_data.items() if v is not None}
else:
input_data = inputs
with tracer.start_as_current_span(f"{cur_node}_generate"):

with (
tracer.start_as_current_span(f"{cur_node}_generate")
if ENABLE_OPEA_TELEMETRY
else contextlib.nullcontext()
):
response = await session.post(endpoint, json=input_data)

if response.content_type == "audio/wav":
audio_data = await response.read()
data = self.align_outputs(audio_data, cur_node, inputs, runtime_graph, llm_parameters_dict, **kwargs)
Expand Down
7 changes: 5 additions & 2 deletions comps/cores/telemetry/opea_telemetry.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

import contextlib
import inspect
import os
from functools import wraps
Expand All @@ -13,6 +14,8 @@
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter

ENABLE_OPEA_TELEMETRY = os.getenv("ENABLE_OPEA_TELEMETRY", "false").lower() == "true"


def detach_ignore_err(self, token: object) -> None:
"""Resets Context to a previous value.
Expand Down Expand Up @@ -47,15 +50,15 @@ def opea_telemetry(func):

@wraps(func)
async def wrapper(*args, **kwargs):
with tracer.start_as_current_span(func.__name__):
with tracer.start_as_current_span(func.__name__) if ENABLE_OPEA_TELEMETRY else contextlib.nullcontext():
res = await func(*args, **kwargs)
return res

else:

@wraps(func)
def wrapper(*args, **kwargs):
with tracer.start_as_current_span(func.__name__):
with tracer.start_as_current_span(func.__name__) if ENABLE_OPEA_TELEMETRY else contextlib.nullcontext():
res = func(*args, **kwargs)
return res

Expand Down
5 changes: 5 additions & 0 deletions tests/cores/telemetry/test_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
# SPDX-License-Identifier: Apache-2.0

import asyncio
import os
import time
import unittest

os.environ["ENABLE_OPEA_TELEMETRY"] = "True"
from comps.cores.telemetry.opea_telemetry import in_memory_exporter, opea_telemetry


Expand Down Expand Up @@ -32,6 +34,9 @@ async def dummy_async_func():

class TestTelemetry(unittest.TestCase):

def tearDown(self):
os.environ["ENABLE_OPEA_TELEMETRY"] = "False"

def test_time_tracing(self):
dummy_func()
asyncio.run(dummy_async_func())
Expand Down