Skip to content

[Asset] Documentation for v3.1.6 refers to source_dag_run in triggering_asset_events, but code is missing #60579

@lifnaja

Description

@lifnaja

Apache Airflow version

3.1.6

If "Other Airflow 3 version" selected, which one?

No response

What happened?

The documentation for Airflow version 3.1.6 mentions that users can access the source_dag_run through the triggering_asset_events template variable. However, this attribute appears to be missing in the actual tag release of 3.1.6, resulting in an error when trying to use it in templates.

Document : https://airflow.apache.org/docs/apache-airflow/3.1.6/authoring-and-scheduling/asset-scheduling.html#accessing-triggering-asset-events-with-jinja

  • In v3.1.6 tag, the source_dag_run attribute/function is not yet merged or present in the codebase.
  • In the main branch, the code for source_dag_run exists.

Attempting to use this in a DAG on version 3.1.6 results in an AttributeError or undefined variable error within the Jinja template.

Image

What you think should happen instead?

No response

How to reproduce

import logging

import pendulum
from airflow.sdk import DAG, Asset, task


def _say_hello():
    print("Hello!")


asset_1 = Asset("asset_1")
asset_2 = Asset("asset_2")

with DAG(
    dag_id="example_asset_producer_1",
    schedule=None,
    start_date=pendulum.datetime(2024, 1, 1),
    catchup=False,
    tags=["example", "asset"],
) as dag:

    @task(outlets=[asset_1])
    def say_hello():
        _say_hello()

    say_hello()


with DAG(
    dag_id="example_asset_producer_2",
    schedule=None,
    start_date=pendulum.datetime(2024, 1, 1),
    catchup=False,
    tags=["example", "asset"],
) as dag:

    @task(outlets=[asset_2])
    def say_hello():
        _say_hello()

    say_hello()

with DAG(
    dag_id="example_asset_consumer",
    schedule=[asset_1, asset_2],
    start_date=pendulum.datetime(2024, 1, 1),
    catchup=False,
    tags=["example", "asset"],
) as dag:

    @task()
    def get_asset_detail(x):
        logging.info(x)
        logging.info(type(x))
        logging.info(x.__dict__)
        logging.info(x.name)

    get_asset_detail(x='{{ (triggering_asset_events.values() | first | first).source_dag_run.data_interval_end }}')

Operating System

docker

Versions of Apache Airflow Providers

No response

Deployment

Docker-Compose

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:corekind:bugThis is a clearly a bugneeds-triagelabel for new issues that we didn't triage yet

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions