How Airflow's ExternalTaskSensor Doesn’t Work

(23. 8. 2021) Apache Airflow

Apache Airflow is a great orchestration tool for daily use. One would call it a “crontab” on steroids. It allows the creation of pipelines of tasks, grouped in so-called DAGs (directed acyclic graphs). Each DAG has its own schedule defined in cron format. Common ETL/ELT practice is to put all tables from a single source to the same DAG, which allows to manage database concurrency requests with a single parameter (concurrency).

How clever this parameter looks, so limiting it actually is. Let's have a typical ETL pipeline - extract multiple tables from source databases or files, do some magic with data and load them to the target database. Once the monster ETL pipeline is done, and the first neverending test run is over, the optimizations come into place and realizing that each database of this process has its own maximum concurrency limits! Following common sense is to split still fresh pipeline into smaller DAGs by database connection, and create a “master” DAG to orchestrate subDAGs. Yes, this will work until one realizes that extracted data could be used in some other process too, but the end of data extraction is known in the master DAG only. So finally, it's time to do it correctly - “asynchronously”.

Asynchronous process consists of multiple stages, where the following stage waits until all DAGs from the previous stage are done. Just for waiting purposes Airflow has “ExternalTaskSensor”, which by definition: Waits for a different DAG or a task in a different DAG to complete for a specific execution_date.

Let’s test this example:

 
from datetime import timedelta
from airflow import DAG
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from airflow.models import DagRun
from airflow import settings
from airflow import models

args = {
    'owner': 'airflow',
}

# First DAG - sleeps for a while for demonstrational purposes

with DAG(
    dag_id="ets_demo_1",
    default_args=args,
    schedule_interval=None,
    start_date=days_ago(2),
    tags=['demo'],
        catchup=False,
) as dag1:

    BashOperator(
        task_id="procrastinate",
        bash_command="sleep 10m"
    )

# Second DAG - echoes something after first DAG is done

with DAG(
    dag_id="ets_demo_2",
    default_args=args,
    schedule_interval=None,
    start_date=days_ago(2),
    tags=['demo'],
    catchup=False,
) as dag2:

    wait = ExternalTaskSensor(
        task_id="wait_for_DAG1",
        external_dag_id='ets_demo_1'
    )

    something = BashOperator(
        task_id="do_someting",
        bash_command="echo something"
    )

    wait >> something

Notice that none of DAGs have set schedule_interval, so one would think that running first and then second job should do the job. Apparently not. First DAG finishes as expected, but the second never starts. Googling “ExternalTaskSensor stuck” gives in hundreds results the same clear explanation - ExternalTaskSensor assumes that both DAGs are scheduled for the same moment in time. Finally the ending words from ExternalTaskSensor definition “... for a specific execution_date” comes clear.

Marc Lamberti in his video Apache Airflow: The ExternalTaskSensor [ Apache Airflow: The ExternalTaskSensor demystified](https://www.youtube.com/watch?v=Oemg-3aiAiI) will give you a nice explanation why it is so. In a nutshell it is a protection that the sensor is triggered by correct run. No option to trigger the very last run of DAG1 as one would expect. ExternalTaskSensor doesn’t work so.

But what if there is a real situation, where the schedule of the first or second DAG is unknown, because they are triggered externally or manually? ExternalTaskSensor has two optional parameters (only one of them can be set):

Fortunately Airflow architecture gives nice access to its internal database.
Following function will return execution date of DAG1:

 
    def get_execution_date(dt, **kwargs):
    session = settings.Session()
    dr = session.query(DagRun)\
        .filter(DagRun.dag_id == kwargs['task'].external_dag_id)\
        .order_by(DagRun.execution_date.desc())\
        .first()
    return dr.execution_date

It does following:

  1. Store session information from settings.
  2. Query session - list all DagRuns.
  3. Filter DagRuns, which have dag_id equal to external_dag_id parameter of invoking task (ExternalTaskSensor)
  4. Sort filtered records descending by execution_date
  5. Store first record to dr.
  6. Return execution_date property

You can easily extend functionality by comparing dr.execution_date with dt (current execution date) to be sure that found DagRun is within the expected range (eg. 12 hours). Finally working DAGs:

from datetime import timedelta
from airflow import DAG
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from airflow.models import DagRun
from airflow import settings
from airflow import models

args = {
    'owner': 'airflow',
}

def get_execution_date(dt, **kwargs):
    session = settings.Session()
    dr = session.query(DagRun)\
        .filter(DagRun.dag_id == kwargs['task'].external_dag_id)\
        .order_by(DagRun.execution_date.desc())\
        .first()
    return dr.execution_date
        
# First DAG - sleeps for a while for demonstrational purposes

with DAG(
    dag_id="ets_demo_1",
    default_args=args,
    schedule_interval=None,
    start_date=days_ago(2),
    tags=['demo'],
    catchup=False,
) as dag1:

    BashOperator(
        task_id="procrastinate",
        bash_command="sleep 10m"
    )

# Second DAG - echoes something after first DAG is done

with DAG(
    dag_id="ets_demo_2",
    default_args=args,
    schedule_interval=None,
    start_date=days_ago(2),
    tags=['demo'],
    catchup=False,
) as dag2:

    wait = ExternalTaskSensor(
        task_id="wait_for_DAG1",
        external_dag_id='ets_demo_1',
        execution_date_fn=get_execution_date
    )

    something = BashOperator(
        task_id="do_someting",
        bash_command="echo something"
    )

    wait >> something

Volné pozice

O autorovi

Ivan Ignáth BI - Team leader