WEBINARS

Testing Airflow to Bullet Proof Your Code

Watch Video On Demand

Hosted By

  • Bas Harenslak

Note: This webinar was recorded in September 2021 and while the information in it is still accurate, many more ways to test your Airflow DAGs have been added in more recent releases. See our Test Airflow DAGs guide.

Agenda

1. Testing in Python

Python testing frameworks:

Typical project structure for testing:

├── dags
│   ├── foo_dag.py
│   ├── bar_dag.py
│   └── hello_world_dag.py
├── mypackage
│   ├── __init__.py
│   └── magic.py
└── tests
    ├── dags
    │   └── test_dag_integrity.py
    ├── mypackage
    │   └── test_magic.py
    ├── test_1.py
    └── test_2.py

A good project structure mimics the structure of your actual project.

Example - checking if your multiplying method indeed multiplies correctly:

testing-airflow-bullet-proof-image4

Why Pytest? Pytest fixtures provide reusable building blocks, which give you flexibility:

import pytest
def multiply(a, b):
   return a * b
@pytest.fixture
def a():
   return 2
@pytest.fixture
def b():
   return 3
def test_multiply(a, b):
   assert multiply(a, b) == 6

2. DAG Integrity Test

a) Will filter out errors such as:

Filters out a lot of silly programming mistakes, so it’s super useful in the beginning of testing.

from airflow.models import DagBag

def test_dagbag():
    dag_bag = DagBag(include_examples=False) #Loads all DAGs in $AIRFLOW_HOME/dags
    assert not dag_bag.import_errors #Import errors aren't raised but captured to ensure all DAGs are parsed

b) Can be used to enforce conventions, e.g. “does each DAG have a tag?”

from airflow.models import DagBag

def test_dagbag():
    dag_bag = DagBag(include_examples=False)
    assert not dag_bag.import_errors
    for dag_id, dag in dag_bag.dags.items():
        assert dag.tags #Assert dag.tags is not empty

c) performing additional tests on all the DAGs

On the dagbag there’s an attribute called bags. It allows you to check certain things, such as: does each DAG have a tag?

3. Unit testing in Airflow

a) Simplest way to test an operator in Airflow:

call execute() on operator

Context is required, but it can be an empty dictionary.

def test_bash_operator():
   test = BashOperator(task_id="test", bash_command="echo hello")
   result = test.execute(context={}) #Each operator implements execute()
   assert result == "hello" #Check result

b) PythonOperator unit test with task context

Context can be supplied manually - if you want to do anything with the execution date in your task, you can provide it just by hand.

def test_python_operator():

    def return_today(**context):
        return f"Today is {context['execution_date'].strftime('%d-%m-%Y')}"

    test = PythonOperator(task_id="test", python_callable=return_today)
    result = test.execute(context={"execution_date": datetime.datetime(2021, 1, 1)})
    assert result == "Today is 01-01-2021"

The point where this basic unit testing fails is when you want to do templating, as execute() doesn’t do templates. Testing the templates requires something a bit more complex.

{% raw %}AssertionError: assert 'Today is {{ execution_date }}' == 'Today is 01-01-2021'

def test_bash_operator_template():
    test = BashOperator(task_id="test", bash_command="echo 'Today is {{ execution_date }}'")
    result = test.execute(context={"execution_date": datetime.datetime(2021, 1, 1)})
    assert result == "Today is 01-01-2021"{% endraw %}

4. Unit testing with templated arguments

Requires:

Manually creating a (local) metastore for testing:

airflow db init

This will create files in your home directory:

Not ideal - probably you don’t want to spam your home folder, so:

export AIRFLOW_HOME=[your project dir]
airflow db init

(make sure AIRFLOW_HOME is also set when running pytest)

But still, it’s a bit impractical to initiate metastore for every test…

So instead you can use this magical fixture to automatically reset Airflow metastore for every session with pytest (tests/conftest.py):

import os

import pytest

os.environ["AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS"] = "False"
os.environ["AIRFLOW__CORE__LOAD_EXAMPLES"] = "False"
os.environ["AIRFLOW__CORE__UNIT_TEST_MODE"] = "True"
os.environ["AIRFLOW_HOME"] = os.path.dirname(os.path.dirname(__file__))

@pytest.fixture(autouse=True, scope="session")
def reset_db():
    from airflow.utils import db

    db.resetdb()
    yield

Best practices:

Oops! No return value from run(), because it can be responsible for running multiple thoughts, and as a result it doesn’t return any failure - what to do?

{% raw %}import datetime
import pathlib

from airflow.models import DAG
from airflow.operators.bash import BashOperator

def test_bash_operator(tmp_path):
   with DAG(dag_id="test_dag", start_date=datetime.datetime(2021, 1, 1), schedule_interval="@daily") as dag:
       output_file = tmp_path / "output.txt"
       test = BashOperator(task_id="test", bash_command="echo {{ ds_nodash }} > " + str(output_file))
       dag.clear()
       test.run(
           start_date=dag.start_date, end_date=dag.start_date, ignore_first_depends_on_past=True, ignore_ti_state=True
       )

       assert output_file.read_text() == "20210101\n"{% endraw %}

5. Unit testing using mocking

Mocking: typically useful when you want to talk to any external system in your test by running a test on your laptop, but you don’t have access to any production database or production systems.

Mocking = replace functionality with “fake” behaviour

If you have, for example, an API that you’re calling and that always returned some sort of random results you could use mocking to return one single predictable outcome.

Need knowledge of internal code - you need to know at what point or what method is responsible for making a call to an external system (metastore).

Different ways to specify mock.patch():

with mock.patch("airflow.models.variable.Variable.get") as variable_get_mock:
    variable_get_mock.do_something()
@mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.get_connection")
@mock.patch("airflow.hooks.base.BaseHook.get_connection")
def test_postgres_demo(base_hook_mock, pg_hook_mock, other_fixture):

6. Unit testing using Docker

Sometimes there are things that cannot be tested by mocking because:

Docker allows testing against “real” system.

7. Integration testing

At some point you’re done with testing individual operators, and you would like to know if multiple operators together actually do what you expect them to do.

You can run a complete DAG in a test.

testing-airflow-bullet-proof-image3

You need a little bit of a knowledge of the internals of airflow to understand that there’s, for example, a state attribute when your DAG runs etc.

8. DTAP environments

Sometimes you just want to test against a real system. Ideally you have more than one system to do that, as you don’t want to deploy into a production system and then trigger the DAG.

If you want to bring your DAG from a branch into production, you typically have multiple branches, each corresponding to a specific Airflow. Then via pull request, you would merge from one branch into another all the way into production until you run in your production environments.

Your test environment should mimic your production environment - remember?

testing-airflow-bullet-proof-image5

9. Airflow CLI

airflow tasks test [dag_id] [task_id] [execution_date]

airflow dags test [dag_id] [execution_date]

10. CI/CD

testing-airflow-bullet-proof-image2

  1. Static checks (Flake8, Black, Pylint, …) - run first, small and quick to run
  2. Testing (generally done in 2nd step)
  3. Deployment code

GitHub Actions example: https://github.com/astronomer/airflow-testing-skeleton/blob/master/.github/workflows/ci.yaml

Build, run, & observe your data workflows.
All in one place.

Get $300 in free credits during your 14-day trial.

Get Started Free