DAG writing best practices in Apache Airflow
Because Airflow is 100% code, knowing the basics of Python is all it takes to get started writing DAGs. However, writing DAGs that are efficient, secure, and scalable requires some Airflow-specific finesse. In this guide, you'll learn how you can develop DAGs that make the most of what Airflow has to offer.
In general, best practices fall into one of two categories:
- DAG design
- Using Airflow as an orchestrator
For an in-depth walk through and examples of some of the concepts covered in this guide, it's recommended that you review the DAG Writing Best Practices in Apache Airflow webinar and the Github repo for DAG examples.
Assumed knowledge
To get the most out of this guide, you should have an understanding of:
- Basic Airflow concepts. See Introduction to Apache Airflow.
- Airflow operators. See Operators 101.
Review idempotency
Idempotency is the foundation for many computing practices, including the Airflow best practices in this guide. A program is considered idempotent if, for a set input, running the program once has the same effect as running the program multiple times.
In the context of Airflow, a DAG is considered idempotent if rerunning the same DAG Run with the same inputs multiple times has the same effect as running it only once. This can be achieved by designing each individual task in your DAG to be idempotent. Designing idempotent DAGs and tasks decreases recovery time from failures and prevents data loss.
Idempotency paves the way for one of Airflow's most useful features: Retries.
Set retries
In a distributed environment where task containers are executed on shared hosts, it's possible for tasks to be killed off unexpectedly. When this happens, you might see a zombie process in the Airflow logs.
You can resolve issues like zombies by using task retries. Retries can be set at different levels with the following precedence:
- Tasks: Pass the
retries
parameter to the task's Operator. - DAGs: Include
retries
in a DAG'sdefault_args
object. - Deployments: Set the environment variable
AIRFLOW__CORE__DEFAULT_TASK_RETRIES
.
Setting retries to 2
will protect a task from most problems common to distributed environments. For more on using retries, see Rerun DAGs and Tasks.
DAG design
The following DAG design principles will help to make your DAGs idempotent, efficient, and readable.
Keep tasks atomic
When organizing your pipeline into individual tasks, each task should be responsible for one operation that can be re-run independently of the others. In an atomized task, a success in part of the task means a success of the entire task.
For example, in an ETL pipeline you would ideally want your Extract, Transform, and Load operations covered by three separate tasks. Atomizing these tasks allows you to rerun each operation in the pipeline independently, which supports idempotence.
Use template fields, variables, and macros
By using templated fields in Airflow, you can pull values into DAGs using environment variables and jinja templating. Compared to using Python functions, using templated fields helps keep your DAGs idempotent and ensures you aren't executing functions on every Scheduler heartbeat. See Avoid top level code in your DAG file.
Contrary to our best practices, the following example defines variables based on datetime
Python functions:
# Variables used by tasks
# Bad example - Define today's and yesterday's date using datetime module
today = datetime.today()
yesterday = datetime.today() - timedelta(1)
If this code is in a DAG file, these functions are executed on every Scheduler heartbeat, which may not be performant. Even more importantly, this doesn't produce an idempotent DAG. You can't rerun a previously failed DAG run for a past date because datetime.today()
is relative to the current date, not the DAG execution date.
A better way of implementing this is by using an Airflow variable:
# Variables used by tasks
# Good example - Define yesterday's date with an Airflow variable
yesterday = {{ yesterday_ds_nodash }}
You can use one of the Airflow built-in variables and macros, or you can create your own templated field to pass information at runtime. For more information on this topic, see templating and macros in Airflow.
Incremental record filtering
You should break out your pipelines into incremental extracts and loads wherever possible. For example, if you have a DAG that runs hourly, each DAG run should process only records from that hour, rather than the whole dataset. When the results in each DAG run represent only a small subset of your total dataset, a failure in one subset of the data won't prevent the rest of your DAG Runs from completing successfully. If your DAGs are idempotent, you can rerun a DAG for only the data that failed rather than reprocessing the entire dataset.
There are multiple ways you can achieve incremental pipelines.
Last modified date
Using a last modified date is recommended for incremental loads. Ideally, each record in your source system has a column containing the last time the record was modified. With this design, a DAG run looks for records that were updated within specific dates from this column.
For example, with a DAG that runs hourly, each DAG run is responsible for loading any records that fall between the start and end of its hour. If any of those runs fail, it doesn't affect other Runs.