Skip to content

Commit ade61f4

Browse files
committed
Do not allow Metastore as secret backend in Supervisor
1 parent 39b1604 commit ade61f4

File tree

17 files changed

+295
-227
lines changed

17 files changed

+295
-227
lines changed

.pre-commit-config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1613,6 +1613,7 @@ repos:
16131613
^airflow-core/src/airflow/operators/subdag\.py$|
16141614
^airflow-core/src/airflow/plugins_manager\.py$|
16151615
^airflow-core/src/airflow/providers_manager\.py$|
1616+
^airflow-core/src/airflow/secrets/__init__.py$|
16161617
^airflow-core/src/airflow/serialization/definitions/[_a-z]+\.py$|
16171618
^airflow-core/src/airflow/serialization/enums\.py$|
16181619
^airflow-core/src/airflow/serialization/helpers\.py$|

airflow-core/newsfragments/56583.significant.rst

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,41 @@
1-
Fix connection access in API server contexts (plugins, log handlers)
1+
Fix Connection & Variable access in API server contexts (plugins, log handlers)
22

33
Previously, hooks used in API server contexts (plugins, middlewares, log handlers) would fail with an ``ImportError``
4-
for ``SUPERVISOR_COMMS``, because ``SUPERVISOR_COMMS`` only exists in worker execution contexts.
4+
for ``SUPERVISOR_COMMS``, because ``SUPERVISOR_COMMS`` only exists in task runner child processes.
55

6-
This has been fixed by implementing automatic context detection with separate secrets backend chains:
6+
This has been fixed by implementing automatic context detection with three separate secrets backend chains:
77

88
**Context Detection:**
99

10-
- **Client contexts** (workers, DAG processors, triggerers): Automatically detected via ``SUPERVISOR_COMMS`` presence
11-
- **Server contexts** (API server, scheduler, plugins): Automatically detected when ``SUPERVISOR_COMMS`` is not available
12-
- No configuration required - works regardless of import order or plugin loading timing
10+
1. **Client contexts** (task runner in worker): Detected via ``SUPERVISOR_COMMS`` presence
11+
2. **Server contexts** (API server, scheduler): Explicitly marked with ``_AIRFLOW_PROCESS_CONTEXT=server`` environment variable
12+
3. **Fallback contexts** (supervisor, unknown contexts): Neither marker present, uses minimal safe chain
1313

1414
**Backend Chains:**
1515

16-
- **Client**: ``EnvironmentVariablesBackend`` → ``ExecutionAPISecretsBackend`` (routes to Execution API)
17-
- **Server**: ``EnvironmentVariablesBackend`` → ``MetastoreBackend`` (direct DB access)
16+
- **Client**: ``EnvironmentVariablesBackend`` → ``ExecutionAPISecretsBackend`` (routes to Execution API via SUPERVISOR_COMMS)
17+
- **Server**: ``EnvironmentVariablesBackend`` → ``MetastoreBackend`` (direct database access)
18+
- **Fallback**: ``EnvironmentVariablesBackend`` only (+ external backends from config like AWS Secrets Manager, Vault)
1819

19-
This maintains the architectural separation where workers access resources only through the Execution API,
20-
while API server components have direct database access.
20+
The fallback chain is crucial for supervisor processes (worker-side, before task runner starts) which need to access
21+
external secrets for remote logging setup but should not use ``MetastoreBackend`` (to maintain worker isolation).
22+
23+
**Architecture Benefits:**
24+
25+
- Workers (supervisor + task runner) never use ``MetastoreBackend``, maintaining strict isolation
26+
- External secrets backends (AWS Secrets Manager, Vault, etc.) work in all three contexts
27+
- Supervisor falls back to Execution API client for connections not found in external backends
28+
- API server and scheduler have direct database access for optimal performance
2129

2230
**Impact:**
2331

2432
- Hooks like ``GCSHook``, ``S3Hook`` now work correctly in log handlers and plugins
2533
- No code changes required for existing plugins or hooks
26-
- Workers remain isolated from direct database access (network-level blocking still possible)
27-
- External secrets backends (AWS Secrets Manager, Vault, etc.) continue to work in all contexts
28-
- Automatic detection works regardless of initialization order
34+
- Workers remain isolated from direct database access (network-level DB blocking fully supported)
35+
- External secrets work everywhere (workers, supervisor, API server)
36+
- Robust handling of unknown contexts with safe minimal chain
2937

