Skip to main content

Orchestrate Snowflake Queries with Airflow

Snowflake is one of the most commonly used data warehouses, and orchestrating Snowflake queries as part of a data pipeline is one of the most common Airflow use cases. Two Airflow provider packages, the Snowflake Airflow provider and the Common SQL provider contain hooks and operators that make it easy to interact with Snowflake from Airflow.

This tutorial covers an example of executing Snowflake operations with Airflow, including:

Additionally, More on the Airflow Snowflake integration offers further information on general best practices and considerations when interacting with Snowflake from Airflow.

Other ways to learn

There are multiple resources for learning about this topic. See also:

Time to complete

This tutorial takes approximately 20 minutes to complete.

Assumed knowledge

To get the most out of this tutorial, make sure you have an understanding of:

Prerequisites

  • The Astro CLI.
  • A Snowflake account. A 30-day free trial is available. You need to have at least one database, one schema and one warehouse set up in your Snowflake account as well as a user with the necessary permissions to create tables and run queries in the schema.

Step 1: Configure your Astro project

Use the Astro CLI to create and run an Airflow project on your local machine.

  1. Create a new Astro project:

    $ mkdir astro-snowflake-tutorial && cd astro-snowflake-tutorial
    $ astro dev init
  2. In the requirements.txt file, add the Snowflake Airflow provider and the Common SQL provider.

    apache-airflow-providers-snowflake==5.7.0
    apache-airflow-providers-common-sql==1.16.0
  3. Run the following command to start your Airflow project:

    astro dev start

Step 2: Configure a Snowflake connection

There are different options to authenticate to Snowflake. The SnowflakeAPIOperator used in this tutorial requires you to use key-pair authentication, which is the preferred method. This method requires you to generate a public/private key pair, add the public key to your role in Snowflake, and use the private key in your Airflow connection.

info

For Astro customers, Astronomer recommends to take advantage of the Astro Environment Manager to store connections in an Astro-managed secrets backend. These connections can be shared across multiple deployed and local Airflow environments. See Manage Astro connections in branch-based deploy workflows. Note that when using the Astro Environment Manager for your Snowflake connection, you can directly paste the private key into the Private Key Content field in the UI without needing to modify it.

  1. In your terminal, run the following command to generate a private RSA key using OpenSSL. Note that while there are other options to generate a key pair, Snowflake has specific requirements for the key format and may not accept keys generated with other tools. Make sure to write down the key passphrase as you will need it later.

    openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8
  2. Generate the associated public key using the following command:

    openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub 
  3. In the Snowflake UI, run the following SQL command to add the public key to your user. You can paste the public key directly from the rsa_key.pub file without needing to modify it.

    ALTER USER <your user> SET RSA_PUBLIC_KEY='<your public key>';
  4. In the Airflow UI, go to Admin -> Connections and click + to create a new connection. Choose the Snowflake connection type and enter the following information:

    • Connection ID: snowflake_conn

    • Schema: Your Snowflake schema. The example DAG uses DEMO_SCHEMA.

    • Login: Your Snowflake user name. Make sure to capitalize the user name as the SnowflakeAPIOperator requires it.

    • Password: Your private key passphrase.

    • Extra: Enter the following JSON object with your own Snowflake account identifier, database, your role in properly capitalized format, and your warehouse.

      {
      "account": "<your account id in the form of abc12345>",
      "warehouse": "<your warehouse>",
      "database": "DEMO_DB",
      "region": "<your region>",
      "role": "<your role in capitalized format>",
      "private_key_content": "-----BEGIN ENCRYPTED PRIVATE KEY-----\nABC...ABC\nABC...ABC=\n-----END ENCRYPTED PRIVATE KEY-----\n"
      }

      For the private_key_content field, paste the contents of the rsa_key.p8 file. Make sure to replace newlines with \n. You can use the script below to format the key correctly:

      def format_private_key(private_key_path):
      with open(private_key_path, 'r') as key_file:
      private_key = key_file.read()
      return private_key.replace('\n', '\\n')

      formatted_key = format_private_key('rsa_key.pem')
      print(formatted_key)

    Your connection should look something like the screenshot below.

    Snowflake connection

tip

When using JSON format to set your connection use the following parameters:

AIRFLOW_CONN_SNOWFLAKE_DEFAULT='{
"conn_type":"snowflake",
"login":"<your user, properly capitalized>",
"password":"<your private key passphrase>",
"schema":"DEMO_SCHEMA",
"extra":{
"account":"<your account id in the form of abc12345",
"warehouse":"<your warehouse>",
"database":"DEMO_DB",
"region":"<your region>",
"role":"<your role, properly capitalized>",
"private_key_content":"-----BEGIN ENCRYPTED PRIVATE KEY-----\nABC...ABC\nABC...ABC=\n-----END ENCRYPTED PRIVATE KEY-----\n"
}
}'

