Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions comps/agent/src/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
MessageObject,
ThreadObject,
)
from comps.cores.telemetry.opea_telemetry import opea_telemetry, tracer

logger = CustomLogger("comps-react-agent")
logflag = os.getenv("LOGFLAG", False)
Expand Down Expand Up @@ -58,6 +59,7 @@ class AgentCompletionRequest(ChatCompletionRequest):
host="0.0.0.0",
port=args.port,
)
@opea_telemetry
async def llm_generate(input: AgentCompletionRequest):
if logflag:
logger.info(input)
Expand Down Expand Up @@ -148,6 +150,7 @@ class CreateAssistant(CreateAssistantsRequest):
host="0.0.0.0",
port=args.port,
)
@opea_telemetry
def create_assistants(input: CreateAssistant):
# 1. initialize the agent
print("@@@ Initializing agent with config: ", input.agent_config)
Expand Down Expand Up @@ -184,6 +187,7 @@ def create_assistants(input: CreateAssistant):
host="0.0.0.0",
port=args.port,
)
@opea_telemetry
def create_threads(input: CreateThreadsRequest):
# create a memory KV for the thread
thread_inst, thread_id = instantiate_thread_memory()
Expand All @@ -205,6 +209,7 @@ def create_threads(input: CreateThreadsRequest):
host="0.0.0.0",
port=args.port,
)
@opea_telemetry
def create_messages(thread_id, input: CreateMessagesRequest):
with threads_global_kv as g_threads:
thread_inst, _, _ = g_threads[thread_id]
Expand Down Expand Up @@ -249,6 +254,7 @@ def create_messages(thread_id, input: CreateMessagesRequest):
host="0.0.0.0",
port=args.port,
)
@opea_telemetry
def create_run(thread_id, input: CreateRunResponse):
with threads_global_kv as g_threads:
thread_inst, _, status = g_threads[thread_id]
Expand Down Expand Up @@ -296,6 +302,7 @@ def create_run(thread_id, input: CreateRunResponse):
host="0.0.0.0",
port=args.port,
)
@opea_telemetry
def cancel_run(thread_id):
with threads_global_kv as g_threads:
thread_inst, created_at, status = g_threads[thread_id]
Expand Down
5 changes: 5 additions & 0 deletions comps/agent/src/integrations/strategy/base_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@

from langgraph.checkpoint.memory import MemorySaver

from comps.cores.telemetry.opea_telemetry import opea_telemetry, tracer

from ..storage.persistence_redis import RedisPersistence
from ..tools import get_tools_descriptions
from ..utils import adapt_custom_prompt, setup_chat_model


class BaseAgent:
@opea_telemetry
def __init__(self, args, local_vars=None, **kwargs) -> None:
self.llm = setup_chat_model(args)
self.tools_descriptions = get_tools_descriptions(args.tools)
Expand Down Expand Up @@ -56,6 +59,7 @@ def execute(self, state: dict):
def prepare_initial_state(self, query):
raise NotImplementedError

@opea_telemetry
async def stream_generator(self, query, config):
initial_state = self.prepare_initial_state(query)
try:
Expand All @@ -71,6 +75,7 @@ async def stream_generator(self, query, config):
except Exception as e:
yield str(e)

@opea_telemetry
async def non_streaming_run(self, query, config):
initial_state = self.prepare_initial_state(query)
print("@@@ Initial State: ", initial_state)
Expand Down
52 changes: 47 additions & 5 deletions comps/agent/src/integrations/strategy/planexec/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from langgraph.graph.message import add_messages
from pydantic import BaseModel, Field

from comps.cores.telemetry.opea_telemetry import opea_telemetry, tracer

