Skip to content

Commit 11b1969

Browse files
authored
Enable Telemetry Tracing in Agent Comp and also add class name along with func name as span name for tracing (opea-project#1503)
* Trace Agent functions and get class name for opea_telemtry tracing Signed-off-by: Louie, Tsai <[email protected]> Signed-off-by: Tsai, Louie <[email protected]> * seperate llm invoke into a standalone function for tracing Signed-off-by: Tsai, Louie <[email protected]> --------- Signed-off-by: Louie, Tsai <[email protected]> Signed-off-by: Tsai, Louie <[email protected]>
1 parent a8f300f commit 11b1969

7 files changed

Lines changed: 156 additions & 22 deletions

File tree

comps/agent/src/agent.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
MessageObject,
3131
ThreadObject,
3232
)
33+
from comps.cores.telemetry.opea_telemetry import opea_telemetry, tracer
3334

3435
logger = CustomLogger("comps-react-agent")
3536
logflag = os.getenv("LOGFLAG", False)
@@ -58,6 +59,7 @@ class AgentCompletionRequest(ChatCompletionRequest):
5859
host="0.0.0.0",
5960
port=args.port,
6061
)
62+
@opea_telemetry
6163
async def llm_generate(input: AgentCompletionRequest):
6264
if logflag:
6365
logger.info(input)
@@ -148,6 +150,7 @@ class CreateAssistant(CreateAssistantsRequest):
148150
host="0.0.0.0",
149151
port=args.port,
150152
)
153+
@opea_telemetry
151154
def create_assistants(input: CreateAssistant):
152155
# 1. initialize the agent
153156
print("@@@ Initializing agent with config: ", input.agent_config)
@@ -184,6 +187,7 @@ def create_assistants(input: CreateAssistant):
184187
host="0.0.0.0",
185188
port=args.port,
186189
)
190+
@opea_telemetry
187191
def create_threads(input: CreateThreadsRequest):
188192
# create a memory KV for the thread
189193
thread_inst, thread_id = instantiate_thread_memory()
@@ -205,6 +209,7 @@ def create_threads(input: CreateThreadsRequest):
205209
host="0.0.0.0",
206210
port=args.port,
207211
)
212+
@opea_telemetry
208213
def create_messages(thread_id, input: CreateMessagesRequest):
209214
with threads_global_kv as g_threads:
210215
thread_inst, _, _ = g_threads[thread_id]
@@ -249,6 +254,7 @@ def create_messages(thread_id, input: CreateMessagesRequest):
249254
host="0.0.0.0",
250255
port=args.port,
251256
)
257+
@opea_telemetry
252258
def create_run(thread_id, input: CreateRunResponse):
253259
with threads_global_kv as g_threads:
254260
thread_inst, _, status = g_threads[thread_id]
@@ -296,6 +302,7 @@ def create_run(thread_id, input: CreateRunResponse):
296302
host="0.0.0.0",
297303
port=args.port,
298304
)
305+
@opea_telemetry
299306
def cancel_run(thread_id):
300307
with threads_global_kv as g_threads:
301308
thread_inst, created_at, status = g_threads[thread_id]

comps/agent/src/integrations/strategy/base_agent.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,15 @@
55

66
from langgraph.checkpoint.memory import MemorySaver
77

8+
from comps.cores.telemetry.opea_telemetry import opea_telemetry, tracer
9+
810
from ..storage.persistence_redis import RedisPersistence
911
from ..tools import get_tools_descriptions
1012
from ..utils import adapt_custom_prompt, setup_chat_model
1113

1214

1315
class BaseAgent:
16+
@opea_telemetry
1417
def __init__(self, args, local_vars=None, **kwargs) -> None:
1518
self.llm = setup_chat_model(args)
1619
self.tools_descriptions = get_tools_descriptions(args.tools)
@@ -56,6 +59,7 @@ def execute(self, state: dict):
5659
def prepare_initial_state(self, query):
5760
raise NotImplementedError
5861

