Skip to content

Commit 33ef960

Browse files
kaxilGitOps Bot
authored andcommitted
Fix Connection or Variable access in Server context (apache#56602)
Hooks used in API server contexts (plugins, middlewares, log handlers) previously failed with `ImportError` for `SUPERVISOR_COMMS` because it only exists in worker Task execution contexts (Worker, Dag processor, Trigger). This prevented using hooks like GCSHook or S3Hook in plugins and broke log retrieval. Implemented automatic context detection using separate secrets backend chains for client and server processes: - Client contexts (workers, DAG processors, triggerers) are detected via `SUPERVISOR_COMMS` presence and use `ExecutionAPISecretsBackend` to route through the Execution API - Server contexts (API server, scheduler, plugins) are detected when `SUPERVISOR_COMMS` is unavailable and use `MetastoreBackend` for direct database access Fixes apache#56120 Fixes apache#56583
1 parent 895fd6a commit 33ef960

File tree

16 files changed

+620
-82
lines changed

16 files changed

+620
-82
lines changed

.pre-commit-config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1613,6 +1613,7 @@ repos:
16131613
^airflow-core/src/airflow/operators/subdag\.py$|
16141614
^airflow-core/src/airflow/plugins_manager\.py$|
16151615
^airflow-core/src/airflow/providers_manager\.py$|
1616+
^airflow-core/src/airflow/secrets/__init__.py$|
16161617
^airflow-core/src/airflow/serialization/definitions/[_a-z]+\.py$|
16171618
^airflow-core/src/airflow/serialization/enums\.py$|
16181619
^airflow-core/src/airflow/serialization/helpers\.py$|
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
Fix Connection & Variable access in API server contexts (plugins, log handlers)
2+
3+
Previously, hooks used in API server contexts (plugins, middlewares, log handlers) would fail with an ``ImportError``
4+
for ``SUPERVISOR_COMMS``, because ``SUPERVISOR_COMMS`` only exists in task runner child processes.
5+
6+
This has been fixed by implementing automatic context detection with three separate secrets backend chains:
7+
8+
**Context Detection:**
9+
10+
1. **Client contexts** (task runner in worker): Detected via ``SUPERVISOR_COMMS`` presence
11+
2. **Server contexts** (API server, scheduler): Explicitly marked with ``_AIRFLOW_PROCESS_CONTEXT=server`` environment variable
12+
3. **Fallback contexts** (supervisor, unknown contexts): Neither marker present, uses minimal safe chain
13+
14+
**Backend Chains:**
15+
16+
- **Client**: ``EnvironmentVariablesBackend`` → ``ExecutionAPISecretsBackend`` (routes to Execution API via SUPERVISOR_COMMS)
17+
- **Server**: ``EnvironmentVariablesBackend`` → ``MetastoreBackend`` (direct database access)
18+
- **Fallback**: ``EnvironmentVariablesBackend`` only (+ external backends from config like AWS Secrets Manager, Vault)
19+
20+
The fallback chain is crucial for supervisor processes (worker-side, before task runner starts) which need to access
21+
external secrets for remote logging setup but should not use ``MetastoreBackend`` (to maintain worker isolation).
22+
23+
**Architecture Benefits:**
24+
25+
- Workers (supervisor + task runner) never use ``MetastoreBackend``, maintaining strict isolation
26+
- External secrets backends (AWS Secrets Manager, Vault, etc.) work in all three contexts
27+
- Supervisor falls back to Execution API client for connections not found in external backends
28+
- API server and scheduler have direct database access for optimal performance
29+
30+
**Impact:**
31+
32+
- Hooks like ``GCSHook``, ``S3Hook`` now work correctly in log handlers and plugins
33+
- No code changes required for existing plugins or hooks
34+
- Workers remain isolated from direct database access (network-level DB blocking fully supported)
35+
- External secrets work everywhere (workers, supervisor, API server)
36+
- Robust handling of unknown contexts with safe minimal chain
37+
38+
See: `#56120 <https://github.com/apache/airflow/issues/56120>`__, `#56583 <https://github.com/apache/airflow/issues/56583>`__, `#51816 <https://github.com/apache/airflow/issues/51816>`__
39+
40+
* Types of change
41+
42+
* [ ] Dag changes
43+
* [ ] Config changes
44+
* [ ] API changes
45+
* [ ] CLI changes
46+
* [ ] Behaviour changes
47+
* [ ] Plugin changes
48+
* [ ] Dependency changes
49+
* [ ] Code interface changes

airflow-core/src/airflow/api_fastapi/main.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@
1919

2020
import os
2121

22+
# Mark this as a server context before any airflow imports
23+
# This ensures plugins loaded at import time get the correct secrets backend chain
24+
os.environ["_AIRFLOW_PROCESS_CONTEXT"] = "server"
25+
2226
from airflow.api_fastapi.app import cached_app
2327

2428
# There is no way to pass the apps to this file from Airflow CLI

airflow-core/src/airflow/jobs/scheduler_job_runner.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1020,6 +1020,11 @@ def set_ti_span_attrs(cls, span, state, ti):
10201020
span.add_event(name="airflow.task.ended", timestamp=datetime_to_nano(ti.end_date))
10211021

10221022
def _execute(self) -> int | None:
1023+
import os
1024+
1025+
# Mark this as a server context for secrets backend detection
1026+
os.environ["_AIRFLOW_PROCESS_CONTEXT"] = "server"
1027+
10231028
self.log.info("Starting the scheduler")
10241029

10251030
reset_signals = self.register_signals()

airflow-core/src/airflow/secrets/__init__.py

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929

3030
from airflow.utils.deprecation_tools import add_deprecated_classes
3131

32-
__all__ = ["BaseSecretsBackend", "DEFAULT_SECRETS_SEARCH_PATH", "DEFAULT_SECRETS_SEARCH_PATH_WORKERS"]
32+
__all__ = ["BaseSecretsBackend", "DEFAULT_SECRETS_SEARCH_PATH"]
3333

3434
from airflow.secrets.base_secrets import BaseSecretsBackend
3535

@@ -38,14 +38,33 @@
3838
"airflow.secrets.metastore.MetastoreBackend",
3939
]
4040

