Skip to content

Commit 7b60547

Browse files
kaxilGitOps Bot
authored andcommitted
Prevent unnecessary kubernetes client imports in workers (apache#56692)
1 parent 38f3e05 commit 7b60547

File tree

4 files changed

+89
-5
lines changed

4 files changed

+89
-5
lines changed

airflow-core/src/airflow/serialization/serialized_objects.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import logging
2929
import math
3030
import re
31+
import sys
3132
import weakref
3233
from collections.abc import Collection, Iterable, Iterator, Mapping, Sequence
3334
from functools import cached_property, lru_cache
@@ -3803,6 +3804,11 @@ def _has_kubernetes() -> bool:
38033804
if "HAS_KUBERNETES" in globals():
38043805
return HAS_KUBERNETES
38053806

3807+
# Check if kubernetes is already imported before triggering expensive import
3808+
if "kubernetes.client" not in sys.modules:
3809+
HAS_KUBERNETES = False
3810+
return False
3811+
38063812
# Loading kube modules is expensive, so delay it until the last moment
38073813

38083814
try:

airflow-core/tests/unit/serialization/test_serialized_objects.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import json
2121
import math
22+
import sys
2223
from collections.abc import Iterator
2324
from datetime import datetime, timedelta
2425

@@ -74,6 +75,7 @@
7475
BaseSerialization,
7576
LazyDeserializedDAG,
7677
SerializedDAG,
78+
_has_kubernetes,
7779
create_scheduler_operator,
7880
)
7981
from airflow.triggers.base import BaseTrigger
@@ -718,3 +720,29 @@ def test_resume_execution(self):
718720
next_kwargs={"error": TriggerFailureReason.TRIGGER_TIMEOUT},
719721
context={},
720722
)
723+
724+
725+
class TestKubernetesImportAvoidance:
726+
"""Test that serialization doesn't import kubernetes unnecessarily."""
727+
728+
def test_has_kubernetes_no_import_when_not_needed(self):
729+
"""Ensure _has_kubernetes() doesn't import k8s when not already loaded."""
730+
# Remove kubernetes from sys.modules if present
731+
k8s_modules = [m for m in list(sys.modules.keys()) if m.startswith("kubernetes")]
732+
if k8s_modules:
733+
pytest.skip("Kubernetes already imported, cannot test import avoidance")
734+
735+
# Call _has_kubernetes() - should check sys.modules and return False without importing
736+
result = _has_kubernetes()
737+
738+
assert result is False
739+
assert "kubernetes.client" not in sys.modules
740+
741+
def test_has_kubernetes_uses_existing_import(self):
742+
"""Ensure _has_kubernetes() uses k8s when it's already imported."""
743+
pytest.importorskip("kubernetes")
744+
745+
# Now k8s is imported, should return True
746+
result = _has_kubernetes()
747+
748+
assert result is True

shared/secrets_masker/src/airflow_shared/secrets_masker/secrets_masker.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -158,20 +158,29 @@ def reset_secrets_masker() -> None:
158158
_secrets_masker().reset_masker()
159159

160160

161+
def _is_v1_env_var(v: Any) -> TypeGuard[_V1EnvVarLike]:
162+
"""Check if object is V1EnvVar, avoiding unnecessary imports."""
163+
# Quick check: if k8s not imported, can't be a V1EnvVar instance
164+
if "kubernetes.client" not in sys.modules:
165+
return False
166+
167+
# K8s is loaded, safe to get/cache the type
168+
v1_type = _get_v1_env_var_type_cached()
169+
return isinstance(v, v1_type)
170+
171+
161172
@cache
162-
def _get_v1_env_var_type() -> type:
173+
def _get_v1_env_var_type_cached() -> type:
174+
"""Get V1EnvVar type (cached, only called when k8s is already loaded)."""
163175
try:
164176
from kubernetes.client import V1EnvVar
165177

166178
return V1EnvVar
167179
except ImportError:
180+
# Shouldn't happen since we check sys.modules first
168181
return type("V1EnvVar", (), {})
169182

170183

171-
def _is_v1_env_var(v: Any) -> TypeGuard[_V1EnvVarLike]:
172-
return isinstance(v, _get_v1_env_var_type())
173-
174-
175184
class SecretsMasker(logging.Filter):
176185
"""Redact secrets from logs."""
177186

shared/secrets_masker/tests/secrets_masker/test_secrets_masker.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1110,3 +1110,44 @@ def test_merge_round_trip(self):
11101110
assert final_dict["api"]["api_key"] == "new_api_key_67890" # User modification kept
11111111
assert final_dict["api"]["timeout"] == 60 # User modification kept
11121112
assert final_dict["app_name"] == "my_application" # Unchanged
1113+
1114+
1115+
class TestKubernetesImportAvoidance:
1116+
"""Test that secrets masker doesn't import kubernetes unnecessarily."""
1117+
1118+
def test_no_k8s_import_when_not_needed(self):
1119+
"""Ensure kubernetes is not imported when masking non-k8s secrets."""
1120+
# Ensure kubernetes is not already imported
1121+
k8s_modules = [m for m in sys.modules if m.startswith("kubernetes")]
1122+
if k8s_modules:
1123+
pytest.skip("Kubernetes already imported, cannot test import avoidance")
1124+
1125+
masker = SecretsMasker()
1126+
configure_secrets_masker_for_test(masker)
1127+
1128+
masker.add_mask("test_secret", "password")
1129+
redacted = masker.redact({"password": "test_secret", "user": "admin"})
1130+
1131+
assert redacted["password"] == "***"
1132+
assert redacted["user"] == "admin"
1133+
1134+
assert "kubernetes.client" not in sys.modules
1135+
1136+
def test_k8s_objects_still_detected_when_imported(self):
1137+
"""Ensure V1EnvVar objects are still properly detected when k8s is imported."""
1138+
pytest.importorskip("kubernetes")
1139+
1140+
from kubernetes.client import V1EnvVar
1141+
1142+
# Create a V1EnvVar object with a sensitive name
1143+
env_var = V1EnvVar(name="password", value="secret123")
1144+
1145+
masker = SecretsMasker()
1146+
configure_secrets_masker_for_test(masker)
1147+
1148+
# Redact the V1EnvVar object - the name field is sensitive
1149+
redacted = masker.redact(env_var)
1150+
1151+
# Should be redacted since "password" is a sensitive field name
1152+
assert redacted["value"] == "***"
1153+
assert redacted["name"] == "password"

0 commit comments

Comments
 (0)