Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import logging
import math
import re
import sys
import weakref
from collections.abc import Collection, Iterable, Iterator, Mapping, Sequence
from functools import cached_property, lru_cache
Expand Down Expand Up @@ -3814,6 +3815,11 @@ def _has_kubernetes() -> bool:
if "HAS_KUBERNETES" in globals():
return HAS_KUBERNETES

# Check if kubernetes is already imported before triggering expensive import
if "kubernetes.client" not in sys.modules:
HAS_KUBERNETES = False
return False

# Loading kube modules is expensive, so delay it until the last moment

try:
Expand Down
28 changes: 28 additions & 0 deletions airflow-core/tests/unit/serialization/test_serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import json
import math
import sys
from collections.abc import Iterator
from datetime import datetime, timedelta

Expand Down Expand Up @@ -74,6 +75,7 @@
BaseSerialization,
LazyDeserializedDAG,
SerializedDAG,
_has_kubernetes,
create_scheduler_operator,
)
from airflow.triggers.base import BaseTrigger
Expand Down Expand Up @@ -718,3 +720,29 @@ def test_resume_execution(self):
next_kwargs={"error": TriggerFailureReason.TRIGGER_TIMEOUT},
context={},
)


class TestKubernetesImportAvoidance:
"""Test that serialization doesn't import kubernetes unnecessarily."""

def test_has_kubernetes_no_import_when_not_needed(self):
"""Ensure _has_kubernetes() doesn't import k8s when not already loaded."""
# Remove kubernetes from sys.modules if present
k8s_modules = [m for m in list(sys.modules.keys()) if m.startswith("kubernetes")]
if k8s_modules:
pytest.skip("Kubernetes already imported, cannot test import avoidance")

# Call _has_kubernetes() - should check sys.modules and return False without importing
result = _has_kubernetes()

assert result is False
assert "kubernetes.client" not in sys.modules

def test_has_kubernetes_uses_existing_import(self):
"""Ensure _has_kubernetes() uses k8s when it's already imported."""
pytest.importorskip("kubernetes")

# Now k8s is imported, should return True
result = _has_kubernetes()

assert result is True
Original file line number Diff line number Diff line change
Expand Up @@ -158,20 +158,29 @@ def reset_secrets_masker() -> None:
_secrets_masker().reset_masker()


def _is_v1_env_var(v: Any) -> TypeGuard[_V1EnvVarLike]:
"""Check if object is V1EnvVar, avoiding unnecessary imports."""
# Quick check: if k8s not imported, can't be a V1EnvVar instance
if "kubernetes.client" not in sys.modules:
return False

# K8s is loaded, safe to get/cache the type
v1_type = _get_v1_env_var_type_cached()
return isinstance(v, v1_type)


@cache
def _get_v1_env_var_type() -> type:
def _get_v1_env_var_type_cached() -> type:
"""Get V1EnvVar type (cached, only called when k8s is already loaded)."""
try:
from kubernetes.client import V1EnvVar

return V1EnvVar
except ImportError:
# Shouldn't happen since we check sys.modules first
return type("V1EnvVar", (), {})


def _is_v1_env_var(v: Any) -> TypeGuard[_V1EnvVarLike]:
return isinstance(v, _get_v1_env_var_type())


class SecretsMasker(logging.Filter):
"""Redact secrets from logs."""

Expand Down
41 changes: 41 additions & 0 deletions shared/secrets_masker/tests/secrets_masker/test_secrets_masker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1110,3 +1110,44 @@ def test_merge_round_trip(self):
assert final_dict["api"]["api_key"] == "new_api_key_67890" # User modification kept
assert final_dict["api"]["timeout"] == 60 # User modification kept
assert final_dict["app_name"] == "my_application" # Unchanged


class TestKubernetesImportAvoidance:
"""Test that secrets masker doesn't import kubernetes unnecessarily."""

def test_no_k8s_import_when_not_needed(self):
"""Ensure kubernetes is not imported when masking non-k8s secrets."""
# Ensure kubernetes is not already imported
k8s_modules = [m for m in sys.modules if m.startswith("kubernetes")]
if k8s_modules:
pytest.skip("Kubernetes already imported, cannot test import avoidance")

masker = SecretsMasker()
configure_secrets_masker_for_test(masker)

masker.add_mask("test_secret", "password")
redacted = masker.redact({"password": "test_secret", "user": "admin"})

assert redacted["password"] == "***"
assert redacted["user"] == "admin"

assert "kubernetes.client" not in sys.modules

def test_k8s_objects_still_detected_when_imported(self):
"""Ensure V1EnvVar objects are still properly detected when k8s is imported."""
pytest.importorskip("kubernetes")

from kubernetes.client import V1EnvVar

# Create a V1EnvVar object with a sensitive name
env_var = V1EnvVar(name="password", value="secret123")

masker = SecretsMasker()
configure_secrets_masker_for_test(masker)

# Redact the V1EnvVar object - the name field is sensitive
redacted = masker.redact(env_var)

# Should be redacted since "password" is a sensitive field name
assert redacted["value"] == "***"
assert redacted["name"] == "password"
Loading