Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/pool_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def get_pool(*, pool_name: str, session: Session = NEW_SESSION) -> APIResponse:
return pool_schema.dump(obj)


@mark_fastapi_migration_done
@security.requires_access_pool("GET")
@format_parameters({"limit": check_limit})
@provide_session
Expand Down
78 changes: 77 additions & 1 deletion airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1163,6 +1163,66 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/pools/:
get:
tags:
- Pool
summary: Get Pools
description: Get all pools entries.
operationId: get_pools
parameters:
- name: limit
in: query
required: false
schema:
type: integer
default: 100
title: Limit
- name: offset
in: query
required: false
schema:
type: integer
default: 0
title: Offset
- name: order_by
in: query
required: false
schema:
type: string
default: id
title: Order By
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/PoolCollectionResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/providers/:
get:
tags:
Expand Down Expand Up @@ -1227,7 +1287,7 @@ components:
- connections
- total_entries
title: ConnectionCollectionResponse
description: DAG Collection serializer for responses.
description: Connection Collection serializer for responses.
ConnectionResponse:
properties:
connection_id:
Expand Down Expand Up @@ -1960,6 +2020,22 @@ components:
- task_instance_states
title: HistoricalMetricDataResponse
description: Historical Metric Data serializer for responses.
PoolCollectionResponse:
properties:
pools:
items:
$ref: '#/components/schemas/PoolResponse'
type: array
title: Pools
total_entries:
type: integer
title: Total Entries
type: object
required:
- pools
- total_entries
title: PoolCollectionResponse
description: Pool Collection serializer for responses.
PoolResponse:
properties:
name:
Expand Down
36 changes: 34 additions & 2 deletions airflow/api_fastapi/core_api/routes/public/pools.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@
from sqlalchemy.orm import Session
from typing_extensions import Annotated

from airflow.api_fastapi.common.db.common import get_session
from airflow.api_fastapi.common.db.common import get_session, paginated_select
from airflow.api_fastapi.common.parameters import QueryLimit, QueryOffset, SortParam
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.serializers.pools import PoolResponse
from airflow.api_fastapi.core_api.serializers.pools import PoolCollectionResponse, PoolResponse
from airflow.models.pool import Pool

