From 02c67656c99dd6e2de83597372eac01c12da033d Mon Sep 17 00:00:00 2001 From: F4RAN Date: Tue, 9 Dec 2025 15:15:50 +0330 Subject: [PATCH 01/15] Add entry point log for work pool update events (issue #16658) --- src/prefect/server/models/workers.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/prefect/server/models/workers.py b/src/prefect/server/models/workers.py index 18f952d143b0..661f766938ba 100644 --- a/src/prefect/server/models/workers.py +++ b/src/prefect/server/models/workers.py @@ -28,7 +28,8 @@ from prefect.server.schemas.statuses import WorkQueueStatus from prefect.server.utilities.database import UUID as PrefectUUID from prefect.types._datetime import DateTime, now - +from prefect.logging import get_logger +logger = get_logger("prefect.server.models.workers") DEFAULT_AGENT_WORK_POOL_NAME = "default-agent-pool" # ----------------------------------------------------- @@ -256,6 +257,8 @@ async def update_work_pool( assert wp is not None assert current_work_pool is not wp + + logger.error(f"[DEBUG] ISSUE ENTRY POINT - Work Pool {work_pool_id} updated. Fields: {list(update_data.keys())}") if "status" in update_data and emit_status_change: await emit_status_change( From a073a67dc40c4ac3b679dfde3f4e2e641df9fa3b Mon Sep 17 00:00:00 2001 From: F4RAN Date: Tue, 9 Dec 2025 15:52:29 +0330 Subject: [PATCH 02/15] feature: Emit events for work pool field updates --- src/prefect/server/models/events.py | 60 +++++++++++++++++++++++++++ src/prefect/server/models/workers.py | 62 +++++++++++++++++++++++++++- 2 files changed, 120 insertions(+), 2 deletions(-) diff --git a/src/prefect/server/models/events.py b/src/prefect/server/models/events.py index c524f1705be7..d038740e0687 100644 --- a/src/prefect/server/models/events.py +++ b/src/prefect/server/models/events.py @@ -418,6 +418,66 @@ async def work_pool_status_event( follows=_get_recent_preceding_work_pool_event_id(pre_update_work_pool), ) +async def work_pool_updated_event( + session: AsyncSession, + work_pool: "ORMWorkPool", + changed_fields: Dict[str, Dict[str, Any]], # {"field_name": {"old": value, "new": value}} + occurred: DateTime, +) -> Event: + """Create an event for work pool field updates (non-status).""" + return Event( + occurred=occurred, + event="prefect.work-pool.updated", + resource={ + "prefect.resource.id": f"prefect.work-pool.{work_pool.id}", + "prefect.resource.name": work_pool.name, + "prefect.work-pool.type": work_pool.type, + "prefect.resource.role": "work-pool", + }, + payload={ + "changed_fields": list(changed_fields.keys()), + "changes": changed_fields, + }, + id=uuid7(), + ) + +async def work_queue_updated_event( + session: AsyncSession, + work_queue: "ORMWorkQueue", + changed_fields: Dict[str, Dict[str, Any]], + occurred: DateTime, +) -> Event: + """Create an event for work queue field updates (non-status).""" + related_work_pool_info: List[Dict[str, Any]] = [] + + if work_queue.work_pool_id: + work_pool = await models.workers.read_work_pool( + session=session, + work_pool_id=work_queue.work_pool_id, + ) + if work_pool and work_pool.id and work_pool.name: + related_work_pool_info.append({ + "prefect.resource.id": f"prefect.work-pool.{work_pool.id}", + "prefect.resource.name": work_pool.name, + "prefect.work-pool.type": work_pool.type, + "prefect.resource.role": "work-pool", + }) + + return Event( + occurred=occurred, + event="prefect.work-queue.updated", + resource={ + "prefect.resource.id": f"prefect.work-queue.{work_queue.id}", + "prefect.resource.name": work_queue.name, + "prefect.resource.role": "work-queue", + }, + related=related_work_pool_info, + payload={ + "changed_fields": list(changed_fields.keys()), + "changes": changed_fields, + }, + id=uuid7(), + ) def _get_recent_preceding_work_pool_event_id( work_pool: Optional["ORMWorkPool"], diff --git a/src/prefect/server/models/workers.py b/src/prefect/server/models/workers.py index 661f766938ba..1cfc4c715b32 100644 --- a/src/prefect/server/models/workers.py +++ b/src/prefect/server/models/workers.py @@ -5,6 +5,7 @@ import datetime from typing import ( + Any, Awaitable, Callable, Dict, @@ -24,7 +25,10 @@ from prefect.server.database import PrefectDBInterface, db_injector, orm_models from prefect.server.events.clients import PrefectServerEventsClient from prefect.server.exceptions import ObjectNotFoundError -from prefect.server.models.events import work_pool_status_event +from prefect.server.models.events import ( + work_pool_status_event, + work_pool_updated_event, # Add this +) from prefect.server.schemas.statuses import WorkQueueStatus from prefect.server.utilities.database import UUID as PrefectUUID from prefect.types._datetime import DateTime, now @@ -258,7 +262,42 @@ async def update_work_pool( assert wp is not None assert current_work_pool is not wp - logger.error(f"[DEBUG] ISSUE ENTRY POINT - Work Pool {work_pool_id} updated. Fields: {list(update_data.keys())}") + + # Detect which fields actually changed (excluding status and internal fields) + # Fields that should trigger update events (user-updatable fields) + WORK_POOL_EVENT_FIELDS = { + # "is_paused", # Handled with status + "description", + "base_job_template", + "concurrency_limit", + "storage_configuration", + } + + # Detect which fields actually changed (only from our allowlist, excluding status) + changed_fields = {} + for field in update_data.keys(): + # Only track fields in our allowlist, and skip status (it has its own event) + if field not in WORK_POOL_EVENT_FIELDS or field == "status": + continue + + old_value = getattr(current_work_pool, field, None) + new_value = getattr(wp, field, None) + + # Compare values (handle different types) + if old_value != new_value: + changed_fields[field] = { + "old": old_value, + "new": new_value, + } + + # Emit event for non-status field changes + if changed_fields: + await emit_work_pool_updated_event( + session=session, + work_pool=wp, + changed_fields=changed_fields, + ) + if "status" in update_data and emit_status_change: await emit_status_change( @@ -801,6 +840,25 @@ async def delete_worker( return result.rowcount > 0 +async def emit_work_pool_updated_event( + session: AsyncSession, + work_pool: orm_models.WorkPool, + changed_fields: Dict[str, Dict[str, Any]], +) -> None: + """Emit an event when work pool fields are updated.""" + if not changed_fields: + return + + async with PrefectServerEventsClient() as events_client: + await events_client.emit( + await work_pool_updated_event( + session=session, + work_pool=work_pool, + changed_fields=changed_fields, + occurred=now("UTC"), + ) + ) + async def emit_work_pool_status_event( event_id: UUID, From bc8413b518c0fcc2c7aa81be2e10a1eed35bd654 Mon Sep 17 00:00:00 2001 From: F4RAN Date: Tue, 9 Dec 2025 16:13:32 +0330 Subject: [PATCH 03/15] Emit events for work pool and work queue field updates Add event emission for work pool and work queue field updates beyond status changes. Events are created when user-updatable fields change: - Work pools: description, base_job_template, concurrency_limit, storage_configuration - Work queues: name, description, concurrency_limit, priority --- src/prefect/server/models/work_queues.py | 72 +++++++++++++++++++++--- src/prefect/server/models/workers.py | 2 - 2 files changed, 64 insertions(+), 10 deletions(-) diff --git a/src/prefect/server/models/work_queues.py b/src/prefect/server/models/work_queues.py index 1eebb5cb09b5..a41f787fae8e 100644 --- a/src/prefect/server/models/work_queues.py +++ b/src/prefect/server/models/work_queues.py @@ -5,8 +5,10 @@ import datetime from typing import ( + Any, Awaitable, Callable, + Dict, Iterable, Optional, Sequence, @@ -32,7 +34,7 @@ ) from prefect.server.events.clients import PrefectServerEventsClient from prefect.server.exceptions import ObjectNotFoundError -from prefect.server.models.events import work_queue_status_event +from prefect.server.models.events import work_queue_status_event, work_queue_updated_event from prefect.server.models.workers import ( DEFAULT_AGENT_WORK_POOL_NAME, bulk_update_work_queue_priorities, @@ -242,12 +244,16 @@ async def update_work_queue( # exclude_unset=True allows us to only update values provided by # the user, ignoring any defaults on the model update_data = work_queue.model_dump_for_orm(exclude_unset=True) - + current_work_queue = await read_work_queue(session=session, work_queue_id=work_queue_id) + if current_work_queue is None: + return False + + session.expunge(current_work_queue) + if "is_paused" in update_data: wq = await read_work_queue(session=session, work_queue_id=work_queue_id) - if wq is None: - return False - + assert wq is not None + assert current_work_queue is not wq # Only update the status to paused if it's not already paused. This ensures a work queue that is already # paused will not get a status update if it's paused again if update_data.get("is_paused") and wq.status != WorkQueueStatus.PAUSED: @@ -282,9 +288,42 @@ async def update_work_queue( updated = result.rowcount > 0 if updated: + wq = await read_work_queue(session=session, work_queue_id=work_queue_id) + assert wq is not None + assert current_work_queue is not wq + WORK_QUEUE_EVENT_FIELDS = { + "name", + "description", + "concurrency_limit", + "priority", + # Exclude "is_paused" - handled with status + # Exclude "last_polled" - usually auto-updated + # Exclude "filter" - deprecated + } + # Detect which fields actually changed + changed_fields = {} + for field in update_data.keys(): + if field not in WORK_QUEUE_EVENT_FIELDS or field == "status": + continue + + old_value = getattr(current_work_queue, field, None) + new_value = getattr(wq, field, None) + + if old_value != new_value: + changed_fields[field] = { + "old": old_value, + "new": new_value, + } + + # Emit event for non-status field changes + if changed_fields: + await emit_work_queue_updated_event( + session=session, + work_queue=wq, + changed_fields=changed_fields, + ) + if "status" in update_data and emit_status_change: - wq = await read_work_queue(session=session, work_queue_id=work_queue_id) - assert wq await emit_status_change(wq) return updated @@ -627,6 +666,23 @@ async def emit_work_queue_status_event( work_queue=work_queue, occurred=now("UTC"), ) - async with PrefectServerEventsClient() as events_client: await events_client.emit(event) + +async def emit_work_queue_updated_event( + session: AsyncSession, + work_queue: orm_models.WorkQueue, + changed_fields: Dict[str, Dict[str, Any]], +) -> None: + if not changed_fields: + return + + async with PrefectServerEventsClient() as events_client: + await events_client.emit( + await work_queue_updated_event( + session=session, + work_queue=work_queue, + changed_fields=changed_fields, + occurred=now("UTC"), + ) + ) \ No newline at end of file diff --git a/src/prefect/server/models/workers.py b/src/prefect/server/models/workers.py index 1cfc4c715b32..cbb15dd0a0bd 100644 --- a/src/prefect/server/models/workers.py +++ b/src/prefect/server/models/workers.py @@ -273,10 +273,8 @@ async def update_work_pool( "storage_configuration", } - # Detect which fields actually changed (only from our allowlist, excluding status) changed_fields = {} for field in update_data.keys(): - # Only track fields in our allowlist, and skip status (it has its own event) if field not in WORK_POOL_EVENT_FIELDS or field == "status": continue From 61df754ddc5ec8e2daf798f460b1f6953ed15668 Mon Sep 17 00:00:00 2001 From: F4RAN Date: Tue, 9 Dec 2025 16:22:34 +0330 Subject: [PATCH 04/15] Emit events for work queue updates via work pool endpoint --- src/prefect/server/models/workers.py | 44 +++++++++++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/src/prefect/server/models/workers.py b/src/prefect/server/models/workers.py index cbb15dd0a0bd..c7b263ff0bff 100644 --- a/src/prefect/server/models/workers.py +++ b/src/prefect/server/models/workers.py @@ -628,6 +628,11 @@ async def update_work_queue( update_values = work_queue.model_dump_for_orm(exclude_unset=True) + current_work_queue = await session.get(db.WorkQueue, work_queue_id) + if current_work_queue is None: + return False + session.expunge(current_work_queue) + if "is_paused" in update_values: if (wq := await session.get(db.WorkQueue, work_queue_id)) is None: return False @@ -666,8 +671,45 @@ async def update_work_queue( updated = result.rowcount > 0 if updated: + updated_work_queue = await session.get(db.WorkQueue, work_queue_id) + assert updated_work_queue is not None + assert current_work_queue is not updated_work_queue + + # Fields that should trigger update events (user-updatable fields) + WORK_QUEUE_EVENT_FIELDS = { + "name", + "description", + "concurrency_limit", + "priority", + # Exclude "is_paused" - handled with status + # Exclude "last_polled" - usually auto-updated + # Exclude "filter" - deprecated + } + + changed_fields = {} + for field in update_values.keys(): + if field not in WORK_QUEUE_EVENT_FIELDS or field == "status": + continue + + old_value = getattr(current_work_queue, field, None) + new_value = getattr(updated_work_queue, field, None) + + if old_value != new_value: + changed_fields[field] = { + "old": old_value, + "new": new_value, + } + + if changed_fields: + from prefect.server.models.work_queues import emit_work_queue_updated_event + await emit_work_queue_updated_event( + session=session, + work_queue=updated_work_queue, + changed_fields=changed_fields, + ) + + if "priority" in update_values or "status" in update_values: - updated_work_queue = await session.get(db.WorkQueue, work_queue_id) assert updated_work_queue if "priority" in update_values: From 1b3edcf8e9872f7172caa559dccf8c5beb036408 Mon Sep 17 00:00:00 2001 From: F4RAN Date: Tue, 9 Dec 2025 17:39:16 +0330 Subject: [PATCH 05/15] closes #16658 Emit events when work pools and work queues are updated Previously, events were only created when status changed. Now we also emit events when other fields like description, concurrency_limit, or priority are updated. This enables automations based on configuration changes. Events include the old and new values in the payload. --- src/prefect/server/models/events.py | 25 ++- src/prefect/server/models/work_queues.py | 26 ++- src/prefect/server/models/workers.py | 32 ++- .../orchestration/api/test_work_queues.py | 194 +++++++++++++++++- .../server/orchestration/api/test_workers.py | 148 +++++++++++++ 5 files changed, 388 insertions(+), 37 deletions(-) diff --git a/src/prefect/server/models/events.py b/src/prefect/server/models/events.py index d038740e0687..25026fc6d66f 100644 --- a/src/prefect/server/models/events.py +++ b/src/prefect/server/models/events.py @@ -418,10 +418,13 @@ async def work_pool_status_event( follows=_get_recent_preceding_work_pool_event_id(pre_update_work_pool), ) + async def work_pool_updated_event( session: AsyncSession, work_pool: "ORMWorkPool", - changed_fields: Dict[str, Dict[str, Any]], # {"field_name": {"old": value, "new": value}} + changed_fields: Dict[ + str, Dict[str, Any] + ], # {"field_name": {"old": value, "new": value}} occurred: DateTime, ) -> Event: """Create an event for work pool field updates (non-status).""" @@ -441,6 +444,7 @@ async def work_pool_updated_event( id=uuid7(), ) + async def work_queue_updated_event( session: AsyncSession, work_queue: "ORMWorkQueue", @@ -449,20 +453,22 @@ async def work_queue_updated_event( ) -> Event: """Create an event for work queue field updates (non-status).""" related_work_pool_info: List[Dict[str, Any]] = [] - + if work_queue.work_pool_id: work_pool = await models.workers.read_work_pool( session=session, work_pool_id=work_queue.work_pool_id, ) if work_pool and work_pool.id and work_pool.name: - related_work_pool_info.append({ - "prefect.resource.id": f"prefect.work-pool.{work_pool.id}", - "prefect.resource.name": work_pool.name, - "prefect.work-pool.type": work_pool.type, - "prefect.resource.role": "work-pool", - }) - + related_work_pool_info.append( + { + "prefect.resource.id": f"prefect.work-pool.{work_pool.id}", + "prefect.resource.name": work_pool.name, + "prefect.work-pool.type": work_pool.type, + "prefect.resource.role": "work-pool", + } + ) + return Event( occurred=occurred, event="prefect.work-queue.updated", @@ -479,6 +485,7 @@ async def work_queue_updated_event( id=uuid7(), ) + def _get_recent_preceding_work_pool_event_id( work_pool: Optional["ORMWorkPool"], ) -> Optional[UUID]: diff --git a/src/prefect/server/models/work_queues.py b/src/prefect/server/models/work_queues.py index a41f787fae8e..698cf8654d9f 100644 --- a/src/prefect/server/models/work_queues.py +++ b/src/prefect/server/models/work_queues.py @@ -34,7 +34,10 @@ ) from prefect.server.events.clients import PrefectServerEventsClient from prefect.server.exceptions import ObjectNotFoundError -from prefect.server.models.events import work_queue_status_event, work_queue_updated_event +from prefect.server.models.events import ( + work_queue_status_event, + work_queue_updated_event, +) from prefect.server.models.workers import ( DEFAULT_AGENT_WORK_POOL_NAME, bulk_update_work_queue_priorities, @@ -244,12 +247,14 @@ async def update_work_queue( # exclude_unset=True allows us to only update values provided by # the user, ignoring any defaults on the model update_data = work_queue.model_dump_for_orm(exclude_unset=True) - current_work_queue = await read_work_queue(session=session, work_queue_id=work_queue_id) + current_work_queue = await read_work_queue( + session=session, work_queue_id=work_queue_id + ) if current_work_queue is None: return False - + session.expunge(current_work_queue) - + if "is_paused" in update_data: wq = await read_work_queue(session=session, work_queue_id=work_queue_id) assert wq is not None @@ -305,16 +310,16 @@ async def update_work_queue( for field in update_data.keys(): if field not in WORK_QUEUE_EVENT_FIELDS or field == "status": continue - + old_value = getattr(current_work_queue, field, None) new_value = getattr(wq, field, None) - + if old_value != new_value: changed_fields[field] = { "old": old_value, "new": new_value, } - + # Emit event for non-status field changes if changed_fields: await emit_work_queue_updated_event( @@ -322,7 +327,7 @@ async def update_work_queue( work_queue=wq, changed_fields=changed_fields, ) - + if "status" in update_data and emit_status_change: await emit_status_change(wq) @@ -669,6 +674,7 @@ async def emit_work_queue_status_event( async with PrefectServerEventsClient() as events_client: await events_client.emit(event) + async def emit_work_queue_updated_event( session: AsyncSession, work_queue: orm_models.WorkQueue, @@ -676,7 +682,7 @@ async def emit_work_queue_updated_event( ) -> None: if not changed_fields: return - + async with PrefectServerEventsClient() as events_client: await events_client.emit( await work_queue_updated_event( @@ -685,4 +691,4 @@ async def emit_work_queue_updated_event( changed_fields=changed_fields, occurred=now("UTC"), ) - ) \ No newline at end of file + ) diff --git a/src/prefect/server/models/workers.py b/src/prefect/server/models/workers.py index c7b263ff0bff..acae2f4e6fb5 100644 --- a/src/prefect/server/models/workers.py +++ b/src/prefect/server/models/workers.py @@ -22,6 +22,7 @@ import prefect.server.schemas as schemas from prefect._internal.uuid7 import uuid7 +from prefect.logging import get_logger from prefect.server.database import PrefectDBInterface, db_injector, orm_models from prefect.server.events.clients import PrefectServerEventsClient from prefect.server.exceptions import ObjectNotFoundError @@ -32,7 +33,7 @@ from prefect.server.schemas.statuses import WorkQueueStatus from prefect.server.utilities.database import UUID as PrefectUUID from prefect.types._datetime import DateTime, now -from prefect.logging import get_logger + logger = get_logger("prefect.server.models.workers") DEFAULT_AGENT_WORK_POOL_NAME = "default-agent-pool" @@ -261,8 +262,7 @@ async def update_work_pool( assert wp is not None assert current_work_pool is not wp - - + # Detect which fields actually changed (excluding status and internal fields) # Fields that should trigger update events (user-updatable fields) WORK_POOL_EVENT_FIELDS = { @@ -272,22 +272,22 @@ async def update_work_pool( "concurrency_limit", "storage_configuration", } - + changed_fields = {} for field in update_data.keys(): if field not in WORK_POOL_EVENT_FIELDS or field == "status": continue - + old_value = getattr(current_work_pool, field, None) new_value = getattr(wp, field, None) - + # Compare values (handle different types) if old_value != new_value: changed_fields[field] = { "old": old_value, "new": new_value, } - + # Emit event for non-status field changes if changed_fields: await emit_work_pool_updated_event( @@ -295,7 +295,6 @@ async def update_work_pool( work_pool=wp, changed_fields=changed_fields, ) - if "status" in update_data and emit_status_change: await emit_status_change( @@ -674,7 +673,7 @@ async def update_work_queue( updated_work_queue = await session.get(db.WorkQueue, work_queue_id) assert updated_work_queue is not None assert current_work_queue is not updated_work_queue - + # Fields that should trigger update events (user-updatable fields) WORK_QUEUE_EVENT_FIELDS = { "name", @@ -685,33 +684,31 @@ async def update_work_queue( # Exclude "last_polled" - usually auto-updated # Exclude "filter" - deprecated } - + changed_fields = {} for field in update_values.keys(): if field not in WORK_QUEUE_EVENT_FIELDS or field == "status": continue - + old_value = getattr(current_work_queue, field, None) new_value = getattr(updated_work_queue, field, None) - + if old_value != new_value: changed_fields[field] = { "old": old_value, "new": new_value, } - + if changed_fields: from prefect.server.models.work_queues import emit_work_queue_updated_event + await emit_work_queue_updated_event( session=session, work_queue=updated_work_queue, changed_fields=changed_fields, ) - if "priority" in update_values or "status" in update_values: - assert updated_work_queue - if "priority" in update_values: await bulk_update_work_queue_priorities( session, @@ -880,6 +877,7 @@ async def delete_worker( return result.rowcount > 0 + async def emit_work_pool_updated_event( session: AsyncSession, work_pool: orm_models.WorkPool, @@ -888,7 +886,7 @@ async def emit_work_pool_updated_event( """Emit an event when work pool fields are updated.""" if not changed_fields: return - + async with PrefectServerEventsClient() as events_client: await events_client.emit( await work_pool_updated_event( diff --git a/tests/server/orchestration/api/test_work_queues.py b/tests/server/orchestration/api/test_work_queues.py index 41d56dc19162..783429ae4a5b 100644 --- a/tests/server/orchestration/api/test_work_queues.py +++ b/tests/server/orchestration/api/test_work_queues.py @@ -557,6 +557,198 @@ async def test_update_work_queue_to_unpaused_with_recent_last_polled_sets_ready_ ], ) + async def test_update_work_queue_emits_updated_event_for_concurrency_limit( + self, + client, + work_queue, + ): + """Test that updating concurrency_limit emits prefect.work-queue.updated event.""" + assert work_queue.concurrency_limit is None + + new_data = schemas.actions.WorkQueueUpdate( + concurrency_limit=10 + ).model_dump(mode="json", exclude_unset=True) + + response = await client.patch( + f"/work_queues/{work_queue.id}", + json=new_data, + ) + + assert response.status_code == 204 + + AssertingEventsClient.assert_emitted_event_with( + event="prefect.work-queue.updated", + resource={ + "prefect.resource.id": f"prefect.work-queue.{work_queue.id}", + "prefect.resource.name": work_queue.name, + "prefect.resource.role": "work-queue", + }, + payload={ + "changed_fields": ["concurrency_limit"], + "changes": { + "concurrency_limit": { + "old": None, + "new": 10, + } + }, + }, + related=[ + { + "prefect.resource.id": f"prefect.work-pool.{work_queue.work_pool.id}", + "prefect.resource.name": work_queue.work_pool.name, + "prefect.work-pool.type": work_queue.work_pool.type, + "prefect.resource.role": "work-pool", + } + ], + ) + + async def test_update_work_queue_emits_updated_event_for_description( + self, + client, + work_queue, + ): + """Test that updating description emits prefect.work-queue.updated event.""" + original_description = work_queue.description or "" + + new_data = schemas.actions.WorkQueueUpdate( + description="Updated description" + ).model_dump(mode="json", exclude_unset=True) + + response = await client.patch( + f"/work_queues/{work_queue.id}", + json=new_data, + ) + + assert response.status_code == 204 + + AssertingEventsClient.assert_emitted_event_with( + event="prefect.work-queue.updated", + resource={ + "prefect.resource.id": f"prefect.work-queue.{work_queue.id}", + "prefect.resource.name": work_queue.name, + "prefect.resource.role": "work-queue", + }, + payload={ + "changed_fields": ["description"], + "changes": { + "description": { + "old": original_description, + "new": "Updated description", + } + }, + }, + ) + + async def test_update_work_queue_emits_updated_event_for_priority( + self, + client, + work_queue, + ): + """Test that updating priority emits prefect.work-queue.updated event.""" + original_priority = work_queue.priority + + new_data = schemas.actions.WorkQueueUpdate( + priority=5 + ).model_dump(mode="json", exclude_unset=True) + + response = await client.patch( + f"/work_queues/{work_queue.id}", + json=new_data, + ) + + assert response.status_code == 204 + + AssertingEventsClient.assert_emitted_event_with( + event="prefect.work-queue.updated", + resource={ + "prefect.resource.id": f"prefect.work-queue.{work_queue.id}", + "prefect.resource.name": work_queue.name, + "prefect.resource.role": "work-queue", + }, + payload={ + "changed_fields": ["priority"], + "changes": { + "priority": { + "old": original_priority, + "new": 5, + } + }, + }, + ) + + async def test_update_work_queue_emits_updated_event_for_multiple_fields( + self, + client, + work_queue, + ): + """Test that updating multiple fields emits single event with all changes.""" + original_description = work_queue.description or "" + original_concurrency_limit = work_queue.concurrency_limit + + new_data = schemas.actions.WorkQueueUpdate( + description="Multi update", + concurrency_limit=20, + ).model_dump(mode="json", exclude_unset=True) + + response = await client.patch( + f"/work_queues/{work_queue.id}", + json=new_data, + ) + + assert response.status_code == 204 + + AssertingEventsClient.assert_emitted_event_with( + event="prefect.work-queue.updated", + resource={ + "prefect.resource.id": f"prefect.work-queue.{work_queue.id}", + "prefect.resource.name": work_queue.name, + "prefect.resource.role": "work-queue", + }, + payload={ + "changed_fields": ["description", "concurrency_limit"], + "changes": { + "description": { + "old": original_description, + "new": "Multi update", + }, + "concurrency_limit": { + "old": original_concurrency_limit, + "new": 20, + }, + }, + }, + ) + + async def test_update_work_queue_no_event_for_noop_update( + self, + client, + work_queue, + ): + """Test that no event is emitted when updating to the same value.""" + # Set initial concurrency limit + await client.patch( + f"/work_queues/{work_queue.id}", + json={"concurrency_limit": 10}, + ) + AssertingEventsClient.reset() + + # Update to same value + response = await client.patch( + f"/work_queues/{work_queue.id}", + json={"concurrency_limit": 10}, + ) + + assert response.status_code == 204 + + # Should not emit updated event (only status event if status changed) + events = [ + event + for client in AssertingEventsClient.all + for event in client.events + if event.event == "prefect.work-queue.updated" + ] + assert len(events) == 0 + class TestReadWorkQueue: async def test_read_work_queue(self, client, work_queue): @@ -1170,4 +1362,4 @@ async def test_read_work_queue_returns_correct_status_when_work_queues_share_nam async def test_read_work_queue_status_returns_404_if_does_not_exist(self, client): response = await client.get(f"/work_queues/{uuid4()}/status") - assert response.status_code == status.HTTP_404_NOT_FOUND + assert response.status_code == status.HTTP_404_NOT_FOUND \ No newline at end of file diff --git a/tests/server/orchestration/api/test_workers.py b/tests/server/orchestration/api/test_workers.py index 93814c8c4e9b..fecb72f5ee8a 100644 --- a/tests/server/orchestration/api/test_workers.py +++ b/tests/server/orchestration/api/test_workers.py @@ -1503,7 +1503,155 @@ async def test_update_work_queue_to_unpaused_with_recent_last_polled_sets_ready_ } ], ) + async def test_update_work_queue_via_work_pool_emits_updated_event( + self, client, work_pool + ): + """Test that updating work queue via work pool endpoint emits updated event.""" + # Create work pool queue + create_response = await client.post( + f"/work_pools/{work_pool.name}/queues", + json=dict(name="test-queue"), + ) + assert create_response.status_code == status.HTTP_201_CREATED + create_result = parse_obj_as(WorkQueue, create_response.json()) + + # Update concurrency limit + update_response = await client.patch( + f"/work_pools/{work_pool.name}/queues/test-queue", + json=dict(concurrency_limit=15), + ) + assert update_response.status_code == status.HTTP_204_NO_CONTENT + + AssertingEventsClient.assert_emitted_event_with( + event="prefect.work-queue.updated", + resource={ + "prefect.resource.id": f"prefect.work-queue.{create_result.id}", + "prefect.resource.name": "test-queue", + "prefect.resource.role": "work-queue", + }, + payload={ + "changed_fields": ["concurrency_limit"], + "changes": { + "concurrency_limit": { + "old": None, + "new": 15, + } + }, + }, + related=[ + { + "prefect.resource.id": f"prefect.work-pool.{work_pool.id}", + "prefect.resource.name": work_pool.name, + "prefect.work-pool.type": work_pool.type, + "prefect.resource.role": "work-pool", + } + ], + ) + +class TestUpdateWorkPoolEvents: + async def test_update_work_pool_emits_updated_event_for_description( + self, client, work_pool + ): + """Test that updating work pool description emits prefect.work-pool.updated event.""" + original_description = work_pool.description + + response = await client.patch( + f"/work_pools/{work_pool.name}", + json={"description": "Updated description"}, + ) + + assert response.status_code == status.HTTP_204_NO_CONTENT + AssertingEventsClient.assert_emitted_event_with( + event="prefect.work-pool.updated", + resource={ + "prefect.resource.id": f"prefect.work-pool.{work_pool.id}", + "prefect.resource.name": work_pool.name, + "prefect.work-pool.type": work_pool.type, + "prefect.resource.role": "work-pool", + }, + payload={ + "changed_fields": ["description"], + "changes": { + "description": { + "old": original_description, + "new": "Updated description", + } + }, + }, + ) + + async def test_update_work_pool_emits_updated_event_for_concurrency_limit( + self, client, work_pool + ): + """Test that updating work pool concurrency_limit emits prefect.work-pool.updated event.""" + original_concurrency_limit = work_pool.concurrency_limit + + response = await client.patch( + f"/work_pools/{work_pool.name}", + json={"concurrency_limit": 25}, + ) + + assert response.status_code == status.HTTP_204_NO_CONTENT + + AssertingEventsClient.assert_emitted_event_with( + event="prefect.work-pool.updated", + resource={ + "prefect.resource.id": f"prefect.work-pool.{work_pool.id}", + "prefect.resource.name": work_pool.name, + "prefect.work-pool.type": work_pool.type, + "prefect.resource.role": "work-pool", + }, + payload={ + "changed_fields": ["concurrency_limit"], + "changes": { + "concurrency_limit": { + "old": original_concurrency_limit, + "new": 25, + } + }, + }, + ) + + async def test_update_work_pool_emits_updated_event_for_multiple_fields( + self, client, work_pool + ): + """Test that updating multiple work pool fields emits single event with all changes.""" + original_description = work_pool.description + original_concurrency_limit = work_pool.concurrency_limit + + response = await client.patch( + f"/work_pools/{work_pool.name}", + json={ + "description": "Multi update", + "concurrency_limit": 30, + }, + ) + + assert response.status_code == status.HTTP_204_NO_CONTENT + + AssertingEventsClient.assert_emitted_event_with( + event="prefect.work-pool.updated", + resource={ + "prefect.resource.id": f"prefect.work-pool.{work_pool.id}", + "prefect.resource.name": work_pool.name, + "prefect.work-pool.type": work_pool.type, + "prefect.resource.role": "work-pool", + }, + payload={ + "changed_fields": ["description", "concurrency_limit"], + "changes": { + "description": { + "old": original_description, + "new": "Multi update", + }, + "concurrency_limit": { + "old": original_concurrency_limit, + "new": 30, + }, + }, + }, + ) class TestWorkPoolStatus: async def test_work_pool_status_with_online_worker(self, client, work_pool): From d416993024f1e45e5ae28089a47f0a37a05bcf23 Mon Sep 17 00:00:00 2001 From: F4RAN Date: Tue, 9 Dec 2025 17:42:40 +0330 Subject: [PATCH 06/15] Add docstring to emit_work_queue_updated_event --- src/prefect/server/models/work_queues.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/prefect/server/models/work_queues.py b/src/prefect/server/models/work_queues.py index 698cf8654d9f..f6c735817cb0 100644 --- a/src/prefect/server/models/work_queues.py +++ b/src/prefect/server/models/work_queues.py @@ -665,6 +665,7 @@ async def emit_work_queue_status_event( db: PrefectDBInterface, work_queue: orm_models.WorkQueue, ) -> None: + """Emit an event when work queue fields are updated.""" async with db.session_context() as session: event = await work_queue_status_event( session=session, From 6ea32846c5f822b83f7d61e6197baf24f99675f2 Mon Sep 17 00:00:00 2001 From: F4RAN Date: Tue, 9 Dec 2025 17:46:04 +0330 Subject: [PATCH 07/15] Add docstring to emit_work_queue_updated_event --- tests/server/orchestration/api/test_work_queues.py | 14 +++++++------- tests/server/orchestration/api/test_workers.py | 3 +++ 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/tests/server/orchestration/api/test_work_queues.py b/tests/server/orchestration/api/test_work_queues.py index 783429ae4a5b..fbc21a547fef 100644 --- a/tests/server/orchestration/api/test_work_queues.py +++ b/tests/server/orchestration/api/test_work_queues.py @@ -565,9 +565,9 @@ async def test_update_work_queue_emits_updated_event_for_concurrency_limit( """Test that updating concurrency_limit emits prefect.work-queue.updated event.""" assert work_queue.concurrency_limit is None - new_data = schemas.actions.WorkQueueUpdate( - concurrency_limit=10 - ).model_dump(mode="json", exclude_unset=True) + new_data = schemas.actions.WorkQueueUpdate(concurrency_limit=10).model_dump( + mode="json", exclude_unset=True + ) response = await client.patch( f"/work_queues/{work_queue.id}", @@ -647,9 +647,9 @@ async def test_update_work_queue_emits_updated_event_for_priority( """Test that updating priority emits prefect.work-queue.updated event.""" original_priority = work_queue.priority - new_data = schemas.actions.WorkQueueUpdate( - priority=5 - ).model_dump(mode="json", exclude_unset=True) + new_data = schemas.actions.WorkQueueUpdate(priority=5).model_dump( + mode="json", exclude_unset=True + ) response = await client.patch( f"/work_queues/{work_queue.id}", @@ -1362,4 +1362,4 @@ async def test_read_work_queue_returns_correct_status_when_work_queues_share_nam async def test_read_work_queue_status_returns_404_if_does_not_exist(self, client): response = await client.get(f"/work_queues/{uuid4()}/status") - assert response.status_code == status.HTTP_404_NOT_FOUND \ No newline at end of file + assert response.status_code == status.HTTP_404_NOT_FOUND diff --git a/tests/server/orchestration/api/test_workers.py b/tests/server/orchestration/api/test_workers.py index fecb72f5ee8a..2de9cba71a70 100644 --- a/tests/server/orchestration/api/test_workers.py +++ b/tests/server/orchestration/api/test_workers.py @@ -1503,6 +1503,7 @@ async def test_update_work_queue_to_unpaused_with_recent_last_polled_sets_ready_ } ], ) + async def test_update_work_queue_via_work_pool_emits_updated_event( self, client, work_pool ): @@ -1548,6 +1549,7 @@ async def test_update_work_queue_via_work_pool_emits_updated_event( ], ) + class TestUpdateWorkPoolEvents: async def test_update_work_pool_emits_updated_event_for_description( self, client, work_pool @@ -1653,6 +1655,7 @@ async def test_update_work_pool_emits_updated_event_for_multiple_fields( }, ) + class TestWorkPoolStatus: async def test_work_pool_status_with_online_worker(self, client, work_pool): """Work pools with an online work should have a status of READY.""" From 19f09d03f556044a1c814e5cf09b665dfbba3adf Mon Sep 17 00:00:00 2001 From: F4RAN Date: Tue, 9 Dec 2025 18:12:16 +0330 Subject: [PATCH 08/15] fix: remove additional logger --- src/prefect/server/models/workers.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/prefect/server/models/workers.py b/src/prefect/server/models/workers.py index acae2f4e6fb5..8be176d08159 100644 --- a/src/prefect/server/models/workers.py +++ b/src/prefect/server/models/workers.py @@ -22,7 +22,6 @@ import prefect.server.schemas as schemas from prefect._internal.uuid7 import uuid7 -from prefect.logging import get_logger from prefect.server.database import PrefectDBInterface, db_injector, orm_models from prefect.server.events.clients import PrefectServerEventsClient from prefect.server.exceptions import ObjectNotFoundError @@ -34,7 +33,6 @@ from prefect.server.utilities.database import UUID as PrefectUUID from prefect.types._datetime import DateTime, now -logger = get_logger("prefect.server.models.workers") DEFAULT_AGENT_WORK_POOL_NAME = "default-agent-pool" # ----------------------------------------------------- From e31c172cce201eead691e365e03ab98db0cdd1fd Mon Sep 17 00:00:00 2001 From: F4RAN Date: Tue, 9 Dec 2025 18:37:22 +0330 Subject: [PATCH 09/15] Update tests to expect field update events in addition to status events --- .../orchestration/api/test_work_queues.py | 51 +++++++++++-- .../server/orchestration/api/test_workers.py | 71 +++++++++++++++++-- 2 files changed, 110 insertions(+), 12 deletions(-) diff --git a/tests/server/orchestration/api/test_work_queues.py b/tests/server/orchestration/api/test_work_queues.py index fbc21a547fef..85ef50520512 100644 --- a/tests/server/orchestration/api/test_work_queues.py +++ b/tests/server/orchestration/api/test_work_queues.py @@ -204,7 +204,12 @@ async def test_update_work_queue( events = [ event for client in AssertingEventsClient.all for event in client.events ] - assert len(events) == 1 + # Expect both status event and updated event for concurrency_limit + assert len(events) == 2 + # Verify we have both a status event and an updated event + event_types = {event.event for event in events} + assert "prefect.work-queue.paused" in event_types + assert "prefect.work-queue.updated" in event_types async def test_update_work_queue_to_paused( self, @@ -312,8 +317,26 @@ async def test_update_work_queue_to_paused_when_already_paused_does_not_emit_eve assert work_queue_response.status_code == 200 assert work_queue_response.json()["status"] == "PAUSED" - # ensure no events emitted for already paused work queue - AssertingEventsClient.assert_emitted_event_count(0) + # Since concurrency_limit changed, we should get an updated event + # (status didn't change, so no status event) + AssertingEventsClient.assert_emitted_event_count(1) + AssertingEventsClient.assert_emitted_event_with( + event="prefect.work-queue.updated", + resource={ + "prefect.resource.id": f"prefect.work-queue.{paused_work_queue.id}", + "prefect.resource.name": paused_work_queue.name, + "prefect.resource.role": "work-queue", + }, + payload={ + "changed_fields": ["concurrency_limit"], + "changes": { + "concurrency_limit": { + "old": None, + "new": 3, + } + }, + }, + ) async def test_update_work_queue_to_unpaused_when_already_unpaused_does_not_emit_event( self, @@ -339,8 +362,26 @@ async def test_update_work_queue_to_unpaused_when_already_unpaused_does_not_emit assert work_queue_response.status_code == 200 assert work_queue_response.json()["status"] == "READY" - # ensure no events emitted for already unpaused work queue - AssertingEventsClient.assert_emitted_event_count(0) + # Since concurrency_limit changed, we should get an updated event + # (status didn't change, so no status event) + AssertingEventsClient.assert_emitted_event_count(1) + AssertingEventsClient.assert_emitted_event_with( + event="prefect.work-queue.updated", + resource={ + "prefect.resource.id": f"prefect.work-queue.{ready_work_queue.id}", + "prefect.resource.name": ready_work_queue.name, + "prefect.resource.role": "work-queue", + }, + payload={ + "changed_fields": ["concurrency_limit"], + "changes": { + "concurrency_limit": { + "old": None, + "new": 3, + } + }, + }, + ) async def test_update_work_queue_to_unpaused_with_no_last_polled_sets_not_ready_status( self, diff --git a/tests/server/orchestration/api/test_workers.py b/tests/server/orchestration/api/test_workers.py index 2de9cba71a70..3e3b63a07f8b 100644 --- a/tests/server/orchestration/api/test_workers.py +++ b/tests/server/orchestration/api/test_workers.py @@ -452,7 +452,14 @@ async def test_update_work_pool(self, client, session, work_pool): assert result.is_paused is True assert result.concurrency_limit == 5 - assert_status_events(work_pool.name, ["prefect.work-pool.paused"]) + # Expect both status event and updated event for concurrency_limit + found_events = [ + event for item in AssertingEventsClient.all for event in item.events + ] + assert len(found_events) == 2 + event_types = {event.event for event in found_events} + assert "prefect.work-pool.paused" in event_types + assert "prefect.work-pool.updated" in event_types async def test_update_work_pool_storage_configuration(self, client, work_pool): bundle_upload_step = { @@ -1194,7 +1201,14 @@ async def test_update_work_queue_to_paused( assert response.json()["concurrency_limit"] == 3 assert response.json()["status"] == "PAUSED" - assert_status_events(work_queue_1.name, ["prefect.work-queue.paused"]) + # Expect both status event and updated event for concurrency_limit + found_events = [ + event for item in AssertingEventsClient.all for event in item.events + ] + assert len(found_events) == 2 + event_types = {event.event for event in found_events} + assert "prefect.work-queue.paused" in event_types + assert "prefect.work-queue.updated" in event_types async def test_update_work_queue_to_paused_sets_paused_status( self, @@ -1228,7 +1242,14 @@ async def test_update_work_queue_to_paused_sets_paused_status( assert work_queue_response.status_code == 200 assert work_queue_response.json()["status"] == "PAUSED" - assert_status_events(work_queue_1.name, ["prefect.work-queue.paused"]) + # Expect both status event and updated event for concurrency_limit + found_events = [ + event for item in AssertingEventsClient.all for event in item.events + ] + assert len(found_events) == 2 + event_types = {event.event for event in found_events} + assert "prefect.work-queue.paused" in event_types + assert "prefect.work-queue.updated" in event_types async def test_update_work_queue_to_paused_when_already_paused_does_not_emit_event( self, @@ -1255,8 +1276,26 @@ async def test_update_work_queue_to_paused_when_already_paused_does_not_emit_eve assert work_queue_response.status_code == 200 assert work_queue_response.json()["status"] == "PAUSED" - # ensure no events emitted for already paused work queue - AssertingEventsClient.assert_emitted_event_count(0) + # Since concurrency_limit changed, we should get an updated event + # (status didn't change, so no status event) + AssertingEventsClient.assert_emitted_event_count(1) + AssertingEventsClient.assert_emitted_event_with( + event="prefect.work-queue.updated", + resource={ + "prefect.resource.id": f"prefect.work-queue.{paused_work_queue.id}", + "prefect.resource.name": paused_work_queue.name, + "prefect.resource.role": "work-queue", + }, + payload={ + "changed_fields": ["concurrency_limit"], + "changes": { + "concurrency_limit": { + "old": None, + "new": 3, + } + }, + }, + ) async def test_update_work_queue_to_unpaused_when_already_unpaused_does_not_emit_event( self, @@ -1283,8 +1322,26 @@ async def test_update_work_queue_to_unpaused_when_already_unpaused_does_not_emit assert work_queue_response.status_code == 200 assert work_queue_response.json()["status"] == "READY" - # ensure no events emitted for already unpaused work queue - AssertingEventsClient.assert_emitted_event_count(0) + # Since concurrency_limit changed, we should get an updated event + # (status didn't change, so no status event) + AssertingEventsClient.assert_emitted_event_count(1) + AssertingEventsClient.assert_emitted_event_with( + event="prefect.work-queue.updated", + resource={ + "prefect.resource.id": f"prefect.work-queue.{ready_work_queue.id}", + "prefect.resource.name": ready_work_queue.name, + "prefect.resource.role": "work-queue", + }, + payload={ + "changed_fields": ["concurrency_limit"], + "changes": { + "concurrency_limit": { + "old": None, + "new": 3, + } + }, + }, + ) async def test_update_work_queue_to_unpaused_with_no_last_polled_sets_not_ready_status( self, From 46a86fa08f58635f165df497880eb329fa92abfc Mon Sep 17 00:00:00 2001 From: F4RAN Date: Wed, 10 Dec 2025 21:35:25 +0330 Subject: [PATCH 10/15] Update src/prefect/server/models/events.py Co-authored-by: Alex Streed --- src/prefect/server/models/events.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/prefect/server/models/events.py b/src/prefect/server/models/events.py index 25026fc6d66f..ce0d7d112fae 100644 --- a/src/prefect/server/models/events.py +++ b/src/prefect/server/models/events.py @@ -438,8 +438,8 @@ async def work_pool_updated_event( "prefect.resource.role": "work-pool", }, payload={ - "changed_fields": list(changed_fields.keys()), - "changes": changed_fields, + "updated_fields": list(changed_fields.keys()), + "updates": changed_fields, }, id=uuid7(), ) From ebf9e442a9d35912f4fafec7d0d8ca52995be011 Mon Sep 17 00:00:00 2001 From: F4RAN Date: Wed, 10 Dec 2025 21:35:41 +0330 Subject: [PATCH 11/15] Update src/prefect/server/models/events.py Co-authored-by: Alex Streed --- src/prefect/server/models/events.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/prefect/server/models/events.py b/src/prefect/server/models/events.py index ce0d7d112fae..bb3c62d22293 100644 --- a/src/prefect/server/models/events.py +++ b/src/prefect/server/models/events.py @@ -479,8 +479,8 @@ async def work_queue_updated_event( }, related=related_work_pool_info, payload={ - "changed_fields": list(changed_fields.keys()), - "changes": changed_fields, + "updated_fields": list(changed_fields.keys()), + "updates": changed_fields, }, id=uuid7(), ) From 040308e93b2b839de3d0a9b7f8b90a4d62df7950 Mon Sep 17 00:00:00 2001 From: F4RAN Date: Wed, 10 Dec 2025 21:35:56 +0330 Subject: [PATCH 12/15] Update src/prefect/server/models/workers.py Co-authored-by: Alex Streed --- src/prefect/server/models/workers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/prefect/server/models/workers.py b/src/prefect/server/models/workers.py index 8be176d08159..10c3c751fd85 100644 --- a/src/prefect/server/models/workers.py +++ b/src/prefect/server/models/workers.py @@ -693,8 +693,8 @@ async def update_work_queue( if old_value != new_value: changed_fields[field] = { - "old": old_value, - "new": new_value, + "from": old_value, + "to": new_value, } if changed_fields: From edd12b1bb01786aa9dc560e1cde38225b2052834 Mon Sep 17 00:00:00 2001 From: F4RAN Date: Wed, 10 Dec 2025 21:36:33 +0330 Subject: [PATCH 13/15] Update src/prefect/server/models/work_queues.py Co-authored-by: Alex Streed --- src/prefect/server/models/work_queues.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/prefect/server/models/work_queues.py b/src/prefect/server/models/work_queues.py index f6c735817cb0..c6f8ba858a91 100644 --- a/src/prefect/server/models/work_queues.py +++ b/src/prefect/server/models/work_queues.py @@ -316,8 +316,8 @@ async def update_work_queue( if old_value != new_value: changed_fields[field] = { - "old": old_value, - "new": new_value, + "from": old_value, + "to": new_value, } # Emit event for non-status field changes From 645b3e8d027de01264e7a7660888b8c2bf1c788c Mon Sep 17 00:00:00 2001 From: F4RAN Date: Wed, 10 Dec 2025 21:36:43 +0330 Subject: [PATCH 14/15] Update src/prefect/server/models/workers.py Co-authored-by: Alex Streed --- src/prefect/server/models/workers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/prefect/server/models/workers.py b/src/prefect/server/models/workers.py index 10c3c751fd85..4b5bcd146ba9 100644 --- a/src/prefect/server/models/workers.py +++ b/src/prefect/server/models/workers.py @@ -282,8 +282,8 @@ async def update_work_pool( # Compare values (handle different types) if old_value != new_value: changed_fields[field] = { - "old": old_value, - "new": new_value, + "from": old_value, + "to": new_value, } # Emit event for non-status field changes From 5209d6210eebbc955e011592220bb1af3330bd8a Mon Sep 17 00:00:00 2001 From: F4RAN Date: Wed, 10 Dec 2025 23:27:58 +0330 Subject: [PATCH 15/15] fix: remove duplicate work queue reads and update tests --- src/prefect/server/models/work_queues.py | 12 ++--- src/prefect/server/models/workers.py | 12 ++--- .../orchestration/api/test_work_queues.py | 52 +++++++++---------- .../server/orchestration/api/test_workers.py | 52 +++++++++---------- 4 files changed, 64 insertions(+), 64 deletions(-) diff --git a/src/prefect/server/models/work_queues.py b/src/prefect/server/models/work_queues.py index c6f8ba858a91..2ee68390b112 100644 --- a/src/prefect/server/models/work_queues.py +++ b/src/prefect/server/models/work_queues.py @@ -256,19 +256,19 @@ async def update_work_queue( session.expunge(current_work_queue) if "is_paused" in update_data: - wq = await read_work_queue(session=session, work_queue_id=work_queue_id) - assert wq is not None - assert current_work_queue is not wq # Only update the status to paused if it's not already paused. This ensures a work queue that is already # paused will not get a status update if it's paused again - if update_data.get("is_paused") and wq.status != WorkQueueStatus.PAUSED: + if ( + update_data.get("is_paused") + and current_work_queue.status != WorkQueueStatus.PAUSED + ): update_data["status"] = WorkQueueStatus.PAUSED # If unpausing, only update status if it's currently paused. This ensures a work queue that is already # unpaused will not get a status update if it's unpaused again if ( update_data.get("is_paused") is False - and wq.status == WorkQueueStatus.PAUSED + and current_work_queue.status == WorkQueueStatus.PAUSED ): # Default status if unpaused update_data["status"] = WorkQueueStatus.NOT_READY @@ -278,7 +278,7 @@ async def update_work_queue( if "last_polled" in update_data: last_polled = cast(DateTime, update_data["last_polled"]) else: - last_polled = wq.last_polled + last_polled = current_work_queue.last_polled # Check if last polled is recent and set status to READY if so if is_last_polled_recent(last_polled): diff --git a/src/prefect/server/models/workers.py b/src/prefect/server/models/workers.py index 4b5bcd146ba9..910017fe55d2 100644 --- a/src/prefect/server/models/workers.py +++ b/src/prefect/server/models/workers.py @@ -631,19 +631,19 @@ async def update_work_queue( session.expunge(current_work_queue) if "is_paused" in update_values: - if (wq := await session.get(db.WorkQueue, work_queue_id)) is None: - return False - # Only update the status to paused if it's not already paused. This ensures a work queue that is already # paused will not get a status update if it's paused again - if update_values.get("is_paused") and wq.status != WorkQueueStatus.PAUSED: + if ( + update_values.get("is_paused") + and current_work_queue.status != WorkQueueStatus.PAUSED + ): update_values["status"] = WorkQueueStatus.PAUSED # If unpausing, only update status if it's currently paused. This ensures a work queue that is already # unpaused will not get a status update if it's unpaused again if ( update_values.get("is_paused") is False - and wq.status == WorkQueueStatus.PAUSED + and current_work_queue.status == WorkQueueStatus.PAUSED ): # Default status if unpaused update_values["status"] = default_status @@ -652,7 +652,7 @@ async def update_work_queue( if "last_polled" in update_values: last_polled = update_values["last_polled"] else: - last_polled = wq.last_polled + last_polled = current_work_queue.last_polled # Check if last polled is recent and set status to READY if so if is_last_polled_recent(last_polled): diff --git a/tests/server/orchestration/api/test_work_queues.py b/tests/server/orchestration/api/test_work_queues.py index 85ef50520512..20d8b905f900 100644 --- a/tests/server/orchestration/api/test_work_queues.py +++ b/tests/server/orchestration/api/test_work_queues.py @@ -328,11 +328,11 @@ async def test_update_work_queue_to_paused_when_already_paused_does_not_emit_eve "prefect.resource.role": "work-queue", }, payload={ - "changed_fields": ["concurrency_limit"], - "changes": { + "updated_fields": ["concurrency_limit"], + "updates": { "concurrency_limit": { - "old": None, - "new": 3, + "from": None, + "to": 3, } }, }, @@ -373,11 +373,11 @@ async def test_update_work_queue_to_unpaused_when_already_unpaused_does_not_emit "prefect.resource.role": "work-queue", }, payload={ - "changed_fields": ["concurrency_limit"], - "changes": { + "updated_fields": ["concurrency_limit"], + "updates": { "concurrency_limit": { - "old": None, - "new": 3, + "from": None, + "to": 3, } }, }, @@ -625,11 +625,11 @@ async def test_update_work_queue_emits_updated_event_for_concurrency_limit( "prefect.resource.role": "work-queue", }, payload={ - "changed_fields": ["concurrency_limit"], - "changes": { + "updated_fields": ["concurrency_limit"], + "updates": { "concurrency_limit": { - "old": None, - "new": 10, + "from": None, + "to": 10, } }, }, @@ -670,11 +670,11 @@ async def test_update_work_queue_emits_updated_event_for_description( "prefect.resource.role": "work-queue", }, payload={ - "changed_fields": ["description"], - "changes": { + "updated_fields": ["description"], + "updates": { "description": { - "old": original_description, - "new": "Updated description", + "from": original_description, + "to": "Updated description", } }, }, @@ -707,11 +707,11 @@ async def test_update_work_queue_emits_updated_event_for_priority( "prefect.resource.role": "work-queue", }, payload={ - "changed_fields": ["priority"], - "changes": { + "updated_fields": ["priority"], + "updates": { "priority": { - "old": original_priority, - "new": 5, + "from": original_priority, + "to": 5, } }, }, @@ -746,15 +746,15 @@ async def test_update_work_queue_emits_updated_event_for_multiple_fields( "prefect.resource.role": "work-queue", }, payload={ - "changed_fields": ["description", "concurrency_limit"], - "changes": { + "updated_fields": ["description", "concurrency_limit"], + "updates": { "description": { - "old": original_description, - "new": "Multi update", + "from": original_description, + "to": "Multi update", }, "concurrency_limit": { - "old": original_concurrency_limit, - "new": 20, + "from": original_concurrency_limit, + "to": 20, }, }, }, diff --git a/tests/server/orchestration/api/test_workers.py b/tests/server/orchestration/api/test_workers.py index 3e3b63a07f8b..26fab1bb5365 100644 --- a/tests/server/orchestration/api/test_workers.py +++ b/tests/server/orchestration/api/test_workers.py @@ -1287,11 +1287,11 @@ async def test_update_work_queue_to_paused_when_already_paused_does_not_emit_eve "prefect.resource.role": "work-queue", }, payload={ - "changed_fields": ["concurrency_limit"], - "changes": { + "updated_fields": ["concurrency_limit"], + "updates": { "concurrency_limit": { - "old": None, - "new": 3, + "from": None, + "to": 3, } }, }, @@ -1333,11 +1333,11 @@ async def test_update_work_queue_to_unpaused_when_already_unpaused_does_not_emit "prefect.resource.role": "work-queue", }, payload={ - "changed_fields": ["concurrency_limit"], - "changes": { + "updated_fields": ["concurrency_limit"], + "updates": { "concurrency_limit": { - "old": None, - "new": 3, + "from": None, + "to": 3, } }, }, @@ -1588,11 +1588,11 @@ async def test_update_work_queue_via_work_pool_emits_updated_event( "prefect.resource.role": "work-queue", }, payload={ - "changed_fields": ["concurrency_limit"], - "changes": { + "updated_fields": ["concurrency_limit"], + "updates": { "concurrency_limit": { - "old": None, - "new": 15, + "from": None, + "to": 15, } }, }, @@ -1630,11 +1630,11 @@ async def test_update_work_pool_emits_updated_event_for_description( "prefect.resource.role": "work-pool", }, payload={ - "changed_fields": ["description"], - "changes": { + "updated_fields": ["description"], + "updates": { "description": { - "old": original_description, - "new": "Updated description", + "from": original_description, + "to": "Updated description", } }, }, @@ -1662,11 +1662,11 @@ async def test_update_work_pool_emits_updated_event_for_concurrency_limit( "prefect.resource.role": "work-pool", }, payload={ - "changed_fields": ["concurrency_limit"], - "changes": { + "updated_fields": ["concurrency_limit"], + "updates": { "concurrency_limit": { - "old": original_concurrency_limit, - "new": 25, + "from": original_concurrency_limit, + "to": 25, } }, }, @@ -1698,15 +1698,15 @@ async def test_update_work_pool_emits_updated_event_for_multiple_fields( "prefect.resource.role": "work-pool", }, payload={ - "changed_fields": ["description", "concurrency_limit"], - "changes": { + "updated_fields": ["description", "concurrency_limit"], + "updates": { "description": { - "old": original_description, - "new": "Multi update", + "from": original_description, + "to": "Multi update", }, "concurrency_limit": { - "old": original_concurrency_limit, - "new": 30, + "from": original_concurrency_limit, + "to": 30, }, }, },