Step 3: Add your SQL statements

The DAG you will create in Step 4 runs multiple SQL statements against your Snowflake data warehouse. While it is possible to add SQL statements directly in your DAG file it is common practice to store them in separate files. When initializing your Astro project with the Astro CLI, an include folder was created. The contents of this folder will automatically be mounted into the Dockerfile, which makes it the standard location in which supporting files are stored.

  1. Create a folder called sql in your include folder.

  2. Create a new file in include/sql called insert_data.sql and copy the following code:

    INSERT INTO {{ params.db_name }}.{{ params.schema_name }}.{{ params.table_name }} (ID, NAME)
    VALUES
    (1, 'Avery');

    This file contains one SQL statement that inserts a row into a table. The database, schema and table names are parameterized so that you can pass them to the operator at runtime.

  3. The SnowflakeSqlApiOperator can run multiple SQL statements in a single task. Create a new file in include/sql called multiple_statements_query.sql and copy the following code:

    INSERT INTO {{ params.db_name }}.{{ params.schema_name }}.{{ params.table_name }} (ID, NAME)
    VALUES
    (2, 'Peanut'),
    (3, 'Butter');


    INSERT INTO {{ params.db_name }}.{{ params.schema_name }}.{{ params.table_name }} (ID, NAME)
    VALUES
    (4, 'Vega'),
    (5, 'Harper');

    This file contains two SQL statements that insert multiple rows into a table.

tip

When running SQL statements from Airflow operators, you can store the SQL code in individual SQL files, in a combined SQL file, or as strings in a Python module. Astronomer recommends storing lengthy SQL statements in a dedicated file to keep your DAG files clean and readable.

Step 4: Write a Snowflake DAG

  1. Create a new file in your dags directory called my_snowflake_dag.py.

  2. Copy and paste the code below into the file:

    """
    ### Snowflake Tutorial DAG

    This DAG demonstrates how to use the SQLExecuteQueryOperator,
    SnowflakeSqlApiOperator and SQLColumnCheckOperator to interact with Snowflake.
    """

    from airflow.decorators import dag
    from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
    from airflow.providers.common.sql.operators.sql import SQLColumnCheckOperator
    from airflow.providers.snowflake.operators.snowflake import SnowflakeSqlApiOperator
    from airflow.models.baseoperator import chain
    from pendulum import datetime, duration
    import os

    _SNOWFLAKE_CONN_ID = "snowflake_conn"
    _SNOWFLAKE_DB = "DEMO_DB"
    _SNOWFLAKE_SCHEMA = "DEMO_SCHEMA"
    _SNOWFLAKE_TABLE = "DEMO_TABLE"


    @dag(
    dag_display_name="Snowflake Tutorial DAG ❄️",
    start_date=datetime(2024, 9, 1),
    schedule=None,
    catchup=False,
    default_args={"owner": "airflow", "retries": 1, "retry_delay": duration(seconds=5)},
    doc_md=__doc__,
    tags=["tutorial"],
    template_searchpath=[
    os.path.join(os.path.dirname(os.path.abspath(__file__)), "../include/sql")
    ], # path to the SQL templates
    )
    def my_snowflake_dag():

    # you can execute SQL queries directly using the SQLExecuteQueryOperator
    create_or_replace_table = SQLExecuteQueryOperator(
    task_id="create_or_replace_table",
    conn_id=_SNOWFLAKE_CONN_ID,
    database="DEMO_DB",
    sql=f"""
    CREATE OR REPLACE TABLE {_SNOWFLAKE_SCHEMA}.{_SNOWFLAKE_TABLE} (
    ID INT,
    NAME VARCHAR
    )
    """,
    )

    # you can also execute SQL queries from a file, make sure to add the path to the template_searchpath
    insert_data = SQLExecuteQueryOperator(
    task_id="insert_data",
    conn_id=_SNOWFLAKE_CONN_ID,
    database="DEMO_DB",
    sql="insert_data.sql",
    params={
    "db_name": _SNOWFLAKE_DB,
    "schema_name": _SNOWFLAKE_SCHEMA,
    "table_name": _SNOWFLAKE_TABLE,
    },
    )

    # you can also execute multiple SQL statements using the SnowflakeSqlApiOperator
    # make sure to set the statement_count parameter to the number of statements in the SQL file
    # and that your connection details are in their proper capitalized form!
    insert_data_multiple_statements = SnowflakeSqlApiOperator(
    task_id="insert_data_multiple_statements",
    snowflake_conn_id=_SNOWFLAKE_CONN_ID,
    sql="multiple_statements_query.sql",
    database=_SNOWFLAKE_DB,
    schema=_SNOWFLAKE_SCHEMA,
    params={
    "db_name": _SNOWFLAKE_DB,
    "schema_name": _SNOWFLAKE_SCHEMA,
    "table_name": _SNOWFLAKE_TABLE,
    },
    statement_count=2, # needs to match the number of statements in the SQL file
    autocommit=True,
    )

    # use SQLCheck operators to check the quality of your data
    data_quality_check = SQLColumnCheckOperator(
    task_id="data_quality_check",
    conn_id=_SNOWFLAKE_CONN_ID,
    database=_SNOWFLAKE_DB,
    table=f"{_SNOWFLAKE_SCHEMA}.{_SNOWFLAKE_TABLE}",
    column_mapping={
    "ID": {"null_check": {"equal_to": 0}, "distinct_check": {"geq_to": 3}}
    },
    )

    chain(
    create_or_replace_table,
    insert_data,
    insert_data_multiple_statements,
    data_quality_check,
    )


    my_snowflake_dag()

    The DAG completes the following steps:

    • Uses the SQLExecuteQueryOperator to run an in-line SQL statement that creates a table in Snowflake.
    • Uses the SQLExecuteQueryOperator to run an SQL file containing a singular SQL statement that inserts data into the table.
    • Uses the SnowflakeSqlApiOperator to run an SQL file containing multiple SQL statements that insert data into the table. The operator is set to run in deferrable mode.
    • Uses the SQLColumnCheckOperator to run a data quality check on the table checking that there are no NULL values in the ID column and that it contains at least 3 distinct values. To learn more about SQL check operators, see Run data quality checks using SQL check operators.

    The chain() method at the end of the DAG sets the dependencies. This method is commonly used over bitshift operators (>>) to make it easier to read dependencies between many tasks.