pools_router = AirflowRouter(tags=["Pool"], prefix="/pools")
Expand Down Expand Up @@ -63,3 +64,34 @@ async def get_pool(
raise HTTPException(404, f"The Pool with name: `{pool_name}` was not found")

return PoolResponse.model_validate(pool, from_attributes=True)


@pools_router.get(
"/",
responses=create_openapi_http_exception_doc([401, 403, 404]),
)
async def get_pools(
limit: QueryLimit,
offset: QueryOffset,
order_by: Annotated[
SortParam,
Depends(SortParam(["id", "name"], Pool).dynamic_depends()),
],
session: Annotated[Session, Depends(get_session)],
) -> PoolCollectionResponse:
"""Get all pools entries."""
pools_select, total_entries = paginated_select(
select(Pool),
[],
order_by=order_by,
offset=offset,
limit=limit,
session=session,
)

pools = session.scalars(pools_select).all()

return PoolCollectionResponse(
pools=[PoolResponse.model_validate(pool, from_attributes=True) for pool in pools],
total_entries=total_entries,
)
2 changes: 1 addition & 1 deletion airflow/api_fastapi/core_api/serializers/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def redact_extra(cls, v: str | None) -> str | None:


class ConnectionCollectionResponse(BaseModel):
"""DAG Collection serializer for responses."""
"""Connection Collection serializer for responses."""

connections: list[ConnectionResponse]
total_entries: int
9 changes: 8 additions & 1 deletion airflow/api_fastapi/core_api/serializers/pools.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def _call_function(function: Callable[[], int]) -> int:
class PoolResponse(BaseModel):
"""Pool serializer for responses."""

pool: str = Field(serialization_alias="name")
pool: str = Field(serialization_alias="name", validation_alias="pool")
slots: int
description: str | None
include_deferred: bool
Expand All @@ -45,3 +45,10 @@ class PoolResponse(BaseModel):
scheduled_slots: Annotated[int, BeforeValidator(_call_function)]
open_slots: Annotated[int, BeforeValidator(_call_function)]
deferred_slots: Annotated[int, BeforeValidator(_call_function)]


class PoolCollectionResponse(BaseModel):
"""Pool Collection serializer for responses."""

pools: list[PoolResponse]
total_entries: int
20 changes: 20 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,26 @@ export const UsePoolServiceGetPoolKeyFn = (
},
queryKey?: Array<unknown>,
) => [usePoolServiceGetPoolKey, ...(queryKey ?? [{ poolName }])];
export type PoolServiceGetPoolsDefaultResponse = Awaited<
ReturnType<typeof PoolService.getPools>
>;
export type PoolServiceGetPoolsQueryResult<
TData = PoolServiceGetPoolsDefaultResponse,
TError = unknown,
> = UseQueryResult<TData, TError>;
export const usePoolServiceGetPoolsKey = "PoolServiceGetPools";
export const UsePoolServiceGetPoolsKeyFn = (
{
limit,
offset,
orderBy,
}: {
limit?: number;
offset?: number;
orderBy?: string;
} = {},
queryKey?: Array<unknown>,
) => [usePoolServiceGetPoolsKey, ...(queryKey ?? [{ limit, offset, orderBy }])];
export type ProviderServiceGetProvidersDefaultResponse = Awaited<
ReturnType<typeof ProviderService.getProviders>
>;
Expand Down
26 changes: 26 additions & 0 deletions airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,32 @@ export const prefetchUsePoolServiceGetPool = (
queryKey: Common.UsePoolServiceGetPoolKeyFn({ poolName }),
queryFn: () => PoolService.getPool({ poolName }),
});
/**
* Get Pools
* Get all pools entries.
* @param data The data for the request.
* @param data.limit
* @param data.offset
* @param data.orderBy
* @returns PoolCollectionResponse Successful Response
* @throws ApiError
*/
export const prefetchUsePoolServiceGetPools = (
queryClient: QueryClient,
{
limit,
offset,
orderBy,
}: {
limit?: number;
offset?: number;
orderBy?: string;
} = {},
) =>
queryClient.prefetchQuery({
queryKey: Common.UsePoolServiceGetPoolsKeyFn({ limit, offset, orderBy }),
queryFn: () => PoolService.getPools({ limit, offset, orderBy }),
});
/**
* Get Providers
* Get providers.
Expand Down
35 changes: 35 additions & 0 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,41 @@ export const usePoolServiceGetPool = <
queryFn: () => PoolService.getPool({ poolName }) as TData,
...options,
});
/**
* Get Pools
* Get all pools entries.
* @param data The data for the request.
* @param data.limit
* @param data.offset
* @param data.orderBy
* @returns PoolCollectionResponse Successful Response
* @throws ApiError
*/
export const usePoolServiceGetPools = <
TData = Common.PoolServiceGetPoolsDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
{
limit,
offset,
orderBy,
}: {
limit?: number;
offset?: number;
orderBy?: string;
} = {},
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useQuery<TData, TError>({
queryKey: Common.UsePoolServiceGetPoolsKeyFn(
{ limit, offset, orderBy },
queryKey,
),
queryFn: () => PoolService.getPools({ limit, offset, orderBy }) as TData,
...options,
});
/**
* Get Providers
* Get providers.
Expand Down
35 changes: 35 additions & 0 deletions airflow/ui/openapi-gen/queries/suspense.ts
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,41 @@ export const usePoolServiceGetPoolSuspense = <
queryFn: () => PoolService.getPool({ poolName }) as TData,
...options,
});
/**
* Get Pools
* Get all pools entries.
* @param data The data for the request.
* @param data.limit
* @param data.offset
* @param data.orderBy
* @returns PoolCollectionResponse Successful Response
* @throws ApiError
*/
export const usePoolServiceGetPoolsSuspense = <
TData = Common.PoolServiceGetPoolsDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
{
limit,
offset,
orderBy,
}: {
limit?: number;
offset?: number;
orderBy?: string;
} = {},
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useSuspenseQuery<TData, TError>({
queryKey: Common.UsePoolServiceGetPoolsKeyFn(
{ limit, offset, orderBy },
queryKey,
),
queryFn: () => PoolService.getPools({ limit, offset, orderBy }) as TData,
...options,
});
/**
* Get Providers
* Get providers.
Expand Down
22 changes: 21 additions & 1 deletion airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export const $ConnectionCollectionResponse = {
type: "object",
required: ["connections", "total_entries"],
title: "ConnectionCollectionResponse",
description: "DAG Collection serializer for responses.",
description: "Connection Collection serializer for responses.",
} as const;

export const $ConnectionResponse = {
Expand Down Expand Up @@ -1189,6 +1189,26 @@ export const $HistoricalMetricDataResponse = {
description: "Historical Metric Data serializer for responses.",
} as const;

export const $PoolCollectionResponse = {
properties: {
pools: {
items: {
$ref: "#/components/schemas/PoolResponse",
},
type: "array",
title: "Pools",
},
total_entries: {
type: "integer",
title: "Total Entries",
},
},
type: "object",
required: ["pools", "total_entries"],
title: "PoolCollectionResponse",
description: "Pool Collection serializer for responses.",
} as const;

export const $PoolResponse = {
properties: {
name: {
Expand Down
Loading