41-
DEFAULT_SECRETS_SEARCH_PATH_WORKERS = [
42-
"airflow.secrets.environment_variables.EnvironmentVariablesBackend",
43-
]
44-
4541

4642
__deprecated_classes = {
4743
"cache": {
4844
"SecretCache": "airflow.sdk.execution_time.cache.SecretCache",
4945
},
5046
}
5147
add_deprecated_classes(__deprecated_classes, __name__)
48+
49+
50+
def __getattr__(name):
51+
if name == "DEFAULT_SECRETS_SEARCH_PATH_WORKERS":
52+
import warnings
53+
54+
warnings.warn(
55+
"airflow.secrets.DEFAULT_SECRETS_SEARCH_PATH_WORKERS is moved to the Task SDK. "
56+
"Use airflow.sdk.execution_time.secrets.DEFAULT_SECRETS_SEARCH_PATH_WORKERS instead.",
57+
DeprecationWarning,
58+
stacklevel=2,
59+
)
60+
try:
61+
from airflow.sdk.execution_time.secrets import DEFAULT_SECRETS_SEARCH_PATH_WORKERS
62+
63+
return DEFAULT_SECRETS_SEARCH_PATH_WORKERS
64+
except (ImportError, AttributeError):
65+
# Back-compat for older Task SDK clients
66+
return [
67+
"airflow.secrets.environment_variables.EnvironmentVariablesBackend",
68+
]
69+
70+
raise AttributeError(f"module '{__name__}' has no attribute '{name}'")

airflow-core/tests/unit/core/test_configuration.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
write_default_airflow_configuration_if_needed,
4444
)
4545
from airflow.providers_manager import ProvidersManager
46-
from airflow.secrets import DEFAULT_SECRETS_SEARCH_PATH_WORKERS
46+
from airflow.sdk.execution_time.secrets import DEFAULT_SECRETS_SEARCH_PATH_WORKERS
4747

4848
from tests_common.test_utils.config import conf_vars
4949
from tests_common.test_utils.markers import skip_if_force_lowest_dependencies_marker
@@ -923,8 +923,10 @@ def test_initialize_secrets_backends_on_workers(self):
923923
backends = initialize_secrets_backends(DEFAULT_SECRETS_SEARCH_PATH_WORKERS)
924924
backend_classes = [backend.__class__.__name__ for backend in backends]
925925

926-
assert len(backends) == 2
926+
assert len(backends) == 3
927927
assert "SystemsManagerParameterStoreBackend" in backend_classes
928+
assert "EnvironmentVariablesBackend" in backend_classes
929+
assert "ExecutionAPISecretsBackend" in backend_classes
928930

