Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions airflow-core/src/airflow/api/common/trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ def _trigger_dag(
else:
data_interval = None

run_id = run_id or DagRun.generate_run_id(
run_id = run_id or dag.timetable.generate_run_id(
run_type=DagRunType.MANUAL,
logical_date=coerced_logical_date,
run_after=timezone.coerce_datetime(run_after),
data_interval=data_interval,
)

# This intentionally does not use 'session' in the current scope because it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from airflow._shared.timezones import timezone
from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel
from airflow.api_fastapi.core_api.datamodels.dag_versions import DagVersionResponse
from airflow.models import DagRun
from airflow.timetables.base import DataInterval
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType, DagRunType
Expand Down Expand Up @@ -129,10 +128,10 @@ def validate_context(self, dag: SerializedDAG) -> dict:
)
run_after = data_interval.end

run_id = self.dag_run_id or DagRun.generate_run_id(
run_type=DagRunType.SCHEDULED,
logical_date=coerced_logical_date,
run_after=run_after,
run_id = self.dag_run_id or dag.timetable.generate_run_id(
run_type=DagRunType.MANUAL,
run_after=timezone.coerce_datetime(run_after),
data_interval=data_interval,
)
return {
"run_id": run_id,
Expand All @@ -143,14 +142,6 @@ def validate_context(self, dag: SerializedDAG) -> dict:
"note": self.note,
}

@model_validator(mode="after")
def validate_dag_run_id(self):
if not self.dag_run_id:
self.dag_run_id = DagRun.generate_run_id(
run_type=DagRunType.MANUAL, logical_date=self.logical_date, run_after=self.run_after
)
return self


class DAGRunsBatchBody(StrictBaseModel):
"""List DAG Runs body for batch endpoint."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk.definitions.asset import Asset
from airflow.sdk.definitions.param import Param
from airflow.timetables.interval import CronDataIntervalTimetable
from airflow.utils.session import provide_session
from airflow.utils.state import DagRunState, State
from airflow.utils.types import DagRunTriggeredByType, DagRunType
Expand All @@ -50,9 +51,44 @@

if TYPE_CHECKING:
from airflow.models.dag_version import DagVersion
from airflow.timetables.base import DataInterval

pytestmark = pytest.mark.db_test


class CustomTimetable(CronDataIntervalTimetable):
"""Custom timetable that generates custom run IDs."""

def generate_run_id(
self,
*,
run_type: DagRunType,
run_after,
data_interval: DataInterval | None,
**kwargs,
) -> str:
if data_interval:
return f"custom_{data_interval.start.strftime('%Y%m%d%H%M%S')}"
return f"custom_manual_{run_after.strftime('%Y%m%d%H%M%S')}"


@pytest.fixture
def custom_timetable_plugin(monkeypatch):
"""Fixture to register CustomTimetable for serialization."""
from airflow import plugins_manager
from airflow.utils.module_loading import qualname

timetable_class_name = qualname(CustomTimetable)
existing_timetables = getattr(plugins_manager, "timetable_classes", None) or {}

monkeypatch.setattr(plugins_manager, "initialize_timetables_plugins", lambda: None)
monkeypatch.setattr(
plugins_manager,
"timetable_classes",
{**existing_timetables, timetable_class_name: CustomTimetable},
)


DAG1_ID = "test_dag1"
DAG1_DISPLAY_NAME = "test_dag1"
DAG2_ID = "test_dag2"
Expand Down Expand Up @@ -1772,6 +1808,44 @@ def test_should_respond_200_with_null_logical_date(self, test_client):
"note": None,
}

@time_machine.travel("2025-10-02 12:00:00", tick=False)
@pytest.mark.usefixtures("custom_timetable_plugin")
def test_custom_timetable_generate_run_id_for_manual_trigger(self, dag_maker, test_client, session):
"""Test that custom timetable's generate_run_id is used for manual triggers (issue #55908)."""
custom_dag_id = "test_custom_timetable_dag"
with dag_maker(
dag_id=custom_dag_id,
schedule=CustomTimetable("0 0 * * *", timezone="UTC"),
session=session,
serialized=True,
):
EmptyOperator(task_id="test_task")

session.commit()

logical_date = datetime(2025, 10, 1, 0, 0, 0, tzinfo=timezone.utc)
response = test_client.post(
f"/dags/{custom_dag_id}/dagRuns",
json={"logical_date": logical_date.isoformat()},
)
assert response.status_code == 200
run_id_with_logical_date = response.json()["dag_run_id"]
assert run_id_with_logical_date.startswith("custom_")

run = session.query(DagRun).filter(DagRun.run_id == run_id_with_logical_date).one()
assert run.dag_id == custom_dag_id

response = test_client.post(
f"/dags/{custom_dag_id}/dagRuns",
json={"logical_date": None},
)
assert response.status_code == 200
run_id_without_logical_date = response.json()["dag_run_id"]
assert run_id_without_logical_date.startswith("custom_manual_")

run = session.query(DagRun).filter(DagRun.run_id == run_id_without_logical_date).one()
assert run.dag_id == custom_dag_id


class TestWaitDagRun:
# The way we init async engine does not work well with FastAPI app init.
Expand Down