Apache Airflow is a great orchestration tool for daily use. One would call it a “crontab” on steroids. It allows the creation of pipelines of tasks, grouped in so-called DAGs (directed acyclic graphs). Each DAG has its own schedule defined in cron format. Common ETL/ELT practice is to put all tables from a single source to the same DAG, which allows to manage database concurrency requests with a single parameter (concurrency).
How clever this parameter looks, so limiting it actually is. Let's have a typical ETL pipeline - extract multiple tables from source databases or files, do some magic with data and load them to the target database. Once the monster ETL pipeline is done, and the first neverending test run is over, the optimizations come into place and realizing that each database of this process has its own maximum concurrency limits! Following common sense is to split still fresh pipeline into smaller DAGs by database connection, and create a “master” DAG to orchestrate subDAGs. Yes, this will work until one realizes that extracted data could be used in some other process too, but the end of data extraction is known in the master DAG only. So finally, it's time to do it correctly - “asynchronously”.
Asynchronous process consists of multiple stages, where the following stage waits until all DAGs from the previous stage are done. Just for waiting purposes Airflow has “ExternalTaskSensor”, which by definition: Waits for a different DAG or a task in a different DAG to complete for a specific execution_date.
Let’s test this example:
from datetime import timedelta from airflow import DAG from airflow.sensors.external_task import ExternalTaskSensor from airflow.operators.bash import BashOperator from airflow.utils.dates import days_ago from airflow.models import DagRun from airflow import settings from airflow import models args = { 'owner': 'airflow', } # First DAG - sleeps for a while for demonstrational purposes with DAG( dag_id="ets_demo_1", default_args=args, schedule_interval=None, start_date=days_ago(2), tags=['demo'], catchup=False, ) as dag1: BashOperator( task_id="procrastinate", bash_command="sleep 10m" ) # Second DAG - echoes something after first DAG is done with DAG( dag_id="ets_demo_2", default_args=args, schedule_interval=None, start_date=days_ago(2), tags=['demo'], catchup=False, ) as dag2: wait = ExternalTaskSensor( task_id="wait_for_DAG1", external_dag_id='ets_demo_1' ) something = BashOperator( task_id="do_someting", bash_command="echo something" ) wait >> something
Notice that none of DAGs have set schedule_interval, so one would think that running first and then second job should do the job. Apparently not. First DAG finishes as expected, but the second never starts. Googling “ExternalTaskSensor stuck” gives in hundreds results the same clear explanation - ExternalTaskSensor assumes that both DAGs are scheduled for the same moment in time. Finally the ending words from ExternalTaskSensor definition “... for a specific execution_date” comes clear.
Marc Lamberti in his video Apache Airflow: The ExternalTaskSensor [ Apache Airflow: The ExternalTaskSensor demystified](https://www.youtube.com/watch?v=Oemg-3aiAiI) will give you a nice explanation why it is so. In a nutshell it is a protection that the sensor is triggered by correct run. No option to trigger the very last run of DAG1 as one would expect. ExternalTaskSensor doesn’t work so.
But what if there is a real situation, where the schedule of the first or second DAG is unknown, because they are triggered externally or manually? ExternalTaskSensor has two optional parameters (only one of them can be set):
Fortunately Airflow architecture gives nice access to its internal database. Following function will return execution date of DAG1:
def get_execution_date(dt, **kwargs): session = settings.Session() dr = session.query(DagRun)\ .filter(DagRun.dag_id == kwargs['task'].external_dag_id)\ .order_by(DagRun.execution_date.desc())\ .first() return dr.execution_date
It does following:
You can easily extend functionality by comparing dr.execution_date with dt (current execution date) to be sure that found DagRun is within the expected range (eg. 12 hours). Finally working DAGs:
from datetime import timedelta from airflow import DAG from airflow.sensors.external_task import ExternalTaskSensor from airflow.operators.bash import BashOperator from airflow.utils.dates import days_ago from airflow.models import DagRun from airflow import settings from airflow import models args = { 'owner': 'airflow', } def get_execution_date(dt, **kwargs): session = settings.Session() dr = session.query(DagRun)\ .filter(DagRun.dag_id == kwargs['task'].external_dag_id)\ .order_by(DagRun.execution_date.desc())\ .first() return dr.execution_date # First DAG - sleeps for a while for demonstrational purposes with DAG( dag_id="ets_demo_1", default_args=args, schedule_interval=None, start_date=days_ago(2), tags=['demo'], catchup=False, ) as dag1: BashOperator( task_id="procrastinate", bash_command="sleep 10m" ) # Second DAG - echoes something after first DAG is done with DAG( dag_id="ets_demo_2", default_args=args, schedule_interval=None, start_date=days_ago(2), tags=['demo'], catchup=False, ) as dag2: wait = ExternalTaskSensor( task_id="wait_for_DAG1", external_dag_id='ets_demo_1', execution_date_fn=get_execution_date ) something = BashOperator( task_id="do_someting", bash_command="echo something" ) wait >> something