WEBINARS

Manage Dependencies Between Airflow Deployments, DAGs, and Tasks

Watch Video On Demand

Hosted By

  • Kenten Danas
  • Chris Hronek

GitHub (Repo for this Webinar)

Airflow Docs

Astronomer Docs

Webinar Agenda

Task-Level Dependencies

Cross DAG Dependencies

Cross Deployment Dependencies

Task-Level Dependencies

Simple dependencies

In Airflow if you have a task you can set its dependencies with syntax or bit-shift operators (see both on the slide). Best practice is not to mix these two in your code.

First you list your tasks:

d1 = DummyOperator(task_id=”first_task”)
d2 = DummyOperator(task_id=”second_task”)
d3 = DummyOperator(task_id=”third_task”)
d4 = DummyOperator(task_id=”fourth_task”)

And then create a dependency chain.

This (syntax):

d1.set_downstream(d2)
d2.set_downstream(d3)
d3.set_downstream(d4)

Is the same as (bit-shift operators): d1 >> d2 >> d3 >> d4

This:

d4.set_upstream(d3)
d3.set_upstream(d2)
d2.set_upstream(d1)

Is the same as: d4 << d3 << d2 << d1

All of these create a chain of dependencies such as: DAG simple_scheduling

Parallel Simple dependencies

In order to make your tasks run simultaneously (parallel), you have to simply put them in brackets.

start = DummyOperator(task_id=”start”)
d1 = DummyOperator(task_id=”first_task”)
d2 = DummyOperator(task_id=”second_task”)
d3 = DummyOperator(task_id=”third_task”)
d4 = DummyOperator(task_id=”fourth_task”)
finish = DummyOperator(task_id=”finish”)

Instead of these tasks running in sequential order by using: start >> d1 >> d2 >> d3 >> d4 >> finish

Which results in: manage-dependencies-between-image6

You can have them run in parallel or at the same time by using an array like so:

start >> [d1, d2, d3, d4] >> finish

Which results in: manage-dependencies-between-image11

Or if you wanted d1 and d2 to run before d3 & d4 in parallel, you could combine the array and bitshift operator like so:

start >> [d1, d2] >> d3 >> d4 >> finish

Which results in: manage-dependencies-between-image5

Defining dependencies between multiple arrays or lists:

You need to have the same number of tasks in each array or list Use the chain function imported from the base operator module

chain(start, [t1, t2], [t3, t4], finish)

Which results in: manage-dependencies-between-image7

Creating cross-downstream dependencies:

start >> [t1, t2]
cross_downstream([t1, t2], [t3, t4])
[t3, t4] >> finish

Which results in: manage-dependencies-between-image1

Simple dependencies in task groups

Sometimes your for loop will generate hundreds of tasks, and that can make your UI extremely messy and very difficult to follow task dependencies. For that reason, there is a function of task groups.

Task Groups are a UI grouping concept to organize your tasks and they can be added to the bitshift dependency chain. It’s kind of an abstract concept, but it allows you to put dependencies within a group and then connect it to an upstream dependency and then a downstream dependency.

start = DummyOperator(task_id=”start”)
finish = DummyOperator(task_id=”finish”)

with TaskGroup(group_id=’group1’) as tg1:
   for i in range(1, 11):
       `DummyOperator(task_id=f’task_{i}’)

t3 = DummyOperator(task_id=”task_3”)
t4 = DummyOperator(task_id=”task_4”)

start >> tg1 >> t3 >> t4 >> finish

manage-dependencies-between-image2

Simple dependencies with branching

The BranchPythonOperator takes a Python function as input and returns a task id (or a list of task ids) to decide which part of the graph to go down. This can be used to iterate down certain paths in a DAG-based off of the results of a function.

It’s useful when you have some business rules that are requiring you to run a specific pipeline and skip another based on whether certain criteria are true or false.

manage-dependencies-between-image4

Trigger rules

In this previous example on simple dependencies with branching, finish would never get triggered because by default that finish task is waiting for those 5 branch tasks to complete when in reality only one will be completed. So, we need to use Trigger Rules on the finish task to “one_success”.

from airflow.utils.trigger_rule import TriggerRule
DummyOperator(task_id=”finish”, trigger_rule=TriggerRule.ONE_SUCCESS)

Other TriggerRule Options:

Cross DAG dependencies

TriggerDagRunOperator

The TriggerDagRunOperator Triggers a DAG run for a specified dag_id.

from airflow.operators.trigger_dagrun import TriggerDagRunOperator

t = TriggerDagRunOperator(
   task_id= Name of the task in current-dag,
   trigger_dag_id= dag_id of the DAG you want to trigger,
   execution_date= Execution date for the dag (if blank current execution date),
   reset_dag_run= Whether to clear dag run if exists (if blank False),
   wait_for_completion= Whether or not to wait for dag run completion (if blank False),
   poke_interval= If wait_for_completion=True poke interval to check dag run status,
   allowed_states= list of allowed states, default is [‘success’],
   failed_states= list of failed or dis-allowed states, default is None
)

manage-dependencies-between-image10

Sensors Sensors are a special type of Operator that is designed to do exactly one thing - wait for something to occur. It can be time-based or waiting for a file, or an external event, but all they do is wait until something happens, and then succeed so their downstream tasks can run.

ExternalTaskSensor

The ExternalTaskSensor waits for a different DAG or a task in a different DAG to complete for a specific execution_date

from airflow.sensors.external_task import ExternalTaskSensor

t = ExternalTaskSensor(
    task_id= Name of the task in current-dag,
    external_task_dag_id= dag_id of the parent DAG of the task,
    external_task_id= task_id of the task,
    timeout=number (in seconds) after the task gives up,
    allowed_states= list of allowed states, default is [‘success’],
    failed_states= list of failed or dis-allowed states, default is None,
    execution_delta= time difference with the previous execution to look at
)

Which results in:

manage-dependencies-between-image3

It’s important to note that both DAGs should be on the same run schedule when running an ExternalTaskSensor. If they aren’t be sure to use the execution_delta field so that the ExternalTaskSensor knows which task run to look for.

Cross deployment dependencies

The rule of thumb is to have a separate deployment per team within a company, just like we do in Astronomer.

But each team has a separate instance of airflow. In order to run one instance of airflow, another has to finish. How to know if the other has finished? How to set up dependencies between tasks in separate teams?

Airflow 2.0 introduced Airflow stable REST API. If you have an API key to a deployment separate from yours, you can actually make REST API calls to another deployment.

See the use case in the github repository.

Build, run, & observe your data workflows.
All in one place.

Get $300 in free credits during your 14-day trial.

Get Started Free