Skip to content

Commit 0cfdaa5

Browse files
committed
Do not allow Metastore as secret backend in Supervisor
1 parent 39b1604 commit 0cfdaa5

File tree

14 files changed

+209
-187
lines changed

14 files changed

+209
-187
lines changed

.pre-commit-config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1613,6 +1613,7 @@ repos:
16131613
^airflow-core/src/airflow/operators/subdag\.py$|
16141614
^airflow-core/src/airflow/plugins_manager\.py$|
16151615
^airflow-core/src/airflow/providers_manager\.py$|
1616+
^airflow-core/src/airflow/secrets/__init__.py$|
16161617
^airflow-core/src/airflow/serialization/definitions/[_a-z]+\.py$|
16171618
^airflow-core/src/airflow/serialization/enums\.py$|
16181619
^airflow-core/src/airflow/serialization/helpers\.py$|

airflow-core/newsfragments/56583.significant.rst

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,41 @@
1-
Fix connection access in API server contexts (plugins, log handlers)
1+
Fix Connection & Variable access in API server contexts (plugins, log handlers)
22

33
Previously, hooks used in API server contexts (plugins, middlewares, log handlers) would fail with an ``ImportError``
4-
for ``SUPERVISOR_COMMS``, because ``SUPERVISOR_COMMS`` only exists in worker execution contexts.
4+
for ``SUPERVISOR_COMMS``, because ``SUPERVISOR_COMMS`` only exists in task runner child processes.
55

6-
This has been fixed by implementing automatic context detection with separate secrets backend chains:
6+
This has been fixed by implementing automatic context detection with three separate secrets backend chains:
77

88
**Context Detection:**
99

10-
- **Client contexts** (workers, DAG processors, triggerers): Automatically detected via ``SUPERVISOR_COMMS`` presence
11-
- **Server contexts** (API server, scheduler, plugins): Automatically detected when ``SUPERVISOR_COMMS`` is not available
12-
- No configuration required - works regardless of import order or plugin loading timing
10+
1. **Client contexts** (task runner in worker): Detected via ``SUPERVISOR_COMMS`` presence
11+
2. **Server contexts** (API server, scheduler): Explicitly marked with ``_AIRFLOW_PROCESS_CONTEXT=server`` environment variable
12+
3. **Fallback contexts** (supervisor, unknown contexts): Neither marker present, uses minimal safe chain
1313

1414
**Backend Chains:**
1515

16-
- **Client**: ``EnvironmentVariablesBackend`` → ``ExecutionAPISecretsBackend`` (routes to Execution API)
17-
- **Server**: ``EnvironmentVariablesBackend`` → ``MetastoreBackend`` (direct DB access)
16+
- **Client**: ``EnvironmentVariablesBackend`` → ``ExecutionAPISecretsBackend`` (routes to Execution API via SUPERVISOR_COMMS)
17+
- **Server**: ``EnvironmentVariablesBackend`` → ``MetastoreBackend`` (direct database access)
18+
- **Fallback**: ``EnvironmentVariablesBackend`` only (+ external backends from config like AWS Secrets Manager, Vault)
1819

19-
This maintains the architectural separation where workers access resources only through the Execution API,
20-
while API server components have direct database access.
20+
The fallback chain is crucial for supervisor processes (worker-side, before task runner starts) which need to access
21+
external secrets for remote logging setup but should not use ``MetastoreBackend`` (to maintain worker isolation).
22+
23+
**Architecture Benefits:**
24+
25+
- Workers (supervisor + task runner) never use ``MetastoreBackend``, maintaining strict isolation
26+
- External secrets backends (AWS Secrets Manager, Vault, etc.) work in all three contexts
27+
- Supervisor falls back to Execution API client for connections not found in external backends
28+
- API server and scheduler have direct database access for optimal performance
2129

2230
**Impact:**
2331

2432
- Hooks like ``GCSHook``, ``S3Hook`` now work correctly in log handlers and plugins
2533
- No code changes required for existing plugins or hooks
26-
- Workers remain isolated from direct database access (network-level blocking still possible)
27-
- External secrets backends (AWS Secrets Manager, Vault, etc.) continue to work in all contexts
28-
- Automatic detection works regardless of initialization order
34+
- Workers remain isolated from direct database access (network-level DB blocking fully supported)
35+
- External secrets work everywhere (workers, supervisor, API server)
36+
- Robust handling of unknown contexts with safe minimal chain
2937

30-
See: `#56120 <https://github.com/apache/airflow/issues/56120>`__, `#56583 <https://github.com/apache/airflow/issues/56583>`__
38+
See: `#56120 <https://github.com/apache/airflow/issues/56120>`__, `#56583 <https://github.com/apache/airflow/issues/56583>`__, `#51816 <https://github.com/apache/airflow/issues/51816>`__
3139

