diff --git a/src/prefect/server/models/events.py b/src/prefect/server/models/events.py index c524f1705be7..bb3c62d22293 100644 --- a/src/prefect/server/models/events.py +++ b/src/prefect/server/models/events.py @@ -419,6 +419,73 @@ async def work_pool_status_event( ) +async def work_pool_updated_event( + session: AsyncSession, + work_pool: "ORMWorkPool", + changed_fields: Dict[ + str, Dict[str, Any] + ], # {"field_name": {"old": value, "new": value}} + occurred: DateTime, +) -> Event: + """Create an event for work pool field updates (non-status).""" + return Event( + occurred=occurred, + event="prefect.work-pool.updated", + resource={ + "prefect.resource.id": f"prefect.work-pool.{work_pool.id}", + "prefect.resource.name": work_pool.name, + "prefect.work-pool.type": work_pool.type, + "prefect.resource.role": "work-pool", + }, + payload={ + "updated_fields": list(changed_fields.keys()), + "updates": changed_fields, + }, + id=uuid7(), + ) + + +async def work_queue_updated_event( + session: AsyncSession, + work_queue: "ORMWorkQueue", + changed_fields: Dict[str, Dict[str, Any]], + occurred: DateTime, +) -> Event: + """Create an event for work queue field updates (non-status).""" + related_work_pool_info: List[Dict[str, Any]] = [] + + if work_queue.work_pool_id: + work_pool = await models.workers.read_work_pool( + session=session, + work_pool_id=work_queue.work_pool_id, + ) + if work_pool and work_pool.id and work_pool.name: + related_work_pool_info.append( + { + "prefect.resource.id": f"prefect.work-pool.{work_pool.id}", + "prefect.resource.name": work_pool.name, + "prefect.work-pool.type": work_pool.type, + "prefect.resource.role": "work-pool", + } + ) + + return Event( + occurred=occurred, + event="prefect.work-queue.updated", + resource={ + "prefect.resource.id": f"prefect.work-queue.{work_queue.id}", + "prefect.resource.name": work_queue.name, + "prefect.resource.role": "work-queue", + }, + related=related_work_pool_info, + payload={ + "updated_fields": list(changed_fields.keys()), + "updates": changed_fields, + }, + id=uuid7(), + ) + + def _get_recent_preceding_work_pool_event_id( work_pool: Optional["ORMWorkPool"], ) -> Optional[UUID]: diff --git a/src/prefect/server/models/work_queues.py b/src/prefect/server/models/work_queues.py index 1eebb5cb09b5..2ee68390b112 100644 --- a/src/prefect/server/models/work_queues.py +++ b/src/prefect/server/models/work_queues.py @@ -5,8 +5,10 @@ import datetime from typing import ( + Any, Awaitable, Callable, + Dict, Iterable, Optional, Sequence, @@ -32,7 +34,10 @@ ) from prefect.server.events.clients import PrefectServerEventsClient from prefect.server.exceptions import ObjectNotFoundError -from prefect.server.models.events import work_queue_status_event +from prefect.server.models.events import ( + work_queue_status_event, + work_queue_updated_event, +) from prefect.server.models.workers import ( DEFAULT_AGENT_WORK_POOL_NAME, bulk_update_work_queue_priorities, @@ -242,22 +247,28 @@ async def update_work_queue( # exclude_unset=True allows us to only update values provided by # the user, ignoring any defaults on the model update_data = work_queue.model_dump_for_orm(exclude_unset=True) + current_work_queue = await read_work_queue( + session=session, work_queue_id=work_queue_id + ) + if current_work_queue is None: + return False - if "is_paused" in update_data: - wq = await read_work_queue(session=session, work_queue_id=work_queue_id) - if wq is None: - return False + session.expunge(current_work_queue) + if "is_paused" in update_data: # Only update the status to paused if it's not already paused. This ensures a work queue that is already # paused will not get a status update if it's paused again - if update_data.get("is_paused") and wq.status != WorkQueueStatus.PAUSED: + if ( + update_data.get("is_paused") + and current_work_queue.status != WorkQueueStatus.PAUSED + ): update_data["status"] = WorkQueueStatus.PAUSED # If unpausing, only update status if it's currently paused. This ensures a work queue that is already # unpaused will not get a status update if it's unpaused again if ( update_data.get("is_paused") is False - and wq.status == WorkQueueStatus.PAUSED + and current_work_queue.status == WorkQueueStatus.PAUSED ): # Default status if unpaused update_data["status"] = WorkQueueStatus.NOT_READY @@ -267,7 +278,7 @@ async def update_work_queue( if "last_polled" in update_data: last_polled = cast(DateTime, update_data["last_polled"]) else: - last_polled = wq.last_polled + last_polled = current_work_queue.last_polled # Check if last polled is recent and set status to READY if so if is_last_polled_recent(last_polled): @@ -282,9 +293,42 @@ async def update_work_queue( updated = result.rowcount > 0 if updated: + wq = await read_work_queue(session=session, work_queue_id=work_queue_id) + assert wq is not None + assert current_work_queue is not wq + WORK_QUEUE_EVENT_FIELDS = { + "name", + "description", + "concurrency_limit", + "priority", + # Exclude "is_paused" - handled with status + # Exclude "last_polled" - usually auto-updated + # Exclude "filter" - deprecated + } + # Detect which fields actually changed + changed_fields = {} + for field in update_data.keys(): + if field not in WORK_QUEUE_EVENT_FIELDS or field == "status": + continue + + old_value = getattr(current_work_queue, field, None) + new_value = getattr(wq, field, None) + + if old_value != new_value: + changed_fields[field] = { + "from": old_value, + "to": new_value, + } + + # Emit event for non-status field changes + if changed_fields: + await emit_work_queue_updated_event( + session=session, + work_queue=wq, + changed_fields=changed_fields, + ) + if "status" in update_data and emit_status_change: - wq = await read_work_queue(session=session, work_queue_id=work_queue_id) - assert wq await emit_status_change(wq) return updated @@ -621,12 +665,31 @@ async def emit_work_queue_status_event( db: PrefectDBInterface, work_queue: orm_models.WorkQueue, ) -> None: + """Emit an event when work queue fields are updated.""" async with db.session_context() as session: event = await work_queue_status_event( session=session, work_queue=work_queue, occurred=now("UTC"), ) - async with PrefectServerEventsClient() as events_client: await events_client.emit(event) + + +async def emit_work_queue_updated_event( + session: AsyncSession, + work_queue: orm_models.WorkQueue, + changed_fields: Dict[str, Dict[str, Any]], +) -> None: + if not changed_fields: + return + + async with PrefectServerEventsClient() as events_client: + await events_client.emit( + await work_queue_updated_event( + session=session, + work_queue=work_queue, + changed_fields=changed_fields, + occurred=now("UTC"), + ) + ) diff --git a/src/prefect/server/models/workers.py b/src/prefect/server/models/workers.py index 18f952d143b0..910017fe55d2 100644 --- a/src/prefect/server/models/workers.py +++ b/src/prefect/server/models/workers.py @@ -5,6 +5,7 @@ import datetime from typing import ( + Any, Awaitable, Callable, Dict, @@ -24,7 +25,10 @@ from prefect.server.database import PrefectDBInterface, db_injector, orm_models from prefect.server.events.clients import PrefectServerEventsClient from prefect.server.exceptions import ObjectNotFoundError -from prefect.server.models.events import work_pool_status_event +from prefect.server.models.events import ( + work_pool_status_event, + work_pool_updated_event, # Add this +) from prefect.server.schemas.statuses import WorkQueueStatus from prefect.server.utilities.database import UUID as PrefectUUID from prefect.types._datetime import DateTime, now @@ -257,6 +261,39 @@ async def update_work_pool( assert wp is not None assert current_work_pool is not wp + # Detect which fields actually changed (excluding status and internal fields) + # Fields that should trigger update events (user-updatable fields) + WORK_POOL_EVENT_FIELDS = { + # "is_paused", # Handled with status + "description", + "base_job_template", + "concurrency_limit", + "storage_configuration", + } + + changed_fields = {} + for field in update_data.keys(): + if field not in WORK_POOL_EVENT_FIELDS or field == "status": + continue + + old_value = getattr(current_work_pool, field, None) + new_value = getattr(wp, field, None) + + # Compare values (handle different types) + if old_value != new_value: + changed_fields[field] = { + "from": old_value, + "to": new_value, + } + + # Emit event for non-status field changes + if changed_fields: + await emit_work_pool_updated_event( + session=session, + work_pool=wp, + changed_fields=changed_fields, + ) + if "status" in update_data and emit_status_change: await emit_status_change( event_id=update_data["last_status_event_id"], # type: ignore @@ -588,20 +625,25 @@ async def update_work_queue( update_values = work_queue.model_dump_for_orm(exclude_unset=True) - if "is_paused" in update_values: - if (wq := await session.get(db.WorkQueue, work_queue_id)) is None: - return False + current_work_queue = await session.get(db.WorkQueue, work_queue_id) + if current_work_queue is None: + return False + session.expunge(current_work_queue) + if "is_paused" in update_values: # Only update the status to paused if it's not already paused. This ensures a work queue that is already # paused will not get a status update if it's paused again - if update_values.get("is_paused") and wq.status != WorkQueueStatus.PAUSED: + if ( + update_values.get("is_paused") + and current_work_queue.status != WorkQueueStatus.PAUSED + ): update_values["status"] = WorkQueueStatus.PAUSED # If unpausing, only update status if it's currently paused. This ensures a work queue that is already # unpaused will not get a status update if it's unpaused again if ( update_values.get("is_paused") is False - and wq.status == WorkQueueStatus.PAUSED + and current_work_queue.status == WorkQueueStatus.PAUSED ): # Default status if unpaused update_values["status"] = default_status @@ -610,7 +652,7 @@ async def update_work_queue( if "last_polled" in update_values: last_polled = update_values["last_polled"] else: - last_polled = wq.last_polled + last_polled = current_work_queue.last_polled # Check if last polled is recent and set status to READY if so if is_last_polled_recent(last_polled): @@ -626,10 +668,45 @@ async def update_work_queue( updated = result.rowcount > 0 if updated: - if "priority" in update_values or "status" in update_values: - updated_work_queue = await session.get(db.WorkQueue, work_queue_id) - assert updated_work_queue + updated_work_queue = await session.get(db.WorkQueue, work_queue_id) + assert updated_work_queue is not None + assert current_work_queue is not updated_work_queue + + # Fields that should trigger update events (user-updatable fields) + WORK_QUEUE_EVENT_FIELDS = { + "name", + "description", + "concurrency_limit", + "priority", + # Exclude "is_paused" - handled with status + # Exclude "last_polled" - usually auto-updated + # Exclude "filter" - deprecated + } + + changed_fields = {} + for field in update_values.keys(): + if field not in WORK_QUEUE_EVENT_FIELDS or field == "status": + continue + + old_value = getattr(current_work_queue, field, None) + new_value = getattr(updated_work_queue, field, None) + + if old_value != new_value: + changed_fields[field] = { + "from": old_value, + "to": new_value, + } + + if changed_fields: + from prefect.server.models.work_queues import emit_work_queue_updated_event + + await emit_work_queue_updated_event( + session=session, + work_queue=updated_work_queue, + changed_fields=changed_fields, + ) + if "priority" in update_values or "status" in update_values: if "priority" in update_values: await bulk_update_work_queue_priorities( session, @@ -799,6 +876,26 @@ async def delete_worker( return result.rowcount > 0 +async def emit_work_pool_updated_event( + session: AsyncSession, + work_pool: orm_models.WorkPool, + changed_fields: Dict[str, Dict[str, Any]], +) -> None: + """Emit an event when work pool fields are updated.""" + if not changed_fields: + return + + async with PrefectServerEventsClient() as events_client: + await events_client.emit( + await work_pool_updated_event( + session=session, + work_pool=work_pool, + changed_fields=changed_fields, + occurred=now("UTC"), + ) + ) + + async def emit_work_pool_status_event( event_id: UUID, occurred: DateTime, diff --git a/tests/server/orchestration/api/test_work_queues.py b/tests/server/orchestration/api/test_work_queues.py index 41d56dc19162..20d8b905f900 100644 --- a/tests/server/orchestration/api/test_work_queues.py +++ b/tests/server/orchestration/api/test_work_queues.py @@ -204,7 +204,12 @@ async def test_update_work_queue( events = [ event for client in AssertingEventsClient.all for event in client.events ] - assert len(events) == 1 + # Expect both status event and updated event for concurrency_limit + assert len(events) == 2 + # Verify we have both a status event and an updated event + event_types = {event.event for event in events} + assert "prefect.work-queue.paused" in event_types + assert "prefect.work-queue.updated" in event_types async def test_update_work_queue_to_paused( self, @@ -312,8 +317,26 @@ async def test_update_work_queue_to_paused_when_already_paused_does_not_emit_eve assert work_queue_response.status_code == 200 assert work_queue_response.json()["status"] == "PAUSED" - # ensure no events emitted for already paused work queue - AssertingEventsClient.assert_emitted_event_count(0) + # Since concurrency_limit changed, we should get an updated event + # (status didn't change, so no status event) + AssertingEventsClient.assert_emitted_event_count(1) + AssertingEventsClient.assert_emitted_event_with( + event="prefect.work-queue.updated", + resource={ + "prefect.resource.id": f"prefect.work-queue.{paused_work_queue.id}", + "prefect.resource.name": paused_work_queue.name, + "prefect.resource.role": "work-queue", + }, + payload={ + "updated_fields": ["concurrency_limit"], + "updates": { + "concurrency_limit": { + "from": None, + "to": 3, + } + }, + }, + ) async def test_update_work_queue_to_unpaused_when_already_unpaused_does_not_emit_event( self, @@ -339,8 +362,26 @@ async def test_update_work_queue_to_unpaused_when_already_unpaused_does_not_emit assert work_queue_response.status_code == 200 assert work_queue_response.json()["status"] == "READY" - # ensure no events emitted for already unpaused work queue - AssertingEventsClient.assert_emitted_event_count(0) + # Since concurrency_limit changed, we should get an updated event + # (status didn't change, so no status event) + AssertingEventsClient.assert_emitted_event_count(1) + AssertingEventsClient.assert_emitted_event_with( + event="prefect.work-queue.updated", + resource={ + "prefect.resource.id": f"prefect.work-queue.{ready_work_queue.id}", + "prefect.resource.name": ready_work_queue.name, + "prefect.resource.role": "work-queue", + }, + payload={ + "updated_fields": ["concurrency_limit"], + "updates": { + "concurrency_limit": { + "from": None, + "to": 3, + } + }, + }, + ) async def test_update_work_queue_to_unpaused_with_no_last_polled_sets_not_ready_status( self, @@ -557,6 +598,198 @@ async def test_update_work_queue_to_unpaused_with_recent_last_polled_sets_ready_ ], ) + async def test_update_work_queue_emits_updated_event_for_concurrency_limit( + self, + client, + work_queue, + ): + """Test that updating concurrency_limit emits prefect.work-queue.updated event.""" + assert work_queue.concurrency_limit is None + + new_data = schemas.actions.WorkQueueUpdate(concurrency_limit=10).model_dump( + mode="json", exclude_unset=True + ) + + response = await client.patch( + f"/work_queues/{work_queue.id}", + json=new_data, + ) + + assert response.status_code == 204 + + AssertingEventsClient.assert_emitted_event_with( + event="prefect.work-queue.updated", + resource={ + "prefect.resource.id": f"prefect.work-queue.{work_queue.id}", + "prefect.resource.name": work_queue.name, + "prefect.resource.role": "work-queue", + }, + payload={ + "updated_fields": ["concurrency_limit"], + "updates": { + "concurrency_limit": { + "from": None, + "to": 10, + } + }, + }, + related=[ + { + "prefect.resource.id": f"prefect.work-pool.{work_queue.work_pool.id}", + "prefect.resource.name": work_queue.work_pool.name, + "prefect.work-pool.type": work_queue.work_pool.type, + "prefect.resource.role": "work-pool", + } + ], + ) + + async def test_update_work_queue_emits_updated_event_for_description( + self, + client, + work_queue, + ): + """Test that updating description emits prefect.work-queue.updated event.""" + original_description = work_queue.description or "" + + new_data = schemas.actions.WorkQueueUpdate( + description="Updated description" + ).model_dump(mode="json", exclude_unset=True) + + response = await client.patch( + f"/work_queues/{work_queue.id}", + json=new_data, + ) + + assert response.status_code == 204 + + AssertingEventsClient.assert_emitted_event_with( + event="prefect.work-queue.updated", + resource={ + "prefect.resource.id": f"prefect.work-queue.{work_queue.id}", + "prefect.resource.name": work_queue.name, + "prefect.resource.role": "work-queue", + }, + payload={ + "updated_fields": ["description"], + "updates": { + "description": { + "from": original_description, + "to": "Updated description", + } + }, + }, + ) + + async def test_update_work_queue_emits_updated_event_for_priority( + self, + client, + work_queue, + ): + """Test that updating priority emits prefect.work-queue.updated event.""" + original_priority = work_queue.priority + + new_data = schemas.actions.WorkQueueUpdate(priority=5).model_dump( + mode="json", exclude_unset=True + ) + + response = await client.patch( + f"/work_queues/{work_queue.id}", + json=new_data, + ) + + assert response.status_code == 204 + + AssertingEventsClient.assert_emitted_event_with( + event="prefect.work-queue.updated", + resource={ + "prefect.resource.id": f"prefect.work-queue.{work_queue.id}", + "prefect.resource.name": work_queue.name, + "prefect.resource.role": "work-queue", + }, + payload={ + "updated_fields": ["priority"], + "updates": { + "priority": { + "from": original_priority, + "to": 5, + } + }, + }, + ) + + async def test_update_work_queue_emits_updated_event_for_multiple_fields( + self, + client, + work_queue, + ): + """Test that updating multiple fields emits single event with all changes.""" + original_description = work_queue.description or "" + original_concurrency_limit = work_queue.concurrency_limit + + new_data = schemas.actions.WorkQueueUpdate( + description="Multi update", + concurrency_limit=20, + ).model_dump(mode="json", exclude_unset=True) + + response = await client.patch( + f"/work_queues/{work_queue.id}", + json=new_data, + ) + + assert response.status_code == 204 + + AssertingEventsClient.assert_emitted_event_with( + event="prefect.work-queue.updated", + resource={ + "prefect.resource.id": f"prefect.work-queue.{work_queue.id}", + "prefect.resource.name": work_queue.name, + "prefect.resource.role": "work-queue", + }, + payload={ + "updated_fields": ["description", "concurrency_limit"], + "updates": { + "description": { + "from": original_description, + "to": "Multi update", + }, + "concurrency_limit": { + "from": original_concurrency_limit, + "to": 20, + }, + }, + }, + ) + + async def test_update_work_queue_no_event_for_noop_update( + self, + client, + work_queue, + ): + """Test that no event is emitted when updating to the same value.""" + # Set initial concurrency limit + await client.patch( + f"/work_queues/{work_queue.id}", + json={"concurrency_limit": 10}, + ) + AssertingEventsClient.reset() + + # Update to same value + response = await client.patch( + f"/work_queues/{work_queue.id}", + json={"concurrency_limit": 10}, + ) + + assert response.status_code == 204 + + # Should not emit updated event (only status event if status changed) + events = [ + event + for client in AssertingEventsClient.all + for event in client.events + if event.event == "prefect.work-queue.updated" + ] + assert len(events) == 0 + class TestReadWorkQueue: async def test_read_work_queue(self, client, work_queue): diff --git a/tests/server/orchestration/api/test_workers.py b/tests/server/orchestration/api/test_workers.py index 93814c8c4e9b..26fab1bb5365 100644 --- a/tests/server/orchestration/api/test_workers.py +++ b/tests/server/orchestration/api/test_workers.py @@ -452,7 +452,14 @@ async def test_update_work_pool(self, client, session, work_pool): assert result.is_paused is True assert result.concurrency_limit == 5 - assert_status_events(work_pool.name, ["prefect.work-pool.paused"]) + # Expect both status event and updated event for concurrency_limit + found_events = [ + event for item in AssertingEventsClient.all for event in item.events + ] + assert len(found_events) == 2 + event_types = {event.event for event in found_events} + assert "prefect.work-pool.paused" in event_types + assert "prefect.work-pool.updated" in event_types async def test_update_work_pool_storage_configuration(self, client, work_pool): bundle_upload_step = { @@ -1194,7 +1201,14 @@ async def test_update_work_queue_to_paused( assert response.json()["concurrency_limit"] == 3 assert response.json()["status"] == "PAUSED" - assert_status_events(work_queue_1.name, ["prefect.work-queue.paused"]) + # Expect both status event and updated event for concurrency_limit + found_events = [ + event for item in AssertingEventsClient.all for event in item.events + ] + assert len(found_events) == 2 + event_types = {event.event for event in found_events} + assert "prefect.work-queue.paused" in event_types + assert "prefect.work-queue.updated" in event_types async def test_update_work_queue_to_paused_sets_paused_status( self, @@ -1228,7 +1242,14 @@ async def test_update_work_queue_to_paused_sets_paused_status( assert work_queue_response.status_code == 200 assert work_queue_response.json()["status"] == "PAUSED" - assert_status_events(work_queue_1.name, ["prefect.work-queue.paused"]) + # Expect both status event and updated event for concurrency_limit + found_events = [ + event for item in AssertingEventsClient.all for event in item.events + ] + assert len(found_events) == 2 + event_types = {event.event for event in found_events} + assert "prefect.work-queue.paused" in event_types + assert "prefect.work-queue.updated" in event_types async def test_update_work_queue_to_paused_when_already_paused_does_not_emit_event( self, @@ -1255,8 +1276,26 @@ async def test_update_work_queue_to_paused_when_already_paused_does_not_emit_eve assert work_queue_response.status_code == 200 assert work_queue_response.json()["status"] == "PAUSED" - # ensure no events emitted for already paused work queue - AssertingEventsClient.assert_emitted_event_count(0) + # Since concurrency_limit changed, we should get an updated event + # (status didn't change, so no status event) + AssertingEventsClient.assert_emitted_event_count(1) + AssertingEventsClient.assert_emitted_event_with( + event="prefect.work-queue.updated", + resource={ + "prefect.resource.id": f"prefect.work-queue.{paused_work_queue.id}", + "prefect.resource.name": paused_work_queue.name, + "prefect.resource.role": "work-queue", + }, + payload={ + "updated_fields": ["concurrency_limit"], + "updates": { + "concurrency_limit": { + "from": None, + "to": 3, + } + }, + }, + ) async def test_update_work_queue_to_unpaused_when_already_unpaused_does_not_emit_event( self, @@ -1283,8 +1322,26 @@ async def test_update_work_queue_to_unpaused_when_already_unpaused_does_not_emit assert work_queue_response.status_code == 200 assert work_queue_response.json()["status"] == "READY" - # ensure no events emitted for already unpaused work queue - AssertingEventsClient.assert_emitted_event_count(0) + # Since concurrency_limit changed, we should get an updated event + # (status didn't change, so no status event) + AssertingEventsClient.assert_emitted_event_count(1) + AssertingEventsClient.assert_emitted_event_with( + event="prefect.work-queue.updated", + resource={ + "prefect.resource.id": f"prefect.work-queue.{ready_work_queue.id}", + "prefect.resource.name": ready_work_queue.name, + "prefect.resource.role": "work-queue", + }, + payload={ + "updated_fields": ["concurrency_limit"], + "updates": { + "concurrency_limit": { + "from": None, + "to": 3, + } + }, + }, + ) async def test_update_work_queue_to_unpaused_with_no_last_polled_sets_not_ready_status( self, @@ -1504,6 +1561,157 @@ async def test_update_work_queue_to_unpaused_with_recent_last_polled_sets_ready_ ], ) + async def test_update_work_queue_via_work_pool_emits_updated_event( + self, client, work_pool + ): + """Test that updating work queue via work pool endpoint emits updated event.""" + # Create work pool queue + create_response = await client.post( + f"/work_pools/{work_pool.name}/queues", + json=dict(name="test-queue"), + ) + assert create_response.status_code == status.HTTP_201_CREATED + create_result = parse_obj_as(WorkQueue, create_response.json()) + + # Update concurrency limit + update_response = await client.patch( + f"/work_pools/{work_pool.name}/queues/test-queue", + json=dict(concurrency_limit=15), + ) + assert update_response.status_code == status.HTTP_204_NO_CONTENT + + AssertingEventsClient.assert_emitted_event_with( + event="prefect.work-queue.updated", + resource={ + "prefect.resource.id": f"prefect.work-queue.{create_result.id}", + "prefect.resource.name": "test-queue", + "prefect.resource.role": "work-queue", + }, + payload={ + "updated_fields": ["concurrency_limit"], + "updates": { + "concurrency_limit": { + "from": None, + "to": 15, + } + }, + }, + related=[ + { + "prefect.resource.id": f"prefect.work-pool.{work_pool.id}", + "prefect.resource.name": work_pool.name, + "prefect.work-pool.type": work_pool.type, + "prefect.resource.role": "work-pool", + } + ], + ) + + +class TestUpdateWorkPoolEvents: + async def test_update_work_pool_emits_updated_event_for_description( + self, client, work_pool + ): + """Test that updating work pool description emits prefect.work-pool.updated event.""" + original_description = work_pool.description + + response = await client.patch( + f"/work_pools/{work_pool.name}", + json={"description": "Updated description"}, + ) + + assert response.status_code == status.HTTP_204_NO_CONTENT + + AssertingEventsClient.assert_emitted_event_with( + event="prefect.work-pool.updated", + resource={ + "prefect.resource.id": f"prefect.work-pool.{work_pool.id}", + "prefect.resource.name": work_pool.name, + "prefect.work-pool.type": work_pool.type, + "prefect.resource.role": "work-pool", + }, + payload={ + "updated_fields": ["description"], + "updates": { + "description": { + "from": original_description, + "to": "Updated description", + } + }, + }, + ) + + async def test_update_work_pool_emits_updated_event_for_concurrency_limit( + self, client, work_pool + ): + """Test that updating work pool concurrency_limit emits prefect.work-pool.updated event.""" + original_concurrency_limit = work_pool.concurrency_limit + + response = await client.patch( + f"/work_pools/{work_pool.name}", + json={"concurrency_limit": 25}, + ) + + assert response.status_code == status.HTTP_204_NO_CONTENT + + AssertingEventsClient.assert_emitted_event_with( + event="prefect.work-pool.updated", + resource={ + "prefect.resource.id": f"prefect.work-pool.{work_pool.id}", + "prefect.resource.name": work_pool.name, + "prefect.work-pool.type": work_pool.type, + "prefect.resource.role": "work-pool", + }, + payload={ + "updated_fields": ["concurrency_limit"], + "updates": { + "concurrency_limit": { + "from": original_concurrency_limit, + "to": 25, + } + }, + }, + ) + + async def test_update_work_pool_emits_updated_event_for_multiple_fields( + self, client, work_pool + ): + """Test that updating multiple work pool fields emits single event with all changes.""" + original_description = work_pool.description + original_concurrency_limit = work_pool.concurrency_limit + + response = await client.patch( + f"/work_pools/{work_pool.name}", + json={ + "description": "Multi update", + "concurrency_limit": 30, + }, + ) + + assert response.status_code == status.HTTP_204_NO_CONTENT + + AssertingEventsClient.assert_emitted_event_with( + event="prefect.work-pool.updated", + resource={ + "prefect.resource.id": f"prefect.work-pool.{work_pool.id}", + "prefect.resource.name": work_pool.name, + "prefect.work-pool.type": work_pool.type, + "prefect.resource.role": "work-pool", + }, + payload={ + "updated_fields": ["description", "concurrency_limit"], + "updates": { + "description": { + "from": original_description, + "to": "Multi update", + }, + "concurrency_limit": { + "from": original_concurrency_limit, + "to": 30, + }, + }, + }, + ) + class TestWorkPoolStatus: async def test_work_pool_status_with_online_worker(self, client, work_pool):