30-
See: `#56120 <https://github.com/apache/airflow/issues/56120>`__, `#56583 <https://github.com/apache/airflow/issues/56583>`__
38+
See: `#56120 <https://github.com/apache/airflow/issues/56120>`__, `#56583 <https://github.com/apache/airflow/issues/56583>`__, `#51816 <https://github.com/apache/airflow/issues/51816>`__
3139

3240
* Types of change
3341

airflow-core/src/airflow/api_fastapi/main.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@
1919

2020
import os
2121

22+
# Mark this as a server context before any airflow imports
23+
# This ensures plugins loaded at import time get the correct secrets backend chain
24+
os.environ["_AIRFLOW_PROCESS_CONTEXT"] = "server"
25+
2226
from airflow.api_fastapi.app import cached_app
2327

2428
# There is no way to pass the apps to this file from Airflow CLI

airflow-core/src/airflow/jobs/scheduler_job_runner.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1020,6 +1020,11 @@ def set_ti_span_attrs(cls, span, state, ti):
10201020
span.add_event(name="airflow.task.ended", timestamp=datetime_to_nano(ti.end_date))
10211021

10221022
def _execute(self) -> int | None:
1023+
import os
1024+
1025+
# Mark this as a server context for secrets backend detection
1026+
os.environ["_AIRFLOW_PROCESS_CONTEXT"] = "server"
1027+
10231028
self.log.info("Starting the scheduler")
10241029

10251030
reset_signals = self.register_signals()

airflow-core/src/airflow/secrets/__init__.py

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929

3030
from airflow.utils.deprecation_tools import add_deprecated_classes
3131

32-
__all__ = ["BaseSecretsBackend", "DEFAULT_SECRETS_SEARCH_PATH", "DEFAULT_SECRETS_SEARCH_PATH_WORKERS"]
32+
__all__ = ["BaseSecretsBackend", "DEFAULT_SECRETS_SEARCH_PATH"]
3333

3434
from airflow.secrets.base_secrets import BaseSecretsBackend
3535

@@ -38,15 +38,33 @@
3838
"airflow.secrets.metastore.MetastoreBackend",
3939
]
4040

41-
DEFAULT_SECRETS_SEARCH_PATH_WORKERS = [
42-
"airflow.secrets.environment_variables.EnvironmentVariablesBackend",
43-
"airflow.sdk.execution_time.secrets.execution_api.ExecutionAPISecretsBackend",
44-
]
45-
4641

4742
__deprecated_classes = {
4843
"cache": {
4944
"SecretCache": "airflow.sdk.execution_time.cache.SecretCache",
5045
},
5146
}
5247
add_deprecated_classes(__deprecated_classes, __name__)
48+
49+
50+
def __getattr__(name):
51+
if name == "DEFAULT_SECRETS_SEARCH_PATH_WORKERS":
52+
import warnings
53+
54+
warnings.warn(
55+
"airflow.secrets.DEFAULT_SECRETS_SEARCH_PATH_WORKERS is moved to the Task SDK. "
56+
"Use airflow.sdk.execution_time.secrets.DEFAULT_SECRETS_SEARCH_PATH_WORKERS instead.",
57+
DeprecationWarning,
58+
stacklevel=2,
59+
)
60+
try:
61+
from airflow.sdk.execution_time.secrets import DEFAULT_SECRETS_SEARCH_PATH_WORKERS
62+
63+
return DEFAULT_SECRETS_SEARCH_PATH_WORKERS
64+
except (ImportError, AttributeError):
65+
# Back-compat for older Task SDK clients
66+
return [
67+
"airflow.secrets.environment_variables.EnvironmentVariablesBackend",
68+
]
69+
70+
raise AttributeError(f"module '{__name__}' has no attribute '{name}'")

airflow-core/tests/unit/core/test_configuration.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
write_default_airflow_configuration_if_needed,
4444
)
4545
from airflow.providers_manager import ProvidersManager
46-
from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH_WORKERS
46+
from airflow.sdk.execution_time.secrets import DEFAULT_SECRETS_SEARCH_PATH_WORKERS
4747

4848
from tests_common.test_utils.config import conf_vars
4949
from tests_common.test_utils.markers import skip_if_force_lowest_dependencies_marker
@@ -923,8 +923,10 @@ def test_initialize_secrets_backends_on_workers(self):
923923
backends = initialize_secrets_backends(DEFAULT_SECRETS_SEARCH_PATH_WORKERS)
924924
backend_classes = [backend.__class__.__name__ for backend in backends]
925925

926-
assert len(backends) == 2
926+
assert len(backends) == 3
927927
assert "SystemsManagerParameterStoreBackend" in backend_classes
928+
assert "EnvironmentVariablesBackend" in backend_classes
929+
assert "ExecutionAPISecretsBackend" in backend_classes
928930

