Skip to main content

Orchestrate dbt Cloud jobs with Airflow

dbt Cloud is a managed service that provides a hosted architecture to run dbt, a tool that helps you build interdependent SQL models for in-warehouse data transformation.

The dbt Cloud Airflow provider allows users to orchestrate and execute actions in dbt Cloud as DAGs. Running dbt with Airflow ensures a reliable, scalable environment for models, as well as the ability to trigger models based on upstream dependencies in your data ecosystem.

info

For a tutorial on how to use the open-source dbt Core package with Airflow see Orchestrate dbt Core with Cosmos.

Assumed knowledge

To get the most out of this tutorial, make sure you have an understanding of:

Time to complete

This tutorial takes approximately 30 minutes to complete.

Prerequisites

Step 1: Configure your Astro project

An Astro project contains all of the files you need to run Airflow locally.

  1. Create a new Astro project:

    $ mkdir astro-dbt-cloud-tutorial && cd astro-dbt-cloud-tutorial
    $ astro dev init
  2. Add the dbt Cloud provider to your requirements.txt file.

    apache-airflow-providers-dbt-cloud
  3. Run the following command to start your Astro project:

    $ astro dev start

Step 2: Configure a dbt connection

  1. In the Airflow UI, go to Admin -> Connections and click +.

  2. Create a new connection named dbt_conn and choose the dbt Cloud connection type. Configure the following values for the connection:

    • Tenant: The URL under which your API cloud is hosted. The default value is cloud.getdbt.com.
    • Account ID: (Optional) The default dbt account to use with this connection.
    • API Token: A dbt user token.

Step 3: Configure a dbt Cloud job

In the dbt Cloud UI, create one dbt Cloud job. The contents of this job do not matter for this tutorial. Optionally, you can use the jaffle shop example from dbt's Quickstart documentation. Copy the dbt Cloud job_id for use in the next step.

Step 4: Write a dbt Cloud DAG

  1. In your dags folder, create a file called check_before_running_dbt_cloud_job.py.

  2. Copy the following code into the file, making sure to replace <your dbt Cloud job id> with the job_id you copied.

    from airflow.decorators import dag, task
    from airflow.providers.dbt.cloud.hooks.dbt import DbtCloudHook, DbtCloudJobRunStatus
    from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
    from pendulum import datetime

    DBT_CLOUD_CONN_ID = "dbt_conn"
    JOB_ID = "<your dbt Cloud job id>"


    @dag(
    start_date=datetime(2022, 2, 10),
    schedule="@daily",
    catchup=False,
    )
    def check_before_running_dbt_cloud_job():
    @task.short_circuit
    def check_job(job_id):
    """
    Retrieves the last run for a given dbt Cloud job and checks
    to see if the job is not currently running.
    """
    hook = DbtCloudHook(DBT_CLOUD_CONN_ID)
    runs = hook.list_job_runs(job_definition_id=job_id, order_by="-id")
    if not runs[0].json().get("data"):
    return True
    else:
    latest_run = runs[0].json()["data"][0]
    return DbtCloudJobRunStatus.is_terminal(latest_run["status"])

    trigger_job = DbtCloudRunJobOperator(
    task_id="trigger_dbt_cloud_job",
    dbt_cloud_conn_id=DBT_CLOUD_CONN_ID,
    job_id=JOB_ID,
    check_interval=600,
    timeout=3600,
    )

    check_job(job_id=JOB_ID) >> trigger_job


    check_before_running_dbt_cloud_job()

    This DAG shows a simple implementation of using the DbtCloudRunJobOperator and DbtCloudHook. The DAG consists of two tasks:

    • check_job_is_not_running: Uses the ShortCircuitOperator to ensure that the dbt Cloud job with the specified JOB_ID is not currently running. The list of currently running dbt Cloud jobs is retrieved using the list_job_runs() method of the DbtCloudHook. Next, the latest_run is selected and its status parameter will be evaluated for being a terminal status or not. If the status of the latest run is terminal, this means the job is not currently running and the pipeline should go ahead triggering another run of this job. If the status of the latest run is not terminal, this means that a job with the given JOB_ID is still running in the dbt Cloud. The function used in the ShortCircuitOperator will return False, therefore causing the DAG to short circuit and skip any downstream tasks.
    • trigger_dbt_cloud_job: Uses the DbtCloudRunJobOperator to trigger a run of the dbt Cloud job with the correct JOB_ID.
  3. Run the DAG and verify that the dbt Cloud job ran in the dbt Cloud UI.

    The full code for this example, along with other DAGs that implement the dbt Cloud provider, can be found on the Astronomer Registry.

Congratulations! You've run a DAG which uses the dbt Cloud provider to orchestrate a job run in dbt Cloud.

info

You can find more examples of how to use dbt Cloud with Airflow in dbt's documentation.

Deferrable dbt Cloud operators

If you are orchestrating long-running dbt Cloud jobs using Airflow, you may benefit from leveraging deferrable operators for cost savings and scalability. The Astronomer providers package contains deferrable versions of several dbt modules:

See also

Was this page helpful?