diff --git a/airflow/api_connexion/endpoints/pool_endpoint.py b/airflow/api_connexion/endpoints/pool_endpoint.py index 1f39297c36187..2e62ce0f3d3d3 100644 --- a/airflow/api_connexion/endpoints/pool_endpoint.py +++ b/airflow/api_connexion/endpoints/pool_endpoint.py @@ -66,6 +66,7 @@ def get_pool(*, pool_name: str, session: Session = NEW_SESSION) -> APIResponse: return pool_schema.dump(obj) +@mark_fastapi_migration_done @security.requires_access_pool("GET") @format_parameters({"limit": check_limit}) @provide_session diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 3dc9f35443fea..0f862e496e59e 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -1163,6 +1163,66 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /public/pools/: + get: + tags: + - Pool + summary: Get Pools + description: Get all pools entries. + operationId: get_pools + parameters: + - name: limit + in: query + required: false + schema: + type: integer + default: 100 + title: Limit + - name: offset + in: query + required: false + schema: + type: integer + default: 0 + title: Offset + - name: order_by + in: query + required: false + schema: + type: string + default: id + title: Order By + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/PoolCollectionResponse' + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /public/providers/: get: tags: @@ -1227,7 +1287,7 @@ components: - connections - total_entries title: ConnectionCollectionResponse - description: DAG Collection serializer for responses. + description: Connection Collection serializer for responses. ConnectionResponse: properties: connection_id: @@ -1960,6 +2020,22 @@ components: - task_instance_states title: HistoricalMetricDataResponse description: Historical Metric Data serializer for responses. + PoolCollectionResponse: + properties: + pools: + items: + $ref: '#/components/schemas/PoolResponse' + type: array + title: Pools + total_entries: + type: integer + title: Total Entries + type: object + required: + - pools + - total_entries + title: PoolCollectionResponse + description: Pool Collection serializer for responses. PoolResponse: properties: name: diff --git a/airflow/api_fastapi/core_api/routes/public/pools.py b/airflow/api_fastapi/core_api/routes/public/pools.py index 0d27f842b1bbb..0f5329a1ccb95 100644 --- a/airflow/api_fastapi/core_api/routes/public/pools.py +++ b/airflow/api_fastapi/core_api/routes/public/pools.py @@ -21,10 +21,11 @@ from sqlalchemy.orm import Session from typing_extensions import Annotated -from airflow.api_fastapi.common.db.common import get_session +from airflow.api_fastapi.common.db.common import get_session, paginated_select +from airflow.api_fastapi.common.parameters import QueryLimit, QueryOffset, SortParam from airflow.api_fastapi.common.router import AirflowRouter from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc -from airflow.api_fastapi.core_api.serializers.pools import PoolResponse +from airflow.api_fastapi.core_api.serializers.pools import PoolCollectionResponse, PoolResponse from airflow.models.pool import Pool pools_router = AirflowRouter(tags=["Pool"], prefix="/pools") @@ -63,3 +64,34 @@ async def get_pool( raise HTTPException(404, f"The Pool with name: `{pool_name}` was not found") return PoolResponse.model_validate(pool, from_attributes=True) + + +@pools_router.get( + "/", + responses=create_openapi_http_exception_doc([401, 403, 404]), +) +async def get_pools( + limit: QueryLimit, + offset: QueryOffset, + order_by: Annotated[ + SortParam, + Depends(SortParam(["id", "name"], Pool).dynamic_depends()), + ], + session: Annotated[Session, Depends(get_session)], +) -> PoolCollectionResponse: + """Get all pools entries.""" + pools_select, total_entries = paginated_select( + select(Pool), + [], + order_by=order_by, + offset=offset, + limit=limit, + session=session, + ) + + pools = session.scalars(pools_select).all() + + return PoolCollectionResponse( + pools=[PoolResponse.model_validate(pool, from_attributes=True) for pool in pools], + total_entries=total_entries, + ) diff --git a/airflow/api_fastapi/core_api/serializers/connections.py b/airflow/api_fastapi/core_api/serializers/connections.py index 1c801607299fe..1cc069cac0cb3 100644 --- a/airflow/api_fastapi/core_api/serializers/connections.py +++ b/airflow/api_fastapi/core_api/serializers/connections.py @@ -51,7 +51,7 @@ def redact_extra(cls, v: str | None) -> str | None: class ConnectionCollectionResponse(BaseModel): - """DAG Collection serializer for responses.""" + """Connection Collection serializer for responses.""" connections: list[ConnectionResponse] total_entries: int diff --git a/airflow/api_fastapi/core_api/serializers/pools.py b/airflow/api_fastapi/core_api/serializers/pools.py index e0b03fd8c1c06..4bfa7137f1231 100644 --- a/airflow/api_fastapi/core_api/serializers/pools.py +++ b/airflow/api_fastapi/core_api/serializers/pools.py @@ -34,7 +34,7 @@ def _call_function(function: Callable[[], int]) -> int: class PoolResponse(BaseModel): """Pool serializer for responses.""" - pool: str = Field(serialization_alias="name") + pool: str = Field(serialization_alias="name", validation_alias="pool") slots: int description: str | None include_deferred: bool @@ -45,3 +45,10 @@ class PoolResponse(BaseModel): scheduled_slots: Annotated[int, BeforeValidator(_call_function)] open_slots: Annotated[int, BeforeValidator(_call_function)] deferred_slots: Annotated[int, BeforeValidator(_call_function)] + + +class PoolCollectionResponse(BaseModel): + """Pool Collection serializer for responses.""" + + pools: list[PoolResponse] + total_entries: int diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index ae21f16a8d067..41b1ff86f154a 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -288,6 +288,26 @@ export const UsePoolServiceGetPoolKeyFn = ( }, queryKey?: Array, ) => [usePoolServiceGetPoolKey, ...(queryKey ?? [{ poolName }])]; +export type PoolServiceGetPoolsDefaultResponse = Awaited< + ReturnType +>; +export type PoolServiceGetPoolsQueryResult< + TData = PoolServiceGetPoolsDefaultResponse, + TError = unknown, +> = UseQueryResult; +export const usePoolServiceGetPoolsKey = "PoolServiceGetPools"; +export const UsePoolServiceGetPoolsKeyFn = ( + { + limit, + offset, + orderBy, + }: { + limit?: number; + offset?: number; + orderBy?: string; + } = {}, + queryKey?: Array, +) => [usePoolServiceGetPoolsKey, ...(queryKey ?? [{ limit, offset, orderBy }])]; export type ProviderServiceGetProvidersDefaultResponse = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index d352f290478b1..2e2e9b0e3cf60 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -357,6 +357,32 @@ export const prefetchUsePoolServiceGetPool = ( queryKey: Common.UsePoolServiceGetPoolKeyFn({ poolName }), queryFn: () => PoolService.getPool({ poolName }), }); +/** + * Get Pools + * Get all pools entries. + * @param data The data for the request. + * @param data.limit + * @param data.offset + * @param data.orderBy + * @returns PoolCollectionResponse Successful Response + * @throws ApiError + */ +export const prefetchUsePoolServiceGetPools = ( + queryClient: QueryClient, + { + limit, + offset, + orderBy, + }: { + limit?: number; + offset?: number; + orderBy?: string; + } = {}, +) => + queryClient.prefetchQuery({ + queryKey: Common.UsePoolServiceGetPoolsKeyFn({ limit, offset, orderBy }), + queryFn: () => PoolService.getPools({ limit, offset, orderBy }), + }); /** * Get Providers * Get providers. diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 0cc21b2ee8d8d..5e05eab97352e 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -458,6 +458,41 @@ export const usePoolServiceGetPool = < queryFn: () => PoolService.getPool({ poolName }) as TData, ...options, }); +/** + * Get Pools + * Get all pools entries. + * @param data The data for the request. + * @param data.limit + * @param data.offset + * @param data.orderBy + * @returns PoolCollectionResponse Successful Response + * @throws ApiError + */ +export const usePoolServiceGetPools = < + TData = Common.PoolServiceGetPoolsDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + limit, + offset, + orderBy, + }: { + limit?: number; + offset?: number; + orderBy?: string; + } = {}, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useQuery({ + queryKey: Common.UsePoolServiceGetPoolsKeyFn( + { limit, offset, orderBy }, + queryKey, + ), + queryFn: () => PoolService.getPools({ limit, offset, orderBy }) as TData, + ...options, + }); /** * Get Providers * Get providers. diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index c4f05bf5d857d..8f032bacd4cbd 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -453,6 +453,41 @@ export const usePoolServiceGetPoolSuspense = < queryFn: () => PoolService.getPool({ poolName }) as TData, ...options, }); +/** + * Get Pools + * Get all pools entries. + * @param data The data for the request. + * @param data.limit + * @param data.offset + * @param data.orderBy + * @returns PoolCollectionResponse Successful Response + * @throws ApiError + */ +export const usePoolServiceGetPoolsSuspense = < + TData = Common.PoolServiceGetPoolsDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + { + limit, + offset, + orderBy, + }: { + limit?: number; + offset?: number; + orderBy?: string; + } = {}, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useSuspenseQuery({ + queryKey: Common.UsePoolServiceGetPoolsKeyFn( + { limit, offset, orderBy }, + queryKey, + ), + queryFn: () => PoolService.getPools({ limit, offset, orderBy }) as TData, + ...options, + }); /** * Get Providers * Get providers. diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index e8607e24cdd74..1556ae254192e 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -37,7 +37,7 @@ export const $ConnectionCollectionResponse = { type: "object", required: ["connections", "total_entries"], title: "ConnectionCollectionResponse", - description: "DAG Collection serializer for responses.", + description: "Connection Collection serializer for responses.", } as const; export const $ConnectionResponse = { @@ -1189,6 +1189,26 @@ export const $HistoricalMetricDataResponse = { description: "Historical Metric Data serializer for responses.", } as const; +export const $PoolCollectionResponse = { + properties: { + pools: { + items: { + $ref: "#/components/schemas/PoolResponse", + }, + type: "array", + title: "Pools", + }, + total_entries: { + type: "integer", + title: "Total Entries", + }, + }, + type: "object", + required: ["pools", "total_entries"], + title: "PoolCollectionResponse", + description: "Pool Collection serializer for responses.", +} as const; + export const $PoolResponse = { properties: { name: { diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index c77e039bb8060..45c5c98526106 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -46,6 +46,8 @@ import type { DeletePoolResponse, GetPoolData, GetPoolResponse, + GetPoolsData, + GetPoolsResponse, GetProvidersData, GetProvidersResponse, } from "./types.gen"; @@ -683,6 +685,36 @@ export class PoolService { }, }); } + + /** + * Get Pools + * Get all pools entries. + * @param data The data for the request. + * @param data.limit + * @param data.offset + * @param data.orderBy + * @returns PoolCollectionResponse Successful Response + * @throws ApiError + */ + public static getPools( + data: GetPoolsData = {}, + ): CancelablePromise { + return __request(OpenAPI, { + method: "GET", + url: "/public/pools/", + query: { + limit: data.limit, + offset: data.offset, + order_by: data.orderBy, + }, + errors: { + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", + 422: "Validation Error", + }, + }); + } } export class ProviderService { diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 41b6f9235072f..cf70c15ed6158 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -8,7 +8,7 @@ export type BaseInfoSchema = { }; /** - * DAG Collection serializer for responses. + * Connection Collection serializer for responses. */ export type ConnectionCollectionResponse = { connections: Array; @@ -266,6 +266,14 @@ export type HistoricalMetricDataResponse = { task_instance_states: TaskInstanceState; }; +/** + * Pool Collection serializer for responses. + */ +export type PoolCollectionResponse = { + pools: Array; + total_entries: number; +}; + /** * Pool serializer for responses. */ @@ -528,6 +536,14 @@ export type GetPoolData = { export type GetPoolResponse = PoolResponse; +export type GetPoolsData = { + limit?: number; + offset?: number; + orderBy?: string; +}; + +export type GetPoolsResponse = PoolCollectionResponse; + export type GetProvidersData = { limit?: number; offset?: number; @@ -1085,6 +1101,33 @@ export type $OpenApiTs = { }; }; }; + "/public/pools/": { + get: { + req: GetPoolsData; + res: { + /** + * Successful Response + */ + 200: PoolCollectionResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; "/public/providers/": { get: { req: GetProvidersData; diff --git a/tests/api_fastapi/core_api/routes/public/test_connections.py b/tests/api_fastapi/core_api/routes/public/test_connections.py index 8c2e3b6d58f7f..ee9c80219eb18 100644 --- a/tests/api_fastapi/core_api/routes/public/test_connections.py +++ b/tests/api_fastapi/core_api/routes/public/test_connections.py @@ -168,4 +168,4 @@ def test_should_respond_200( body = response.json() assert body["total_entries"] == expected_total_entries - assert [dag["connection_id"] for dag in body["connections"]] == expected_ids + assert [connection["connection_id"] for connection in body["connections"]] == expected_ids diff --git a/tests/api_fastapi/core_api/routes/public/test_pools.py b/tests/api_fastapi/core_api/routes/public/test_pools.py index d75ad6e417e21..e97f85b95dcd4 100644 --- a/tests/api_fastapi/core_api/routes/public/test_pools.py +++ b/tests/api_fastapi/core_api/routes/public/test_pools.py @@ -43,7 +43,7 @@ def _create_pools(session) -> None: session.add_all([pool1, pool2]) -class TestPools: +class TestPoolsEndpoint: @pytest.fixture(autouse=True) def setup(self) -> None: clear_db_pools() @@ -55,7 +55,7 @@ def create_pools(self): _create_pools() -class TestDeletePool(TestPools): +class TestDeletePool(TestPoolsEndpoint): def test_delete_should_respond_204(self, test_client, session): self.create_pools() pools = session.query(Pool).all() @@ -78,7 +78,7 @@ def test_delete_should_respond_404(self, test_client): assert f"The Pool with name: `{POOL1_NAME}` was not found" == body["detail"] -class TestGetPool(TestPools): +class TestGetPool(TestPoolsEndpoint): def test_get_should_respond_200(self, test_client, session): self.create_pools() response = test_client.get(f"/public/pools/{POOL1_NAME}") @@ -101,3 +101,28 @@ def test_get_should_respond_404(self, test_client): assert response.status_code == 404 body = response.json() assert f"The Pool with name: `{POOL1_NAME}` was not found" == body["detail"] + + +class TestGetPools(TestPoolsEndpoint): + @pytest.mark.parametrize( + "query_params, expected_total_entries, expected_ids", + [ + # Filters + ({}, 3, [Pool.DEFAULT_POOL_NAME, POOL1_NAME, POOL2_NAME]), + ({"limit": 1}, 3, [Pool.DEFAULT_POOL_NAME]), + ({"limit": 1, "offset": 1}, 3, [POOL1_NAME]), + # Sort + ({"order_by": "-id"}, 3, [POOL2_NAME, POOL1_NAME, Pool.DEFAULT_POOL_NAME]), + ({"order_by": "id"}, 3, [Pool.DEFAULT_POOL_NAME, POOL1_NAME, POOL2_NAME]), + ], + ) + def test_should_respond_200( + self, test_client, session, query_params, expected_total_entries, expected_ids + ): + self.create_pools() + response = test_client.get("/public/pools/", params=query_params) + assert response.status_code == 200 + + body = response.json() + assert body["total_entries"] == expected_total_entries + assert [pool["name"] for pool in body["pools"]] == expected_ids