Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
02c6765
Add entry point log for work pool update events (issue #16658)
F4RAN Dec 9, 2025
a073a67
feature: Emit events for work pool field updates
F4RAN Dec 9, 2025
bc8413b
Emit events for work pool and work queue field updates
F4RAN Dec 9, 2025
61df754
Emit events for work queue updates via work pool endpoint
F4RAN Dec 9, 2025
1b3edcf
closes #16658
F4RAN Dec 9, 2025
d416993
Add docstring to emit_work_queue_updated_event
F4RAN Dec 9, 2025
6ea3284
Add docstring to emit_work_queue_updated_event
F4RAN Dec 9, 2025
19f09d0
fix: remove additional logger
F4RAN Dec 9, 2025
e31c172
Update tests to expect field update events in addition to status events
F4RAN Dec 9, 2025
1ae0586
Merge branch 'main' into 16658-work-pool-queue-update-events
F4RAN Dec 9, 2025
379b754
Merge branch 'main' into 16658-work-pool-queue-update-events
F4RAN Dec 9, 2025
e4e7b2f
Merge branch 'main' into 16658-work-pool-queue-update-events
F4RAN Dec 9, 2025
c53e528
Merge branch 'main' into 16658-work-pool-queue-update-events
F4RAN Dec 9, 2025
aadff69
Merge branch 'main' into 16658-work-pool-queue-update-events
F4RAN Dec 9, 2025
3df4829
Merge branch 'main' into 16658-work-pool-queue-update-events
F4RAN Dec 9, 2025
793e720
Merge branch 'main' into 16658-work-pool-queue-update-events
F4RAN Dec 10, 2025
46a86fa
Update src/prefect/server/models/events.py
F4RAN Dec 10, 2025
ebf9e44
Update src/prefect/server/models/events.py
F4RAN Dec 10, 2025
040308e
Update src/prefect/server/models/workers.py
F4RAN Dec 10, 2025
edd12b1
Update src/prefect/server/models/work_queues.py
F4RAN Dec 10, 2025
645b3e8
Update src/prefect/server/models/workers.py
F4RAN Dec 10, 2025
45e87f6
Merge branch 'main' into 16658-work-pool-queue-update-events
F4RAN Dec 10, 2025
5209d62
fix: remove duplicate work queue reads and update tests
F4RAN Dec 10, 2025
b76a927
Merge branch 'main' into 16658-work-pool-queue-update-events
F4RAN Dec 10, 2025
3397b74
Merge branch 'main' into 16658-work-pool-queue-update-events
F4RAN Dec 10, 2025
e16ced8
Merge branch 'main' into 16658-work-pool-queue-update-events
F4RAN Dec 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions src/prefect/server/models/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,73 @@ async def work_pool_status_event(
)


async def work_pool_updated_event(
session: AsyncSession,
work_pool: "ORMWorkPool",
changed_fields: Dict[
str, Dict[str, Any]
], # {"field_name": {"old": value, "new": value}}
occurred: DateTime,
) -> Event:
"""Create an event for work pool field updates (non-status)."""
return Event(
occurred=occurred,
event="prefect.work-pool.updated",
resource={
"prefect.resource.id": f"prefect.work-pool.{work_pool.id}",
"prefect.resource.name": work_pool.name,
"prefect.work-pool.type": work_pool.type,
"prefect.resource.role": "work-pool",
},
payload={
"updated_fields": list(changed_fields.keys()),
"updates": changed_fields,
},
id=uuid7(),
)


async def work_queue_updated_event(
session: AsyncSession,
work_queue: "ORMWorkQueue",
changed_fields: Dict[str, Dict[str, Any]],
occurred: DateTime,
) -> Event:
"""Create an event for work queue field updates (non-status)."""
related_work_pool_info: List[Dict[str, Any]] = []

if work_queue.work_pool_id:
work_pool = await models.workers.read_work_pool(
session=session,
work_pool_id=work_queue.work_pool_id,
)
if work_pool and work_pool.id and work_pool.name:
related_work_pool_info.append(
{
"prefect.resource.id": f"prefect.work-pool.{work_pool.id}",
"prefect.resource.name": work_pool.name,
"prefect.work-pool.type": work_pool.type,
"prefect.resource.role": "work-pool",
}
)

return Event(
occurred=occurred,
event="prefect.work-queue.updated",
resource={
"prefect.resource.id": f"prefect.work-queue.{work_queue.id}",
"prefect.resource.name": work_queue.name,
"prefect.resource.role": "work-queue",
},
related=related_work_pool_info,
payload={
"updated_fields": list(changed_fields.keys()),
"updates": changed_fields,
},
id=uuid7(),
)


def _get_recent_preceding_work_pool_event_id(
work_pool: Optional["ORMWorkPool"],
) -> Optional[UUID]:
Expand Down
85 changes: 74 additions & 11 deletions src/prefect/server/models/work_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@

import datetime
from typing import (
Any,
Awaitable,
Callable,
Dict,
Iterable,
Optional,
Sequence,
Expand All @@ -32,7 +34,10 @@
)
from prefect.server.events.clients import PrefectServerEventsClient
from prefect.server.exceptions import ObjectNotFoundError
from prefect.server.models.events import work_queue_status_event
from prefect.server.models.events import (
work_queue_status_event,
work_queue_updated_event,
)
from prefect.server.models.workers import (
DEFAULT_AGENT_WORK_POOL_NAME,
bulk_update_work_queue_priorities,
Expand Down Expand Up @@ -242,22 +247,28 @@ async def update_work_queue(
# exclude_unset=True allows us to only update values provided by
# the user, ignoring any defaults on the model
update_data = work_queue.model_dump_for_orm(exclude_unset=True)
current_work_queue = await read_work_queue(
session=session, work_queue_id=work_queue_id
)
if current_work_queue is None:
return False

if "is_paused" in update_data:
wq = await read_work_queue(session=session, work_queue_id=work_queue_id)
if wq is None:
return False
session.expunge(current_work_queue)

if "is_paused" in update_data:
# Only update the status to paused if it's not already paused. This ensures a work queue that is already
# paused will not get a status update if it's paused again
if update_data.get("is_paused") and wq.status != WorkQueueStatus.PAUSED:
if (
update_data.get("is_paused")
and current_work_queue.status != WorkQueueStatus.PAUSED
):
update_data["status"] = WorkQueueStatus.PAUSED

# If unpausing, only update status if it's currently paused. This ensures a work queue that is already
# unpaused will not get a status update if it's unpaused again
if (
update_data.get("is_paused") is False
and wq.status == WorkQueueStatus.PAUSED
and current_work_queue.status == WorkQueueStatus.PAUSED
):
# Default status if unpaused
update_data["status"] = WorkQueueStatus.NOT_READY
Expand All @@ -267,7 +278,7 @@ async def update_work_queue(
if "last_polled" in update_data:
last_polled = cast(DateTime, update_data["last_polled"])
else:
last_polled = wq.last_polled
last_polled = current_work_queue.last_polled

# Check if last polled is recent and set status to READY if so
if is_last_polled_recent(last_polled):
Expand All @@ -282,9 +293,42 @@ async def update_work_queue(
updated = result.rowcount > 0

if updated:
wq = await read_work_queue(session=session, work_queue_id=work_queue_id)
assert wq is not None
assert current_work_queue is not wq
WORK_QUEUE_EVENT_FIELDS = {
"name",
"description",
"concurrency_limit",
"priority",
# Exclude "is_paused" - handled with status
# Exclude "last_polled" - usually auto-updated
# Exclude "filter" - deprecated
}
# Detect which fields actually changed
changed_fields = {}
for field in update_data.keys():
if field not in WORK_QUEUE_EVENT_FIELDS or field == "status":
continue

old_value = getattr(current_work_queue, field, None)
new_value = getattr(wq, field, None)

if old_value != new_value:
changed_fields[field] = {
"from": old_value,
"to": new_value,
}

# Emit event for non-status field changes
if changed_fields:
await emit_work_queue_updated_event(
session=session,
work_queue=wq,
changed_fields=changed_fields,
)

if "status" in update_data and emit_status_change:
wq = await read_work_queue(session=session, work_queue_id=work_queue_id)
assert wq
await emit_status_change(wq)

return updated
Expand Down Expand Up @@ -621,12 +665,31 @@ async def emit_work_queue_status_event(
db: PrefectDBInterface,
work_queue: orm_models.WorkQueue,
) -> None:
"""Emit an event when work queue fields are updated."""
async with db.session_context() as session:
event = await work_queue_status_event(
session=session,
work_queue=work_queue,
occurred=now("UTC"),
)

async with PrefectServerEventsClient() as events_client:
await events_client.emit(event)


async def emit_work_queue_updated_event(
session: AsyncSession,
work_queue: orm_models.WorkQueue,
changed_fields: Dict[str, Dict[str, Any]],
) -> None:
if not changed_fields:
return

async with PrefectServerEventsClient() as events_client:
await events_client.emit(
await work_queue_updated_event(
session=session,
work_queue=work_queue,
changed_fields=changed_fields,
occurred=now("UTC"),
)
)
117 changes: 107 additions & 10 deletions src/prefect/server/models/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import datetime
from typing import (
Any,
Awaitable,
Callable,
Dict,
Expand All @@ -24,7 +25,10 @@
from prefect.server.database import PrefectDBInterface, db_injector, orm_models
from prefect.server.events.clients import PrefectServerEventsClient
from prefect.server.exceptions import ObjectNotFoundError
from prefect.server.models.events import work_pool_status_event
from prefect.server.models.events import (
work_pool_status_event,
work_pool_updated_event, # Add this
)
from prefect.server.schemas.statuses import WorkQueueStatus
from prefect.server.utilities.database import UUID as PrefectUUID
from prefect.types._datetime import DateTime, now
Expand Down Expand Up @@ -257,6 +261,39 @@ async def update_work_pool(
assert wp is not None
assert current_work_pool is not wp

# Detect which fields actually changed (excluding status and internal fields)
# Fields that should trigger update events (user-updatable fields)
WORK_POOL_EVENT_FIELDS = {
# "is_paused", # Handled with status
"description",
"base_job_template",
"concurrency_limit",
"storage_configuration",
}

changed_fields = {}
for field in update_data.keys():
if field not in WORK_POOL_EVENT_FIELDS or field == "status":
continue

old_value = getattr(current_work_pool, field, None)
new_value = getattr(wp, field, None)

# Compare values (handle different types)
if old_value != new_value:
changed_fields[field] = {
"from": old_value,
"to": new_value,
}

# Emit event for non-status field changes
if changed_fields:
await emit_work_pool_updated_event(
session=session,
work_pool=wp,
changed_fields=changed_fields,
)

if "status" in update_data and emit_status_change:
await emit_status_change(
event_id=update_data["last_status_event_id"], # type: ignore
Expand Down Expand Up @@ -588,20 +625,25 @@ async def update_work_queue(

update_values = work_queue.model_dump_for_orm(exclude_unset=True)

if "is_paused" in update_values:
if (wq := await session.get(db.WorkQueue, work_queue_id)) is None:
return False
current_work_queue = await session.get(db.WorkQueue, work_queue_id)
if current_work_queue is None:
return False
session.expunge(current_work_queue)

if "is_paused" in update_values:
# Only update the status to paused if it's not already paused. This ensures a work queue that is already
# paused will not get a status update if it's paused again
if update_values.get("is_paused") and wq.status != WorkQueueStatus.PAUSED:
if (
update_values.get("is_paused")
and current_work_queue.status != WorkQueueStatus.PAUSED
):
update_values["status"] = WorkQueueStatus.PAUSED

# If unpausing, only update status if it's currently paused. This ensures a work queue that is already
# unpaused will not get a status update if it's unpaused again
if (
update_values.get("is_paused") is False
and wq.status == WorkQueueStatus.PAUSED
and current_work_queue.status == WorkQueueStatus.PAUSED
):
# Default status if unpaused
update_values["status"] = default_status
Expand All @@ -610,7 +652,7 @@ async def update_work_queue(
if "last_polled" in update_values:
last_polled = update_values["last_polled"]
else:
last_polled = wq.last_polled
last_polled = current_work_queue.last_polled

# Check if last polled is recent and set status to READY if so
if is_last_polled_recent(last_polled):
Expand All @@ -626,10 +668,45 @@ async def update_work_queue(
updated = result.rowcount > 0

if updated:
if "priority" in update_values or "status" in update_values:
updated_work_queue = await session.get(db.WorkQueue, work_queue_id)
assert updated_work_queue
updated_work_queue = await session.get(db.WorkQueue, work_queue_id)
assert updated_work_queue is not None
assert current_work_queue is not updated_work_queue

# Fields that should trigger update events (user-updatable fields)
WORK_QUEUE_EVENT_FIELDS = {
"name",
"description",
"concurrency_limit",
"priority",
# Exclude "is_paused" - handled with status
# Exclude "last_polled" - usually auto-updated
# Exclude "filter" - deprecated
}

changed_fields = {}
for field in update_values.keys():
if field not in WORK_QUEUE_EVENT_FIELDS or field == "status":
continue

old_value = getattr(current_work_queue, field, None)
new_value = getattr(updated_work_queue, field, None)

if old_value != new_value:
changed_fields[field] = {
"from": old_value,
"to": new_value,
}

if changed_fields:
from prefect.server.models.work_queues import emit_work_queue_updated_event

await emit_work_queue_updated_event(
session=session,
work_queue=updated_work_queue,
changed_fields=changed_fields,
)

if "priority" in update_values or "status" in update_values:
if "priority" in update_values:
await bulk_update_work_queue_priorities(
session,
Expand Down Expand Up @@ -799,6 +876,26 @@ async def delete_worker(
return result.rowcount > 0


async def emit_work_pool_updated_event(
session: AsyncSession,
work_pool: orm_models.WorkPool,
changed_fields: Dict[str, Dict[str, Any]],
) -> None:
"""Emit an event when work pool fields are updated."""
if not changed_fields:
return

async with PrefectServerEventsClient() as events_client:
await events_client.emit(
await work_pool_updated_event(
session=session,
work_pool=work_pool,
changed_fields=changed_fields,
occurred=now("UTC"),
)
)


async def emit_work_pool_status_event(
event_id: UUID,
occurred: DateTime,
Expand Down
Loading