Skip to main content

Apache Airflow® Quickstart - ETL

ETL: An introduction to modern, enhanced ETL development with Airflow.

Step 1: Clone the Astronomer Quickstart repository

  1. Create a new directory for your project and open it:

    mkdir airflow-quickstart-etl && cd airflow-quickstart-etl
  2. Clone the repository and open it:

    git clone -b etl --single-branch https://github.com/astronomer/airflow-quickstart.git && cd airflow-quickstart/etl

    Your directory should have the following structure:

    .
    ├── Dockerfile
    ├── README.md
    ├── dags
    │ └── example_etl_galaxies.py
    ├── include
    │ ├── astronomy.db
    │ ├── custom_functions
    │ │ └── galaxy_functions.py
    │ └── data
    │ └── galaxy_names.txt
    ├── packages.txt
    ├── requirements.txt
    ├── solutions
    │ └── example_etl_galaxies_solution.py
    └── tests
    └── dags
    └── test_dag_example.py

Step 2: Start up Airflow and explore the UI

  1. Start the project using the Astro CLI:

    astro dev start

    The CLI will let you know when all Airflow services are up and running.

  2. If it doesn't launch automtically, navigate your browser to localhost:8080 and sign in to the Airflow UI using username admin and password admin.

  3. Explore the landing page and individual DAG view page to get a sense of the metadata available about the DAG, run, and all task instances. For a deep-dive into the UI's features, see An introduction to the Airflow UI.

    For example, the DAGs view will look like this screenshot:

    Airfllow UI DAGs view

    As you start to trigger DAG runs, the graph view will look like this screenshot:

    Example ETL Galaxies DAG graph view

    The Gantt chart will look like this screenshot:

    Example ETL Galaxies DAG Gantt chart

Step 3: Explore the project

Building Extract, Transform, and Load (ETL) workloads is a common pattern in Apache Airflow. This project shows an example pattern for defining an ETL workload using DuckDB as the data warehouse of choice.

As you try out this project, you'll see how easy Airflow makes it to:

  • Write responsive pipelines that change based on user inputs.
  • Perform database operations using SQL.
  • Access and extract data from local files.
  • Execute transformations with Pandas.

You'll write a lean ETL pipeline in easy-to-read Python code!

warning

This project uses DuckDB, an in-memory database. Although this type of database is great for learning Airflow, your data is not guaranteed to persist between executions!