3240
* Types of change
3341

airflow-core/src/airflow/api_fastapi/main.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@
1919

2020
import os
2121

22+
# Mark this as a server context before any airflow imports
23+
# This ensures plugins loaded at import time get the correct secrets backend chain
24+
os.environ["_AIRFLOW_PROCESS_CONTEXT"] = "server"
25+
2226
from airflow.api_fastapi.app import cached_app
2327

2428
# There is no way to pass the apps to this file from Airflow CLI

airflow-core/src/airflow/jobs/scheduler_job_runner.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1020,6 +1020,11 @@ def set_ti_span_attrs(cls, span, state, ti):
10201020
span.add_event(name="airflow.task.ended", timestamp=datetime_to_nano(ti.end_date))
10211021

10221022
def _execute(self) -> int | None:
1023+
import os
1024+
1025+
# Mark this as a server context for secrets backend detection
1026+
os.environ["_AIRFLOW_PROCESS_CONTEXT"] = "server"
1027+
10231028
self.log.info("Starting the scheduler")
10241029

10251030
reset_signals = self.register_signals()

airflow-core/src/airflow/secrets/__init__.py

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929

3030
from airflow.utils.deprecation_tools import add_deprecated_classes
3131

32-
__all__ = ["BaseSecretsBackend", "DEFAULT_SECRETS_SEARCH_PATH", "DEFAULT_SECRETS_SEARCH_PATH_WORKERS"]
32+
__all__ = ["BaseSecretsBackend", "DEFAULT_SECRETS_SEARCH_PATH"]
3333

3434
from airflow.secrets.base_secrets import BaseSecretsBackend
3535

@@ -38,15 +38,33 @@
3838
"airflow.secrets.metastore.MetastoreBackend",
3939
]
4040

41-
DEFAULT_SECRETS_SEARCH_PATH_WORKERS = [
42-
"airflow.secrets.environment_variables.EnvironmentVariablesBackend",
43-
"airflow.sdk.execution_time.secrets.execution_api.ExecutionAPISecretsBackend",
44-
]
45-
4641

4742
__deprecated_classes = {
4843
"cache": {
4944
"SecretCache": "airflow.sdk.execution_time.cache.SecretCache",
5045
},
5146
}
5247
add_deprecated_classes(__deprecated_classes, __name__)
48+
49+
50+
def __getattr__(name):
51+
if name == "DEFAULT_SECRETS_SEARCH_PATH_WORKERS":
52+
import warnings
53+
54+
warnings.warn(
55+
"airflow.secrets.DEFAULT_SECRETS_SEARCH_PATH_WORKERS is moved to the Task SDK. "
56+
"Use airflow.sdk.execution_time.secrets.DEFAULT_SECRETS_SEARCH_PATH_WORKERS instead.",
57+
DeprecationWarning,
58+
stacklevel=2,
59+
)
60+
try:
61+
from airflow.sdk.execution_time.secrets import DEFAULT_SECRETS_SEARCH_PATH_WORKERS
62+
63+
return DEFAULT_SECRETS_SEARCH_PATH_WORKERS
64+
except (ImportError, AttributeError):
65+
# Back-compat for older Task SDK clients
66+
return [
67+
"airflow.secrets.environment_variables.EnvironmentVariablesBackend",
68+
]
69+
70+
raise AttributeError(f"module '{__name__}' has no attribute '{name}'")

airflow-core/tests/unit/core/test_configuration.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
write_default_airflow_configuration_if_needed,
4444
)
4545
from airflow.providers_manager import ProvidersManager
46-
from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH_WORKERS
46+
from airflow.sdk.execution_time.secrets import DEFAULT_SECRETS_SEARCH_PATH_WORKERS
4747

4848
from tests_common.test_utils.config import conf_vars
4949
from tests_common.test_utils.markers import skip_if_force_lowest_dependencies_marker

task-sdk/src/airflow/sdk/execution_time/context.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,12 +362,16 @@ def __eq__(self, other):
362362
return True
363363

364364
def get(self, conn_id: str, default_conn: Any = None) -> Any:
365+
from airflow.exceptions import AirflowNotFoundException
366+
365367
try:
366368
return _get_connection(conn_id)
367369
except AirflowRuntimeError as e:
368370
if e.error.error == ErrorType.CONNECTION_NOT_FOUND:
369371
return default_conn
370372
raise
373+
except AirflowNotFoundException:
374+
return default_conn
371375

372376

373377
class VariableAccessor:

task-sdk/src/airflow/sdk/execution_time/secrets/__init__.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,9 @@
2121

