Skip to content

Commit fdbb9a7

Browse files
committed
Add configurable timeout for Execution API requests
Workers can now configure the timeout for HTTP requests to the Execution API server via the `[workers] execution_api_timeout` configuration option. The default remains 5 seconds (httpx default) to preserve existing behavior. https://www.python-httpx.org/advanced/timeouts/ This prevents timeout errors in high-load environments where API servers may take longer than 5 seconds to respond. Users experiencing timeout issues can now increase this value without code changes. The timeout controls how long a worker waits for a single API request to complete, which is different from execution_api_retries that controls retry behavior after failures. Fixes #56571
1 parent c3f53b1 commit fdbb9a7

File tree

3 files changed

+32
-1
lines changed

3 files changed

+32
-1
lines changed

airflow-core/src/airflow/config_templates/config.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1618,6 +1618,15 @@ workers:
16181618
type: float
16191619
example: ~
16201620
default: "90.0"
1621+
execution_api_timeout:
1622+
description: |
1623+
The timeout (in seconds) for HTTP requests from workers to the Execution API server.
1624+
This controls how long a worker will wait for a response from the API server before
1625+
timing out. Increase this value if you experience timeout errors under high load.
1626+
version_added: 3.1.1
1627+
type: float
1628+
example: ~
1629+
default: "5.0"
16211630
socket_cleanup_timeout:
16221631
description: |
16231632
Number of seconds to wait after a task process exits before forcibly closing any

task-sdk/src/airflow/sdk/api/client.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -829,6 +829,7 @@ def noop_handler(request: httpx.Request) -> httpx.Response:
829829
API_RETRY_WAIT_MIN = conf.getfloat("workers", "execution_api_retry_wait_min")
830830
API_RETRY_WAIT_MAX = conf.getfloat("workers", "execution_api_retry_wait_max")
831831
API_SSL_CERT_PATH = conf.get("api", "ssl_cert")
832+
API_TIMEOUT = conf.getfloat("workers", "execution_api_timeout")
832833

833834

834835
class Client(httpx.Client):
@@ -848,6 +849,10 @@ def __init__(self, *, base_url: str | None, dry_run: bool = False, token: str, *
848849
if API_SSL_CERT_PATH:
849850
ctx.load_verify_locations(API_SSL_CERT_PATH)
850851
kwargs["verify"] = ctx
852+
853+
# Set timeout if not explicitly provided
854+
kwargs.setdefault("timeout", API_TIMEOUT)
855+
851856
pyver = f"{'.'.join(map(str, sys.version_info[:3]))}"
852857
super().__init__(
853858
auth=auth,

task-sdk/tests/task_sdk/api/test_client.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
from uuid6 import uuid7
3131

3232
from airflow.sdk import timezone
33-
from airflow.sdk.api.client import RemoteValidationError, ServerResponseError
33+
from airflow.sdk.api.client import Client, RemoteValidationError, ServerResponseError
3434
from airflow.sdk.api.datamodels._generated import (
3535
AssetEventsResponse,
3636
AssetResponse,
@@ -99,6 +99,23 @@ def handle_request(request: httpx.Request) -> httpx.Response:
9999

100100
assert isinstance(err.value, FileNotFoundError)
101101

102+
@mock.patch("airflow.sdk.api.client.API_TIMEOUT", 60.0)
103+
def test_timeout_configuration(self):
104+
def handle_request(request: httpx.Request) -> httpx.Response:
105+
return httpx.Response(status_code=200)
106+
107+
client = make_client(httpx.MockTransport(handle_request))
108+
assert client.timeout == httpx.Timeout(60.0)
109+
110+
def test_timeout_can_be_overridden(self):
111+
def handle_request(request: httpx.Request) -> httpx.Response:
112+
return httpx.Response(status_code=200)
113+
114+
client = Client(
115+
base_url="test://server", token="", transport=httpx.MockTransport(handle_request), timeout=120.0
116+
)
117+
assert client.timeout == httpx.Timeout(120.0)
118+
102119
def test_error_parsing(self):
103120
responses = [
104121
httpx.Response(422, json={"detail": [{"loc": ["#0"], "msg": "err", "type": "required"}]})

0 commit comments

Comments
 (0)