From 4e5632c711ad72ddd62ccb7d9bde344daf751dd1 Mon Sep 17 00:00:00 2001 From: Ranuga <79456372+Programmer-RD-AI@users.noreply.github.com> Date: Tue, 21 Oct 2025 20:32:31 +0530 Subject: [PATCH] [v3-1-test] docs: Enhance triggering_asset_event retrieval documentation in DAGs (#52666) (#52674) Closes #52666 (cherry picked from commit 4f7908cc8830be4bdde7f53ca39749a1e1863e77) Co-authored-by: Ranuga <79456372+Programmer-RD-AI@users.noreply.github.com> --- .../asset-scheduling.rst | 88 ++++++++++++++++--- 1 file changed, 76 insertions(+), 12 deletions(-) diff --git a/airflow-core/docs/authoring-and-scheduling/asset-scheduling.rst b/airflow-core/docs/authoring-and-scheduling/asset-scheduling.rst index 51ae64c62c7a5..4feaf7e24a97c 100644 --- a/airflow-core/docs/authoring-and-scheduling/asset-scheduling.rst +++ b/airflow-core/docs/authoring-and-scheduling/asset-scheduling.rst @@ -152,16 +152,35 @@ Fetching information from a triggering asset event A triggered Dag can fetch information from the asset that triggered it using the ``triggering_asset_events`` template or parameter. See more at :ref:`templates-ref`. -Example: +The ``triggering_asset_events`` is a dictionary that looks like this: .. code-block:: python - example_snowflake_asset = Asset("snowflake://my_db/my_schema/my_table") + { + Asset("s3://asset-bucket/example.csv"): [ + AssetEvent(uri="s3://asset-bucket/example.csv", source_dag_run=DagRun(...), ...), + ..., + ], + Asset("s3://another-bucket/another.csv"): [ + AssetEvent(uri="s3://another-bucket/another.csv", source_dag_run=DagRun(...), ...), + ..., + ], + } - with DAG(dag_id="load_snowflake_data", schedule="@hourly", ...): - SQLExecuteQueryOperator( - task_id="load", conn_id="snowflake_default", outlets=[example_snowflake_asset], ... - ) +You can access this information in your tasks using Jinja templating or directly in Python functions. + +Accessing triggering asset events with Jinja +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +You can use Jinja templating to pass information from the triggering asset events to your operators. + +**Example: Single Triggering Asset** + +If your DAG is triggered by a single asset, you can access its information like this: + +.. code-block:: python + + example_snowflake_asset = Asset("snowflake://my_db/my_schema/my_table") with DAG(dag_id="query_snowflake_data", schedule=[example_snowflake_asset], ...): SQLExecuteQueryOperator( @@ -175,13 +194,58 @@ Example: """, ) - @task - def print_triggering_asset_events(triggering_asset_events=None): - for asset, asset_list in triggering_asset_events.items(): - print(asset, asset_list) - print(asset_list[0].source_dag_run.dag_id) +In this example, ``triggering_asset_events.values() | first | first`` does the following: +1. ``triggering_asset_events.values()``: Gets a list of all lists of asset events. +2. ``| first``: Gets the first list of asset events (since we only have one triggering asset). +3. ``| first``: Gets the first ``AssetEvent`` from that list. + +**Example: Multiple Triggering Assets** + +When your DAG is triggered by multiple assets, you can iterate through them in your Jinja template. + +.. code-block:: python + + with DAG(dag_id="process_assets", schedule=[asset1, asset2], ...): + BashOperator( + task_id="process", + bash_command=""" + {% for asset_uri, events in triggering_asset_events.items() %} + echo "Processing asset: {{ asset_uri }}" + {% for event in events %} + echo " Triggered by DAG: {{ event.source_dag_run.dag_id }}" + echo " Data interval start: {{ event.source_dag_run.data_interval_start }}" + echo " Data interval end: {{ event.source_dag_run.data_interval_end }}" + {% endfor %} + {% endfor %} + """, + ) + + +Accessing triggering asset events in Python +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +You can also access the ``triggering_asset_events`` directly in a Python function by passing it as a parameter. + +.. code-block:: python + + @task + def print_triggering_asset_events(triggering_asset_events=None): + if triggering_asset_events: + for asset, asset_events in triggering_asset_events.items(): + print(f"Asset: {asset.uri}") + for event in asset_events: + print(f" - Triggered by DAG run: {event.source_dag_run.dag_id}") + print( + f" Data interval: {event.source_dag_run.data_interval_start} to {event.source_dag_run.data_interval_end}" + ) + print(f" Run ID: {event.source_dag_run.run_id}") + print(f" Timestamp: {event.timestamp}") + + + print_triggering_asset_events() - print_triggering_asset_events() +.. note:: + When a DAG is scheduled by multiple assets, there may be multiple asset events for each asset. The logic for handling these events can be complex. It is up to the DAG author to decide how to process them. For example, you might want to process all new data since the last run, or you might want to process each triggering event individually. Note that this example is using `(.values() | first | first) `_ to fetch the first of one asset given to the Dag, and the first of one AssetEvent for that asset. An implementation can be quite complex if you