929931
@skip_if_force_lowest_dependencies_marker
930932
@conf_vars(

task-sdk/src/airflow/sdk/execution_time/context.py

Lines changed: 23 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -190,12 +190,16 @@ async def _async_get_connection(conn_id: str) -> Connection:
190190

191191
from airflow.sdk.execution_time.supervisor import ensure_secrets_backend_loaded
192192

193-
# Try secrets backends using async wrapper (which may include SupervisorCommsSecretsBackend
194-
# in worker contexts or MetastoreBackend in API server contexts)
193+
# Try secrets backends
195194
backends = ensure_secrets_backend_loaded()
196195
for secrets_backend in backends:
197196
try:
198-
conn = await sync_to_async(secrets_backend.get_connection)(conn_id) # type: ignore[assignment]
197+
# Use async method if available, otherwise wrap sync method
198+
if hasattr(secrets_backend, "aget_connection"):
199+
conn = await secrets_backend.aget_connection(conn_id) # type: ignore[assignment]
200+
else:
201+
conn = await sync_to_async(secrets_backend.get_connection)(conn_id) # type: ignore[assignment]
202+
199203
if conn:
200204
SecretCache.save_connection_uri(conn_id, conn.get_uri())
201205
_mask_connection_secrets(conn)
@@ -233,7 +237,8 @@ def _get_variable(key: str, deserialize_json: bool) -> Any:
233237
pass # Continue to check backends
234238

235239
backends = ensure_secrets_backend_loaded()
236-
# iterate over backends if not in cache (or expired)
240+
241+
# Iterate over backends if not in cache (or expired)
237242
for secrets_backend in backends:
238243
try:
239244
var_val = secrets_backend.get_variable(key=key)
@@ -253,31 +258,13 @@ def _get_variable(key: str, deserialize_json: bool) -> Any:
253258
type(secrets_backend).__name__,
254259
)
255260

256-
if backends:
257-
log.debug(
258-
"Variable not found in any of the configured Secrets Backends. Trying to retrieve from API server",
259-
key=key,
260-
)
261-
262-
# TODO: This should probably be moved to a separate module like `airflow.sdk.execution_time.comms`
263-
# or `airflow.sdk.execution_time.variable`
264-
# A reason to not move it to `airflow.sdk.execution_time.comms` is that it
265-
# will make that module depend on Task SDK, which is not ideal because we intend to
266-
# keep Task SDK as a separate package than execution time mods.
267-
from airflow.sdk.execution_time.comms import ErrorResponse, GetVariable
268-
from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
269-
270-
msg = SUPERVISOR_COMMS.send(GetVariable(key=key))
271-
272-
if isinstance(msg, ErrorResponse):
273-
raise AirflowRuntimeError(msg)
261+
# If no backend found the variable, raise a not found error (mirrors _get_connection)
262+
from airflow.sdk.exceptions import AirflowRuntimeError, ErrorType
263+
from airflow.sdk.execution_time.comms import ErrorResponse
274264

275-
if TYPE_CHECKING:
276-
assert isinstance(msg, VariableResult)
277-
variable = _convert_variable_result_to_variable(msg, deserialize_json)
278-
# Save raw value to ensure cache consistency regardless of deserialize_json parameter
279-
SecretCache.save_variable(key, msg.value)
280-
return variable.value
265+
raise AirflowRuntimeError(
266+
ErrorResponse(error=ErrorType.VARIABLE_NOT_FOUND, detail={"message": f"Variable {key} not found"})
267+
)
281268

282269

