Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions examples/slackbot/src/slackbot/_internal/thread_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
"""Thread status tracking for Slack events (cross-process safe).
This module encapsulates a tiny SQLite-backed status mechanism to avoid duplicate
processing of the same Slack thread when users edit their original post or when
Slack delivers multiple events. It keeps the rest of the app simple and avoids
sprawling status logic across files.
"""

from __future__ import annotations

from typing import Literal

from slackbot.core import Database

Status = Literal["in_progress", "completed"]


async def ensure_schema(db: Database) -> None:
"""Ensure the status table exists.
Using CREATE TABLE IF NOT EXISTS is idempotent and cheap enough to call
whenever we touch the status table, avoiding module-level state or locks.
"""

def _create() -> None:
db.con.execute(
"""
CREATE TABLE IF NOT EXISTS slack_thread_status (
thread_ts TEXT PRIMARY KEY,
status TEXT NOT NULL CHECK (status IN ('in_progress','completed')),
updated_at REAL NOT NULL
);
"""
)
db.con.commit()

await db.loop.run_in_executor(db.executor, _create)


async def try_acquire(db: Database, thread_ts: str) -> bool:
"""Attempt to mark a thread as in_progress; returns True if acquired.
Uses an atomic INSERT OR IGNORE on a PRIMARY KEY to prevent duplicates across
concurrent processes or tasks.
"""
await ensure_schema(db)

def _insert() -> int:
cur = db.con.cursor()
cur.execute(
"""
INSERT OR IGNORE INTO slack_thread_status (thread_ts, status, updated_at)
VALUES (?, 'in_progress', strftime('%s','now'))
""",
(thread_ts,),
)
db.con.commit()
return cur.rowcount

rowcount: int = await db.loop.run_in_executor(db.executor, _insert)
return rowcount == 1


async def get_status(db: Database, thread_ts: str) -> Status | None:
await ensure_schema(db)

def _query() -> Status | None:
cur = db.con.cursor()
cur.execute(
"SELECT status FROM slack_thread_status WHERE thread_ts = ?",
(thread_ts,),
)
row = cur.fetchone()
return row[0] if row else None # type: ignore[return-value]
Copy link

Copilot AI Sep 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type ignore comment is unnecessary. The return type annotation already handles the None case correctly, and mypy should understand that row[0] will be a Status when row is not None.

Suggested change
return row[0] if row else None # type: ignore[return-value]
return row[0] if row else None

Copilot uses AI. Check for mistakes.

return await db.loop.run_in_executor(db.executor, _query)


async def mark_completed(db: Database, thread_ts: str) -> None:
await ensure_schema(db)

def _update() -> None:
cur = db.con.cursor()
cur.execute(
"""
UPDATE slack_thread_status
SET status = 'completed', updated_at = strftime('%s','now')
WHERE thread_ts = ?
""",
(thread_ts,),
)
db.con.commit()

await db.loop.run_in_executor(db.executor, _update)
55 changes: 54 additions & 1 deletion examples/slackbot/src/slackbot/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@

from slackbot._internal.constants import WORKSPACE_TO_CHANNEL_ID
from slackbot._internal.templates import CHANNEL_REDIRECT_MESSAGE, WELCOME_MESSAGE
from slackbot._internal.thread_status import (
get_status as get_thread_status,
)
from slackbot._internal.thread_status import (
mark_completed as mark_thread_completed,
)
from slackbot._internal.thread_status import (
try_acquire as try_acquire_thread,
)
from slackbot.assets import summarize_thread
from slackbot.core import (
Database,
Expand All @@ -40,6 +49,9 @@

logger = get_logger(__name__)

# Duplicate handling is coordinated via a small SQLite table in
# _internal.thread_status to work across processes.


def get_designated_channel_for_workspace(team_id: str) -> str | None:
"""Get the designated channel ID for a given workspace team ID."""
Expand Down Expand Up @@ -137,6 +149,37 @@ async def handle_message(payload: SlackPayload, db: Database):
return Completed(message="Message too long", name="SKIPPED")

if re.search(BOT_MENTION, user_message) and payload.authorizations:
# Use the root thread timestamp as our idempotency key
root_ts = thread_ts

# Cross-process acquire; only one handler should proceed
acquired = await try_acquire_thread(db, root_ts)
if not acquired:
status = await get_thread_status(db, root_ts)
if status == "in_progress":
assert event.channel is not None, (
"Event channel is None when posting edit-ignored notice"
)
Comment on lines +160 to +162
Copy link

Copilot AI Sep 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assertion message should be more descriptive and actionable. Consider changing to 'Event channel cannot be None when posting notification message' or use proper error handling instead of assertion.

Suggested change
assert event.channel is not None, (
"Event channel is None when posting edit-ignored notice"
)
if event.channel is None:
logger.error("Event channel is None when posting edit-ignored notice")
return Completed(
message="Cannot post edit-ignored notice: event channel is None",
name="SKIPPED_NO_CHANNEL",
data=dict(thread_ts=root_ts),
)

Copilot uses AI. Check for mistakes.
await post_slack_message(
message=(
"✋ I noticed you edited your original message. "
"I'm already working on your first version — please add any "
"clarifications as new messages in this thread so I don't lose track."
),
channel_id=event.channel,
thread_ts=root_ts,
)
return Completed(
message="Ignored edit while in progress",
name="IGNORED_EDIT",
data=dict(thread_ts=root_ts),
)
return Completed(
message="Duplicate event after completion",
name="SKIPPED_DUPLICATE",
data=dict(thread_ts=root_ts),
)

# Check if this is the designated channel
team_id = payload.team_id or ""
is_designated = check_if_designated_channel(event.channel, team_id)
Expand Down Expand Up @@ -188,7 +231,11 @@ async def handle_message(payload: SlackPayload, db: Database):

try:
result = await run_agent(
cleaned_message, conversation, user_context, event.channel, thread_ts
cleaned_message,
conversation,
user_context,
event.channel,
thread_ts,
) # type: ignore

await db.add_thread_messages(thread_ts, result.new_messages())
Expand All @@ -213,6 +260,12 @@ async def handle_message(payload: SlackPayload, db: Database):
name="ERROR_HANDLED",
data=dict(error=str(e), user_context=user_context),
)
finally:
try:
await mark_thread_completed(db, root_ts)
except Exception:
logger.warning("Failed to mark thread as completed")
Copy link

Copilot AI Sep 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The warning message should include the specific error details for better debugging. Consider logging the exception: logger.warning('Failed to mark thread as completed', exc_info=True) or logger.warning('Failed to mark thread as completed: %s', e).

Suggested change
logger.warning("Failed to mark thread as completed")
logger.warning("Failed to mark thread as completed", exc_info=True)

Copilot uses AI. Check for mistakes.

return Completed(
message="Responded to mention",
data=dict(user_context=user_context, conversation=conversation),
Expand Down
3 changes: 3 additions & 0 deletions examples/slackbot/src/slackbot/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ def _insert():

await self.loop.run_in_executor(self.executor, _insert)

# Note: Thread status tracking is handled in-process within api.py to keep
# persistence minimal for the examples package.


@task(task_run_name="build user context for {user_id}")
def build_user_context(
Expand Down
1 change: 1 addition & 0 deletions examples/slackbot/src/slackbot/slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class EventBlock(BaseModel):
class SlackEvent(BaseModel):
client_msg_id: str | None = None
type: str
subtype: str | None = None
text: str | None = None
user: str | dict[str, Any] | None = None
ts: str | None = None
Expand Down
Loading