Apache Airflow® Quickstart - ETL
ETL: An introduction to modern, enhanced ETL development with Airflow.
Step 1: Clone the Astronomer Quickstart repository
-
Create a new directory for your project and open it:
mkdir airflow-quickstart-etl && cd airflow-quickstart-etl
-
Clone the repository and open it:
git clone -b etl --single-branch https://github.com/astronomer/airflow-quickstart.git && cd airflow-quickstart/etl
Your directory should have the following structure:
.
├── Dockerfile
├── README.md
├── dags
│ └── example_etl_galaxies.py
├── include
│ ├── astronomy.db
│ ├── custom_functions
│ │ └── galaxy_functions.py
│ └── data
│ └── galaxy_names.txt
├── packages.txt
├── requirements.txt
├── solutions
│ └── example_etl_galaxies_solution.py
└── tests
└── dags
└── test_dag_example.py
Step 2: Start up Airflow and explore the UI
-
Start the project using the Astro CLI:
astro dev start
The CLI will let you know when all Airflow services are up and running.
-
If it doesn't launch automtically, navigate your browser to
localhost:8080
and sign in to the Airflow UI using usernameadmin
and passwordadmin
. -
Explore the landing page and individual DAG view page to get a sense of the metadata available about the DAG, run, and all task instances. For a deep-dive into the UI's features, see An introduction to the Airflow UI.
For example, the DAGs view will look like this screenshot:
As you start to trigger DAG runs, the graph view will look like this screenshot:
The Gantt chart will look like this screenshot:
Step 3: Explore the project
Building Extract, Transform, and Load (ETL) workloads is a common pattern in Apache Airflow. This project shows an example pattern for defining an ETL workload using DuckDB as the data warehouse of choice.
As you try out this project, you'll see how easy Airflow makes it to:
- Write responsive pipelines that change based on user inputs.
- Perform database operations using SQL.
- Access and extract data from local files.
- Execute transformations with Pandas.
You'll write a lean ETL pipeline in easy-to-read Python code!
This project uses DuckDB, an in-memory database. Although this type of database is great for learning Airflow, your data is not guaranteed to persist between executions!
For production applications, use a persistent database instead (consider DuckDB's hosted option MotherDuck or another database like Postgres, MySQL, or Snowflake).
Pipeline structure
An Airflow project can have any number of DAGs (directed acyclic graphs), the main building blocks of Airflow pipelines. This project has one:
example_etl_galaxies
This DAG contains five @task
-decorated Python functions:
-
create_galaxy_table_in_duckdb
uses a hook to create a database connection and a SQL query to create a database table. -
extract_galaxy_data
returns a dataframe created using a modularized function imported from the project'sinclude
directory. -
transform_galaxy_data
gets a user-specified value from the DAG context and uses it to execute a simple data transformation on the dataframe, returning another dataframe. -
load_galaxy_data
uses a database hook to load the dataframe into the database. You can load the dataframe directly in the context of a SQL query. No conversion of the dataframe is required. -
print_loaded_galaxies
executes aSELECT
query on the database and prints the data to the logs.
Step 4: Get your hands dirty!
With Airflow, you can connect to many external systems and create dynamic and responsive workflows. In this step, you'll learn how to create a connection to an external system.
Create a connection to an external system
Creating connections to interoperate with the many systems Airflow supports is easy.
In the steps that follow, you'll create a connection in the Airflow UI and use it in a new DAG. You can use your own preferred external system or use Postgres for a local option.
-
A Postgres database has already been added to the project for you. Confirm that port
5432
is available, or, if not, modify the external port in the project'sdocker-compose.override.yml
:version: "3.1"
services:
example-db:
image: postgres:14.9
ports:
- "5435:5432"
environment:
- POSTGRES_USER=example
- POSTGRES_PASSWORD=example
- POSTGRES_DB=example -
Create a connection in the UI.
Airflow supports a number of ways to create connections, but most users choose the UI.
To create a conneciton in the UI, go to Admin > Connections.
Your connection should look like this screenshot, with the password being
example
: -
Add a DAG to the project.
Create a new Python file in the
dags
directory and paste this DAG code there:from airflow.decorators import (
dag,
task,
) # This DAG uses the TaskFlow API. See: https://www.astronomer.io/docs/learn/airflow-decorators
from pendulum import datetime, duration
import duckdb
import os
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from include.custom_functions.load_functions import get_sql_query
from airflow.models.dataset import Dataset
_DUCKDB_INSTANCE_NAME = os.getenv("DUCKDB_INSTANCE_NAME", "include/astronomy.db")
_TABLE_NAME = os.getenv("_TABLE_NAME", "galaxy_data")
_DUCKDB_TABLE_NAME = os.getenv("DUCKDB_TABLE_NAME", "galaxy_data")
_DUCKDB_TABLE_URI = f"duckdb://{_DUCKDB_INSTANCE_NAME}/{_DUCKDB_TABLE_NAME}"
@dag(
start_date=datetime(2024, 7, 1), # Date after which the DAG can be scheduled
schedule=[Dataset(_DUCKDB_TABLE_URI)], # See: https://www.astronomer.io/docs/learn/scheduling-in-airflow for options
catchup=False, # See: https://www.astronomer.io/docs/learn/rerunning-dags#catchup
max_consecutive_failed_dag_runs=5, # auto-pauses the DAG after 5 consecutive failed runs, experimental
max_active_runs=1, # Allow only one concurrent run of this DAG, prevents parallel DuckDB calls
doc_md=__doc__, # Add DAG Docs in the UI, see https://www.astronomer.io/docs/learn/custom-airflow-ui-docs-tutorial
default_args={
"owner": "Astro", # owner of this DAG in the Airflow UI
"retries": 3, # tasks retry 3 times before they fail
"retry_delay": duration(seconds=30), # tasks wait 30s in between retries
}, # default_args are applied to all tasks in a DAG
tags=["example", "ETL"], # Add tags in the UI
# Warning - in-memory DuckDB is not a persistent database between workers. To move this workflow into production, use a
# cloud-based database and, based on concurrency capabilities, adjust the two parameters below.
concurrency=1, # allow only a single task execution at a time, prevents parallel DuckDB calls
is_paused_upon_creation=False, # start running the DAG as soon as it's created
)
def example_etl_galaxies_load(): # By default, the dag_id is the name of the decorated function
@task
def extract_galaxy_data_duckdb(
duckdb_instance_name: str = _DUCKDB_INSTANCE_NAME,
table_name: str = _TABLE_NAME,
):
cursor = duckdb.connect(duckdb_instance_name)
galaxy_data_df = cursor.sql(f"SELECT * FROM {table_name};").df()
return galaxy_data_df
@task
def create_sql_query(df):
sql_str = get_sql_query(df, _TABLE_NAME)
return sql_str
create_galaxy_table_postgres = SQLExecuteQueryOperator(
task_id="create_galaxy_table_postgres",
conn_id="postgres_default",
sql=f"""
DROP TABLE IF EXISTS {_TABLE_NAME};
CREATE TABLE {_TABLE_NAME} (
name VARCHAR PRIMARY KEY,
distance_from_milkyway INT,
distance_from_solarsystem INT,
type_of_galaxy VARCHAR,
characteristics VARCHAR
);
""",
)
create_sql_query_obj = create_sql_query(extract_galaxy_data_duckdb())
load_galaxy_data_postgres = SQLExecuteQueryOperator(
task_id = "load_galaxy_data_postgres",
conn_id = "postgres_default",
sql = create_sql_query_obj,
)
create_galaxy_table_postgres >> load_galaxy_data_postgres
example_etl_galaxies_load()This DAG extracts data from the project's DuckDB database, creates a table in the project's Postgres database, and loads the data into the table. Using an Airflow Dataset trigger, it will run when
example_etl_galaxies
updates thegalaxy_data
dataset.Connection parameters vary between operators. In the case of
SQLExecuteQueryOperator
, it isconn_id
:load_galaxy_data_postgres = SQLExecuteQueryOperator(
task_id = "load_galaxy_data_postgres",
conn_id = "postgres_default",
sql = create_sql_query_obj,
) -
Trigger your new DAG!
Trigger the
example_etl_galaxies
DAG and not the new one you just added. Your new DAG will run after theload_galaxy_task
inexample_etl_galaxies
runs successfully.If all goes well, the graph view will look like this screenshot:
For more guidance on getting started with connections, see: Integrations & connections.
Next Steps: Run Airflow on Astro
The easiest way to run Airflow in production is with Astro. To get started, create an Astro trial. During your trial signup, you will have the option of choosing the same template project you worked with in this quickstart.