|
33 | 33 | from airflow.providers.standard.operators.empty import EmptyOperator |
34 | 34 | from airflow.sdk.definitions.asset import Asset |
35 | 35 | from airflow.sdk.definitions.param import Param |
| 36 | +from airflow.timetables.interval import CronDataIntervalTimetable |
36 | 37 | from airflow.utils.session import provide_session |
37 | 38 | from airflow.utils.state import DagRunState, State |
38 | 39 | from airflow.utils.types import DagRunTriggeredByType, DagRunType |
|
50 | 51 |
|
51 | 52 | if TYPE_CHECKING: |
52 | 53 | from airflow.models.dag_version import DagVersion |
| 54 | + from airflow.timetables.base import DataInterval |
53 | 55 |
|
54 | 56 | pytestmark = pytest.mark.db_test |
55 | 57 |
|
| 58 | + |
| 59 | +class CustomTimetable(CronDataIntervalTimetable): |
| 60 | + """Custom timetable that generates custom run IDs.""" |
| 61 | + |
| 62 | + def generate_run_id( |
| 63 | + self, |
| 64 | + *, |
| 65 | + run_type: DagRunType, |
| 66 | + run_after, |
| 67 | + data_interval: DataInterval | None, |
| 68 | + **kwargs, |
| 69 | + ) -> str: |
| 70 | + if data_interval: |
| 71 | + return f"custom_{data_interval.start.strftime('%Y%m%d%H%M%S')}" |
| 72 | + return f"custom_manual_{run_after.strftime('%Y%m%d%H%M%S')}" |
| 73 | + |
| 74 | + |
| 75 | +@pytest.fixture |
| 76 | +def custom_timetable_plugin(monkeypatch): |
| 77 | + """Fixture to register CustomTimetable for serialization.""" |
| 78 | + from airflow import plugins_manager |
| 79 | + from airflow.utils.module_loading import qualname |
| 80 | + |
| 81 | + timetable_class_name = qualname(CustomTimetable) |
| 82 | + existing_timetables = getattr(plugins_manager, "timetable_classes", None) or {} |
| 83 | + |
| 84 | + monkeypatch.setattr(plugins_manager, "initialize_timetables_plugins", lambda: None) |
| 85 | + monkeypatch.setattr( |
| 86 | + plugins_manager, |
| 87 | + "timetable_classes", |
| 88 | + {**existing_timetables, timetable_class_name: CustomTimetable}, |
| 89 | + ) |
| 90 | + |
| 91 | + |
56 | 92 | DAG1_ID = "test_dag1" |
57 | 93 | DAG1_DISPLAY_NAME = "test_dag1" |
58 | 94 | DAG2_ID = "test_dag2" |
@@ -1772,6 +1808,44 @@ def test_should_respond_200_with_null_logical_date(self, test_client): |
1772 | 1808 | "note": None, |
1773 | 1809 | } |
1774 | 1810 |
|
| 1811 | + @time_machine.travel("2025-10-02 12:00:00", tick=False) |
| 1812 | + @pytest.mark.usefixtures("custom_timetable_plugin") |
| 1813 | + def test_custom_timetable_generate_run_id_for_manual_trigger(self, dag_maker, test_client, session): |
| 1814 | + """Test that custom timetable's generate_run_id is used for manual triggers (issue #55908).""" |
| 1815 | + custom_dag_id = "test_custom_timetable_dag" |
| 1816 | + with dag_maker( |
| 1817 | + dag_id=custom_dag_id, |
| 1818 | + schedule=CustomTimetable("0 0 * * *", timezone="UTC"), |
| 1819 | + session=session, |
| 1820 | + serialized=True, |
| 1821 | + ): |
| 1822 | + EmptyOperator(task_id="test_task") |
| 1823 | + |
| 1824 | + session.commit() |
| 1825 | + |
| 1826 | + logical_date = datetime(2025, 10, 1, 0, 0, 0, tzinfo=timezone.utc) |
| 1827 | + response = test_client.post( |
| 1828 | + f"/dags/{custom_dag_id}/dagRuns", |
| 1829 | + json={"logical_date": logical_date.isoformat()}, |
| 1830 | + ) |
| 1831 | + assert response.status_code == 200 |
| 1832 | + run_id_with_logical_date = response.json()["dag_run_id"] |
| 1833 | + assert run_id_with_logical_date.startswith("custom_") |
| 1834 | + |
| 1835 | + run = session.query(DagRun).filter(DagRun.run_id == run_id_with_logical_date).one() |
| 1836 | + assert run.dag_id == custom_dag_id |
| 1837 | + |
| 1838 | + response = test_client.post( |
| 1839 | + f"/dags/{custom_dag_id}/dagRuns", |
| 1840 | + json={"logical_date": None}, |
| 1841 | + ) |
| 1842 | + assert response.status_code == 200 |
| 1843 | + run_id_without_logical_date = response.json()["dag_run_id"] |
| 1844 | + assert run_id_without_logical_date.startswith("custom_manual_") |
| 1845 | + |
| 1846 | + run = session.query(DagRun).filter(DagRun.run_id == run_id_without_logical_date).one() |
| 1847 | + assert run.dag_id == custom_dag_id |
| 1848 | + |
1775 | 1849 |
|
1776 | 1850 | class TestWaitDagRun: |
1777 | 1851 | # The way we init async engine does not work well with FastAPI app init. |
|
0 commit comments