Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 79 additions & 30 deletions cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1706,24 +1706,27 @@ def _resolve_attachment_path(raw_path: str) -> Path | None:


def _format_process_notification(evt: dict) -> "str | None":
"""Format a process notification event into a [IMPORTANT: ...] message.
"""Format a process notification for display only.

Handles both completion events (notify_on_complete) and watch pattern
match events from the unified completion_queue.
The returned text is a notification, not a prompt. It must never be
enqueued into ``_pending_input`` because that queue represents real user
turns (plus explicit command-generated follow-up turns). Re-entering
background process status through the normal user-input path can contaminate
the next turn after context compaction or interrupts.
"""
evt_type = evt.get("type", "completion")
_sid = evt.get("session_id", "unknown")
_cmd = evt.get("command", "unknown")

if evt_type == "watch_disabled":
return f"[IMPORTANT: {evt.get('message', '')}]"
if evt_type in ("watch_disabled", "watch_overflow_tripped", "watch_overflow_released"):
return f"[Background process notice: {evt.get('message', '')}]"

if evt_type == "watch_match":
_pat = evt.get("pattern", "?")
_out = evt.get("output", "")
_sup = evt.get("suppressed", 0)
text = (
f"[IMPORTANT: Background process {_sid} matched "
f"[Background process {_sid} matched "
f"watch pattern \"{_pat}\".\n"
f"Command: {_cmd}\n"
f"Matched output:\n{_out}"
Expand All @@ -1737,13 +1740,76 @@ def _format_process_notification(evt: dict) -> "str | None":
_exit = evt.get("exit_code", "?")
_out = evt.get("output", "")
return (
f"[IMPORTANT: Background process {_sid} completed "
f"[Background process {_sid} completed "
f"(exit code {_exit}).\n"
f"Command: {_cmd}\n"
f"Output:\n{_out}]"
)


def _load_cli_background_notifications_mode(config: dict | None = None) -> str:
"""Load CLI background-process notification mode from env/config."""
mode = os.getenv("HERMES_BACKGROUND_NOTIFICATIONS", "")
if not mode:
cfg = config if isinstance(config, dict) else CLI_CONFIG
display_cfg = cfg.get("display", {}) if isinstance(cfg, dict) else {}
raw = display_cfg.get("background_process_notifications") if isinstance(display_cfg, dict) else None
if raw is False:
mode = "off"
elif raw not in (None, ""):
mode = str(raw)
mode = (mode or "all").strip().lower()
valid = {"all", "result", "error", "off"}
if mode not in valid:
logger.warning(
"Unknown background_process_notifications '%s', defaulting to 'all'",
mode,
)
return "all"
return mode


def _should_display_process_notification(evt: dict, mode: str) -> bool:
"""Return whether a process notification should be displayed in the CLI."""
mode = (mode or "all").strip().lower()
if mode == "off":
return False
evt_type = evt.get("type", "completion")
if evt_type == "completion":
return mode in ("all", "result") or (
mode == "error" and evt.get("exit_code") not in (0, None)
)
# Running/watch/status events are intentionally chatty and only belong in all mode.
return mode == "all"


def _drain_process_notifications_for_cli(emit=None, config: dict | None = None) -> int:
"""Drain background process notifications without injecting user input.

Returns the number of notifications displayed. ``emit`` is a display
side-channel (defaults to ``print``), not the chat input queue.
"""
from tools.process_registry import process_registry

mode = _load_cli_background_notifications_mode(config)
displayed = 0
while not process_registry.completion_queue.empty():
evt = process_registry.completion_queue.get_nowait()
_evt_sid = evt.get("session_id", "")
if evt.get("type", "completion") == "completion" and process_registry.is_completion_consumed(_evt_sid):
continue
if not _should_display_process_notification(evt, mode):
continue
_synth = _format_process_notification(evt)
if _synth:
if emit is None:
print(_synth)
else:
emit(_synth)
displayed += 1
return displayed


def _detect_file_drop(user_input: str) -> "dict | None":
"""Detect if *user_input* starts with a real local file path.

