Skip to content

Commit c0f9d11

Browse files
authored
Improve DAG and task missing error handling in callbacks (#56725)
When callbacks are executed but the DAG or task is missing from the `DagBag`, users now receive clear error messages explaining what went wrong instead of cryptic `KeyError` exceptions. This typically occurs due to race conditions where DAGs are removed or fail to parse between scheduling and callback execution. Fixes #56701
1 parent 5ae3eaf commit c0f9d11

File tree

2 files changed

+186
-6
lines changed

2 files changed

+186
-6
lines changed

airflow-core/src/airflow/dag_processing/processor.py

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
)
3737
from airflow.configuration import conf
3838
from airflow.dag_processing.dagbag import DagBag
39+
from airflow.exceptions import TaskNotFound
3940
from airflow.sdk.execution_time.comms import (
4041
ConnectionResult,
4142
DeleteVariable,
@@ -71,7 +72,10 @@
7172

7273
from airflow.api_fastapi.execution_api.app import InProcessExecutionAPI
7374
from airflow.sdk.api.client import Client
75+
from airflow.sdk.bases.operator import BaseOperator
7476
from airflow.sdk.definitions.context import Context
77+
from airflow.sdk.definitions.dag import DAG
78+
from airflow.sdk.definitions.mappedoperator import MappedOperator
7579
from airflow.typing_compat import Self
7680

7781

@@ -234,6 +238,39 @@ def _serialize_dags(
234238
return serialized_dags, serialization_import_errors
235239

236240

241+
def _get_dag_with_task(
242+
dagbag: DagBag, dag_id: str, task_id: str | None = None
243+
) -> tuple[DAG, BaseOperator | MappedOperator | None]:
244+
"""
245+
Retrieve a DAG and optionally a task from the DagBag.
246+
247+
:param dagbag: DagBag to retrieve from
248+
:param dag_id: DAG ID to retrieve
249+
:param task_id: Optional task ID to retrieve from the DAG
250+
:return: tuple of (dag, task) where task is None if not requested
251+
:raises ValueError: If DAG or task is not found
252+
"""
253+
if dag_id not in dagbag.dags:
254+
raise ValueError(
255+
f"DAG '{dag_id}' not found in DagBag. "
256+
f"This typically indicates a race condition where the DAG was removed or failed to parse."
257+
)
258+
259+
dag = dagbag.dags[dag_id]
260+
261+
if task_id is not None:
262+
try:
263+
task = dag.get_task(task_id)
264+
return dag, task
265+
except TaskNotFound:
266+
raise ValueError(
267+
f"Task '{task_id}' not found in DAG '{dag_id}'. "
268+
f"This typically indicates a race condition where the task was removed or the DAG structure changed."
269+
) from None
270+
271+
return dag, None
272+
273+
237274
def _execute_callbacks(
238275
dagbag: DagBag, callback_requests: list[CallbackRequest], log: FilteringBoundLogger
239276
) -> None:
@@ -250,8 +287,7 @@ def _execute_callbacks(
250287
def _execute_dag_callbacks(dagbag: DagBag, request: DagCallbackRequest, log: FilteringBoundLogger) -> None:
251288
from airflow.sdk.api.datamodels._generated import TIRunContext
252289

253-
dag = dagbag.dags[request.dag_id]
254-
290+
dag, _ = _get_dag_with_task(dagbag, request.dag_id)
255291
callbacks = dag.on_failure_callback if request.is_failure_callback else dag.on_success_callback
256292
if not callbacks:
257293
log.warning("Callback requested, but dag didn't have any", dag_id=request.dag_id)
@@ -303,8 +339,10 @@ def _execute_task_callbacks(dagbag: DagBag, request: TaskCallbackRequest, log: F
303339
)
304340
return
305341

306-
dag = dagbag.dags[request.ti.dag_id]
307-
task = dag.get_task(request.ti.task_id)
342+
dag, task = _get_dag_with_task(dagbag, request.ti.dag_id, request.ti.task_id)
343+
344+
if TYPE_CHECKING:
345+
assert task is not None
308346

309347
if request.task_callback_type is TaskInstanceState.UP_FOR_RETRY:
310348
callbacks = task.on_retry_callback
@@ -356,8 +394,10 @@ def get_callback_representation(callback):
356394

357395
def _execute_email_callbacks(dagbag: DagBag, request: EmailRequest, log: FilteringBoundLogger) -> None:
358396
"""Execute email notification for task failure/retry."""
359-
dag = dagbag.dags[request.ti.dag_id]
360-
task = dag.get_task(request.ti.task_id)
397+
dag, task = _get_dag_with_task(dagbag, request.ti.dag_id, request.ti.task_id)
398+
399+
if TYPE_CHECKING:
400+
assert task is not None
361401

362402
if not task.email:
363403
log.warning(

airflow-core/tests/unit/dag_processing/test_processor.py

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -901,6 +901,25 @@ def fake_collect_dags(self, *args, **kwargs):
901901
# Should log warning about no callback found
902902
log.warning.assert_called_once_with("Callback requested, but dag didn't have any", dag_id="test_dag")
903903

904+
def test_execute_dag_callbacks_missing_dag(self):
905+
"""Test _execute_dag_callbacks raises ValueError for missing DAG"""
906+
dagbag = DagBag()
907+
908+
request = DagCallbackRequest(
909+
filepath="test.py",
910+
dag_id="missing_dag",
911+
run_id="test_run",
912+
bundle_name="testing",
913+
bundle_version=None,
914+
is_failure_callback=True,
915+
msg="Test failure message",
916+
)
917+
918+
log = structlog.get_logger()
919+
920+
with pytest.raises(ValueError, match="DAG 'missing_dag' not found in DagBag"):
921+
_execute_dag_callbacks(dagbag, request, log)
922+
904923
@pytest.mark.parametrize(
905924
"xcom_operation,expected_message_type,expected_message,mock_response",
906925
[
@@ -1277,6 +1296,58 @@ def fake_collect_dags(self, *args, **kwargs):
12771296

12781297
assert call_count == 2
12791298

1299+
@pytest.mark.parametrize(
1300+
"dag_exists,task_exists,expected_error",
1301+
[
1302+
(False, False, "DAG 'missing_dag' not found in DagBag"),
1303+
(True, False, "Task 'missing_task' not found in DAG 'test_dag'"),
1304+
],
1305+
)
1306+
def test_execute_task_callbacks_missing_dag_or_task(
1307+
self, spy_agency, dag_exists, task_exists, expected_error
1308+
):
1309+
"""Test _execute_task_callbacks raises ValueError for missing DAG or task"""
1310+
if dag_exists:
1311+
with DAG(dag_id="test_dag") as dag:
1312+
BaseOperator(task_id="existing_task")
1313+
1314+
def fake_collect_dags(self, *args, **kwargs):
1315+
self.dags[dag.dag_id] = dag
1316+
1317+
spy_agency.spy_on(DagBag.collect_dags, call_fake=fake_collect_dags, owner=DagBag)
1318+
1319+
dagbag = DagBag()
1320+
dagbag.collect_dags()
1321+
dag_id = "test_dag"
1322+
task_id = "missing_task"
1323+
else:
1324+
dagbag = DagBag()
1325+
dag_id = "missing_dag"
1326+
task_id = "test_task"
1327+
1328+
ti_data = TIDataModel(
1329+
id=uuid.uuid4(),
1330+
dag_id=dag_id,
1331+
task_id=task_id,
1332+
run_id="test_run",
1333+
try_number=1,
1334+
dag_version_id=uuid.uuid4(),
1335+
)
1336+
1337+
request = TaskCallbackRequest(
1338+
filepath="test.py",
1339+
msg="Task failed",
1340+
ti=ti_data,
1341+
bundle_name="testing",
1342+
bundle_version=None,
1343+
task_callback_type=TaskInstanceState.FAILED,
1344+
)
1345+
1346+
log = structlog.get_logger()
1347+
1348+
with pytest.raises(ValueError, match=expected_error):
1349+
_execute_task_callbacks(dagbag, request, log)
1350+
12801351

12811352
class TestExecuteEmailCallbacks:
12821353
"""Test the email callback execution functionality."""
@@ -1517,6 +1588,75 @@ def test_execute_email_callbacks_email_disabled_for_type(self, mock_send_email):
15171588
info_call = log.info.call_args[0][0]
15181589
assert "Email not sent - task configured with email_on_" in info_call
15191590

1591+
@pytest.mark.parametrize(
1592+
"dag_exists,task_exists,expected_error",
1593+
[
1594+
(False, False, "DAG 'missing_dag' not found in DagBag"),
1595+
(True, False, "Task 'missing_task' not found in DAG 'test_dag'"),
1596+
],
1597+
)
1598+
def test_execute_email_callbacks_missing_dag_or_task(
1599+
self, spy_agency, dag_exists, task_exists, expected_error
1600+
):
1601+
"""Test _execute_email_callbacks raises ValueError for missing DAG or task"""
1602+
if dag_exists:
1603+
with DAG(dag_id="test_dag") as dag:
1604+
BaseOperator(task_id="existing_task", email="[email protected]")
1605+
1606+
def fake_collect_dags(self, *args, **kwargs):
1607+
self.dags[dag.dag_id] = dag
1608+
1609+
spy_agency.spy_on(DagBag.collect_dags, call_fake=fake_collect_dags, owner=DagBag)
1610+
1611+
dagbag = DagBag()
1612+
dagbag.collect_dags()
1613+
dag_id = "test_dag"
1614+
task_id = "missing_task"
1615+
else:
1616+
dagbag = DagBag()
1617+
dag_id = "missing_dag"
1618+
task_id = "test_task"
1619+
1620+
ti_data = TIDataModel(
1621+
id=uuid.uuid4(),
1622+
dag_id=dag_id,
1623+
task_id=task_id,
1624+
run_id="test_run",
1625+
try_number=1,
1626+
dag_version_id=uuid.uuid4(),
1627+
)
1628+
1629+
current_time = timezone.utcnow()
1630+
request = EmailRequest(
1631+
filepath="test.py",
1632+
bundle_name="testing",
1633+
bundle_version=None,
1634+
ti=ti_data,
1635+
context_from_server=TIRunContext(
1636+
dag_run=DRDataModel(
1637+
dag_id=dag_id,
1638+
run_id="test_run",
1639+
logical_date=current_time,
1640+
data_interval_start=current_time,
1641+
data_interval_end=current_time,
1642+
run_after=current_time,
1643+
start_date=current_time,
1644+
end_date=None,
1645+
run_type="manual",
1646+
state="running",
1647+
consumed_asset_events=[],
1648+
),
1649+
max_tries=2,
1650+
),
1651+
email_type="failure",
1652+
msg="Task failed",
1653+
)
1654+
1655+
log = structlog.get_logger()
1656+
1657+
with pytest.raises(ValueError, match=expected_error):
1658+
_execute_email_callbacks(dagbag, request, log)
1659+
15201660

15211661
class TestDagProcessingMessageTypes:
15221662
def test_message_types_in_dag_processor(self):

0 commit comments

Comments
 (0)