Orchestrate dbt Cloud jobs with Airflow
dbt Cloud is a managed service that provides a hosted architecture to run dbt, a tool that helps you build interdependent SQL models for in-warehouse data transformation.
The dbt Cloud Airflow provider allows users to orchestrate and execute actions in dbt Cloud as DAGs. Running dbt with Airflow ensures a reliable, scalable environment for models, as well as the ability to trigger models based on upstream dependencies in your data ecosystem.
For a tutorial on how to use the open-source dbt Core package with Airflow see Orchestrate dbt Core with Cosmos.
Assumed knowledge
To get the most out of this tutorial, make sure you have an understanding of:
- The basics of dbt. See Getting started with dbt Cloud.
- Airflow fundamentals, such as writing DAGs and defining tasks. See Get started with Apache Airflow.
- Airflow operators. See Operators 101.
Time to complete
This tutorial takes approximately 30 minutes to complete.
Prerequisites
- The Astro CLI.
- A dbt Cloud account. A 14-day free trial is available.
- Access to a data warehouse supported by dbt Cloud. View the dbt documentation for an up-to-date list of adapters.
Step 1: Configure your Astro project
An Astro project contains all of the files you need to run Airflow locally.
-
Create a new Astro project:
$ mkdir astro-dbt-cloud-tutorial && cd astro-dbt-cloud-tutorial
$ astro dev init -
Add the dbt Cloud provider to your
requirements.txt
file.apache-airflow-providers-dbt-cloud
-
Run the following command to start your Astro project:
$ astro dev start
Step 2: Configure a dbt connection
-
In the Airflow UI, go to Admin -> Connections and click +.
-
Create a new connection named
dbt_conn
and choose thedbt Cloud
connection type. Configure the following values for the connection:- Tenant: The URL under which your API cloud is hosted. The default value is
cloud.getdbt.com
. - Account ID: (Optional) The default dbt account to use with this connection.
- API Token: A dbt user token.
- Tenant: The URL under which your API cloud is hosted. The default value is
Step 3: Configure a dbt Cloud job
In the dbt Cloud UI, create one dbt Cloud job. The contents of this job do not matter for this tutorial. Optionally, you can use the jaffle shop example from dbt's Quickstart documentation. Copy the dbt Cloud job_id
for use in the next step.
Step 4: Write a dbt Cloud DAG
-
In your
dags
folder, create a file calledcheck_before_running_dbt_cloud_job.py
. -
Copy the following code into the file, making sure to replace
<your dbt Cloud job id>
with thejob_id
you copied.from airflow.decorators import dag, task
from airflow.providers.dbt.cloud.hooks.dbt import DbtCloudHook, DbtCloudJobRunStatus
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
from pendulum import datetime
DBT_CLOUD_CONN_ID = "dbt_conn"
JOB_ID = "<your dbt Cloud job id>"
@dag(
start_date=datetime(2022, 2, 10),
schedule="@daily",
catchup=False,
)
def check_before_running_dbt_cloud_job():
@task.short_circuit
def check_job(job_id):
"""
Retrieves the last run for a given dbt Cloud job and checks
to see if the job is not currently running.
"""
hook = DbtCloudHook(DBT_CLOUD_CONN_ID)
runs = hook.list_job_runs(job_definition_id=job_id, order_by="-id")
if not runs[0].json().get("data"):
return True
else:
latest_run = runs[0].json()["data"][0]
return DbtCloudJobRunStatus.is_terminal(latest_run["status"])
trigger_job = DbtCloudRunJobOperator(
task_id="trigger_dbt_cloud_job",
dbt_cloud_conn_id=DBT_CLOUD_CONN_ID,
job_id=JOB_ID,
check_interval=600,
timeout=3600,
)
check_job(job_id=JOB_ID) >> trigger_job
check_before_running_dbt_cloud_job()This DAG shows a simple implementation of using the DbtCloudRunJobOperator and DbtCloudHook. The DAG consists of two tasks:
check_job_is_not_running
: Uses the ShortCircuitOperator to ensure that the dbt Cloud job with the specifiedJOB_ID
is not currently running. The list of currently running dbt Cloud jobs is retrieved using thelist_job_runs()
method of theDbtCloudHook
. Next, thelatest_run
is selected and itsstatus
parameter will be evaluated for being a terminal status or not. If the status of the latest run is terminal, this means the job is not currently running and the pipeline should go ahead triggering another run of this job. If the status of the latest run is not terminal, this means that a job with the givenJOB_ID
is still running in the dbt Cloud. The function used in the ShortCircuitOperator will returnFalse
, therefore causing the DAG to short circuit and skip any downstream tasks.trigger_dbt_cloud_job
: Uses theDbtCloudRunJobOperator
to trigger a run of the dbt Cloud job with the correctJOB_ID
.
-
Run the DAG and verify that the dbt Cloud job ran in the dbt Cloud UI.
The full code for this example, along with other DAGs that implement the dbt Cloud provider, can be found on the Astronomer Registry.
Congratulations! You've run a DAG which uses the dbt Cloud provider to orchestrate a job run in dbt Cloud.
You can find more examples of how to use dbt Cloud with Airflow in dbt's documentation.
Deferrable dbt Cloud operators
If you are orchestrating long-running dbt Cloud jobs using Airflow, you may benefit from leveraging deferrable operators for cost savings and scalability. The Astronomer providers package contains deferrable versions of several dbt modules:
- DbtCloudHookAsync: Asynchronous version of the DbtCloudHook.
- DbtCloudRunJobTrigger: Trigger class used in deferrable dbt Cloud operators.
- DbtCloudJobRunSensorAsync: Asynchronously checks the status of dbt Cloud job runs.
- DbtCloudRunJobOperatorAsync: Executes a dbt Cloud job asynchronously and waits for the job to reach a terminal status before completing successfully.