WEBINARS

Improve Your DAGs with Hidden Airflow Features

Watch Video On Demand

Recorded On

Hosted By

  • Marc Lamberti
  • Viraj Parekh

The CLI level

Airflow standalone

imrpove-dags-hidden-airflow-image2

Airflow standalone runs all parts of an Airflow deployment under one primary process, providing a more manageable entry level than Breeze and a convenient tool for local development.

What can Airflow standalone do?

airflow db shell

airflow db shell → There are various ways you can inspect data, including — if you have direct access to the database — interacting with it using your own tools (often graphical tools showing the database objects). But if you don’t have such tools, you can use the airflow db shell command — this will drop you in the db shell tool for your database, and you will be able to inspect your data.

CLI commands

airflow info → shows information about the current Airflow environment, crucial Airflow details such as version, executor used, connection string, DAGs folder, python version.

tools info → if you want to interact with Git or Postgres from your tasks, you may need to install them. (Notice that Airflow expects that you have those tools installed.)

paths info → especially python_path. If you add a folder with your external functions that you call from your DAGs, you want to make sure that environment is aware of it — otherwise, it won’t work.

providers info → what providers are installed? Which version?

airflow dags test <dag_id> <execution_date> → Executes one single DagRun for a given DAG and execution date using the DebugExecutor.

⚠️ This command generates metadata (DAG runs, task runs etc).

DebugExecutor: meant as a debug tool and can be used from IDE. It is a single process executor that queues TaskInstance and executes it.

airflow tasks test -m <dag_id> <task_id> <execution_date> → Tests a task instance. Will run a task without checking for dependencies or recording its state in the database.

# easy_dag.py
def _my_func():
    my_val = 123
  raise ValueError("exception")
    return(my_val)

⚠️ XCOMs are stored

The UI level

imrpove-dags-hidden-airflow-image4

Airflow allows you to customize the DAG home page header and the page title. It can help distinguish between various installations of Airflow, or simply be used to amend the page text.

Note: the custom title will be applied to both the page header and the page title.

To make this kind of change, simply:

# in .env
AIRFLOW__WEBSERVER__INSTANCE_NAME=DEV
astro dev stop && astro dev start

imrpove-dags-hidden-airflow-image3

Extra alert messages can be shown on the UI dashboard, which is particularly useful for warning about setup issues or announcing changes to end-users.

from airflow.www.utils import UIAlert

#"info", "warning", "error"

DASHBOARD_UIALERTS = [
    UIAlert("Update!", category="info", roles=["User", "Admin"]),
  UIAlert("Check user permissions", category="warning", roles=["Admin"]),
]

astro dev stop && astro dev start

The DAG level

Taskflow API

# taskflow.py

from airflow import DAG
from airflow.decorators import task

from datetime import datetime

with DAG('taskflow',
 start_date=datetime(2022, 1 ,1),
 schedule_interval='@daily',
 catchup=False) as dag:

 @task
 def setup(my_data):
  print(f"setup dataset {my_data}")
  return my_data + 2

 @task
 def load(my_modified_data):
  print(f"load dataset {my_modified_data}")

 load(setup(42))

💡 As with any operator, you can pass parameters to task() like trigger_rule, task_id etc.

💡 You can mention the other decorators — docker, branch, virtualenv — and you can create custom decorators, too.

Taskgroup

# taskflow.py

from airflow import DAG
from airflow.decorators import task, **task_group**

from datetime import datetime

with DAG('taskflow',
 start_date=datetime(2022, 1 ,1),
 schedule_interval='@daily',
 catchup=False) as dag:

 @task
 def setup(my_data):
  print(f"setup dataset {my_data}")
  return my_data + 2

 **@task_group(group_id='ml_tasks')
 def ml_tasks(value):

  @task
  def ml_1(value):
   return value + 42

  @task
  def ml_2(value):
   return value - 42

  return [ml_1(value), ml_2(value)]**

 @task
 def load(my_modified_data):
  print(f"load dataset {my_modified_data}")

 load(**ml_tasks(**setup(42)**)**)

💡 You can use default_args in taskgroup to apply default arguments to tasks present only within that group. Especially useful for pools!

Edgemodifier

You can label the dependency edges between different tasks in the Graph view — this can be especially useful for branching areas of your DAG, allowing you to label the conditions under which certain branches might run.

from airflow import DAG
from airflow.decorators import task, task_group
**from airflow.utils.edgemodifier import Label**

from datetime import datetime

with DAG('taskflow',
    start_date=datetime(2022, 1 ,1),
    schedule_interval='@daily',
    catchup=False) as dag:

    @task
    def setup(my_data):
        print(f"setup dataset {my_data}")
        return my_data + 2

    @task_group(group_id='ml_tasks')
    def ml_tasks(value):

        @task
        def ml_1(value):
            return value + 42

        @task
        def ml_2(value):
            return value - 42

        **return [value >> Label("decision tree") >> ml_1(value),
                value >> Label("random forest") >> ml_2(value)]**

    @task
    def load(my_modified_data):
        print(f"load dataset {my_modified_data}")

    load(ml_tasks(setup(42)))

Airflow Exceptions

from airflow import DAG
from airflow.decorators import task, task_group
from airflow.utils.edgemodifier import Label
**from airflow.exceptions import AirflowFailException, AirflowException, AirflowSkipException**

from datetime import datetime

with DAG('taskflow',
    start_date=datetime(2022, 1 ,1),
    schedule_interval='@daily',
    catchup=False) as dag:

    **@task(retrie=3)**
    def setup(my_data):
    **raise AirflowFailException # Doesn't respect retries, AirflowException does**
        print(f"setup dataset {my_data}")
        return my_data + 2

    @task_group(group_id='ml_tasks')
    def ml_tasks(value):

        @task
        def ml_1(value):
            return value + 42

        @task
        def ml_2(value):
            return value - 42

        return [value >> Label("decision tree") >> ml_1(value),
                value >> Label("random forest") >> ml_2(value)]

    @task
    def load(my_modified_data):
        print(f"load dataset {my_modified_data}")

    load(ml_tasks(setup(42)))

Dynamic DAGs with Jinja

Jinja is a template engine that takes a template file with special placeholders and replaces them with data from a source. Apache Airflow® uses Jinja to build its webpages as well as to render values in DAG files at run time.

There are various methods of generating DAGs dynamically:

→ Single file method with globals → Multiple files method with JSON → Multiple files method with Jinja

imrpove-dags-hidden-airflow-image1

python include/dynamic_dag_jinja/generate_dag.py

Deferrable Operators

Sensors take worker slots, waste resources, time, and possibly cause deadlocks.

Deferrable operators suspend themselves and release worker slots: they reduce resource consumption, lower infra cost, and are more resilient.

Two components:

Trigger: An asynchronous Python function that quickly and continuously evaluates a given condition. An Operator must have its own Trigger code to be async

Triggerer: Responsible for running Triggers and signaling tasks to resume when their conditions have been met. Like the Scheduler, it is designed to be highly available.

💡 In addition to the deferrable operators that are published by the Apache Airflow® open source project, Astronomer maintains astronomer-providers, an open-source collection of deferrable operators bundled as provider packages.

How to stay up to date?

Changelog → https://airflow.apache.org/docs/apache-airflow/stable/changelog.html

Apache Airflow® Slack → #airflow-releases and #announcements

Astronomer Academy → https://academy.astronomer.io/

Marc’s little trick → go to https://github.com/apache/airflow/milestones

Build, run, & observe your data workflows.
All in one place.

Get $300 in free credits during your 14-day trial.