Expand Down Expand Up @@ -13072,19 +13138,10 @@ def process_loop():
if not self._agent_running:
self._check_config_mcp_changes()
# Check for background process notifications (completions
# and watch pattern matches) while agent is idle.
# and watch pattern matches) while agent is idle. Display
# them out-of-band; never enqueue them as user input.
try:
from tools.process_registry import process_registry
if not process_registry.completion_queue.empty():
evt = process_registry.completion_queue.get_nowait()
# Skip if the agent already consumed this via wait/poll/log
_evt_sid = evt.get("session_id", "")
if evt.get("type") == "completion" and process_registry.is_completion_consumed(_evt_sid):
pass # already delivered via tool result
else:
_synth = _format_process_notification(evt)
if _synth:
self._pending_input.put(_synth)
_drain_process_notifications_for_cli()
except Exception:
pass
continue
Expand Down Expand Up @@ -13185,18 +13242,10 @@ def _restart_recording():
threading.Thread(target=_restart_recording, daemon=True).start()

# Drain process notifications (completions + watch matches)
# that arrived while the agent was running.
# that arrived while the agent was running. Display them
# out-of-band; never enqueue them as synthetic user turns.
try:
from tools.process_registry import process_registry
while not process_registry.completion_queue.empty():
evt = process_registry.completion_queue.get_nowait()
# Skip if the agent already consumed this via wait/poll/log
_evt_sid = evt.get("session_id", "")
if evt.get("type") == "completion" and process_registry.is_completion_consumed(_evt_sid):
continue # already delivered via tool result
_synth = _format_process_notification(evt)
if _synth:
self._pending_input.put(_synth)
_drain_process_notifications_for_cli()
except Exception:
pass # Non-fatal — don't break the main loop

Expand Down
121 changes: 52 additions & 69 deletions gateway/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -1040,20 +1040,20 @@ def _parse_session_key(session_key: str) -> "dict | None":


def _format_gateway_process_notification(evt: dict) -> "str | None":
"""Format a watch pattern event from completion_queue into a [IMPORTANT:] message."""
"""Format a process notification for direct user-visible delivery."""
evt_type = evt.get("type", "completion")
_sid = evt.get("session_id", "unknown")
_cmd = evt.get("command", "unknown")

if evt_type == "watch_disabled":
return f"[IMPORTANT: {evt.get('message', '')}]"
if evt_type in ("watch_disabled", "watch_overflow_tripped", "watch_overflow_released"):
return f"[Background process notice: {evt.get('message', '')}]"

