Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions verl/experimental/agent_loop/agent_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ async def generate_sequences(self, batch: DataProto) -> DataProto:
tasks = []
agent_names = batch.non_tensor_batch["agent_name"]
raw_prompts = batch.non_tensor_batch["raw_prompt"]
tools_kwargs_batch = batch.non_tensor_batch["tools_kwargs"]
if "index" in batch.non_tensor_batch:
index = batch.non_tensor_batch["index"]
else:
Expand All @@ -283,9 +284,13 @@ async def generate_sequences(self, batch: DataProto) -> DataProto:
batch.meta_info.get("global_steps", -1), index, batch.meta_info.get("validate", False)
)

for agent_name, messages, trajectory in zip(agent_names, raw_prompts, trajectory_info, strict=True):
for agent_name, messages, trajectory, tools_kwargs in zip(
agent_names, raw_prompts, trajectory_info, tools_kwargs_batch, strict=True
):
tasks.append(
asyncio.create_task(self._run_agent_loop(agent_name, messages.tolist(), sampling_params, trajectory))
asyncio.create_task(
self._run_agent_loop(agent_name, messages.tolist(), sampling_params, tools_kwargs, trajectory)
)
)
outputs = await asyncio.gather(*tasks)

Expand All @@ -297,6 +302,7 @@ async def _run_agent_loop(
agent_name: str,
messages: list[dict[str, Any]],
sampling_params: dict[str, Any],
tools_kwargs: dict[str, Any],
trajectory: dict[str, Any],
) -> AgentLoopOutput:
with rollout_trace_attr(
Expand All @@ -317,7 +323,7 @@ async def _run_agent_loop(
server_manager=self.server_manager,
tokenizer=self.tokenizer,
)
output = await agent_loop.run(messages, sampling_params)
output = await agent_loop.run(messages, sampling_params, tools_kwargs)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will break all subclasses of AgentLoopBase, we should make tools_kwargs an optional argument.
https://github.com/volcengine/verl/blob/main/verl/experimental/agent_loop/agent_loop.py#L171

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps I should add **kwargs to the run method of the AgentLoopBase class?

return output

def _postprocess(self, inputs: list[AgentLoopOutput]) -> DataProto:
Expand Down
12 changes: 7 additions & 5 deletions verl/experimental/agent_loop/tool_agent_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ def init_class(cls, config, tokenizer, **kwargs):
cls.system_prompt = tokenizer.apply_chat_template([{}], add_generation_prompt=False, tokenize=True)

@rollout_trace_op
async def run(self, messages: list[dict[str, Any]], sampling_params: dict[str, Any]) -> AgentLoopOutput:
async def run(
self, messages: list[dict[str, Any]], sampling_params: dict[str, Any], tools_kwargs: dict[str, Any]
) -> AgentLoopOutput:
metrics = {}
request_id = uuid4().hex
prompt_ids = await self.loop.run_in_executor(
Expand Down Expand Up @@ -97,7 +99,7 @@ async def run(self, messages: list[dict[str, Any]], sampling_params: dict[str, A
# call tools
tasks = []
for tool_call in tool_calls[: self.max_parallel_calls]:
tasks.append(self._call_tool(tool_call))
tasks.append(self._call_tool(tool_call, tools_kwargs))
with simple_timer("tool_calls", metrics):
tool_responses = await asyncio.gather(*tasks)
if any(isinstance(item, Exception) for item in tool_responses):
Expand Down Expand Up @@ -133,16 +135,16 @@ async def run(self, messages: list[dict[str, Any]], sampling_params: dict[str, A
)
return output

async def _call_tool(self, tool_call: FunctionCall) -> dict[str, str]:
async def _call_tool(self, tool_call: FunctionCall, tools_kwargs: dict[str, Any]) -> dict[str, str]:
"""Call tool and return tool response."""
tool, instance_id = None, None
try:
# TODO: append malformed tool_call to the prompt: invalid function name or arguments
tool_name = tool_call.name
tool_args = json.loads(tool_call.arguments)
tool = self.tools[tool_name]

instance_id = await tool.create()
kwargs = tools_kwargs.get(tool_name, {})
instance_id = await tool.create(create_kwargs=kwargs.get("create_kwargs", {}))
tool_response, _, _ = await tool.execute(instance_id, tool_args)
except Exception as e:
logger.exception(f"Error when executing tool: {e}")
Expand Down
2 changes: 2 additions & 0 deletions verl/tools/gsm8k_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ def get_openai_tool_schema(self) -> OpenAIFunctionToolSchema:
async def create(self, instance_id: Optional[str] = None, ground_truth: Optional[str] = None, **kwargs) -> str:
if instance_id is None:
instance_id = str(uuid4())
if ground_truth is None:
ground_truth = kwargs.get("create_kwargs", {}).get("ground_truth", None)
self._instance_dict[instance_id] = {
"response": "",
"ground_truth": ground_truth,
Expand Down