from ...global_var import threads_global_kv
from ...utils import has_multi_tool_inputs, tool_renderer
from ..base_agent import BaseAgent
Expand Down Expand Up @@ -54,6 +56,7 @@ class PlanStepChecker:
str: A decision for whether we should use this plan or not
"""

@opea_telemetry
def __init__(self, llm, is_vllm=False):
class grade(BaseModel):
binary_score: str = Field(description="executable score 'yes' or 'no'")
Expand All @@ -66,9 +69,15 @@ class grade(BaseModel):
output_parser = PydanticToolsParser(tools=[grade], first_tool_only=True)
self.chain = plan_check_prompt | llm | output_parser

@opea_telemetry
def __llm_invoke__(self, state):
scored_result = self.chain.invoke(state)
return scored_result

@opea_telemetry
def __call__(self, state):
# print("---CALL PlanStepChecker---")
scored_result = self.chain.invoke(state)
scored_result = self.__llm_invoke__(state)
score = scored_result.binary_score
print(f"Task is {state['context']}, Score is {score}")
if score.startswith("yes"):
Expand All @@ -79,6 +88,7 @@ def __call__(self, state):

# Define workflow Node
class Planner:
@opea_telemetry
def __init__(self, llm, plan_checker=None, is_vllm=False):
if is_vllm:
llm = llm.bind_tools([Plan], tool_choice={"function": {"name": Plan.__name__}})
Expand All @@ -88,6 +98,12 @@ def __init__(self, llm, plan_checker=None, is_vllm=False):
self.llm = planner_prompt | llm | output_parser
self.plan_checker = plan_checker

@opea_telemetry
def __llm_invoke__(self, messages):
plan = self.llm.invoke(messages)
return plan

@opea_telemetry
def __call__(self, state):
print("---CALL Planner---")
input = state["messages"][-1].content
Expand All @@ -96,7 +112,7 @@ def __call__(self, state):
while not success:
while not success:
try:
plan = self.llm.invoke({"messages": [("user", state["messages"][-1].content)]})
plan = self.__llm_invoke__({"messages": [("user", state["messages"][-1].content)]})
print("Generated plan: ", plan)
success = True
except OutputParserException as e:
Expand All @@ -116,6 +132,7 @@ def __call__(self, state):


class Executor:
@opea_telemetry
def __init__(self, llm, tools=[]):
prompt = hwchase17_react_prompt
if has_multi_tool_inputs(tools):
Expand All @@ -126,6 +143,7 @@ def __init__(self, llm, tools=[]):
agent=agent_chain, tools=tools, handle_parsing_errors=True, max_iterations=50
)

@opea_telemetry
def __call__(self, state):
print("---CALL Executor---")
plan = state["plan"]
Expand All @@ -151,6 +169,7 @@ def __call__(self, state):


class AnswerMaker:
@opea_telemetry
def __init__(self, llm, is_vllm=False):
if is_vllm:
llm = llm.bind_tools([Response], tool_choice={"function": {"name": Response.__name__}})
Expand All @@ -159,13 +178,19 @@ def __init__(self, llm, is_vllm=False):
output_parser = PydanticToolsParser(tools=[Response], first_tool_only=True)
self.llm = answer_make_prompt | llm | output_parser

@opea_telemetry
def __llm_invoke__(self, state):
output = self.llm.invoke(state)
return output

@opea_telemetry
def __call__(self, state):
print("---CALL AnswerMaker---")
success = False
# sometime, LLM will not provide accurate steps per ask, try more than one time until success
while not success:
try:
output = self.llm.invoke(state)
output = self.__llm_invoke__(state)
print("Generated response: ", output.response)
success = True
except OutputParserException as e:
Expand All @@ -183,6 +208,7 @@ class FinalAnswerChecker:
str: A decision for whether we should use this plan or not
"""

@opea_telemetry
def __init__(self, llm, is_vllm=False):
class grade(BaseModel):
binary_score: str = Field(description="executable score 'yes' or 'no'")
Expand All @@ -194,9 +220,15 @@ class grade(BaseModel):
output_parser = PydanticToolsParser(tools=[grade], first_tool_only=True)
self.chain = answer_check_prompt | llm | output_parser

@opea_telemetry
def __llm_invoke__(self, state):
output = self.chain.invoke(state)
return output

@opea_telemetry
def __call__(self, state):
print("---CALL FinalAnswerChecker---")
scored_result = self.chain.invoke(state)
scored_result = self.__llm_invoke__(state)
score = scored_result.binary_score
print(f"Answer is {state['response']}, Grade of good response is {score}")
if score.startswith("yes"):
Expand All @@ -206,19 +238,26 @@ def __call__(self, state):


class Replanner:
@opea_telemetry
def __init__(self, llm, answer_checker=None):
llm = llm.bind_tools([Plan])
output_parser = PydanticToolsParser(tools=[Plan], first_tool_only=True)
self.llm = replanner_prompt | llm | output_parser
self.answer_checker = answer_checker

@opea_telemetry
def __llm_invoke__(self, state):
output = self.llm.invoke(state)
return output

@opea_telemetry
def __call__(self, state):
print("---CALL Replanner---")
success = False
# sometime, LLM will not provide accurate steps per ask, try more than one time until success
while not success:
try:
output = self.llm.invoke(state)
output = self.__llm_invoke__(state)
success = True
print("Replan: ", output)
except OutputParserException as e:
Expand All @@ -230,6 +269,7 @@ def __call__(self, state):


class PlanExecuteAgentWithLangGraph(BaseAgent):
@opea_telemetry
def __init__(self, args, with_memory=False, **kwargs):
super().__init__(args, local_vars=globals(), **kwargs)

Expand Down Expand Up @@ -260,6 +300,7 @@ def __init__(self, args, with_memory=False, **kwargs):
def prepare_initial_state(self, query):
return {"messages": [("user", query)]}

@opea_telemetry
async def stream_generator(self, query, config, thread_id=None):
initial_state = self.prepare_initial_state(query)
if thread_id is not None:
Expand All @@ -282,6 +323,7 @@ async def stream_generator(self, query, config, thread_id=None):
yield f"data: {repr(event)}\n\n"
yield "data: [DONE]\n\n"

@opea_telemetry
async def non_streaming_run(self, query, config):
initial_state = self.prepare_initial_state(query)
try:
Expand Down
Loading