Skip to main content
Version: Airflow 3.x

Using Airflow to Execute SQL

Executing SQL queries is one of the most common use cases for data pipelines. Whether you're extracting and loading data, calling a stored procedure, or executing a complex query for a report, Airflow can help you orchestrate the process.

In this guide you'll learn about the best practices for executing SQL from your DAG, review the most commonly used Airflow SQL-related operators, and then use sample code to implement a few common SQL use cases.

Assumed knowledge

To get the most out of this guide, you should have an understanding of:

Best practices for executing SQL from your DAG

No matter what database or SQL version you're using, there are many ways to execute your queries using Airflow. Once you determine how to execute your queries, the following tips will help you keep your DAGs clean, readable, and efficient for execution.

Use hooks and operators

The Common SQL provider is a great place to start when looking for SQL-related operators. It includes the SQLExecuteQueryOperator operator, which is a generic operator that can be used with a variety of databases, including Snowflake and Postgres.

For some databases more specialized operators exist and are part of the service-specific provider package. For example the SnowflakeSqlApiOperator supports submitting multiple SQL statements in a single request.

Hooks to use with a database in an @task decorated function or in a PythonOperator are typically contained in the provider package for that database. For example, the PostgresHook is part of the Postgres provider package.

tip

If you are looking to run data quality checks on your data, you can use the SQL check operators, see Run data quality checks using SQL check operators for more information.

Keep lengthy SQL code out of your DAG

Astronomer recommends avoiding long SQL statements in your DAG file. If you have a SQL query, you should keep it in its own .sql file.

If you use the Astro CLI, you can store supporting code like SQL scripts in the include/ directory:

├─ dags/  
| └─ example-dag.py
├─ plugins/
├─ include/
| ├─ query1.sql
| └─ query2.sql
├─ Dockerfile
├─ packages.txt
└─ requirements.txt

An exception to this rule could be very short queries (such as SELECT * FROM table). Putting one-line queries like this directly in the DAG is fine if it makes your code more readable.

Examples

This section contains a few examples of how to use Airflow to execute SQL queries. The examples are based on Snowflake, but the concepts apply to most relational databases.

Example 1: Execute a query

In this first example, a DAG executes two simple interdependent queries using the SQLExecuteQueryOperator.

First, you need to make sure you have the Common SQL and the Snowflake provider installed. If you use the Astro CLI, you can add the following lines to your requirements.txt file. You need the Snowflake provider in order to connect to Snowflake, and the Common SQL provider to use the SQLExecuteQueryOperator:

apache-airflow-providers-snowflake
apache-airflow-providers-common-sql

Next, you need to define your DAG:

from airflow.sdk import chain, dag
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator


@dag(template_searchpath="/usr/local/airflow/include") # Path to the SQL files
def execute_snowflake_queries():
run_query1 = SQLExecuteQueryOperator(
task_id="run_query1", conn_id="my_snowflake_conn", sql="query1.sql"
)

run_query2 = SQLExecuteQueryOperator(
task_id="run_query2", conn_id="my_snowflake_conn", sql="query2.sql"
)

chain(run_query1, run_query2)


execute_snowflake_queries()

The template_searchpath argument in the DAG definition tells the DAG to look in the given folder for scripts, so you can now add two SQL scripts to your project. In this example, those scripts are query1.sql and query2.sql, which contain the following SQL code respectively:

CREATE OR REPLACE TABLE MY_DATABASE.MY_SCHEMA.MY_NEW_TABLE (
id INT,
grocery STRING
);
INSERT INTO MY_DATABASE.MY_SCHEMA.MY_NEW_TABLE (id, grocery)
VALUES
(1, 'Chocolate'),
(2, 'Eggs'),
(3, 'Cake');

Note that the SQL in these files could be any type of query you need to execute.

Finally, you need to set up a connection to your database service, in this case a connection to Snowflake with the connection ID my_snowflake_conn. There are a few ways to manage connections using Astronomer, see the Connections guide for more information on connections in general and the Snowflake Connection guide for more information on how to set up a Snowflake connection.

Example 2: Execute a query with parameters

Using Airflow, you can also parameterize your SQL queries to use information from the Airflow context. Consider when you have a query that selects data from a table for a date that you want to dynamically update. You can execute the query using the same setup as in Example 1, but with a few adjustments.

Your DAG will look like the following:

from airflow.sdk import dag
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from pendulum import datetime


@dag(
start_date=datetime(2025, 4, 1),
schedule="@daily",
)
def parameterized_query():
opr_param_query = SQLExecuteQueryOperator(
task_id="param_query",
conn_id="my_snowflake_conn",
sql="""
SELECT *
FROM ETL_DEMO.DEV.SALES
WHERE SALE_DATE >= DATEADD(day, -1, '{{ logical_date }}');
""",
)

opr_param_query


parameterized_query()

In this example, the query has been parameterized to dynamically select data for one day before the DAG's logical date.

Astronomer recommends using Airflow context information or macros whenever possible to increase flexibility and make your workflows idempotent. The above example will work with any Airflow context information. For example, you could access a DAG-level param using the params dictionary:

SELECT *
FROM STATE_DATA
WHERE state = {{ params['my_state'] }}

You can also pass information to your SQL file using the parameters argument in the SQLExecuteQueryOperator. This is useful if you want to pass a value that is derived from another task in your DAG.

from airflow.sdk import chain, dag, task
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator


@dag
def using_parameters_argument():

@task
def get_grocery():
return "Chocolate"

_get_grocery = get_grocery()

opr_param_query = SQLExecuteQueryOperator(
task_id="param_query",
conn_id="my_snowflake_conn",
sql="""
SELECT *
FROM DEMO_DB.DEMO_SCHEMA.MY_NEW_TABLE
WHERE grocery = %(my_grocery)s;
""",
parameters={"my_grocery": _get_grocery},
)

chain(
_get_grocery,
opr_param_query,
)


using_parameters_argument()

Was this page helpful?