929931
@skip_if_force_lowest_dependencies_marker
930932
@conf_vars(

task-sdk/src/airflow/sdk/execution_time/context.py

Lines changed: 40 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,8 @@ def _get_connection(conn_id: str) -> Connection:
149149
except SecretCache.NotPresentException:
150150
pass # continue to backends
151151

152-
# iterate over configured backends if not in cache (or expired)
152+
# Iterate over configured backends (which may include SupervisorCommsSecretsBackend
153+
# in worker contexts or MetastoreBackend in API server contexts)
153154
backends = ensure_secrets_backend_loaded()
154155
for secrets_backend in backends:
155156
try:
@@ -165,26 +166,10 @@ def _get_connection(conn_id: str) -> Connection:
165166
type(secrets_backend).__name__,
166167
)
167168

168-
if backends:
169-
log.debug(
170-
"Connection not found in any of the configured Secrets Backends. Trying to retrieve from API server",
171-
conn_id=conn_id,
172-
)
173-
174-
# TODO: This should probably be moved to a separate module like `airflow.sdk.execution_time.comms`
175-
# or `airflow.sdk.execution_time.connection`
176-
# A reason to not move it to `airflow.sdk.execution_time.comms` is that it
177-
# will make that module depend on Task SDK, which is not ideal because we intend to
178-
# keep Task SDK as a separate package than execution time mods.
179-
# Also applies to _async_get_connection.
180-
from airflow.sdk.execution_time.comms import GetConnection
181-
from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
182-
183-
msg = SUPERVISOR_COMMS.send(GetConnection(conn_id=conn_id))
169+
# If no backend found the connection, raise an error
170+
from airflow.exceptions import AirflowNotFoundException
184171

185-
conn = _process_connection_result_conn(msg)
186-
SecretCache.save_connection_uri(conn_id, conn.get_uri())
187-
return conn
172+
raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")
188173

189174

190175
async def _async_get_connection(conn_id: str) -> Connection:
@@ -201,34 +186,36 @@ async def _async_get_connection(conn_id: str) -> Connection:
201186
_mask_connection_secrets(conn)
202187
return conn
203188
except SecretCache.NotPresentException:
204-
pass # continue to API
189+
pass # continue to backends
205190

206-
from airflow.sdk.execution_time.comms import GetConnection
207191
from airflow.sdk.execution_time.supervisor import ensure_secrets_backend_loaded
208-
from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
209192

210-
# Try secrets backends first using async wrapper
193+
# Try secrets backends
211194
backends = ensure_secrets_backend_loaded()
212195
for secrets_backend in backends:
213196
try:
214-
conn = await sync_to_async(secrets_backend.get_connection)(conn_id) # type: ignore[assignment]
197+
# Use async method if available, otherwise wrap sync method
198+
if hasattr(secrets_backend, "aget_connection"):
199+
conn = await secrets_backend.aget_connection(conn_id) # type: ignore[assignment]
200+
else:
201+
conn = await sync_to_async(secrets_backend.get_connection)(conn_id) # type: ignore[assignment]
202+
215203
if conn:
216-
# TODO: this should probably be in get conn
217-
if conn.password:
218-
mask_secret(conn.password)
219-
if conn.extra:
220-
mask_secret(conn.extra)
204+
SecretCache.save_connection_uri(conn_id, conn.get_uri())
205+
_mask_connection_secrets(conn)
221206
return conn
222207
except Exception:
223208
# If one backend fails, try the next one
224-
continue
209+
log.exception(
210+
"Unable to retrieve connection from secrets backend (%s). "
211+
"Checking subsequent secrets backend.",
212+
type(secrets_backend).__name__,
213+
)
214+
215+
# If no backend found the connection, raise an error
216+
from airflow.exceptions import AirflowNotFoundException
225217

226-
# If no secrets backend has the connection, fall back to API server
227-
msg = await SUPERVISOR_COMMS.asend(GetConnection(conn_id=conn_id))
228-
conn = _process_connection_result_conn(msg)
229-
SecretCache.save_connection_uri(conn_id, conn.get_uri())
230-
_mask_connection_secrets(conn)
231-
return conn
218+
raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")
232219

233220

234221
def _get_variable(key: str, deserialize_json: bool) -> Any:
@@ -250,7 +237,8 @@ def _get_variable(key: str, deserialize_json: bool) -> Any:
250237
pass # Continue to check backends
251238

