Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 76 additions & 12 deletions airflow-core/docs/authoring-and-scheduling/asset-scheduling.rst
Original file line number Diff line number Diff line change
Expand Up @@ -152,16 +152,35 @@ Fetching information from a triggering asset event

A triggered Dag can fetch information from the asset that triggered it using the ``triggering_asset_events`` template or parameter. See more at :ref:`templates-ref`.

Example:
The ``triggering_asset_events`` is a dictionary that looks like this:

.. code-block:: python

example_snowflake_asset = Asset("snowflake://my_db/my_schema/my_table")
{
Asset("s3://asset-bucket/example.csv"): [
AssetEvent(uri="s3://asset-bucket/example.csv", source_dag_run=DagRun(...), ...),
...,
],
Asset("s3://another-bucket/another.csv"): [
AssetEvent(uri="s3://another-bucket/another.csv", source_dag_run=DagRun(...), ...),
...,
],
}

with DAG(dag_id="load_snowflake_data", schedule="@hourly", ...):
SQLExecuteQueryOperator(
task_id="load", conn_id="snowflake_default", outlets=[example_snowflake_asset], ...
)
You can access this information in your tasks using Jinja templating or directly in Python functions.

Accessing triggering asset events with Jinja
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

You can use Jinja templating to pass information from the triggering asset events to your operators.

**Example: Single Triggering Asset**

If your DAG is triggered by a single asset, you can access its information like this:

.. code-block:: python

example_snowflake_asset = Asset("snowflake://my_db/my_schema/my_table")

with DAG(dag_id="query_snowflake_data", schedule=[example_snowflake_asset], ...):
SQLExecuteQueryOperator(
Expand All @@ -175,13 +194,58 @@ Example:
""",
)

@task
def print_triggering_asset_events(triggering_asset_events=None):
for asset, asset_list in triggering_asset_events.items():
print(asset, asset_list)
print(asset_list[0].source_dag_run.dag_id)
In this example, ``triggering_asset_events.values() | first | first`` does the following:
1. ``triggering_asset_events.values()``: Gets a list of all lists of asset events.
2. ``| first``: Gets the first list of asset events (since we only have one triggering asset).
3. ``| first``: Gets the first ``AssetEvent`` from that list.

**Example: Multiple Triggering Assets**

When your DAG is triggered by multiple assets, you can iterate through them in your Jinja template.

.. code-block:: python

with DAG(dag_id="process_assets", schedule=[asset1, asset2], ...):
BashOperator(
task_id="process",
bash_command="""
{% for asset_uri, events in triggering_asset_events.items() %}
echo "Processing asset: {{ asset_uri }}"
{% for event in events %}
echo " Triggered by DAG: {{ event.source_dag_run.dag_id }}"
echo " Data interval start: {{ event.source_dag_run.data_interval_start }}"
echo " Data interval end: {{ event.source_dag_run.data_interval_end }}"
{% endfor %}
{% endfor %}
""",
)


Accessing triggering asset events in Python
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

You can also access the ``triggering_asset_events`` directly in a Python function by passing it as a parameter.

.. code-block:: python

@task
def print_triggering_asset_events(triggering_asset_events=None):
if triggering_asset_events:
for asset, asset_events in triggering_asset_events.items():
print(f"Asset: {asset.uri}")
for event in asset_events:
print(f" - Triggered by DAG run: {event.source_dag_run.dag_id}")
print(
f" Data interval: {event.source_dag_run.data_interval_start} to {event.source_dag_run.data_interval_end}"
)
print(f" Run ID: {event.source_dag_run.run_id}")
print(f" Timestamp: {event.timestamp}")


print_triggering_asset_events()

print_triggering_asset_events()
.. note::
When a DAG is scheduled by multiple assets, there may be multiple asset events for each asset. The logic for handling these events can be complex. It is up to the DAG author to decide how to process them. For example, you might want to process all new data since the last run, or you might want to process each triggering event individually.

Note that this example is using `(.values() | first | first) <https://jinja.palletsprojects.com/en/3.1.x/templates/#jinja-filters.first>`_ to
fetch the first of one asset given to the Dag, and the first of one AssetEvent for that asset. An implementation can be quite complex if you
Expand Down