Apache Airflow® Quickstart
It's easy to get your pipelines up and running with Apache Airflow®.
This quickstart offers three learning paths. Choose between these popular use cases:
- Learning Airflow: an introduction to Airflow's lean and dynamic pipelines-as-Python-code
- ETL: an introduction to modern, enhanced ETL development with Airflow
- Generative AI: an introduction to generative AI model development with Airflow
Launch your journey with Airflow by signing up for a trial at astronomer.io! You'll be able to deploy your projects to Astro at the end of this tutorial.
For more help getting started, also check out our step-by-step Get Started with Airflow tutorial.
Time to complete
This quickstart takes approximately 30 minutes to complete.
Assumed knowledge
To get the most out of this quickstart, you should have an understanding of:
- Basic Airflow concepts. See Introduction to Apache Airflow.
- Basic Python. See the Python documentation.
Prerequisites
- The Astro CLI version 1.25.0 or higher.
- An integrated development environment (IDE) for Python development, such as VS Code, Sublime Text, or PyCharm.
- (Optional) A local installation of Python 3 to improve your Python developer experience.
Step 1: Clone the Astronomer Quickstart repository
- Learning Airflow
- ETL
- Generative AI
-
Create a new directory for your project and open it:
mkdir airflow-quickstart-learning && cd airflow-quickstart-learning
-
Clone the repository and open it:
git clone -b learning-airflow --single-branch https://github.com/astronomer/airflow-quickstart.git && cd airflow-quickstart/learning-airflow
Your directory should have the following structure:
.
├── Dockerfile
├── README.md
├── dags
│ └── example_astronauts.py
├── include
├── packages.txt
├── requirements.txt
├── solutions
│ └── example_astronauts_solution.py
└── tests
└── dags
└── test_dag_integrity.py
-
Create a new directory for your project and open it:
mkdir airflow-quickstart-etl && cd airflow-quickstart-etl
-
Clone the repository and open it:
git clone -b etl --single-branch https://github.com/astronomer/airflow-quickstart.git && cd airflow-quickstart/etl
Your directory should have the following structure:
.
├── Dockerfile
├── README.md
├── dags
│ └── example_etl_galaxies.py
├── include
│ ├── astronomy.db
│ ├── custom_functions
│ │ └── galaxy_functions.py
│ └── data
│ └── galaxy_names.txt
├── packages.txt
├── requirements.txt
├── solutions
│ └── example_etl_galaxies_solution.py
└── tests
└── dags
└── test_dag_example.py
-
Create a new directory for your project and open it:
mkdir airflow-quickstart-genai && cd airflow-quickstart-genai
-
Clone the repository and open it:
git clone -b generative-ai --single-branch https://github.com/astronomer/airflow-quickstart.git && cd airflow-quickstart/generative-ai
Your directory should have the following structure:
.
├── Dockerfile
├── README.md
├── airflow_settings.yaml
├── dags
│ └── example_vector_embeddings.py
├── include
│ ├── custom_functions
│ │ └── embedding_func.py
│ └── data
│ └── galaxy_names.txt
├── packages.txt
├── requirements.txt
├── solutions
│ └── example_vector_embeddings_solution.py
└── tests
└── dags
└── test_dag_integrity.py
Step 2: Start up Airflow and explore the UI
- Learning Airflow
- ETL
- Generative AI
-
Start the project using the Astro CLI:
astro dev start
The CLI will let you know when all Airflow services are up and running.
-
In your browser, navigate to
localhost:8080
and sign in to the Airflow UI using usernameadmin
and passwordadmin
. -
Unpause the
example_astronauts
DAG. -
Explore the DAGs view (the landing page) and individual DAG view page to get a sense of the metadata available about the DAG, run, and all task instances. For a deep-dive into the UI's features, see An introduction to the Airflow UI.
For example, the DAGs view will look like this screenshot:
As you start to trigger DAG runs, the graph view will look like this screenshot:
The Gantt chart will look like this screenshot:
-
Start the project using the Astro CLI:
astro dev start
The CLI will let you know when all Airflow services are up and running.
-
In your browser, navigate to
localhost:8080
and sign in to the Airflow UI using usernameadmin
and passwordadmin
. -
Unpause the
example_astronauts
DAG. -
Explore the landing page and individual DAG view page to get a sense of the metadata available about the DAG, run, and all task instances. For a deep-dive into the UI's features, see An introduction to the Airflow UI.
For example, the DAGs view will look like this screenshot:
As you start to trigger DAG runs, the graph view will look like this screenshot:
The Gantt chart will look like this screenshot:
-
Start the project using the Astro CLI:
astro dev start
The CLI will let you know when all Airflow services are up and running.
-
In your browser, navigate to
localhost:8080
and sign in to the Airflow UI using usernameadmin
and passwordadmin
. -
Unpause the
example_astronauts
DAG. -
Explore the DAGs view (landing page) and individual DAG view page to get a sense of the metadata available about the DAG, run, and all task instances. For a deep-dive into the UI's features, see An introduction to the Airflow UI.
For example, the DAGs view will look like this screenshot:
As you start to trigger DAG runs, the graph view will look like this screenshot:
The Gantt chart will look like this screenshot:
Step 3: Explore the project
- Learning Airflow
- ETL
- Generative AI
This Astro project introduces you to the basics of orchestrating pipelines with Airflow. You'll see how easy it is to:
- get data from data sources.
- generate tasks automatically and in parallel.
- trigger downstream workflows automatically.
You'll build a lean, dynamic pipeline serving a common use case: extracting data from an API and loading it into a database!
This project uses DuckDB, an in-memory database. Although this type of database is great for learning Airflow, your data is not guaranteed to persist between executions!
For production applications, use a persistent database instead (consider DuckDB's hosted option MotherDuck or another database like Postgres, MySQL, or Snowflake).
Pipeline structure
An Airflow instance can have any number of DAGs (directed acyclic graphs), your data pipelines in Airflow. This project has two:
example_astronauts
This DAG queries the list of astronauts currently in space from the Open Notify API, prints assorted data about the astronauts, and loads data into an in-memory database.
Tasks in the DAG are Python functions decorated using Airflow's TaskFlow API, which makes it easy to turn arbitrary Python code into Airflow tasks, automatically infer dependencies, and pass data between tasks.
-
get_astronaut_names
andget_astronaut_numbers
make a JSON array and an integer available, respectively, to downstream tasks in the DAG. -
print_astronaut_craft
andprint_astronauts
make use of this data in different ways. The third task uses dynamic task mapping to create a parallel task for each Astronaut in the list retrieved from the API. Airflow lets you do this with just two lines of code:print_astronaut_craft.partial(greeting="Hello! :)").expand(
person_in_space=get_astronaut_names()
),The key feature is the
expand()
function, which makes the DAG automatically adjust the number of tasks each time it runs. -
create_astronauts_table in duckdb
andload_astronauts_in_duckdb
create a DuckDB database table for some of the data and load the data, respectively.
example_extract_astronauts
This DAG queries the database you created for astronaut data in example_astronauts
and prints out some of this data. Changing a single line of code in this DAG can make it run automatically when the other DAG completes a run.
Building Extract, Transform, and Load (ETL) workloads is a common pattern in Apache Airflow. This project shows an example pattern for defining an ETL workload using DuckDB as the data warehouse of choice.
As you try out this project, you'll see how easy Airflow makes it to:
- write responsive pipelines that change based on user inputs.
- perform database operations using SQL.
- access and extract data from local files.
- execute transformations with Pandas.
You'll write a lean ETL pipeline in easy-to-read Python code!
This project uses DuckDB, an in-memory database. Although this type of database is great for learning Airflow, your data is not guaranteed to persist between executions!
For production applications, use a persistent database instead (consider DuckDB's hosted option MotherDuck or another database like Postgres, MySQL, or Snowflake).
Pipeline structure
An Airflow project can have any number of DAGs (directed acyclic graphs), the main building blocks of Airflow pipelines. This project has one:
example_etl_galaxies
This DAG contains five @task
-decorated Python functions:
-
create_galaxy_table_in_duckdb
uses a hook to create a database connection and a SQL query to create a database table. -
extract_galaxy_data
returns a dataframe created using a modularized function imported from the project'sinclude
directory. -
transform_galaxy_data
gets a user-specified value from the DAG context and uses it to execute a simple data transformation on the dataframe, returning another dataframe. -
load_galaxy_data
uses a database hook to load the dataframe into the database. You can load the dataframe directly in the context of a SQL query. No conversion of the dataframe is required. -
print_loaded_galaxies
executes aSELECT
query on the database and prints the data to the logs.
Apache Airflow is one of the most common orchestration engines for AI/Machine Learning jobs, especially for retrieval-augmented generation (RAG). This project shows a simple example of building vector embeddings for text and then performing a semantic search on the embeddings.
The DAG (directed acyclic graph) in the project demonstrates how to leverage Airflow's automation and orchestration capabilities to:
- orchestrate a generative AI pipeline.
- compute vector embeddings of words using Python's
SentenceTransformers
library. - compare the embeddings of a word of interest to a list of words to find the semantically closest match.
You'll write a user-customizable generative-AI pipeline in easy-to-read Python code!
This project uses DuckDB, an in-memory database, for running dbt transformations. Although this type of database is great for learning Airflow, your data is not guaranteed to persist between executions!
For production applications, use a persistent database instead (consider DuckDB's hosted option MotherDuck or another database like Postgres, MySQL, or Snowflake).
Pipeline structure
An Airflow project can have any number of DAGs (directed acyclic graphs), the main building blocks of Airflow pipelines. This project has one:
example_vector_embeddings
This DAG contains six tasks:
-
get_words
gets a list of words from the context to embed. -
create_embeddings
creates embeddings for the list of words. -
create_vector_table
creates a table in the DuckDB database and an HNSW index on the embedding vector. -
insert_words_into_db
inserts the words and embeddings into the table. -
embed_word
embeds a single word and returns the embeddings. -
find_closest_word_match
finds the closest match to a word of interest.
Step 4: Get your hands dirty!
- Learning Airflow
- ETL
- Generative AI
With Airflow, it's easy to create cross-workflow dependencies. In this step, you'll learn how to:
- use Airflow Datasets to create a dependency between DAGs so when one workflow ends another begins. To do this, you'll modify the
example_extract_astronauts
DAG to use a Dataset to trigger a DAG run when theexample_astronauts
DAG updates the table that both DAGs query.
Schedule the example_extract_astronauts
DAG on an Airflow Dataset
With Datasets, DAGs that access the same data can have explicit, visible relationships, and DAGs can be scheduled based on updates to these datasets. This feature helps make Airflow data-aware and expands Airflow scheduling capabilities beyond time-based methods such as cron. Downstream DAGs can be scheduled based on combinations of Dataset updates coming from tasks in the same Airflow instance or calls to the Airflow API.
-
Define the
get_astronaut_names
task as a producer of a Dataset. To do this, pass a Dataset object, encapsulated in a list, to the task'soutlets
parameter:@task(
outlets=[Dataset("current_astronauts")]
)
def get_astronaut_names(**context) -> list[dict]:For more information about Airflow Datasets, see: Datasets and data-aware scheduling in Airflow.
-
Schedule a downstream DAG run using an Airflow Dataset:
Now that you have defined the
get_astronauts
task in theexample_astronauts
DAG as a Dataset producer, you can use that Dataset to schedule downstream DAG runs.Datasets function like an API to communicate when data at a specific location in your ecosystem is ready for use, reducing the code required to create cross-DAG dependencies. For example, with an import and a single line of code, you can schedule a DAG to run when another DAG in the same Airflow environment has updated a Dataset.
To schedule the
example_extract_astronauts
DAG to run whenexample_astronauts
updates thecurrent_astronauts
Dataset, add an import statement to make the Airflow Dataset package available:from airflow import Dataset
-
Then, set the DAG's schedule using the
current_astronauts
Dataset:schedule=[Dataset("current_astronauts")],
-
Rerun the
example_astronauts
DAG in the UI and check the status of the tasks in the individual DAG view. Watch as theexample_extract_astronauts
DAG gets triggered automatically whenexample_astronauts
finishes running.If all goes well, the graph view of the Dataset-triggered DAG run will look like this screenshot:
For more information about Airflow Datasets, see: Datasets and data-aware scheduling in Airflow.
With Airflow, you can connect to many external systems and create dynamic and responsive workflows. In this step, you'll learn how to create a connection to an external system.
Create a connection to an external system
Creating connections to interoperate with the many systems Airflow supports is easy.
In the steps that follow, you'll create a connection in the Airflow UI and use it in a new DAG. You can use your own preferred external system or use Postgres for a local option.
-
A Postgres database has already been added to the project for you. Confirm that port
5432
is available, or, if not, modify the external port in the project'sdocker-compose.override.yml
:version: "3.1"
services:
example-db:
image: postgres:14.9
ports:
- "5435:5432"
environment:
- POSTGRES_USER=example
- POSTGRES_PASSWORD=example
- POSTGRES_DB=example -
Create a connection in the UI.
Airflow supports a number of ways to create connections, but most users choose the UI.
To create a conneciton in the UI, go to Admin > Connections.
Your connection should look like this screenshot, with the password being
example
: -
Add a DAG to the project.
Create a new Python file in the
dags
directory and paste this DAG code there:from airflow.decorators import (
dag,
task,
) # This DAG uses the TaskFlow API. See: https://www.astronomer.io/docs/learn/airflow-decorators
from pendulum import datetime, duration
import duckdb
import os
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from include.custom_functions.load_functions import get_sql_query
from airflow.models.dataset import Dataset
_DUCKDB_INSTANCE_NAME = os.getenv("DUCKDB_INSTANCE_NAME", "include/astronomy.db")
_TABLE_NAME = os.getenv("_TABLE_NAME", "galaxy_data")
_DUCKDB_TABLE_NAME = os.getenv("DUCKDB_TABLE_NAME", "galaxy_data")
_DUCKDB_TABLE_URI = f"duckdb://{_DUCKDB_INSTANCE_NAME}/{_DUCKDB_TABLE_NAME}"
@dag(
start_date=datetime(2024, 7, 1), # Date after which the DAG can be scheduled
schedule=[Dataset(_DUCKDB_TABLE_URI)], # See: https://www.astronomer.io/docs/learn/scheduling-in-airflow for options
catchup=False, # See: https://www.astronomer.io/docs/learn/rerunning-dags#catchup
max_consecutive_failed_dag_runs=5, # auto-pauses the DAG after 5 consecutive failed runs, experimental
max_active_runs=1, # Allow only one concurrent run of this DAG, prevents parallel DuckDB calls
doc_md=__doc__, # Add DAG Docs in the UI, see https://www.astronomer.io/docs/learn/custom-airflow-ui-docs-tutorial
default_args={
"owner": "Astro", # owner of this DAG in the Airflow UI
"retries": 3, # tasks retry 3 times before they fail
"retry_delay": duration(seconds=30), # tasks wait 30s in between retries
}, # default_args are applied to all tasks in a DAG
tags=["example", "ETL"], # Add tags in the UI
# Warning - in-memory DuckDB is not a persistent database between workers. To move this workflow into production, use a
# cloud-based database and, based on concurrency capabilities, adjust the two parameters below.
concurrency=1, # allow only a single task execution at a time, prevents parallel DuckDB calls
is_paused_upon_creation=False, # start running the DAG as soon as it's created
)
def example_etl_galaxies_load(): # By default, the dag_id is the name of the decorated function
@task
def extract_galaxy_data_duckdb(
duckdb_instance_name: str = _DUCKDB_INSTANCE_NAME,
table_name: str = _TABLE_NAME,
):
cursor = duckdb.connect(duckdb_instance_name)
galaxy_data_df = cursor.sql(f"SELECT * FROM {table_name};").df()
return galaxy_data_df
@task
def create_sql_query(df):
sql_str = get_sql_query(df, _TABLE_NAME)
return sql_str
create_galaxy_table_postgres = SQLExecuteQueryOperator(
task_id="create_galaxy_table_postgres",
conn_id="postgres_default",
sql=f"""
DROP TABLE IF EXISTS {_TABLE_NAME};
CREATE TABLE {_TABLE_NAME} (
name VARCHAR PRIMARY KEY,
distance_from_milkyway INT,
distance_from_solarsystem INT,
type_of_galaxy VARCHAR,
characteristics VARCHAR
);
""",
)
create_sql_query_obj = create_sql_query(extract_galaxy_data_duckdb())
load_galaxy_data_postgres = SQLExecuteQueryOperator(
task_id = "load_galaxy_data_postgres",
conn_id = "postgres_default",
sql = create_sql_query_obj,
)
create_galaxy_table_postgres >> load_galaxy_data_postgres
example_etl_galaxies_load()This DAG extracts data from the project's DuckDB database, creates a table in the project's Postgres database, and loads the data into the table. Using an Airflow Dataset trigger, it will run when
example_etl_galaxies
updates thegalaxy_data
dataset.Connection parameters vary between operators. In the case of
SQLExecuteQueryOperator
, it isconn_id
:load_galaxy_data_postgres = SQLExecuteQueryOperator(
task_id = "load_galaxy_data_postgres",
conn_id = "postgres_default",
sql = create_sql_query_obj,
) -
Trigger your new DAG!
Trigger the
example_etl_galaxies
DAG and not the new one you just added. Your new DAG will run after theload_galaxy_task
inexample_etl_galaxies
runs successfully.If all goes well, the graph view will look like this screenshot:
For more guidance on getting started with connections, see: Integrations & connections.
With Airflow, it's easy to test and compare LMs until you find the right model for your generative AI workflows. In this step, you'll learn how to:
- configure a DAG to use different LMs.
- use the Airflow UI to compare the performance of the models you select.
Experiment with different LMs to compare performance
Sentence Transformers (AKA SBERT) is a popular Python module for accessing, using, and training text and image embedding models. It enables a wide range of AI applications, including semantic search, semantic textual similarity, and paraphrase mining. SBERT provides various pre-trained language models via the Sentence Transformers Hugging Face organization. Additionally, over 6,000 community Sentence Transformers models have been publicly released on the Hugging Face Hub.
Try using a different language model from among those provided by SBERT in this project's DAG. Then, explore the metadata in the Airflow UI to compare the performance of the models.
-
Start your experiment by using a different model. Find the
_LM
variable definition in theget_embeddings_one_word
function close to the top of theexample_vector_embeddings
DAG and replace the model string withdistiluse-base-multilingual-cased-v2
:_LM = os.getenv("LM", "distiluse-base-multilingual-cased-v2")
The default is very fast, but this one is slower and lower-performing overall, so the results should be noticeably different. You could also try a model with higher overall performance, such as
all-mpnet-base-v2
. For a list of possible models to choose from, see SBERT's Pretrained models list. -
Next, find the dimensions of the model in the SBERT docs.
For example, the
distiluse-base-multilingual-cased-v2
model has dimensions of 512: -
Use this number to redefine another top-level variable,
_LM_DIMENSIONS
:_LM_DIMENSIONS = os.getenv("LM_DIMS", "512")
This value is used in the vector column type definition in the
create_vector_table
task and the select query in thefind_closest_word_match
task. -
Rerun the DAG. Depending on the models you choose, you might see large differences in the performance of the
create_embeddings
task.For example, using the default
all-MiniLM-L6-v2
model should result in runtimes of around 4s:By contrast, using the
distiluse-base-multilingual-cased-v2
model might result in runtimes three times as long or longer: -
Check the log output from the
find_closest_word_match
task and look for differences between the search result sets.For example, the faster LM
all-MiniML-L6-v2
returnssun, planet, light
:The more performant LM
all-mpnet-base-v2
returnssun, rocket, planet
:
For more information about the SBERT project, library, and models, see the Sentence Transformers Documentation.
Next steps: run Airflow on Astro
The easiest way to run Airflow in production is with Astro. To get started, create an Astro trial. During your trial signup, you will have the option of choosing the same template project you worked with in this quickstart.