if evt_type == "watch_match":
_pat = evt.get("pattern", "?")
_out = evt.get("output", "")
_sup = evt.get("suppressed", 0)
text = (
f"[IMPORTANT: Background process {_sid} matched "
f"[Background process {_sid} matched "
f"watch pattern \"{_pat}\".\n"
f"Command: {_cmd}\n"
f"Matched output:\n{_out}"
Expand Down Expand Up @@ -7750,23 +7750,31 @@ async def _handle_message_with_agent(self, event, source, _quick_key: str, run_g
# Drain watch pattern notifications that arrived during the agent run.
# Watch events and completions share the same queue; completions are
# already handled by the per-process watcher task above, so we only
# inject watch-type events here.
# deliver watch-type events here. Delivery is notification-only and
# must not re-enter _handle_message as fake user/internal input.
try:
from tools.process_registry import process_registry as _pr
notify_mode = self._load_background_notifications_mode()
_watch_events = []
while not _pr.completion_queue.empty():
evt = _pr.completion_queue.get_nowait()
evt_type = evt.get("type", "completion")
if evt_type in {"watch_match", "watch_disabled"}:
if evt_type in (
"watch_match",
"watch_disabled",
"watch_overflow_tripped",
"watch_overflow_released",
) and notify_mode == "all":
_watch_events.append(evt)
# else: completion events are handled by the watcher task
# else: completion events are handled by the watcher task;
# filtered watch events are intentionally discarded.
for evt in _watch_events:
synth_text = _format_gateway_process_notification(evt)
if synth_text:
try:
await self._inject_watch_notification(synth_text, evt)
await self._send_process_notification(synth_text, evt)
except Exception as e2:
logger.error("Watch notification injection error: %s", e2)
logger.error("Watch notification delivery error: %s", e2)
except Exception as e:
logger.debug("Watch queue drain error: %s", e)

Expand Down Expand Up @@ -13325,43 +13333,41 @@ def _build_process_event_source(self, evt: dict):
user_name=str(evt.get("user_name") or "").strip() or None,
)

async def _inject_watch_notification(self, synth_text: str, evt: dict) -> None:
"""Inject a watch-pattern notification as a synthetic message event.
async def _send_process_notification(self, synth_text: str, evt: dict) -> None:
"""Send a process notification directly to the originating chat/thread.

Routing must come from the queued watch event itself, not from whatever
foreground message happened to be active when the queue was drained.
This is intentionally delivery-only. Do not call
``adapter.handle_message(MessageEvent(..., internal=True))`` here:
that routes the notification through the normal inbound-message path
and can turn process output into fake user input.
"""
source = self._build_process_event_source(evt)
if not source:
logger.warning(
"Dropping watch notification with no routing metadata for process %s",
"Dropping process notification with no routing metadata for process %s",
evt.get("session_id", "unknown"),
)
return
platform_name = source.platform.value if hasattr(source.platform, "value") else str(source.platform)
adapter = None
for p, a in self.adapters.items():
if p.value == platform_name:
if p == source.platform or p.value == platform_name:
adapter = a
break
if not adapter:
if not adapter or not source.chat_id:
return
try:
synth_event = MessageEvent(
text=synth_text,
message_type=MessageType.TEXT,
source=source,
internal=True,
)
logger.info(
"Watch pattern notification — injecting for %s chat=%s thread=%s",
platform_name,
source.chat_id,
source.thread_id,
)
await adapter.handle_message(synth_event)
except Exception as e:
logger.error("Watch notification injection error: %s", e)
send_meta = {"thread_id": source.thread_id} if source.thread_id else None
logger.info(
"Process notification — delivering for %s chat=%s thread=%s",
platform_name,
source.chat_id,
source.thread_id,
)
await adapter.send(source.chat_id, synth_text, metadata=send_meta)

async def _inject_watch_notification(self, synth_text: str, evt: dict) -> None:
"""Backward-compatible wrapper for direct watch notification delivery."""
await self._send_process_notification(synth_text, evt)

async def _run_process_watcher(self, watcher: dict) -> None:
"""
Expand Down Expand Up @@ -13392,9 +13398,10 @@ async def _run_process_watcher(self, watcher: dict) -> None:
logger.debug("Process watcher started: %s (every %ss, notify=%s, agent_notify=%s)",
session_id, interval, notify_mode, agent_notify)

if notify_mode == "off" and not agent_notify:
if notify_mode == "off":
# Still wait for the process to exit so we can log it, but don't
# push any messages to the user.
# push any messages to the user. notify_on_complete must not
# bypass an explicit off setting or re-enter as a fake user turn.
while True:
await asyncio.sleep(interval)
session = process_registry.get(session_id)
Expand All @@ -13416,19 +13423,25 @@ async def _run_process_watcher(self, watcher: dict) -> None:
last_output_len = current_output_len

if session.exited:
# --- Agent-triggered completion: inject synthetic message ---
# Skip if the agent already consumed the result via wait/poll/log
# --- Agent-triggered completion: direct notification only ---
# Skip if the agent already consumed the result via wait/poll/log.
# Do not route through adapter.handle_message(), which would
# re-enter the inbound user-message path.
from tools.process_registry import process_registry as _pr_check
if agent_notify and not _pr_check.is_completion_consumed(session_id):
agent_should_notify = agent_notify and (
notify_mode in ("all", "result")
or (notify_mode == "error" and session.exit_code not in (0, None))
)
if agent_should_notify and not _pr_check.is_completion_consumed(session_id):
from tools.ansi_strip import strip_ansi
_out = strip_ansi(session.output_buffer[-2000:]) if session.output_buffer else ""
synth_text = (
f"[IMPORTANT: Background process {session_id} completed "
f"[Background process {session_id} completed "
f"(exit code {session.exit_code}).\n"
f"Command: {session.command}\n"
f"Output:\n{_out}]"
)
source = self._build_process_event_source({
await self._send_process_notification(synth_text, {
"session_id": session_id,
"session_key": session_key,
"platform": platform_name,
Expand All @@ -13437,36 +13450,6 @@ async def _run_process_watcher(self, watcher: dict) -> None:
"user_id": user_id,
"user_name": user_name,
})
if not source:
logger.warning(
"Dropping completion notification with no routing metadata for process %s",
session_id,
)
break

adapter = None
for p, a in self.adapters.items():
if p == source.platform:
adapter = a
break
if adapter and source.chat_id:
try:
synth_event = MessageEvent(
text=synth_text,
message_type=MessageType.TEXT,
source=source,
internal=True,
)
logger.info(
"Process %s finished — injecting agent notification for session %s chat=%s thread=%s",
session_id,
session_key,
source.chat_id,
source.thread_id,
)
await adapter.handle_message(synth_event)
except Exception as e:
logger.error("Agent notify injection error: %s", e)
break

# --- Normal text-only notification ---
Expand Down
Loading