Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
303 changes: 50 additions & 253 deletions backend/app/utils/workforce.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,14 @@

import asyncio
import logging
from collections.abc import Generator

from camel.agents import ChatAgent
from camel.societies.workforce.base import BaseNode
from camel.societies.workforce.events import (
TaskAssignedEvent,
TaskCompletedEvent,
TaskCreatedEvent,
TaskFailedEvent,
WorkerCreatedEvent,
)
from camel.societies.workforce.prompts import TASK_DECOMPOSE_PROMPT
from camel.societies.workforce.task_channel import TaskChannel
from camel.societies.workforce.utils import (
FailureHandlingConfig,
TaskAnalysisResult,
Expand All @@ -35,6 +30,7 @@
from camel.societies.workforce.workforce import (
DEFAULT_WORKER_POOL_SIZE,
Workforce as BaseWorkforce,
WorkforcePlan,
WorkforceState,
)
from camel.societies.workforce.workforce_metrics import WorkforceMetrics
Expand All @@ -54,6 +50,7 @@
)
from app.utils.single_agent_worker import SingleAgentWorker
from app.utils.telemetry.workforce_metrics import WorkforceMetricsCallback
from camel.agents import ChatAgent

logger = logging.getLogger("workforce")

Expand Down Expand Up @@ -101,6 +98,7 @@ def __init__(
)
self.task_agent.stream_accumulate = True
self.task_agent._stream_accumulate_explicit = True
self._last_plan: WorkforcePlan | None = None
logger.info(
f"[WF-LIFECYCLE] ✅ Workforce.__init__ COMPLETED, id={id(self)}"
)
Expand Down Expand Up @@ -177,19 +175,23 @@ def eigent_make_sub_tasks(
coordinator_context: str = "",
on_stream_batch=None,
on_stream_text=None,
):
"""Split process_task method to eigent_make_sub_tasks
and eigent_start method.
) -> list[Task]:
"""Plan task decomposition without starting execution.

Delegates to CAMEL's plan_task_async. The resulting plan is
stored internally as ``_last_plan`` and the subtask list is
returned for backward-compatible callers. Pass the subtask list
(or the plan itself) to ``eigent_start`` to execute.

Args:
task: The main task to decompose
task: The main task to decompose.
coordinator_context: Optional context ONLY for coordinator
agent during decomposition. This context will NOT
be passed to subtasks or worker agents.
on_stream_batch: Optional callback for streaming
batches signature (List[Task], bool)
on_stream_text: Optional callback for raw
streaming text chunks
agent during decomposition. Will NOT be passed to workers.
on_stream_batch: Optional callback (List[Task], bool).
on_stream_text: Optional callback for raw streaming text.