252239
backends = ensure_secrets_backend_loaded()
253-
# iterate over backends if not in cache (or expired)
240+
241+
# Iterate over backends if not in cache (or expired)
254242
for secrets_backend in backends:
255243
try:
256244
var_val = secrets_backend.get_variable(key=key)
@@ -270,31 +258,13 @@ def _get_variable(key: str, deserialize_json: bool) -> Any:
270258
type(secrets_backend).__name__,
271259
)
272260

273-
if backends:
274-
log.debug(
275-
"Variable not found in any of the configured Secrets Backends. Trying to retrieve from API server",
276-
key=key,
277-
)
278-
279-
# TODO: This should probably be moved to a separate module like `airflow.sdk.execution_time.comms`
280-
# or `airflow.sdk.execution_time.variable`
281-
# A reason to not move it to `airflow.sdk.execution_time.comms` is that it
282-
# will make that module depend on Task SDK, which is not ideal because we intend to
283-
# keep Task SDK as a separate package than execution time mods.
284-
from airflow.sdk.execution_time.comms import ErrorResponse, GetVariable
285-
from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
286-
287-
msg = SUPERVISOR_COMMS.send(GetVariable(key=key))
288-
289-
if isinstance(msg, ErrorResponse):
290-
raise AirflowRuntimeError(msg)
261+
# If no backend found the variable, raise a not found error (mirrors _get_connection)
262+
from airflow.sdk.exceptions import AirflowRuntimeError, ErrorType
263+
from airflow.sdk.execution_time.comms import ErrorResponse
291264

292-
if TYPE_CHECKING:
293-
assert isinstance(msg, VariableResult)
294-
variable = _convert_variable_result_to_variable(msg, deserialize_json)
295-
# Save raw value to ensure cache consistency regardless of deserialize_json parameter
296-
SecretCache.save_variable(key, msg.value)
297-
return variable.value
265+
raise AirflowRuntimeError(
266+
ErrorResponse(error=ErrorType.VARIABLE_NOT_FOUND, detail={"message": f"Variable {key} not found"})
267+
)
298268

299269

300270
def _set_variable(key: str, value: Any, description: str | None = None, serialize_json: bool = False) -> None:
@@ -307,18 +277,21 @@ def _set_variable(key: str, value: Any, description: str | None = None, serializ
307277

308278
from airflow.sdk.execution_time.cache import SecretCache
309279
from airflow.sdk.execution_time.comms import PutVariable
280+
from airflow.sdk.execution_time.secrets.execution_api import ExecutionAPISecretsBackend
310281
from airflow.sdk.execution_time.supervisor import ensure_secrets_backend_loaded
311282
from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
312283

313284
# check for write conflicts on the worker
314285
for secrets_backend in ensure_secrets_backend_loaded():
286+
if isinstance(secrets_backend, ExecutionAPISecretsBackend):
287+
continue
315288
try:
316289
var_val = secrets_backend.get_variable(key=key)
317290
if var_val is not None:
318291
_backend_name = type(secrets_backend).__name__
319292
log.warning(
320293
"The variable %s is defined in the %s secrets backend, which takes "
321-
"precedence over reading from the database. The value in the database will be "
294+
"precedence over reading from the API Server. The value from the API Server will be "
322295
"updated, but to read it you have to delete the conflicting variable "
323296
"from %s",
324297
key,
@@ -379,12 +352,16 @@ def __eq__(self, other):
379352
return True
380353

381354
def get(self, conn_id: str, default_conn: Any = None) -> Any:
355+
from airflow.exceptions import AirflowNotFoundException
356+
382357
try:
383358
return _get_connection(conn_id)
384359
except AirflowRuntimeError as e:
385360
if e.error.error == ErrorType.CONNECTION_NOT_FOUND:
386361
return default_conn
387362
raise
363+
except AirflowNotFoundException:
364+
return default_conn
388365

389366

390367
class VariableAccessor:
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
"""Secrets backends for task execution contexts."""
19+
20+
from __future__ import annotations
21+
22+
from airflow.sdk.execution_time.secrets.execution_api import ExecutionAPISecretsBackend
23+
24+
__all__ = ["ExecutionAPISecretsBackend", "DEFAULT_SECRETS_SEARCH_PATH_WORKERS"]
25+
26+
DEFAULT_SECRETS_SEARCH_PATH_WORKERS = [
27+
"airflow.secrets.environment_variables.EnvironmentVariablesBackend",
28+
"airflow.sdk.execution_time.secrets.execution_api.ExecutionAPISecretsBackend",
29+
]

0 commit comments

Comments
 (0)