Skip to content

Commit 22df319

Browse files
Kubernetes: allow opt-out of rescheduling on SIGTERM by setting PREFECT_FLOW_RUN_EXECUTE_SIGTERM_BEHAVIOR (#19223)
1 parent a136290 commit 22df319

2 files changed

Lines changed: 42 additions & 4 deletions

File tree

src/integrations/prefect-kubernetes/prefect_kubernetes/worker.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -417,8 +417,9 @@ def _configure_eviction_handling(self):
417417
"""
418418
Configures eviction handling for the job pod. Needs to run before
419419
420-
If `backoffLimit` is set to 0, we'll tell the Runner to reschedule
421-
its flow run when it receives a SIGTERM.
420+
If `backoffLimit` is set to 0 and `PREFECT_FLOW_RUN_EXECUTE_SIGTERM_BEHAVIOR` is
421+
not set in env, we'll tell the Runner to reschedule its flow run when it receives
422+
a SIGTERM.
422423
423424
If `backoffLimit` is set to a positive number, we'll ensure that the
424425
reschedule SIGTERM handling is not set. Having both a `backoffLimit` and
@@ -428,7 +429,8 @@ def _configure_eviction_handling(self):
428429
# its flow run when it receives a SIGTERM.
429430
if self.job_manifest["spec"].get("backoffLimit") == 0:
430431
if isinstance(self.env, dict):
431-
self.env["PREFECT_FLOW_RUN_EXECUTE_SIGTERM_BEHAVIOR"] = "reschedule"
432+
if not self.env.get("PREFECT_FLOW_RUN_EXECUTE_SIGTERM_BEHAVIOR"):
433+
self.env["PREFECT_FLOW_RUN_EXECUTE_SIGTERM_BEHAVIOR"] = "reschedule"
432434
elif not any(
433435
v.get("name") == "PREFECT_FLOW_RUN_EXECUTE_SIGTERM_BEHAVIOR"
434436
for v in self.env
@@ -646,7 +648,9 @@ class KubernetesWorkerVariables(BaseVariables):
646648
title="Backoff Limit",
647649
description=(
648650
"The number of times Kubernetes will retry a job after pod eviction. "
649-
"If set to 0, Prefect will reschedule the flow run when the pod is evicted."
651+
"If set to 0, Prefect will reschedule the flow run when the pod is evicted "
652+
"unless PREFECT_FLOW_RUN_EXECUTE_SIGTERM_BEHAVIOR is set to value "
653+
"different from 'reschedule'."
650654
),
651655
)
652656
finished_job_ttl: Optional[int] = Field(

src/integrations/prefect-kubernetes/tests/test_worker.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2195,6 +2195,40 @@ async def test_sets_environment_variables(
21952195
}.items()
21962196
]
21972197

2198+
async def test_does_not_overwrite_sigterm_behavior_env(
2199+
self,
2200+
flow_run,
2201+
mock_core_client,
2202+
mock_watch,
2203+
mock_pods_stream_that_returns_running_pod,
2204+
mock_batch_client,
2205+
):
2206+
mock_watch.return_value.stream = mock_pods_stream_that_returns_running_pod
2207+
2208+
configuration = await KubernetesWorkerJobConfiguration.from_template_and_values(
2209+
KubernetesWorker.get_default_base_job_template(),
2210+
{"env": {"PREFECT_FLOW_RUN_EXECUTE_SIGTERM_BEHAVIOR": "die"}},
2211+
)
2212+
configuration.prepare_for_flow_run(flow_run)
2213+
2214+
async with KubernetesWorker(work_pool_name="test") as k8s_worker:
2215+
await k8s_worker.run(flow_run, configuration)
2216+
mock_batch_client.return_value.create_namespaced_job.assert_called_once()
2217+
2218+
manifest = mock_batch_client.return_value.create_namespaced_job.call_args[
2219+
0
2220+
][1]
2221+
pod = manifest["spec"]["template"]["spec"]
2222+
env = pod["containers"][0]["env"]
2223+
assert env == [
2224+
{"name": key, "value": value}
2225+
for key, value in {
2226+
**configuration._base_environment(),
2227+
**configuration._base_flow_run_environment(flow_run),
2228+
"PREFECT_FLOW_RUN_EXECUTE_SIGTERM_BEHAVIOR": "die",
2229+
}.items()
2230+
]
2231+
21982232
async def test_uses_custom_env_list_from_base_template(
21992233
self,
22002234
flow_run,

0 commit comments

Comments
 (0)