Introducing Cosmos 1.6: The Best Way to Run dbt-core in Airflow

  • Pankaj Singh

Cosmos 1.6.0, is now available, featuring a range of enhancements and new additions to serve the community. This blog post will walk you through the features introduced in Cosmos 1.6, released on August 20th. Details:

When to use Cosmos

The open-source provider package Cosmos allows you to integrate dbt-core jobs into Airflow by automatically creating Airflow tasks from dbt models, giving you the ability to turn your dbt-core projects into an Airflow task group and DAGs with just a few lines of code. Since its release in early 2023, Cosmos has become the most popular way to run dbt-core with Airflow, recently passing 1 million monthly downloads.

This blog post dives into the newest Cosmos features released in version 1.6.0. If you are new to Cosmos, check out our tutorial and the documentation for more information.

Airflow Task Representation for dbt Source Nodes

A new way of rendering source nodes as Airflow tasks is now available through the RenderConfig API. The default behavior is set to not render source nodes. You can control this rendering behavior using the source_rendering_behavior parameter in RenderConfig.

With this addition, Cosmos now supports running freshness checks on source nodes, executing source tests, and rendering source nodes as EmptyOperators in the Airflow UI for nodes without tests or freshness checks.

This feature was heavily requested across various channels by community members and developed by Alejandro Rojas Benítez, a data engineer at GlossGenius. We thank all our users for their continued suggestions and inspiration on how we can further improve Cosmos!

Example of how to render dbt Source Nodes

from datetime import datetime
from cosmos import DbtDag, ProjectConfig, RenderConfig
# New in 1.6: import SourceRenderingBehavior options
from cosmos.constants import SourceRenderingBehavior
# define the dbt profile
airflow_db = ProfileConfig(
profile_name="airflow_db",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="airflow_metadata_db",
profile_args={"schema": "dbt"},
),
)
jaffle_shop_path = Path("/usr/local/airflow/dbt/jaffle_shop")
dbt_executable = Path("/usr/local/airflow/dbt_venv/bin/dbt")
# define the execution configuration
venv_execution_config = ExecutionConfig(
dbt_executable_path=str(dbt_executable),
)
# create a DAG from a dbt-core project
simple_dag = DbtDag(
project_config=ProjectConfig(jaffle_shop_path),
profile_config=airflow_db,
execution_config=venv_execution_config,
schedule_interval="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
dag_id="simple_dag",
# New in 1.6: set the render config to include source nodes
render_config=RenderConfig(
source_rendering_behavior=SourceRenderingBehavior.ALL,
),
)
Fig 1. Dbt project tree view.

Fig 1. Dbt project tree view.

Fig 2. Graph of the Airflow DAG created by the DAG code above.

Fig 2. Graph of the Airflow DAG created by the DAG code above.

In the Airflow graph view above, you can see that source nodes are rendered in sky blue on the left side.

For more information on rendering source nodes with Cosmos check out the documentation.

Persisting the virtual env directory for LoadMode.VIRTUALENV

Additionally, Cosmos 1.6. added a new option in the ExecutionConfig API to persist the virtual environment for the virtualenv load mode on a single worker. This allows the worker to maintain and reuse the virtual environment, enhancing DAG run time efficiency. Before this change, Cosmos would create a new Python virtualenv every time, which could significantly impact the task execution time.

ExecutionConfig(
virtualenv_dir=”path/to/virtualenv”,
)

Support using remote manifest files (from GCS/S3/Azure Blob Storage)

Cosmos 1.6 better supports users who want to keep their dbt project(s) and Airflow in different repositories and in different Docker container images. Hosting manifest files in remote object storage in either GCS, S3 or Azure blob storage is compatible with implementations that use the Cosmos manifest load method.

For more information, See Load Cosmos dbt manifest file from S3/GCS/ABS in the documentation.

Example implementation of using a remote manifest file

