Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 144 additions & 0 deletions server/reflector/hatchet/dag_zulip.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
"""
Hatchet DAG Status -> Zulip Live Updates.

Posts/updates/deletes a Zulip message showing the Hatchet workflow DAG status.
All functions are fire-and-forget (catch + warning log on failure).

Note: Uses deferred imports throughout for fork-safety,
consistent with the pipeline pattern in daily_multitrack_pipeline.py.
"""

from reflector.logger import logger
from reflector.settings import settings


def _dag_zulip_enabled() -> bool:
return bool(
settings.ZULIP_REALM and settings.ZULIP_DAG_STREAM and settings.ZULIP_DAG_TOPIC
)


async def create_dag_zulip_message(transcript_id: str, workflow_run_id: str) -> None:
"""Post initial DAG status to Zulip. Called at dispatch time (normal DB context)."""
if not _dag_zulip_enabled():
return

try:
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415
from reflector.tools.render_hatchet_run import ( # noqa: PLC0415
render_run_detail,
)
from reflector.zulip import send_message_to_zulip # noqa: PLC0415

client = HatchetClientManager.get_client()
details = await client.runs.aio_get(workflow_run_id)
content = render_run_detail(details)

response = await send_message_to_zulip(
settings.ZULIP_DAG_STREAM, settings.ZULIP_DAG_TOPIC, content
)
message_id = response.get("id")

if message_id:
transcript = await transcripts_controller.get_by_id(transcript_id)
if transcript:
await transcripts_controller.update(
transcript, {"zulip_message_id": message_id}
)
except Exception:
logger.warning(
"[DAG Zulip] Failed to create DAG message",
transcript_id=transcript_id,
workflow_run_id=workflow_run_id,
exc_info=True,
)


async def update_dag_zulip_message(
transcript_id: str,
workflow_run_id: str,
error_message: str | None = None,
) -> None:
"""Update existing DAG status in Zulip. Called from Hatchet worker (forked).

Args:
error_message: If set, appended as an error banner to the rendered DAG.
"""
if not _dag_zulip_enabled():
return

try:
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415
from reflector.hatchet.workflows.daily_multitrack_pipeline import ( # noqa: PLC0415
fresh_db_connection,
)
from reflector.tools.render_hatchet_run import ( # noqa: PLC0415
render_run_detail,
)
from reflector.zulip import update_zulip_message # noqa: PLC0415

async with fresh_db_connection():
transcript = await transcripts_controller.get_by_id(transcript_id)
if not transcript or not transcript.zulip_message_id:
return

client = HatchetClientManager.get_client()
details = await client.runs.aio_get(workflow_run_id)
content = render_run_detail(details)

if error_message:
content += f"\n\n:cross_mark: **{error_message}**"

await update_zulip_message(
transcript.zulip_message_id,
settings.ZULIP_DAG_STREAM,
settings.ZULIP_DAG_TOPIC,
content,
)
except Exception:
logger.warning(
"[DAG Zulip] Failed to update DAG message",
transcript_id=transcript_id,
workflow_run_id=workflow_run_id,
exc_info=True,
)


async def delete_dag_zulip_message(transcript_id: str) -> None:
"""Delete DAG Zulip message and clear zulip_message_id.

Called from post_zulip task (already inside fresh_db_connection).
Swallows InvalidMessageError (message already deleted).
"""
if not _dag_zulip_enabled():
return

try:
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.zulip import ( # noqa: PLC0415
InvalidMessageError,
delete_zulip_message,
)

transcript = await transcripts_controller.get_by_id(transcript_id)
if not transcript or not transcript.zulip_message_id:
return

try:
await delete_zulip_message(transcript.zulip_message_id)
except InvalidMessageError:
logger.warning(
"[DAG Zulip] Message already deleted",
transcript_id=transcript_id,
zulip_message_id=transcript.zulip_message_id,
)

await transcripts_controller.update(transcript, {"zulip_message_id": None})
except Exception:
logger.warning(
"[DAG Zulip] Failed to delete DAG message",
transcript_id=transcript_id,
exc_info=True,
)
23 changes: 22 additions & 1 deletion server/reflector/hatchet/workflows/daily_multitrack_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
TIMEOUT_SHORT,
TaskName,
)
from reflector.hatchet.dag_zulip import update_dag_zulip_message
from reflector.hatchet.workflows.models import (
ActionItemsResult,
ConsentResult,
Expand Down Expand Up @@ -238,14 +239,29 @@ def decorator(
@functools.wraps(func)
async def wrapper(input: PipelineInput, ctx: Context) -> R:
try:
return await func(input, ctx)
result = await func(input, ctx)
try:
await update_dag_zulip_message(
input.transcript_id, ctx.workflow_run_id
)
except Exception:
pass
return result
except Exception as e:
logger.error(
f"[Hatchet] {step_name} failed",
transcript_id=input.transcript_id,
error=str(e),
exc_info=True,
)
try:
await update_dag_zulip_message(
input.transcript_id,
ctx.workflow_run_id,
error_message=f"{step_name} failed: {e}",
)
except Exception:
pass
if set_error_status:
await set_workflow_error_status(input.transcript_id)
raise
Expand Down Expand Up @@ -1294,6 +1310,11 @@ async def post_zulip(input: PipelineInput, ctx: Context) -> ZulipResult:

async with fresh_db_connection():
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.hatchet.dag_zulip import ( # noqa: PLC0415
delete_dag_zulip_message,
)

await delete_dag_zulip_message(input.transcript_id)

transcript = await transcripts_controller.get_by_id(input.transcript_id)
if transcript:
Expand Down
11 changes: 11 additions & 0 deletions server/reflector/services/transcript_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from reflector.db.recordings import recordings_controller
from reflector.db.transcripts import Transcript, transcripts_controller
from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.dag_zulip import create_dag_zulip_message
from reflector.logger import logger
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
from reflector.utils.string import NonEmptyString
Expand Down Expand Up @@ -266,6 +267,16 @@ async def dispatch_transcript_processing(
transcript, {"workflow_run_id": workflow_id}
)

try:
await create_dag_zulip_message(config.transcript_id, workflow_id)
except Exception:
logger.warning(
"[DAG Zulip] Failed to create DAG message at dispatch",
transcript_id=config.transcript_id,
workflow_id=workflow_id,
exc_info=True,
)

logger.info("Hatchet workflow dispatched", workflow_id=workflow_id)
return None

Expand Down
3 changes: 3 additions & 0 deletions server/reflector/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ class Settings(BaseSettings):
ZULIP_REALM: str | None = None
ZULIP_API_KEY: str | None = None
ZULIP_BOT_EMAIL: str | None = None
ZULIP_DAG_STREAM: str | None = None
ZULIP_DAG_TOPIC: str | None = None
ZULIP_HOST_HEADER: str | None = None

# Hatchet workflow orchestration (always enabled for multitrack processing)
HATCHET_CLIENT_TOKEN: str | None = None
Expand Down
Loading
Loading