Airflow · Design · General · Linux · MAC · Mysql · postgres · Python · Windows

Building Data Pipelines through Apache Airflow (Magic of ExternalTaskSensor)

Objective 

While building the data pipeline, developers realise a need of setting up the dependencies between 2 DAGs wherein the execution of second DAG depends on the execution of first DAG. On that note, Apache airflow comes with the first class sensor named ExternalTaskSensor which can be used to model these kind of dependencies in the application.

Task Properties

The ExternalTaskSensor task has the following type properties.

PropertyrequiredTypeDescription
external_dag_idtrueStringThe dag_id that contains the task you want to wait for.
external_task_idtrueString or NoneThe task_id that contains the task you want to wait for. If  ‘None’ (default value) the sensor waits for the DAG.
execution_deltafalseStringtime difference with the previous execution to look at, the default is the same execution_date as the current task or DAG. For yesterday, use [positive!] datetime.timedelta(days=1). Either execution_delta or execution_date_fn can be passed to ExternalTaskSensor, but not both.
execution_date_fnfalseStringfunction that receives the current execution date and returns the desired execution dates to query. Either execution_delta or execution_date_fn can be passed to ExternalTaskSensor, but not both.
check_existencefalsebooleanSet to true to check if the external task exists (when external_task_id is not None) or check if the DAG to wait for exists (when external_task_id is None), and immediately cease waiting if the external task or DAG does not exist (default value: false).

With reference to Airflow terminology, Sensors are a certain type of operator that will keep running until a certain criterion is met. Sensors are derived from BaseSensorOperator and run a poke method at a specified poke_interval until it returns True. Moreover, all sensors inherit the timeout and poke_interval on top of the BaseOperator attributes. So, one can override the following type properties with respect to ExternalTaskSensor is required. 

PropertyrequiredTypeDescription
poke_intervalfalseintTime in seconds that the job should wait in between each tries.
time_outfalseintTime, in seconds before the task times out and fails.
modefalseStringOptions are: “{ poke | reschedule }“, default is “poke“. When set to “poke“ the sensor is taking up a worker slot for its whole execution time and sleeps between pokes. Use this mode if the expected runtime of the sensor is short or if a short poke interval is required. When set to “reschedule“ the sensor task frees the worker slot when the criteria is not yet met and it’s rescheduled at a later time. Use this mode if the time before the criteria is met is expected to be quite long. The poke interval should be more than one minute to prevent too much load on the scheduler.

Sample DAGs to understand the working of ExternalTaskSensor

import airflow
from airflow import DAG
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime, timedelta
import dateutil.parser
from airflow.operators.dummy_operator import DummyOperator


default_global_args = {
'owner': 'Tanuj',
'email': ['xyz@gmail.com'],
'email_on_failure': True,
'email_on_retry': True,
'start_date': datetime(2020, 6, 23)
}


dag = DAG(
dag_id = 'DependentJob',
default_args = default_global_args,
schedule_interval= '*/10 * * * *',
max_active_runs = 10
)


DependentOperation = DummyOperator(task_id='DependentOperation',dag=dag,trigger_rule=TriggerRule.ALL_SUCCESS)
Sample DependentJob DAG
from datetime import timedelta
import dateutil.parser
import airflow
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.sensors import ExternalTaskSensor


args = {
'owner': 'Tanuj',
'depends_on_past': False,
'start_date': datetime(2020, 6, 23),
'email': ['xyz@gmail.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}


new DAG definition
dag = DAG(
dag_id='ExternalWorkflow',
default_args=args,
schedule_interval= '*/10 * * * *',
)

external_task = ExternalTaskSensor(external_task_id ='DependentOperation',
task_id='external_task',
external_dag_id = 'DependentJob',
dag=dag)

newjob = DummyOperator(dag=dag, task_id='newjob')

external_task >> newjob
Sample ExternalWorkflow DAG
from datetime import timedelta
import dateutil.parser
import airflow
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.sensors import ExternalTaskSensor


args = {
'owner': 'Tanuj',
'depends_on_past': False,
'start_date': datetime(2020, 6, 23),
'email': ['xyz@gmail.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}


new DAG definition
dag = DAG(
dag_id='ExternalWorkWithExecutionDelta',
default_args=args,
schedule_interval= '*/15 * * * *',
)

external_task = ExternalTaskSensor(external_task_id ='DependentOperation',
task_id='external_task',
external_dag_id = 'DependentJob',
execution_delta=timedelta(minutes=5),
dag=dag)

newjob = DummyOperator(dag=dag, task_id='newjob')

external_task >> newjob
Sample ExternalWorkWithExecutionDelta DAG

Visual Understanding

To understand the visual working of the ExternalTaskSensor, I have created two DAGs named DependentJob and ExternalWorkWithExecutionDelta same as stated in the above python blocks.

Note* Please don’t confuse with the timing. You can see the difference of 5 hours and 30 minutes due to GMT difference as Indian Standard Time (IST) is 5 hours and 30 minutes ahead of Greenwich Mean Time (GMT). So, the above picture is showing GMT time (2020-06-23T00:15:00) and the second above picture is nothing but the IST time like 6AM, 7AM, 8AM etc.

  • DependentJob DAG is having one task named DependentOperation which is nothing but a DummyOperator. 
  • DependentOperation is running every 10 minutes starting from 23rd June.
  • ExternalWorkWithExecutionDelta is having two tasks named external_task and newjob. Task external_task  is a ExternalTaskSensor which waits for the completion of it’s external task (DependentOperation) in the external DAG (DependentJob) given the fact that both the task should have the same execution date.
  • ExternalWorkWithExecutionDelta is running every 15 minutes starting from 23rd June in which external_task is having a execution_delta of 5 mins. It means that this task will be successful when it finds the the difference of 5 minutes in the dependent task execution date.
  • So now if you look at the above visuals closely, you will find that the success of the external task relies in the below data points :
    • DependentJob DAG’s execution time is as follows – 12:00 AM, 12:10 AM, 12:20 AM, 12:30 AM, 12:40 AM, 12:50 AM, 01:00 AM likewise.
    • ExternalWorkWithExecutionDelta DAG’s execution time is as follows – 12:00 AM, 12:15 AM (execution delta – 5 mins, check the dependent DAG – DependentJob execution at 12:10 AM, both the execution dates are matched hence marked as success), 12:30 AM (execution delta – 5 mins, checks DependentJob at 12:25 AM, No match hence running), 12:45 AM (execution delta – 5 mins, checks DependentJob at 12:40 AM, success), 01:00 AM (execution delta – 5 mins, checks for DependentJob at 12:55 AM, No match hence running) likewise. 
  • In addition to it, if neither execution_delta nor execution_date_fn is provided in the DAG then success of the external task/DAG is directly dependent on the success of the dependent task/DAG. Both the DAG’s execution date should be exactly same. 
                                                                           

Keeping same execution date while triggering the DAGs

One of the important thing to notice is that if you are triggering the DAGs manually, they will be running with different execution_date, which is the reason why the ExternalTaskSensor doesn’t detect the completion of the first DAG’s task. So try to run them in the same schedule.

On the same page, execution_delta and execution_date_fn arguments are provided  to synchronised the two DAGs with respect to execution date. Moreover, if schedule is provided as None in the DAG then try to trigger both the dependent DAGs with the same execution date.

Conclusion

As the title named ExternalTaskSensor suggests that it senses for the completion of a state of any task/DAG in airflow. So on that note, We will be using ExternalTaskSensor to set dependencies between our DAGs to build the complicated DATA pipelines, so that one does not run until the dependency is completed. Happy Airflowing 🙂

Leave a comment