Returns:
list[Task]: The decomposed subtasks.
"""
logger.debug(
"[DECOMPOSE] eigent_make_sub_tasks called",
Expand All @@ -210,231 +212,58 @@ def eigent_make_sub_tasks(
)
raise UserException(code.error, task.result)

self.reset()
self._task = task
self.set_channel(TaskChannel())
self._state = WorkforceState.RUNNING
task.state = TaskState.OPEN
subtasks = asyncio.run(
self.handle_decompose_append_task(
plan = asyncio.run(
self.plan_task_async(
task,
reset=False,
coordinator_context=coordinator_context,
on_stream_batch=on_stream_batch,
on_stream_text=on_stream_text,
planner_context=coordinator_context or None,
raw_text_callback=on_stream_text,
subtask_batch_callback=on_stream_batch,
)
)
self._last_plan = plan

logger.info(
"[DECOMPOSE] Task decomposition completed",
extra={
"api_task_id": self.api_task_id,
"task_id": task.id,
"subtasks_count": len(subtasks),
"subtasks_count": len(plan.subtasks),
},
)
return subtasks

async def eigent_start(self, subtasks: list[Task]):
"""start the workforce"""
logger.debug(
(
f"[WF-LIFECYCLE] eigent_start called with "
f"{len(subtasks)} subtasks"
),
extra={"api_task_id": self.api_task_id},
)
# Clear existing pending tasks to use the user-edited task list
# (tasks may have been added during decomposition before user edits)
self._pending_tasks.clear()

self._pending_tasks.extendleft(reversed(subtasks))
self.save_snapshot("Initial task decomposition")

try:
await self.start()
except Exception as e:
logger.error(
f"[WF-LIFECYCLE] Error in workforce execution: {e}",
extra={"api_task_id": self.api_task_id, "error": str(e)},
exc_info=True,
)
self._state = WorkforceState.STOPPED
raise
finally:
if self._state != WorkforceState.STOPPED:
self._state = WorkforceState.IDLE

def _decompose_task(self, task: Task, stream_callback=None):
"""Decompose task with optional streaming text callback."""
decompose_prompt = str(
TASK_DECOMPOSE_PROMPT.format(
content=task.content,
child_nodes_info=self._get_child_nodes_info(),
additional_info=task.additional_info,
)
)

self.task_agent.reset()
result = task.decompose(
self.task_agent, decompose_prompt, stream_callback=stream_callback
)

if isinstance(result, Generator):

def streaming_with_dependencies():
all_subtasks = []
for new_tasks in result:
all_subtasks.extend(new_tasks)
if new_tasks:
self._update_dependencies_for_decomposition(
task, all_subtasks
)
yield new_tasks
return plan.subtasks

return streaming_with_dependencies()
else:
subtasks = result
if subtasks:
self._update_dependencies_for_decomposition(task, subtasks)
return subtasks

async def handle_decompose_append_task(
self,
task: Task,
reset: bool = True,
coordinator_context: str = "",
on_stream_batch=None,
on_stream_text=None,
) -> list[Task]:
"""Override to support coordinator_context parameter.
Handle task decomposition and validation,
then append to pending tasks.
async def eigent_start(self, plan_or_subtasks: WorkforcePlan | list[Task]):
"""Execute a plan or a list of (possibly edited) subtasks.

Args:
task: The task to be processed
reset: Should trigger workforce reset
(Workforce must not be running)
coordinator_context: Optional context ONLY for
coordinator during decomposition
on_stream_batch: Optional callback for streaming
batches signature (List[Task], bool)
on_stream_text: Optional callback for raw streaming text chunks

Returns:
List[Task]: The decomposed subtasks or the original task
plan_or_subtasks: Either a WorkforcePlan, or a subtask list
(backward-compatible). When a list is passed, the plan
from the last eigent_make_sub_tasks call is reused with
the edited subtask list.
"""
logger.debug(
f"[DECOMPOSE] handle_decompose_append_task called, "
f"task_id={task.id}, reset={reset}"
)

if not validate_task_content(task.content, task.id):
task.state = TaskState.FAILED
task.result = "Task failed: Invalid or empty content provided"
logger.warning(
f"[DECOMPOSE] Task {task.id} rejected: "
f"Invalid or empty content. "
f"Content preview: '{task.content}'"
)
return [task]

if reset and self._state != WorkforceState.RUNNING:
self.reset()

self._task = task
task.state = TaskState.FAILED

if coordinator_context:
original_content = task.content
task_with_context = (
coordinator_context
+ "\n=== CURRENT TASK ===\n"
+ original_content
)
task.content = task_with_context
subtasks_result = self._decompose_task(
task, stream_callback=on_stream_text
)
task.content = original_content
if isinstance(plan_or_subtasks, WorkforcePlan):
plan = plan_or_subtasks
else:
subtasks_result = self._decompose_task(
task, stream_callback=on_stream_text
)

if isinstance(subtasks_result, Generator):
subtasks = []
for new_tasks in subtasks_result:
subtasks.extend(new_tasks)
if on_stream_batch:
try:
on_stream_batch(new_tasks, False)
except Exception as e:
logger.warning(f"Streaming callback failed: {e}")

