Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -417,8 +417,9 @@ def _configure_eviction_handling(self):
"""
Configures eviction handling for the job pod. Needs to run before

If `backoffLimit` is set to 0, we'll tell the Runner to reschedule
its flow run when it receives a SIGTERM.
If `backoffLimit` is set to 0 and `PREFECT_FLOW_RUN_EXECUTE_SIGTERM_BEHAVIOR` is
not set in env, we'll tell the Runner to reschedule its flow run when it receives
a SIGTERM.

If `backoffLimit` is set to a positive number, we'll ensure that the
reschedule SIGTERM handling is not set. Having both a `backoffLimit` and
Expand All @@ -428,7 +429,8 @@ def _configure_eviction_handling(self):
# its flow run when it receives a SIGTERM.
if self.job_manifest["spec"].get("backoffLimit") == 0:
if isinstance(self.env, dict):
self.env["PREFECT_FLOW_RUN_EXECUTE_SIGTERM_BEHAVIOR"] = "reschedule"
if not self.env.get("PREFECT_FLOW_RUN_EXECUTE_SIGTERM_BEHAVIOR"):
self.env["PREFECT_FLOW_RUN_EXECUTE_SIGTERM_BEHAVIOR"] = "reschedule"
elif not any(
v.get("name") == "PREFECT_FLOW_RUN_EXECUTE_SIGTERM_BEHAVIOR"
for v in self.env
Expand Down Expand Up @@ -646,7 +648,9 @@ class KubernetesWorkerVariables(BaseVariables):
title="Backoff Limit",
description=(
"The number of times Kubernetes will retry a job after pod eviction. "
"If set to 0, Prefect will reschedule the flow run when the pod is evicted."
"If set to 0, Prefect will reschedule the flow run when the pod is evicted "
"unless PREFECT_FLOW_RUN_EXECUTE_SIGTERM_BEHAVIOR is set to value "
"different from 'reschedule'."
),
)
finished_job_ttl: Optional[int] = Field(
Expand Down
34 changes: 34 additions & 0 deletions src/integrations/prefect-kubernetes/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2195,6 +2195,40 @@ async def test_sets_environment_variables(
}.items()
]

async def test_does_not_overwrite_sigterm_behavior_env(
self,
flow_run,
mock_core_client,
mock_watch,
mock_pods_stream_that_returns_running_pod,
mock_batch_client,
):
mock_watch.return_value.stream = mock_pods_stream_that_returns_running_pod

configuration = await KubernetesWorkerJobConfiguration.from_template_and_values(
KubernetesWorker.get_default_base_job_template(),
{"env": {"PREFECT_FLOW_RUN_EXECUTE_SIGTERM_BEHAVIOR": "die"}},
)
configuration.prepare_for_flow_run(flow_run)

async with KubernetesWorker(work_pool_name="test") as k8s_worker:
await k8s_worker.run(flow_run, configuration)
mock_batch_client.return_value.create_namespaced_job.assert_called_once()

manifest = mock_batch_client.return_value.create_namespaced_job.call_args[
0
][1]
pod = manifest["spec"]["template"]["spec"]
env = pod["containers"][0]["env"]
assert env == [
{"name": key, "value": value}
for key, value in {
**configuration._base_environment(),
**configuration._base_flow_run_environment(flow_run),
"PREFECT_FLOW_RUN_EXECUTE_SIGTERM_BEHAVIOR": "die",
}.items()
]

async def test_uses_custom_env_list_from_base_template(
self,
flow_run,
Expand Down