from datetime import datetime
from airflow.decorators import dag
from airflow.providers.cncf.kubernetes.secret import Secret
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig
from cosmos.constants import ExecutionMode
from cosmos.profiles import PostgresUserPasswordProfileMapping
postgres_password_secret = Secret(
deploy_type="env",
deploy_target="POSTGRES_PASSWORD",
secret="postgres-secrets",
key="password",
)
postgres_host_secret = Secret(
deploy_type="env",
deploy_target="POSTGRES_HOST",
secret="postgres-secrets",
key="host",
)
@dag(
schedule_interval="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["simple"],
)
def remote_manifest_k8_mode_task_group() -> None:
DbtTaskGroup(
group_id="my_jaffle_shop_project",
project_config=ProjectConfig(
# New in 1.6: specify a remote manifest path manifest_path="s3://cosmos-manifest-test/manifest.json"
manifest_conn_id="aws_s3_conn",
project_name="jaffle_shop",
),
profile_config=ProfileConfig(
profile_name="airflow_db",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="airflow_metadata_db",
profile_args={"schema": "dbt"},
),
),
execution_config=ExecutionConfig(
execution_mode=ExecutionMode.KUBERNETES,
),
operator_args={
"image": "dbt-jaffle-shop:1.0.0",
"get_logs": True,
"is_delete_operator_pod": False,
"secrets": [postgres_password_secret, postgres_host_secret],
},
)
remote_manifest_k8_mode_task_group()

Support remote caching

In addition to being able to store manifests in remote object storage, we have enhanced dbt ls output caching to enable storage in remote systems such as S3, GCS, and Azure blob storage as well. This improvement increases the scalability of dbt ls output caching and reduces the load on the Airflow metadata database. This is an alternative to the Cosmos 1.5 Airflow Variable-based cache.

This feature utilizes the Airflow ObjectStorage feature, which is dependent on Airflow version 2.8.0+.

You can enable remote caching for dbt ls by setting the following environment variables in your Airflow environment. See the Cosmos documentation for more information.

AIRFLOW__COSMOS__REMOTE_CACHE_DIR=gs://cosmos-manifest/
AIRFLOW__COSMOS__REMOTE_CACHE_DIR_CONN_ID=gcp_conn

Caching of the dbt package

Cosmos 1.6 also adds support for caching the dbt package-lock.yml file to ensure predictable installation of dbt project dependencies and to enhance the performance of dbt deps in Cosmos for LOCAL and VIRTUALENV execution modes.

You can enable or disable package caching using the following environment variable: AIRFLOW__COSMOS__ENABLE_CACHE_PACKAGE_LOCKFILE=True

Teradata profile mapping

Lastly, this release added support for profile mapping to enable an integration between the Teradata Airflow provider and the dbt-teradata adapter. This enhancement expands Cosmos' capabilities, allowing seamless connections with Teradata Vantage and improved workflow management.

Profile_config = TeradataUserPasswordProfileMapping(
conn_id=”airflow_teradata_conn”,
profile_args={"schema": "my_schema"}
)

You can learn more about Teradata profile mapping in the Cosmos Docs.

Conclusion

The 1.6 release of Cosmos added many highly anticipated features. We’d like to thank the Cosmos contributors making this release possible, in particular Alejandro Rojas Benítez, Daniel Reeves, Giovanni Corsetti Silva, Jaegwon.seo, Micah Victoria, Pankaj Koti, Pankaj Singh, Satish Chinthanippu and Tatiana Al-Chueyr.

By combining Cosmos and dbt Deploys in one platform, dbt on Astro enables teams to observe and deploy their dbt and Airflow code from a single place. Experience the latest firsthand with our 14-day Free Trial (new signups receive $300 in credits to get started). Sign up today to get started!

Ready to Get Started?

See how your team can fuel its data workflows with more power and less complexity than ever before.

Start Free Trial →

Which plan works best for your team?

Learn about pricing →

What can Astronomer do for your organization?

Talk to an expert →