62+
@opea_telemetry
5963
async def stream_generator(self, query, config):
6064
initial_state = self.prepare_initial_state(query)
6165
try:
@@ -71,6 +75,7 @@ async def stream_generator(self, query, config):
7175
except Exception as e:
7276
yield str(e)
7377

78+
@opea_telemetry
7479
async def non_streaming_run(self, query, config):
7580
initial_state = self.prepare_initial_state(query)
7681
print("@@@ Initial State: ", initial_state)

comps/agent/src/integrations/strategy/planexec/planner.py

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
from langgraph.graph.message import add_messages
1414
from pydantic import BaseModel, Field
1515

16+
from comps.cores.telemetry.opea_telemetry import opea_telemetry, tracer
17+
1618
from ...global_var import threads_global_kv
1719
from ...utils import has_multi_tool_inputs, tool_renderer
1820
from ..base_agent import BaseAgent
@@ -54,6 +56,7 @@ class PlanStepChecker:
5456
str: A decision for whether we should use this plan or not
5557
"""
5658

59+
@opea_telemetry
5760
def __init__(self, llm, is_vllm=False):
5861
class grade(BaseModel):
5962
binary_score: str = Field(description="executable score 'yes' or 'no'")
@@ -66,9 +69,15 @@ class grade(BaseModel):
6669
output_parser = PydanticToolsParser(tools=[grade], first_tool_only=True)
6770
self.chain = plan_check_prompt | llm | output_parser
6871

72+
@opea_telemetry
73+
def __llm_invoke__(self, state):
74+
scored_result = self.chain.invoke(state)
75+
return scored_result
76+
77+
@opea_telemetry
6978
def __call__(self, state):
7079
# print("---CALL PlanStepChecker---")
71-
scored_result = self.chain.invoke(state)
80+
scored_result = self.__llm_invoke__(state)
7281
score = scored_result.binary_score
7382
print(f"Task is {state['context']}, Score is {score}")
7483
if score.startswith("yes"):
@@ -79,6 +88,7 @@ def __call__(self, state):
7988

8089
# Define workflow Node
8190
class Planner:
91+
@opea_telemetry
8292
def __init__(self, llm, plan_checker=None, is_vllm=False):
8393
if is_vllm:
8494
llm = llm.bind_tools([Plan], tool_choice={"function": {"name": Plan.__name__}})
@@ -88,6 +98,12 @@ def __init__(self, llm, plan_checker=None, is_vllm=False):
8898
self.llm = planner_prompt | llm | output_parser
8999
self.plan_checker = plan_checker
90100

101+
@opea_telemetry
102+
def __llm_invoke__(self, messages):
103+
plan = self.llm.invoke(messages)
104+
return plan
105+
106+
@opea_telemetry
91107
def __call__(self, state):
92108
print("---CALL Planner---")
93109
input = state["messages"][-1].content
@@ -96,7 +112,7 @@ def __call__(self, state):
96112
while not success:
97113
while not success:
98114
try:
99-
plan = self.llm.invoke({"messages": [("user", state["messages"][-1].content)]})
115+
plan = self.__llm_invoke__({"messages": [("user", state["messages"][-1].content)]})
100116
print("Generated plan: ", plan)
101117
success = True
102118
except OutputParserException as e:
@@ -116,6 +132,7 @@ def __call__(self, state):
116132

117133

118134
class Executor:
135+
@opea_telemetry
119136
def __init__(self, llm, tools=[]):
120137
prompt = hwchase17_react_prompt
121138
if has_multi_tool_inputs(tools):
@@ -126,6 +143,7 @@ def __init__(self, llm, tools=[]):
126143
agent=agent_chain, tools=tools, handle_parsing_errors=True, max_iterations=50
127144
)
128145

146+
@opea_telemetry
129147
def __call__(self, state):
130148
print("---CALL Executor---")
131149
plan = state["plan"]
@@ -151,6 +169,7 @@ def __call__(self, state):
151169

152170

153171
class AnswerMaker:
172+
@opea_telemetry
154173
def __init__(self, llm, is_vllm=False):
155174
if is_vllm:
156175
llm = llm.bind_tools([Response], tool_choice={"function": {"name": Response.__name__}})
@@ -159,13 +178,19 @@ def __init__(self, llm, is_vllm=False):
159178
output_parser = PydanticToolsParser(tools=[Response], first_tool_only=True)
160179
self.llm = answer_make_prompt | llm | output_parser
161180

181+
@opea_telemetry
182+
def __llm_invoke__(self, state):
183+
output = self.llm.invoke(state)
184+
return output
185+
186+
@opea_telemetry
162187
def __call__(self, state):
163188
print("---CALL AnswerMaker---")
164189
success = False
165190
# sometime, LLM will not provide accurate steps per ask, try more than one time until success
166191
while not success:
167192
try:
168-
output = self.llm.invoke(state)
193+
output = self.__llm_invoke__(state)
169194
print("Generated response: ", output.response)
170195
success = True
171196
except OutputParserException as e:
@@ -183,6 +208,7 @@ class FinalAnswerChecker:
183208
str: A decision for whether we should use this plan or not
184209
"""
185210