2222
from airflow.sdk.execution_time.secrets.execution_api import ExecutionAPISecretsBackend
2323

24-
__all__ = ["ExecutionAPISecretsBackend"]
24+
__all__ = ["ExecutionAPISecretsBackend", "DEFAULT_SECRETS_SEARCH_PATH_WORKERS"]
25+
26+
DEFAULT_SECRETS_SEARCH_PATH_WORKERS = [
27+
"airflow.secrets.environment_variables.EnvironmentVariablesBackend",
28+
"airflow.sdk.execution_time.secrets.execution_api.ExecutionAPISecretsBackend",
29+
]

task-sdk/src/airflow/sdk/execution_time/supervisor.py

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1769,30 +1769,44 @@ def ensure_secrets_backend_loaded() -> list[BaseSecretsBackend]:
17691769
Initialize secrets backend with auto-detected context.
17701770
17711771
Detection strategy:
1772-
- If SUPERVISOR_COMMS exists and is set → client chain (ExecutionAPISecretsBackend)
1773-
- Otherwise → server chain (MetastoreBackend)
1772+
1. SUPERVISOR_COMMS exists and is set → client chain (ExecutionAPISecretsBackend)
1773+
2. _AIRFLOW_PROCESS_CONTEXT=server env var → server chain (MetastoreBackend)
1774+
3. Neither → fallback chain (only env vars + external backends, no MetastoreBackend)
17741775
1775-
Client contexts: workers, DAG processor, triggerer (have SUPERVISOR_COMMS)
1776-
Server contexts: API server, scheduler (no SUPERVISOR_COMMS)
1776+
Client contexts: task runner in worker (has SUPERVISOR_COMMS)
1777+
Server contexts: API server, scheduler (set _AIRFLOW_PROCESS_CONTEXT=server)
1778+
Fallback contexts: supervisor, unknown contexts (no SUPERVISOR_COMMS, no env var)
17771779
1778-
This approach works regardless of import order or plugin loading timing.
1780+
The fallback chain ensures supervisor can use external secrets (AWS Secrets Manager,
1781+
Vault, etc.) while falling back to API client, without trying MetastoreBackend.
17791782
"""
1783+
import os
1784+
17801785
from airflow.configuration import ensure_secrets_loaded
1781-
from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH, DEFAULT_SECRETS_SEARCH_PATH_WORKERS
1786+
from airflow.sdk.execution_time.secrets import DEFAULT_SECRETS_SEARCH_PATH_WORKERS
17821787

1783-
# Check for client context (SUPERVISOR_COMMS)
1788+
# 1. Check for client context (SUPERVISOR_COMMS)
17841789
try:
17851790
from airflow.sdk.execution_time import task_runner
17861791

17871792
if hasattr(task_runner, "SUPERVISOR_COMMS") and task_runner.SUPERVISOR_COMMS is not None:
1788-
# Client context: worker, DAG processor, triggerer
1793+
# Client context: task runner with SUPERVISOR_COMMS
17891794
return ensure_secrets_loaded(default_backends=DEFAULT_SECRETS_SEARCH_PATH_WORKERS)
17901795
except (ImportError, AttributeError):
17911796
pass
17921797

1793-
# Default to server context (no SUPERVISOR_COMMS = server)
1794-
# Server context: API server, scheduler, plugins
1795-
return ensure_secrets_loaded(default_backends=DEFAULT_SECRETS_SEARCH_PATH)
1798+
# 2. Check for explicit server context
1799+
if os.environ.get("_AIRFLOW_PROCESS_CONTEXT") == "server":
1800+
# Server context: API server, scheduler
1801+
# uses the default server list
1802+
return ensure_secrets_loaded()
1803+
1804+
# 3. Fallback for unknown contexts (supervisor, etc.)
1805+
# Only env vars + external backends from config, no MetastoreBackend, no ExecutionAPISecretsBackend
1806+
fallback_backends = [
1807+
"airflow.secrets.environment_variables.EnvironmentVariablesBackend",
1808+
]
1809+
return ensure_secrets_loaded(default_backends=fallback_backends)
17961810

17971811

17981812
def _configure_logging(log_path: str, client: Client) -> tuple[FilteringBoundLogger, BinaryIO | TextIO]:

task-sdk/tests/task_sdk/definitions/test_connection.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
from airflow.sdk import Connection
2929
from airflow.sdk.exceptions import ErrorType
3030
from airflow.sdk.execution_time.comms import ConnectionResult, ErrorResponse
31-
from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH_WORKERS
31+
from airflow.sdk.execution_time.secrets import DEFAULT_SECRETS_SEARCH_PATH_WORKERS
3232

3333
from tests_common.test_utils.config import conf_vars
3434

0 commit comments

Comments
 (0)