Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions console/src/api/types/cronjob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ export interface CronJobRuntime {
misfire_grace_seconds?: number;
}

export interface CronJobExecutionSession {
mode?: "dispatch" | "new_per_run";
}

export interface CronJobExecution {
session?: CronJobExecutionSession;
}

export interface CronJobRequest {
input: unknown;
session_id?: string | null;
Expand All @@ -39,6 +47,7 @@ export interface CronJobSpecInput {
text?: string;
request?: CronJobRequest;
dispatch: CronJobDispatch;
execution?: CronJobExecution;
runtime?: CronJobRuntime;
meta?: Record<string, unknown>;
}
Expand Down
7 changes: 7 additions & 0 deletions console/src/locales/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -466,9 +466,14 @@
"requestInput": "Request Content",
"requestSessionId": "Request Session ID",
"requestUserId": "Request User ID",
"executionSessionStrategy": "Execution Session",
"executionSessionReuseDispatch": "Reuse output session",
"executionSessionNewPerRun": "Create new session for each run",
"executionSessionNewPerRunHint": "Each run will execute in a fresh chat/session while results still go to the output session below.",
"dispatchChannel": "Target Channel",
"dispatchTargetUserId": "Target User ID",
"dispatchTargetSessionId": "Target Session ID",
"outputSession": "Output Session",
"dispatchMode": "Delivery Mode",
"runtimeMaxConcurrency": "Max Concurrency",
"runtimeTimeoutSeconds": "Timeout (seconds)",
Expand All @@ -486,9 +491,11 @@
"requestInputExample": "Format: [{\"role\":\"user\",\"content\":[{\"type\":\"text\",\"text\":\"Your message here\"}]}]",
"requestSessionIdTooltip": "Session ID for the request context. Use 'default' if unsure.",
"requestUserIdTooltip": "User ID that initiates the request. Use 'system' for automated tasks.",
"executionSessionStrategyTooltip": "Choose whether each run reuses the output session context or starts a fresh execution session.",
"dispatchChannelTooltip": "Target channel where the response will be sent (e.g., 'console', 'discord', 'imessage').",
"dispatchTargetUserIdTooltip": "User ID who will receive the response in the target channel.",
"dispatchTargetSessionIdTooltip": "Session ID where the response will be delivered in the target channel.",
"outputSessionTooltip": "The session that receives cron output and stays stable for channel delivery.",
"dispatchModeTooltip": "Choose 'stream' for real-time responses or 'final' for complete responses only.",
"maxConcurrencyTooltip": "Maximum number of this job that can run simultaneously. Default: 1",
"timeoutSecondsTooltip": "Maximum execution time in seconds. Job will be terminated if exceeded.",
Expand Down
7 changes: 7 additions & 0 deletions console/src/locales/ja.json
Original file line number Diff line number Diff line change
Expand Up @@ -408,9 +408,14 @@
"requestInput": "リクエスト内容",
"requestSessionId": "リクエストセッションID",
"requestUserId": "リクエストユーザーID",
"executionSessionStrategy": "実行セッション",
"executionSessionReuseDispatch": "出力セッションを再利用",
"executionSessionNewPerRun": "実行ごとに新しいセッションを作成",
"executionSessionNewPerRunHint": "各実行は新しいチャット/セッションで処理されますが、結果は下の出力セッションに配信されます。",
"dispatchChannel": "対象チャンネル",
"dispatchTargetUserId": "対象ユーザーID",
"dispatchTargetSessionId": "対象セッションID",
"outputSession": "出力セッション",
"dispatchMode": "配信モード",
"runtimeMaxConcurrency": "最大同時実行数",
"runtimeTimeoutSeconds": "タイムアウト(秒)",
Expand All @@ -428,9 +433,11 @@
"requestInputExample": "フォーマット: [{\"role\":\"user\",\"content\":[{\"type\":\"text\",\"text\":\"ここにメッセージを入力\"}]}]",
"requestSessionIdTooltip": "リクエストコンテキストのセッションID。不明な場合は「default」を使用してください。",
"requestUserIdTooltip": "リクエストを開始するユーザーID。自動タスクには「system」を使用してください。",
"executionSessionStrategyTooltip": "各実行で出力セッションの文脈を再利用するか、新しい実行セッションを開始するかを選択します。",
"dispatchChannelTooltip": "レスポンスを送信する対象チャンネル(例: 'console', 'discord', 'imessage')。",
"dispatchTargetUserIdTooltip": "対象チャンネルでレスポンスを受け取るユーザーID。",
"dispatchTargetSessionIdTooltip": "対象チャンネルでレスポンスを届けるセッションID。",
"outputSessionTooltip": "cron の出力を受け取るセッションです。チャンネル配信とコンソール表示のために安定したまま保たれます。",
"dispatchModeTooltip": "リアルタイム応答には「ストリーム」を、完了後の応答には「ファイナル」を選択してください。",
"maxConcurrencyTooltip": "このジョブの最大同時実行数。デフォルト: 1",
"timeoutSecondsTooltip": "最大実行時間(秒)。超過するとジョブが終了します。",
Expand Down
7 changes: 7 additions & 0 deletions console/src/locales/ru.json
Original file line number Diff line number Diff line change
Expand Up @@ -413,9 +413,14 @@
"requestInput": "Содержимое запроса",
"requestSessionId": "ID сессии запроса",
"requestUserId": "ID пользователя запроса",
"executionSessionStrategy": "Сессия выполнения",
"executionSessionReuseDispatch": "Переиспользовать сессию вывода",
"executionSessionNewPerRun": "Создавать новую сессию для каждого запуска",
"executionSessionNewPerRunHint": "Каждый запуск будет выполняться в новой беседе/сессии, а результат по-прежнему будет отправляться в указанную ниже сессию вывода.",
"dispatchChannel": "Целевой канал",
"dispatchTargetUserId": "ID целевого пользователя",
"dispatchTargetSessionId": "ID целевой сессии",
"outputSession": "Сессия вывода",
"dispatchMode": "Режим доставки",
"runtimeMaxConcurrency": "Макс. параллельность",
"runtimeTimeoutSeconds": "Таймаут (секунды)",
Expand All @@ -433,9 +438,11 @@
"requestInputExample": "Формат: [{\"role\":\"user\",\"content\":[{\"type\":\"text\",\"text\":\"Ваше сообщение\"}]}]",
"requestSessionIdTooltip": "ID сессии для контекста запроса. Если не уверены, используйте 'default'.",
"requestUserIdTooltip": "ID пользователя, который инициирует запрос. Для автоматических задач используйте 'system'.",
"executionSessionStrategyTooltip": "Выберите, будет ли каждый запуск переиспользовать контекст сессии вывода или начинаться в новой сессии выполнения.",
"dispatchChannelTooltip": "Целевой канал, куда будет отправлен ответ (например, 'console', 'discord', 'imessage').",
"dispatchTargetUserIdTooltip": "ID пользователя, который получит ответ в целевом канале.",
"dispatchTargetSessionIdTooltip": "ID сессии, куда будет доставлен ответ в целевом канале.",
"outputSessionTooltip": "Сессия, которая получает вывод cron-задачи и остаётся стабильной для доставки по каналу.",
"dispatchModeTooltip": "Выберите 'stream' для ответов в реальном времени или 'final' только для итоговых ответов.",
"maxConcurrencyTooltip": "Максимальное количество одновременно выполняемых экземпляров этой задачи. По умолчанию: 1",
"timeoutSecondsTooltip": "Максимальное время выполнения в секундах. При превышении задача будет остановлена.",
Expand Down
7 changes: 7 additions & 0 deletions console/src/locales/zh.json
Original file line number Diff line number Diff line change
Expand Up @@ -431,9 +431,14 @@
"requestInput": "请求内容",
"requestSessionId": "请求会话ID",
"requestUserId": "请求用户ID",
"executionSessionStrategy": "执行会话",
"executionSessionReuseDispatch": "复用输出会话",
"executionSessionNewPerRun": "每次执行创建新会话",
"executionSessionNewPerRunHint": "每次运行都会在新的聊天/会话中执行,但结果仍会投递到下面配置的输出会话。",
"dispatchChannel": "目标频道",
"dispatchTargetUserId": "目标用户ID",
"dispatchTargetSessionId": "目标会话ID",
"outputSession": "输出会话",
"dispatchMode": "分发模式",
"runtimeMaxConcurrency": "最大并发数",
"runtimeTimeoutSeconds": "超时时间(秒)",
Expand All @@ -451,9 +456,11 @@
"requestInputExample": "格式:[{\"role\":\"user\",\"content\":[{\"type\":\"text\",\"text\":\"您的消息内容\"}]}]",
"requestSessionIdTooltip": "请求上下文的会话ID。不确定时使用 'default'。",
"requestUserIdTooltip": "发起请求的用户ID。自动化任务使用 'system'。",
"executionSessionStrategyTooltip": "选择每次执行是复用输出会话上下文,还是启动一个全新的执行会话。",
"dispatchChannelTooltip": "响应将发送到的目标频道(例如:'console'、'discord'、'imessage')。",
"dispatchTargetUserIdTooltip": "在目标频道中接收响应的用户ID。",
"dispatchTargetSessionIdTooltip": "在目标频道中传递响应的会话ID。",
"outputSessionTooltip": "接收定时任务输出的会话,保持稳定以便频道投递与控制台查看。",
"dispatchModeTooltip": "选择 'stream' 获取实时响应,或选择 'final' 仅获取完整响应。",
"maxConcurrencyTooltip": "此任务可以同时运行的最大数量。默认:1",
"timeoutSecondsTooltip": "最大执行时间(秒)。超时将终止任务。",
Expand Down
35 changes: 23 additions & 12 deletions console/src/pages/Control/CronJobs/components/JobDrawer.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
Switch,
Button,
Checkbox,
Radio,
} from "@agentscope-ai/design";
import { TimePicker } from "antd";
import { useTranslation } from "react-i18next";
Expand Down Expand Up @@ -329,19 +330,29 @@ export function JobDrawer({
</Form.Item>

<Form.Item
name={["request", "session_id"]}
label={t("cronJobs.requestSessionId")}
tooltip={t("cronJobs.requestSessionIdTooltip")}
name="sessionStrategy"
label={t("cronJobs.executionSessionStrategy")}
tooltip={t("cronJobs.executionSessionStrategyTooltip")}
initialValue="dispatch"
>
<Input placeholder="default" />
<Radio.Group>
<Radio value="dispatch">
{t("cronJobs.executionSessionReuseDispatch")}
</Radio>
<Radio value="new_per_run">
{t("cronJobs.executionSessionNewPerRun")}
</Radio>
</Radio.Group>
</Form.Item>

<Form.Item
name={["request", "user_id"]}
label={t("cronJobs.requestUserId")}
tooltip={t("cronJobs.requestUserIdTooltip")}
>
<Input placeholder="system" />
<Form.Item noStyle shouldUpdate>
{({ getFieldValue }) =>
getFieldValue("sessionStrategy") === "new_per_run" ? (
<div className={styles.formExtraText} style={{ marginBottom: 16 }}>
{t("cronJobs.executionSessionNewPerRunHint")}
</div>
) : null
}
</Form.Item>

<Form.Item name={["dispatch", "type"]} label="DispatchType" hidden>
Expand Down Expand Up @@ -370,11 +381,11 @@ export function JobDrawer({

<Form.Item
name={["dispatch", "target", "session_id"]}
label={t("cronJobs.dispatchTargetSessionId")}
label={t("cronJobs.outputSession")}
rules={[
{ required: true, message: t("cronJobs.pleaseInputSessionId") },
]}
tooltip={t("cronJobs.dispatchTargetSessionIdTooltip")}
tooltip={t("cronJobs.outputSessionTooltip")}
>
<Input placeholder="default" />
</Form.Item>
Expand Down
20 changes: 9 additions & 11 deletions console/src/pages/Control/CronJobs/components/columns.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -236,16 +236,14 @@ export const createColumns = (
},
},
{
title: "RequestSessionID",
dataIndex: ["request", "session_id"],
key: "session_id",
width: 160,
},
{
title: "RequestUserID",
dataIndex: ["request", "user_id"],
key: "user_id",
width: 140,
title: handlers.t("cronJobs.executionSessionStrategy"),
dataIndex: ["execution", "session", "mode"],
key: "execution_session_mode",
width: 220,
render: (mode?: string) =>
mode === "new_per_run"
? handlers.t("cronJobs.executionSessionNewPerRun")
: handlers.t("cronJobs.executionSessionReuseDispatch"),
},
{
title: "DispatchType",
Expand All @@ -266,7 +264,7 @@ export const createColumns = (
width: 190,
},
{
title: "DispatchTargetSessionID",
title: handlers.t("cronJobs.outputSession"),
dataIndex: ["dispatch", "target", "session_id"],
key: "target_session_id",
width: 210,
Expand Down
6 changes: 6 additions & 0 deletions console/src/pages/Control/CronJobs/components/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ export const DEFAULT_FORM_VALUES = {
cronType: "daily",
cronTime: dayjs().hour(9).minute(0),
task_type: "agent" as const,
execution: {
session: {
mode: "dispatch" as const,
},
},
sessionStrategy: "dispatch" as const,
dispatch: {
type: "channel" as const,
channel: "console",
Expand Down
14 changes: 13 additions & 1 deletion console/src/pages/Control/CronJobs/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ function CronJobsPage() {
: "",
},
cronType: cronParts.type,
sessionStrategy: job.execution?.session?.mode || "dispatch",
};

// Set time picker value
Expand Down Expand Up @@ -157,15 +158,26 @@ function CronJobsPage() {
...values.schedule,
cron: cronExpression,
},
execution: {
session: {
mode: values.sessionStrategy || "dispatch",
},
},
};

delete (processedValues as any).sessionStrategy;
if (processedValues.request) {
delete (processedValues.request as any).session_id;
delete (processedValues.request as any).user_id;
}

// Parse request input JSON
if (values.request?.input && typeof values.request.input === "string") {
try {
processedValues = {
...processedValues,
request: {
...values.request,
...(processedValues.request || {}),
input: JSON.parse(values.request.input as any),
},
};
Expand Down
36 changes: 29 additions & 7 deletions src/copaw/app/crons/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,21 @@

import asyncio
import logging
import time
from typing import Any, Dict
from uuid import uuid4

from .models import CronJobSpec

logger = logging.getLogger(__name__)


def _generate_execution_session_id(job_id: str) -> str:
timestamp = int(time.time() * 1000)
uuid_short = str(uuid4())[:8]
return f"cron:{job_id}:{timestamp}:{uuid_short}"


class CronExecutor:
def __init__(self, *, runner: Any, channel_manager: Any):
self._runner = runner
Expand All @@ -23,16 +31,30 @@ async def execute(self, job: CronJobSpec) -> None:
stream_query + send_event)
"""
target_user_id = job.dispatch.target.user_id
target_session_id = job.dispatch.target.session_id
dispatch_meta: Dict[str, Any] = dict(job.dispatch.meta or {})
dispatch_session_id = job.dispatch.target.session_id
session_mode = job.execution.session.mode
execution_session_id = (
_generate_execution_session_id(job.id)
if session_mode == "new_per_run"
else (dispatch_session_id or f"cron:{job.id}")
)
dispatch_meta: Dict[str, Any] = {
**dict(job.dispatch.meta or {}),
"execution_session_id": execution_session_id,
"dispatch_session_id": dispatch_session_id,
"session_mode": session_mode,
}
logger.info(
"cron execute: job_id=%s channel=%s task_type=%s "
"target_user_id=%s target_session_id=%s",
"target_user_id=%s dispatch_session_id=%s "
"execution_session_id=%s session_mode=%s",
job.id,
job.dispatch.channel,
job.task_type,
target_user_id[:40] if target_user_id else "",
target_session_id[:40] if target_session_id else "",
dispatch_session_id[:40] if dispatch_session_id else "",
execution_session_id[:40] if execution_session_id else "",
session_mode,
)

if job.task_type == "text" and job.text:
Expand All @@ -45,7 +67,7 @@ async def execute(self, job: CronJobSpec) -> None:
await self._channel_manager.send_text(
channel=job.dispatch.channel,
user_id=target_user_id,
session_id=target_session_id,
session_id=dispatch_session_id,
text=job.text.strip(),
meta=dispatch_meta,
)
Expand All @@ -60,14 +82,14 @@ async def execute(self, job: CronJobSpec) -> None:
assert job.request is not None
req: Dict[str, Any] = job.request.model_dump(mode="json")
req["user_id"] = target_user_id or "cron"
req["session_id"] = target_session_id or f"cron:{job.id}"
req["session_id"] = execution_session_id

async def _run() -> None:
async for event in self._runner.stream_query(req):
await self._channel_manager.send_event(
channel=job.dispatch.channel,
user_id=target_user_id,
session_id=target_session_id,
session_id=dispatch_session_id,
event=event,
meta=dispatch_meta,
)
Expand Down
12 changes: 10 additions & 2 deletions src/copaw/app/crons/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@ class JobRuntimeSpec(BaseModel):
misfire_grace_seconds: int = Field(default=60, ge=0)


class ExecutionSessionSpec(BaseModel):
mode: Literal["dispatch", "new_per_run"] = "dispatch"


class ExecutionSpec(BaseModel):
session: ExecutionSessionSpec = Field(default_factory=ExecutionSessionSpec)


class CronJobRequest(BaseModel):
"""Passthrough payload to runner.stream_query(request=...).

Expand All @@ -133,6 +141,7 @@ class CronJobSpec(BaseModel):
text: Optional[str] = None
request: Optional[CronJobRequest] = None
dispatch: DispatchSpec
execution: ExecutionSpec = Field(default_factory=ExecutionSpec)

runtime: JobRuntimeSpec = Field(default_factory=JobRuntimeSpec)
meta: Dict[str, Any] = Field(default_factory=dict)
Expand All @@ -149,12 +158,11 @@ def _validate_task_type_fields(self) -> "CronJobSpec":
raise ConfigurationException(
message="task_type is agent but request is missing",
)
# Keep request.user_id and request.session_id in sync with target
# Keep request.user_id aligned with dispatch target.
target = self.dispatch.target
self.request = self.request.model_copy(
update={
"user_id": target.user_id,
"session_id": target.session_id,
},
)
return self
Expand Down
Loading
Loading