Note: This webinar was recorded in September 2021 and while the information in it is still accurate, many more ways to test your Airflow DAGs have been added in more recent releases. See our Test Airflow DAGs guide.
Webinar links:
- GitHub Repo for this webinar
- Presentation
- Pytest fixture scope
- Pytest builtin fixtures
- Pytest autouse
- Pytest conftest.py
- pytest-docker-tools plugin
- The Airflow source code
Agenda
- Testing in Python
- Testing in Airflow
- DAG integrity test
- Unit testing in Airflow 101
- Unit testing with templated arguments
- Unit testing using mocking
- Unit testing using Docker
- Integration testing
- DTAP environments
- Airflow CLI
- CI/CD
1. Testing in Python
Python testing frameworks:
- unitest (Python builitin)
- pytest (used by Airflow)
- many more
Typical project structure for testing:
├── dags
│ ├── foo_dag.py
│ ├── bar_dag.py
│ └── hello_world_dag.py
├── mypackage
│ ├── __init__.py
│ └── magic.py
└── tests
├── dags
│ └── test_dag_integrity.py
├── mypackage
│ └── test_magic.py
├── test_1.py
└── test_2.py
A good project structure mimics the structure of your actual project.
Example - checking if your multiplying method indeed multiplies correctly:
Why Pytest? Pytest fixtures provide reusable building blocks, which give you flexibility:
import pytest
def multiply(a, b):
return a * b
@pytest.fixture
def a():
return 2
@pytest.fixture
def b():
return 3
def test_multiply(a, b):
assert multiply(a, b) == 6
2. DAG Integrity Test
a) Will filter out errors such as:
- Missing required arguments (such as forgotten DAG ID)
- Duplicate DAG ids
- Cycles in DAGs
Filters out a lot of silly programming mistakes, so it’s super useful in the beginning of testing.
from airflow.models import DagBag
def test_dagbag():
dag_bag = DagBag(include_examples=False) #Loads all DAGs in $AIRFLOW_HOME/dags
assert not dag_bag.import_errors #Import errors aren't raised but captured to ensure all DAGs are parsed
b) Can be used to enforce conventions, e.g. “does each DAG have a tag?”
from airflow.models import DagBag
def test_dagbag():
dag_bag = DagBag(include_examples=False)
assert not dag_bag.import_errors
for dag_id, dag in dag_bag.dags.items():
assert dag.tags #Assert dag.tags is not empty
c) performing additional tests on all the DAGs
On the dagbag there’s an attribute called bags. It allows you to check certain things, such as: does each DAG have a tag?
3. Unit testing in Airflow
a) Simplest way to test an operator in Airflow:
call execute() on operator
Context is required, but it can be an empty dictionary.
def test_bash_operator():
test = BashOperator(task_id="test", bash_command="echo hello")
result = test.execute(context={}) #Each operator implements execute()
assert result == "hello" #Check result
b) PythonOperator unit test with task context
Context can be supplied manually - if you want to do anything with the execution date in your task, you can provide it just by hand.
def test_python_operator():
def return_today(**context):
return f"Today is {context['execution_date'].strftime('%d-%m-%Y')}"
test = PythonOperator(task_id="test", python_callable=return_today)
result = test.execute(context={"execution_date": datetime.datetime(2021, 1, 1)})
assert result == "Today is 01-01-2021"
The point where this basic unit testing fails is when you want to do templating, as execute() doesn’t do templates. Testing the templates requires something a bit more complex.
{% raw %}AssertionError: assert 'Today is {{ execution_date }}' == 'Today is 01-01-2021'
def test_bash_operator_template():
test = BashOperator(task_id="test", bash_command="echo 'Today is {{ execution_date }}'")
result = test.execute(context={"execution_date": datetime.datetime(2021, 1, 1)})
assert result == "Today is 01-01-2021"{% endraw %}
4. Unit testing with templated arguments
- Templating arguments is done before
execute()
- Therefore cannot use
execute()
if testing templated arguments - Call
run()
instead - Match
start_ date
andend_date
to run one instance (for predictable outcome)
Requires:
- Airflow metastore
- DAG
Manually creating a (local) metastore for testing:
airflow db init
This will create files in your home directory:
- airflow.db
- airflow.cfg
- webserver_config.py
Not ideal - probably you don’t want to spam your home folder, so:
export AIRFLOW_HOME=[your project dir]
airflow db init
(make sure AIRFLOW_HOME
is also set when running pytest)
But still, it’s a bit impractical to initiate metastore for every test…
So instead you can use this magical fixture to automatically reset Airflow metastore for every session with pytest (tests/conftest.py):
import os
import pytest
os.environ["AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS"] = "False"
os.environ["AIRFLOW__CORE__LOAD_EXAMPLES"] = "False"
os.environ["AIRFLOW__CORE__UNIT_TEST_MODE"] = "True"
os.environ["AIRFLOW_HOME"] = os.path.dirname(os.path.dirname(__file__))
@pytest.fixture(autouse=True, scope="session")
def reset_db():
from airflow.utils import db
db.resetdb()
yield
Best practices:
- Separate the test
- Do not include default connections
- Do not include any example decks
- Optionally, you can clean up the temp files.
- Task context is fetched from various places, including the DAG, which is why you need a DAG, so to avoid state from old/other tasks run
dag.clear()
Oops! No return value from run(), because it can be responsible for running multiple thoughts, and as a result it doesn’t return any failure - what to do?
- Therefore: write to temp file
- Read file content to assert result
tmp_path
is a pytest builtin fixture
{% raw %}import datetime
import pathlib
from airflow.models import DAG
from airflow.operators.bash import BashOperator
def test_bash_operator(tmp_path):
with DAG(dag_id="test_dag", start_date=datetime.datetime(2021, 1, 1), schedule_interval="@daily") as dag:
output_file = tmp_path / "output.txt"
test = BashOperator(task_id="test", bash_command="echo {{ ds_nodash }} > " + str(output_file))
dag.clear()
test.run(
start_date=dag.start_date, end_date=dag.start_date, ignore_first_depends_on_past=True, ignore_ti_state=True
)
assert output_file.read_text() == "20210101\n"{% endraw %}
5. Unit testing using mocking
Mocking: typically useful when you want to talk to any external system in your test by running a test on your laptop, but you don’t have access to any production database or production systems.
Mocking = replace functionality with “fake” behaviour
- Avoid call to external system
- Return predictable outcome
If you have, for example, an API that you’re calling and that always returned some sort of random results you could use mocking to return one single predictable outcome.
Need knowledge of internal code - you need to know at what point or what method is responsible for making a call to an external system (metastore).
Different ways to specify mock.patch()
:
- As context manager
with mock.patch("airflow.models.variable.Variable.get") as variable_get_mock:
variable_get_mock.do_something()
- As decorator
- mock objects provided as arguments in inverse order
- avoids nested
with … as …
@mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.get_connection")
@mock.patch("airflow.hooks.base.BaseHook.get_connection")
def test_postgres_demo(base_hook_mock, pg_hook_mock, other_fixture):
6. Unit testing using Docker
Sometimes there are things that cannot be tested by mocking because:
- They require a lot of knowledge of the internals of a certain system
- There’s just no nice mocking library available
Docker allows testing against “real” system.
pytest-docker-tools
plugin provides convenience fixtures for defining Docker containers
7. Integration testing
At some point you’re done with testing individual operators, and you would like to know if multiple operators together actually do what you expect them to do.
You can run a complete DAG in a test.
You need a little bit of a knowledge of the internals of airflow to understand that there’s, for example, a state attribute when your DAG runs etc.
8. DTAP environments
Sometimes you just want to test against a real system. Ideally you have more than one system to do that, as you don’t want to deploy into a production system and then trigger the DAG.
If you want to bring your DAG from a branch into production, you typically have multiple branches, each corresponding to a specific Airflow. Then via pull request, you would merge from one branch into another all the way into production until you run in your production environments.
Your test environment should mimic your production environment - remember?
9. Airflow CLI
airflow tasks test [dag_id] [task_id] [execution_date]
- Can be useful if you need to debug in production
- Won’t see run in UI (task instance is saved in DB though)
airflow dags test [dag_id] [execution_date]
- Run complete DAG on terminal using DebugExecutor
10. CI/CD
- Static checks (Flake8, Black, Pylint, …) - run first, small and quick to run
- Testing (generally done in 2nd step)
- Deployment code
GitHub Actions example: https://github.com/astronomer/airflow-testing-skeleton/blob/master/.github/workflows/ci.yaml