# After consuming the generator, check task.subtasks
# for final result as fallback
if not subtasks and task.subtasks:
subtasks = task.subtasks
else:
subtasks = subtasks_result

if subtasks:
self._pending_tasks.extendleft(reversed(subtasks))
# Log task created events
metrics_callbacks = [
cb
for cb in self._callbacks
if isinstance(cb, WorkforceMetrics)
]
if metrics_callbacks:
for subtask in subtasks:
event = TaskCreatedEvent(
task_id=subtask.id,
description=subtask.content,
parent_task_id=task.id if task else None,
task_type=None,
)
metrics_callbacks[0].log_task_created(event)

if not subtasks:
logger.warning(
"[DECOMPOSE] No subtasks returned, creating fallback task"
)
fallback_task = Task(
content=task.content,
id=f"{task.id}.1",
parent=task,
)
task.subtasks = [fallback_task]
subtasks = [fallback_task]

# Log fallback task created event
metrics_callbacks = [
cb
for cb in self._callbacks
if isinstance(cb, WorkforceMetrics)
]
if metrics_callbacks:
event = TaskCreatedEvent(
task_id=fallback_task.id,
description=fallback_task.content,
parent_task_id=task.id if task else None,
task_type=None,
subtasks = plan_or_subtasks
# Reuse the plan from eigent_make_sub_tasks if available,
# updating its subtask list with the (possibly edited) one.
last_plan = getattr(self, "_last_plan", None)
if isinstance(last_plan, WorkforcePlan):
last_plan.subtasks = subtasks
plan = last_plan
else:
plan = WorkforcePlan(
task=self._task or Task(content="", id="fallback"),
subtasks=subtasks,
)
metrics_callbacks[0].log_task_created(event)

if on_stream_batch:
try:
on_stream_batch(subtasks, True)
except Exception as e:
logger.warning(f"Final streaming callback failed: {e}")

logger.debug(
f"[DECOMPOSE] handle_decompose_append_task completed, "
f"returned {len(subtasks)} subtasks"
f"[WF-LIFECYCLE] eigent_start called with "
f"{len(plan.subtasks)} subtasks",
extra={"api_task_id": self.api_task_id},
)
return subtasks

await self.run_plan_async(plan, interactive=True)

def _get_agent_id_from_node_id(self, node_id: str) -> str | None:
"""Map worker node_id to the actual agent_id for
Expand Down Expand Up @@ -684,35 +513,6 @@ def add_single_agent_worker(

return self

def _sync_subtask_to_parent(self, task: Task) -> None:
"""Sync completed subtask's :obj:`result` and :obj:`state`
back to its :obj:`parent.subtasks` list. CAMEL stores results
in :obj:`_completed_tasks` but doesn't update
:obj:`parent.subtasks`, causing :obj:`parent.subtasks[i].result`
to remain :obj:`None`. This ensures consistency.

Args:
task (Task): The completed subtask whose result/state should
be synced to :obj:`parent.subtasks`.
"""
parent: Task = task.parent
if not parent or not parent.subtasks:
return

for sub in parent.subtasks:
if sub.id == task.id:
sub.result = task.result
sub.state = task.state
logger.debug(
f"[SYNC] Synced subtask {task.id} "
f"result to parent.subtasks"
)
return

logger.warning(
f"[SYNC] Subtask {task.id} not found in parent.subtasks"
)

async def _notify_task_completion(self, task: Task) -> None:
"""Send task completion notification to frontend.

Expand Down Expand Up @@ -771,10 +571,7 @@ async def _handle_completed_task(self, task: Task) -> None:
task (Task): The completed task to process.
"""
logger.debug(f"[WF] DONE {task.id}")
# Sync and fix internal at first before sending task state
# TODO: CAMEL should handle this task sync or have a more
# efficient sync
self._sync_subtask_to_parent(task)
# _sync_task_to_parent is now handled by CAMEL base class
await self._notify_task_completion(task)
await super()._handle_completed_task(task)

Expand Down
Loading