Manage Dependencies Between Airflow Deployments, DAGs, and Tasks
Watch Video On Demand
Hosted By
Webinar Links
GitHub (Repo for this Webinar)
Airflow Docs
Astronomer Docs
- Astronomer Guide: Task Groups
- Astronomer Guide: Managing Dependencies in Airflow
- Astronomer Guide: Cross DAG Dependencies
- Astronomer Registry: TriggerDagRunOperator
- Astronomer Guide: BranchPythonOperator
- Astronomer Registry: PythonSensor
- Astronomer Webinar: SmartSensors
- Astronomer Registry: All Sensors
- Astronomer Registry: ExternalTaskSensor
- Astronomer Registry: SimpleHttpOperator
Webinar Agenda
Task-Level Dependencies
- Basic Dependencies
- Parallel Dependencies
- Crossed Downstream Dependencies
- Setting dependencies for dynamically genereated tasks
- Setting dependencies for Task Groups
- Setting dependencies when Branching
- Trigger Rules
Cross DAG Dependencies
- TriggerDagRunOperator
- ExternalTaskSensor
Cross Deployment Dependencies
- Trigger External DAG using Stable REST API
Task-Level Dependencies
Simple dependencies
In Airflow if you have a task you can set its dependencies with syntax or bit-shift operators (see both on the slide). Best practice is not to mix these two in your code.
First you list your tasks:
d1 = DummyOperator(task_id=”first_task”)
d2 = DummyOperator(task_id=”second_task”)
d3 = DummyOperator(task_id=”third_task”)
d4 = DummyOperator(task_id=”fourth_task”)
And then create a dependency chain.
This (syntax)
:
d1.set_downstream(d2)
d2.set_downstream(d3)
d3.set_downstream(d4)
Is the same as (bit-shift operators):
d1 >> d2 >> d3 >> d4
This:
d4.set_upstream(d3)
d3.set_upstream(d2)
d2.set_upstream(d1)
Is the same as:
d4 << d3 << d2 << d1
All of these create a chain of dependencies such as:
Parallel Simple dependencies
In order to make your tasks run simultaneously (parallel), you have to simply put them in brackets.
start = DummyOperator(task_id=”start”)
d1 = DummyOperator(task_id=”first_task”)
d2 = DummyOperator(task_id=”second_task”)
d3 = DummyOperator(task_id=”third_task”)
d4 = DummyOperator(task_id=”fourth_task”)
finish = DummyOperator(task_id=”finish”)
Instead of these tasks running in sequential order by using:
start >> d1 >> d2 >> d3 >> d4 >> finish
Which results in:
You can have them run in parallel or at the same time by using an array like so:
start >> [d1, d2, d3, d4] >> finish
Which results in:
Or if you wanted d1 and d2 to run before d3 & d4 in parallel, you could combine the array and bitshift operator like so:
start >> [d1, d2] >> d3 >> d4 >> finish
Which results in:
Defining dependencies between multiple arrays or lists:
You need to have the same number of tasks in each array or list Use the chain function imported from the base operator module
chain(start, [t1, t2], [t3, t4], finish)
Which results in:
Creating cross-downstream dependencies:
start >> [t1, t2]
cross_downstream([t1, t2], [t3, t4])
[t3, t4] >> finish
Which results in:
Simple dependencies in task groups
Sometimes your for loop will generate hundreds of tasks, and that can make your UI extremely messy and very difficult to follow task dependencies. For that reason, there is a function of task groups.
Task Groups are a UI grouping concept to organize your tasks and they can be added to the bitshift dependency chain. It’s kind of an abstract concept, but it allows you to put dependencies within a group and then connect it to an upstream dependency and then a downstream dependency.
start = DummyOperator(task_id=”start”)
finish = DummyOperator(task_id=”finish”)
with TaskGroup(group_id=’group1’) as tg1:
for i in range(1, 11):
`DummyOperator(task_id=f’task_{i}’)
t3 = DummyOperator(task_id=”task_3”)
t4 = DummyOperator(task_id=”task_4”)
start >> tg1 >> t3 >> t4 >> finish
Simple dependencies with branching
The BranchPythonOperator takes a Python function as input and returns a task id (or a list of task ids) to decide which part of the graph to go down. This can be used to iterate down certain paths in a DAG-based off of the results of a function.
It’s useful when you have some business rules that are requiring you to run a specific pipeline and skip another based on whether certain criteria are true or false.
Trigger rules
In this previous example on simple dependencies with branching, finish would never get triggered because by default that finish task is waiting for those 5 branch tasks to complete when in reality only one will be completed. So, we need to use Trigger Rules on the finish task to “one_success”.
from airflow.utils.trigger_rule import TriggerRule
DummyOperator(task_id=”finish”, trigger_rule=TriggerRule.ONE_SUCCESS)
Other TriggerRule Options:
ALL_SUCCESS
: default, would need branch 0 through 4 to succeedALL_FAILED
: branch 0 through 4 would need to failALL_DONE
: branch 0 through 4 would need to succeed, fail, or skipONE_FAILED
: a single failure in branch 0 through 4ONE_SUCCESS
: a single success in branch 0 through 4DUMMY
: dependencies are just for show, trigger at will
Cross DAG dependencies
TriggerDagRunOperator
The TriggerDagRunOperator Triggers a DAG run for a specified dag_id.
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
t = TriggerDagRunOperator(
task_id= Name of the task in current-dag,
trigger_dag_id= dag_id of the DAG you want to trigger,
execution_date= Execution date for the dag (if blank current execution date),
reset_dag_run= Whether to clear dag run if exists (if blank False),
wait_for_completion= Whether or not to wait for dag run completion (if blank False),
poke_interval= If wait_for_completion=True poke interval to check dag run status,
allowed_states= list of allowed states, default is [‘success’],
failed_states= list of failed or dis-allowed states, default is None
)
Sensors Sensors are a special type of Operator that is designed to do exactly one thing - wait for something to occur. It can be time-based or waiting for a file, or an external event, but all they do is wait until something happens, and then succeed so their downstream tasks can run.
- See PythonSensor in 2-triggered-by-upstream-dag.py file *
ExternalTaskSensor
The ExternalTaskSensor waits for a different DAG or a task in a different DAG to complete for a specific execution_date
from airflow.sensors.external_task import ExternalTaskSensor
t = ExternalTaskSensor(
task_id= Name of the task in current-dag,
external_task_dag_id= dag_id of the parent DAG of the task,
external_task_id= task_id of the task,
timeout=number (in seconds) after the task gives up,
allowed_states= list of allowed states, default is [‘success’],
failed_states= list of failed or dis-allowed states, default is None,
execution_delta= time difference with the previous execution to look at
)
Which results in:
It’s important to note that both DAGs should be on the same run schedule when running an ExternalTaskSensor. If they aren’t be sure to use the execution_delta field so that the ExternalTaskSensor knows which task run to look for.
Cross deployment dependencies
The rule of thumb is to have a separate deployment per team within a company, just like we do in Astronomer.
But each team has a separate instance of airflow. In order to run one instance of airflow, another has to finish. How to know if the other has finished? How to set up dependencies between tasks in separate teams?
Airflow 2.0 introduced Airflow stable REST API. If you have an API key to a deployment separate from yours, you can actually make REST API calls to another deployment.
See the use case in the github repository.