For production applications, use a persistent database instead (consider DuckDB's hosted option MotherDuck or another database like Postgres, MySQL, or Snowflake).

Pipeline structure

An Airflow project can have any number of DAGs (directed acyclic graphs), the main building blocks of Airflow pipelines. This project has one:

example_etl_galaxies

This DAG contains five @task-decorated Python functions:

  • create_galaxy_table_in_duckdb uses a hook to create a database connection and a SQL query to create a database table.

  • extract_galaxy_data returns a dataframe created using a modularized function imported from the project's include directory.

  • transform_galaxy_data gets a user-specified value from the DAG context and uses it to execute a simple data transformation on the dataframe, returning another dataframe.

  • load_galaxy_data uses a database hook to load the dataframe into the database. You can load the dataframe directly in the context of a SQL query. No conversion of the dataframe is required.

  • print_loaded_galaxies executes a SELECT query on the database and prints the data to the logs.

Step 4: Get your hands dirty!

With Airflow, you can connect to many external systems and create dynamic and responsive workflows. In this step, you'll learn how to create a connection to an external system.

Create a connection to an external system

Creating connections to interoperate with the many systems Airflow supports is easy.

In the steps that follow, you'll create a connection in the Airflow UI and use it in a new DAG. You can use your own preferred external system or use Postgres for a local option.

  1. A Postgres database has already been added to the project for you. Confirm that port 5432 is available, or, if not, modify the external port in the project's docker-compose.override.yml:

    version: "3.1"
    services:
    example-db:
    image: postgres:14.9
    ports:
    - "5435:5432"
    environment:
    - POSTGRES_USER=example
    - POSTGRES_PASSWORD=example
    - POSTGRES_DB=example
  2. Create a connection in the UI.

    Airflow supports a number of ways to create connections, but most users choose the UI.

    To create a conneciton in the UI, go to Admin > Connections.

    Connections seen from the DAG view

    Your connection should look like this screenshot, with the password being example:

    Postgres connection in UI

  3. Add a DAG to the project.

    Create a new Python file in the dags directory and paste this DAG code there:

    from airflow.decorators import (
    dag,
    task,
    ) # This DAG uses the TaskFlow API. See: https://www.astronomer.io/docs/learn/airflow-decorators
    from pendulum import datetime, duration
    import duckdb
    import os
    from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
    from include.custom_functions.load_functions import get_sql_query
    from airflow.models.dataset import Dataset

    _DUCKDB_INSTANCE_NAME = os.getenv("DUCKDB_INSTANCE_NAME", "include/astronomy.db")
    _TABLE_NAME = os.getenv("_TABLE_NAME", "galaxy_data")
    _DUCKDB_TABLE_NAME = os.getenv("DUCKDB_TABLE_NAME", "galaxy_data")
    _DUCKDB_TABLE_URI = f"duckdb://{_DUCKDB_INSTANCE_NAME}/{_DUCKDB_TABLE_NAME}"

    @dag(
    start_date=datetime(2024, 7, 1), # Date after which the DAG can be scheduled
    schedule=[Dataset(_DUCKDB_TABLE_URI)], # See: https://www.astronomer.io/docs/learn/scheduling-in-airflow for options
    catchup=False, # See: https://www.astronomer.io/docs/learn/rerunning-dags#catchup
    max_consecutive_failed_dag_runs=5, # auto-pauses the DAG after 5 consecutive failed runs, experimental
    max_active_runs=1, # Allow only one concurrent run of this DAG, prevents parallel DuckDB calls
    doc_md=__doc__, # Add DAG Docs in the UI, see https://www.astronomer.io/docs/learn/custom-airflow-ui-docs-tutorial
    default_args={
    "owner": "Astro", # owner of this DAG in the Airflow UI
    "retries": 3, # tasks retry 3 times before they fail
    "retry_delay": duration(seconds=30), # tasks wait 30s in between retries
    }, # default_args are applied to all tasks in a DAG
    tags=["example", "ETL"], # Add tags in the UI
    # Warning - in-memory DuckDB is not a persistent database between workers. To move this workflow into production, use a
    # cloud-based database and, based on concurrency capabilities, adjust the two parameters below.
    concurrency=1, # allow only a single task execution at a time, prevents parallel DuckDB calls
    is_paused_upon_creation=False, # start running the DAG as soon as it's created
    )
    def example_etl_galaxies_load(): # By default, the dag_id is the name of the decorated function

    @task
    def extract_galaxy_data_duckdb(
    duckdb_instance_name: str = _DUCKDB_INSTANCE_NAME,
    table_name: str = _TABLE_NAME,
    ):
    cursor = duckdb.connect(duckdb_instance_name)
    galaxy_data_df = cursor.sql(f"SELECT * FROM {table_name};").df()

    return galaxy_data_df

    @task
    def create_sql_query(df):
    sql_str = get_sql_query(df, _TABLE_NAME)
    return sql_str

    create_galaxy_table_postgres = SQLExecuteQueryOperator(
    task_id="create_galaxy_table_postgres",
    conn_id="postgres_default",
    sql=f"""
    DROP TABLE IF EXISTS {_TABLE_NAME};
    CREATE TABLE {_TABLE_NAME} (
    name VARCHAR PRIMARY KEY,
    distance_from_milkyway INT,
    distance_from_solarsystem INT,
    type_of_galaxy VARCHAR,
    characteristics VARCHAR
    );
    """,
    )

    create_sql_query_obj = create_sql_query(extract_galaxy_data_duckdb())

    load_galaxy_data_postgres = SQLExecuteQueryOperator(
    task_id = "load_galaxy_data_postgres",
    conn_id = "postgres_default",
    sql = create_sql_query_obj,
    )

    create_galaxy_table_postgres >> load_galaxy_data_postgres

    example_etl_galaxies_load()

    This DAG extracts data from the project's DuckDB database, creates a table in the project's Postgres database, and loads the data into the table. Using an Airflow Dataset trigger, it will run when example_etl_galaxies updates the galaxy_data dataset.

    Connection parameters vary between operators. In the case of SQLExecuteQueryOperator, it is conn_id:

    load_galaxy_data_postgres = SQLExecuteQueryOperator(
    task_id = "load_galaxy_data_postgres",
    conn_id = "postgres_default",
    sql = create_sql_query_obj,
    )
  4. Trigger your new DAG!

    Trigger the example_etl_galaxies DAG and not the new one you just added. Your new DAG will run after the load_galaxy_task in example_etl_galaxies runs successfully.

    If all goes well, the graph view will look like this screenshot:

    Postgres DAG graph in UI

For more guidance on getting started with connections, see: Integrations & connections.

Next Steps: Run Airflow on Astro

The easiest way to run Airflow in production is with Astro. To get started, create an Astro trial. During your trial signup, you will have the option of choosing the same template project you worked with in this quickstart.

Was this page helpful?