283270
def _set_variable(key: str, value: Any, description: str | None = None, serialize_json: bool = False) -> None:
@@ -290,18 +277,21 @@ def _set_variable(key: str, value: Any, description: str | None = None, serializ
290277

291278
from airflow.sdk.execution_time.cache import SecretCache
292279
from airflow.sdk.execution_time.comms import PutVariable
280+
from airflow.sdk.execution_time.secrets.execution_api import ExecutionAPISecretsBackend
293281
from airflow.sdk.execution_time.supervisor import ensure_secrets_backend_loaded
294282
from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
295283

296284
# check for write conflicts on the worker
297285
for secrets_backend in ensure_secrets_backend_loaded():
286+
if isinstance(secrets_backend, ExecutionAPISecretsBackend):
287+
continue
298288
try:
299289
var_val = secrets_backend.get_variable(key=key)
300290
if var_val is not None:
301291
_backend_name = type(secrets_backend).__name__
302292
log.warning(
303293
"The variable %s is defined in the %s secrets backend, which takes "
304-
"precedence over reading from the database. The value in the database will be "
294+
"precedence over reading from the API Server. The value from the API Server will be "
305295
"updated, but to read it you have to delete the conflicting variable "
306296
"from %s",
307297
key,
@@ -362,12 +352,16 @@ def __eq__(self, other):
362352
return True
363353

364354
def get(self, conn_id: str, default_conn: Any = None) -> Any:
355+
from airflow.exceptions import AirflowNotFoundException
356+
365357
try:
366358
return _get_connection(conn_id)
367359
except AirflowRuntimeError as e:
368360
if e.error.error == ErrorType.CONNECTION_NOT_FOUND:
369361
return default_conn
370362
raise
363+
except AirflowNotFoundException:
364+
return default_conn
371365

372366

373367
class VariableAccessor:

task-sdk/src/airflow/sdk/execution_time/secrets/__init__.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,9 @@
2121

2222
from airflow.sdk.execution_time.secrets.execution_api import ExecutionAPISecretsBackend
2323

24-
__all__ = ["ExecutionAPISecretsBackend"]
24+
__all__ = ["ExecutionAPISecretsBackend", "DEFAULT_SECRETS_SEARCH_PATH_WORKERS"]
25+
26+
DEFAULT_SECRETS_SEARCH_PATH_WORKERS = [
27+
"airflow.secrets.environment_variables.EnvironmentVariablesBackend",
28+
"airflow.sdk.execution_time.secrets.execution_api.ExecutionAPISecretsBackend",
29+
]

task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def get_conn_value(self, conn_id: str) -> str | None:
4343
"""
4444
raise NotImplementedError("Use get_connection instead")
4545

46-
def get_connection(self, conn_id: str) -> Connection | None:
46+
def get_connection(self, conn_id: str) -> Connection | None: # type: ignore[override]
4747
"""
4848
Return connection object by routing through SUPERVISOR_COMMS.
4949
@@ -75,7 +75,7 @@ def get_variable(self, key: str) -> str | None:
7575
:param key: Variable key
7676
:return: Variable value or None if not found
7777
"""
78-
from airflow.sdk.execution_time.comms import ErrorResponse, GetVariable
78+
from airflow.sdk.execution_time.comms import ErrorResponse, GetVariable, VariableResult
7979
from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
8080

8181
try:
@@ -86,8 +86,59 @@ def get_variable(self, key: str) -> str | None:
8686
return None
8787

8888
# Extract value from VariableResult
89-
if hasattr(msg, "value"):
90-
return msg.value
89+
if isinstance(msg, VariableResult):
90+
return msg.value # Already a string | None
91+
return None
92+
except Exception:
93+
# If SUPERVISOR_COMMS fails for any reason, return None
94+
# to allow fallback to other backends
95+
return None
96+
97+
async def aget_connection(self, conn_id: str) -> Connection | None: # type: ignore[override]
98+
"""
99+
Return connection object asynchronously via SUPERVISOR_COMMS.
100+
101+
:param conn_id: connection id
102+
:return: Connection object or None if not found
103+
"""
104+
from airflow.sdk.execution_time.comms import ErrorResponse, GetConnection
105+
from airflow.sdk.execution_time.context import _process_connection_result_conn
106+
from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
107+
108+
try:
109+
msg = await SUPERVISOR_COMMS.asend(GetConnection(conn_id=conn_id))
110+
111+
if isinstance(msg, ErrorResponse):
112+
# Connection not found or error occurred
113+
return None
114+
115+
# Convert ExecutionAPI response to SDK Connection
116+
return _process_connection_result_conn(msg)
117+
except Exception:
118+
# If SUPERVISOR_COMMS fails for any reason, return None
119+
# to allow fallback to other backends
120+
return None
121+
122+
async def aget_variable(self, key: str) -> str | None:
123+
"""
124+
Return variable value asynchronously via SUPERVISOR_COMMS.
125+
126+
:param key: Variable key
127+
:return: Variable value or None if not found
128+
"""
129+
from airflow.sdk.execution_time.comms import ErrorResponse, GetVariable, VariableResult
130+
from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
131+
132+
try:
133+
msg = await SUPERVISOR_COMMS.asend(GetVariable(key=key))
134+
135+
if isinstance(msg, ErrorResponse):
136+
# Variable not found or error occurred
137+
return None
138+
139+
# Extract value from VariableResult
140+
if isinstance(msg, VariableResult):
141+
return msg.value # Already a string | None
91142
return None
92143
except Exception:
93144
# If SUPERVISOR_COMMS fails for any reason, return None

0 commit comments

Comments
 (0)