Step 5: Run the DAG

  1. In the Airflow UI, click the play button to manually run your DAG.

  2. Open the logs for the data_quality_check task to see the results of the data quality check, confirming that the table was created and populated correctly.

    [2024-09-12, 16:03:09 UTC] {sql.py:469} INFO - Record: [('ID', 'null_check', 0), ('ID', 'distinct_check', 5)]
    [2024-09-12, 16:03:09 UTC] {sql.py:492} INFO - All tests have passed

More on the Airflow Snowflake integration

This section provides additional information on orchestrating actions in Snowflake with Airflow.

Snowflake Operators and Hooks

Several open source packages contain operators used to orchestrate Snowflake in Airflow.

The Common SQL provider package contains operators that you can use with several SQL databases, including Snowflake:

The Snowflake provider package contains:

  • SnowflakeSqlApiOperator: Executes multiple SQL statements in a single task. Note that this operator uses the Snowflake SQL API, which requires connection parameters such as the role and user to be properly capitalized. The operator can be set to be deferrable using deferrable=True.
  • CopyFromExternalStageToSnowflakeOperator: Copies data from an external stage to a Snowflake table. Note that the prefix parameter will be added to the full stage path defined in Snowflake. See the ELT with Snowflake and Apache Airflow® for eCommerce reference architecture for an example of how to use this operator.
  • SnowflakeHook: A client to interact with Snowflake which is commonly used when building custom operators interacting with Snowflake.

Best practices and considerations

The following are some best practices and considerations to keep in mind when orchestrating Snowflake queries from Airflow:

  • To reduce costs and improve the scalability of your Airflow environment, consider using the SnowflakeSqlApiOperator in deferrable mode for long running queries.
  • Set your default Snowflake query specifications such as Warehouse, Role, Schema, and so on in the Airflow connection. Then overwrite those parameters for specific tasks as necessary in your operator definitions. This is cleaner and easier to read than adding USE Warehouse XYZ; statements within your queries. If you are an Astro customer, use the Astro Environment Manager to define your base connection and add overrides for specific deployments and tasks.
  • Pay attention to which Snowflake compute resources your tasks are using, as overtaxing your assigned resources can cause slowdowns in your Airflow tasks. It is generally recommended to have different warehouses devoted to your different Airflow environments to ensure DAG development and testing does not interfere with DAGs running in production. If you want to optimize your Snowflake usage, consider using SnowPatrol to detect anomalies in your Snowflake spend.
  • Make use of Snowflake stages together with the CopyFromExternalStageToSnowflakeOperator when loading large amounts data from an external system using Airflow. To see how to use this operator, check out the ELT with Snowflake and Apache Airflow® for eCommerce reference architecture.

Conclusion

Congratulations! You've connected Airflow to Snowflake and executed Snowflake queries from your Airflow DAGs. You've also learned about best practices and considerations when orchestrating Snowflake queries from Airflow.

Was this page helpful?