211+
@opea_telemetry
186212
def __init__(self, llm, is_vllm=False):
187213
class grade(BaseModel):
188214
binary_score: str = Field(description="executable score 'yes' or 'no'")
@@ -194,9 +220,15 @@ class grade(BaseModel):
194220
output_parser = PydanticToolsParser(tools=[grade], first_tool_only=True)
195221
self.chain = answer_check_prompt | llm | output_parser
196222

223+
@opea_telemetry
224+
def __llm_invoke__(self, state):
225+
output = self.chain.invoke(state)
226+
return output
227+
228+
@opea_telemetry
197229
def __call__(self, state):
198230
print("---CALL FinalAnswerChecker---")
199-
scored_result = self.chain.invoke(state)
231+
scored_result = self.__llm_invoke__(state)
200232
score = scored_result.binary_score
201233
print(f"Answer is {state['response']}, Grade of good response is {score}")
202234
if score.startswith("yes"):
@@ -206,19 +238,26 @@ def __call__(self, state):
206238

207239

208240
class Replanner:
241+
@opea_telemetry
209242
def __init__(self, llm, answer_checker=None):
210243
llm = llm.bind_tools([Plan])
211244
output_parser = PydanticToolsParser(tools=[Plan], first_tool_only=True)
212245
self.llm = replanner_prompt | llm | output_parser
213246
self.answer_checker = answer_checker
214247

248+
@opea_telemetry
249+
def __llm_invoke__(self, state):
250+
output = self.llm.invoke(state)
251+
return output
252+
253+
@opea_telemetry
215254
def __call__(self, state):
216255
print("---CALL Replanner---")
217256
success = False
218257
# sometime, LLM will not provide accurate steps per ask, try more than one time until success
219258
while not success:
220259
try:
221-
output = self.llm.invoke(state)
260+
output = self.__llm_invoke__(state)
222261
success = True
223262
print("Replan: ", output)
224263
except OutputParserException as e:
@@ -230,6 +269,7 @@ def __call__(self, state):
230269

231270

232271
class PlanExecuteAgentWithLangGraph(BaseAgent):
272+
@opea_telemetry
233273
def __init__(self, args, with_memory=False, **kwargs):
234274
super().__init__(args, local_vars=globals(), **kwargs)
235275

@@ -260,6 +300,7 @@ def __init__(self, args, with_memory=False, **kwargs):
260300
def prepare_initial_state(self, query):
261301
return {"messages": [("user", query)]}
262302

303+
@opea_telemetry
263304
async def stream_generator(self, query, config, thread_id=None):
264305
initial_state = self.prepare_initial_state(query)
265306
if thread_id is not None:
@@ -282,6 +323,7 @@ async def stream_generator(self, query, config, thread_id=None):
282323
yield f"data: {repr(event)}\n\n"
283324
yield "data: [DONE]\n\n"
284325

326+
@opea_telemetry
285327
async def non_streaming_run(self, query, config):
286328
initial_state = self.prepare_initial_state(query)
287329
try:

0 commit comments

Comments
 (0)