Cosmos 1.6.0, is now available, featuring a range of enhancements and new additions to serve the community. This blog post will walk you through the features introduced in Cosmos 1.6, released on August 20th. Details:
Introducing Cosmos 1.6: The Best Way to Run dbt-core in Airflow
When to use Cosmos
The open-source provider package Cosmos allows you to integrate dbt-core jobs into Airflow by automatically creating Airflow tasks from dbt models, giving you the ability to turn your dbt-core projects into an Airflow task group and DAGs with just a few lines of code. Since its release in early 2023, Cosmos has become the most popular way to run dbt-core with Airflow, recently passing 1 million monthly downloads.
This blog post dives into the newest Cosmos features released in version 1.6.0. If you are new to Cosmos, check out our tutorial and the documentation for more information.
Airflow Task Representation for dbt Source Nodes
A new way of rendering source nodes as Airflow tasks is now available through the RenderConfig API. The default behavior is set to not render source nodes. You can control this rendering behavior using the source_rendering_behavior parameter in RenderConfig.
With this addition, Cosmos now supports running freshness checks on source nodes, executing source tests, and rendering source nodes as EmptyOperators in the Airflow UI for nodes without tests or freshness checks.
This feature was heavily requested across various channels by community members and developed by Alejandro Rojas Benítez, a data engineer at GlossGenius. We thank all our users for their continued suggestions and inspiration on how we can further improve Cosmos!
Example of how to render dbt Source Nodes
from datetime import datetimefrom cosmos import DbtDag, ProjectConfig, RenderConfig# New in 1.6: import SourceRenderingBehavior optionsfrom cosmos.constants import SourceRenderingBehavior# define the dbt profileairflow_db = ProfileConfig(profile_name="airflow_db",target_name="dev",profile_mapping=PostgresUserPasswordProfileMapping(conn_id="airflow_metadata_db",profile_args={"schema": "dbt"},),)jaffle_shop_path = Path("/usr/local/airflow/dbt/jaffle_shop")dbt_executable = Path("/usr/local/airflow/dbt_venv/bin/dbt")# define the execution configurationvenv_execution_config = ExecutionConfig(dbt_executable_path=str(dbt_executable),)# create a DAG from a dbt-core projectsimple_dag = DbtDag(project_config=ProjectConfig(jaffle_shop_path),profile_config=airflow_db,execution_config=venv_execution_config,schedule_interval="@daily",start_date=datetime(2024, 1, 1),catchup=False,dag_id="simple_dag",# New in 1.6: set the render config to include source nodesrender_config=RenderConfig(source_rendering_behavior=SourceRenderingBehavior.ALL,),)
Fig 1. Dbt project tree view.
Fig 2. Graph of the Airflow DAG created by the DAG code above.
In the Airflow graph view above, you can see that source nodes are rendered in sky blue on the left side.
For more information on rendering source nodes with Cosmos check out the documentation.
Persisting the virtual env directory for LoadMode.VIRTUALENV
Additionally, Cosmos 1.6. added a new option in the ExecutionConfig API to persist the virtual environment for the virtualenv load mode on a single worker. This allows the worker to maintain and reuse the virtual environment, enhancing DAG run time efficiency. Before this change, Cosmos would create a new Python virtualenv every time, which could significantly impact the task execution time.
ExecutionConfig(virtualenv_dir=”path/to/virtualenv”,…)
Support using remote manifest files (from GCS/S3/Azure Blob Storage)
Cosmos 1.6 better supports users who want to keep their dbt project(s) and Airflow in different repositories and in different Docker container images. Hosting manifest files in remote object storage in either GCS, S3 or Azure blob storage is compatible with implementations that use the Cosmos manifest load method.
For more information, See Load Cosmos dbt manifest file from S3/GCS/ABS in the documentation.
Example implementation of using a remote manifest file
from datetime import datetimefrom airflow.decorators import dagfrom airflow.providers.cncf.kubernetes.secret import Secretfrom cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfigfrom cosmos.constants import ExecutionModefrom cosmos.profiles import PostgresUserPasswordProfileMappingpostgres_password_secret = Secret(deploy_type="env",deploy_target="POSTGRES_PASSWORD",secret="postgres-secrets",key="password",)postgres_host_secret = Secret(deploy_type="env",deploy_target="POSTGRES_HOST",secret="postgres-secrets",key="host",)@dag(schedule_interval="@daily",start_date=datetime(2024, 1, 1),catchup=False,tags=["simple"],)def remote_manifest_k8_mode_task_group() -> None:DbtTaskGroup(group_id="my_jaffle_shop_project",project_config=ProjectConfig(# New in 1.6: specify a remote manifest path manifest_path="s3://cosmos-manifest-test/manifest.json"manifest_conn_id="aws_s3_conn",project_name="jaffle_shop",),profile_config=ProfileConfig(profile_name="airflow_db",target_name="dev",profile_mapping=PostgresUserPasswordProfileMapping(conn_id="airflow_metadata_db",profile_args={"schema": "dbt"},),),execution_config=ExecutionConfig(execution_mode=ExecutionMode.KUBERNETES,),operator_args={"image": "dbt-jaffle-shop:1.0.0","get_logs": True,"is_delete_operator_pod": False,"secrets": [postgres_password_secret, postgres_host_secret],},)remote_manifest_k8_mode_task_group()
Support remote caching
In addition to being able to store manifests in remote object storage, we have enhanced dbt ls
output caching to enable storage in remote systems such as S3, GCS, and Azure blob storage as well. This improvement increases the scalability of dbt ls output caching and reduces the load on the Airflow metadata database. This is an alternative to the Cosmos 1.5 Airflow Variable-based cache.
This feature utilizes the Airflow ObjectStorage feature, which is dependent on Airflow version 2.8.0+.
You can enable remote caching for dbt ls by setting the following environment variables in your Airflow environment. See the Cosmos documentation for more information.
AIRFLOW__COSMOS__REMOTE_CACHE_DIR=gs://cosmos-manifest/AIRFLOW__COSMOS__REMOTE_CACHE_DIR_CONN_ID=gcp_conn
Caching of the dbt package
Cosmos 1.6 also adds support for caching the dbt package-lock.yml file to ensure predictable installation of dbt project dependencies and to enhance the performance of dbt deps in Cosmos for LOCAL and VIRTUALENV execution modes.
You can enable or disable package caching using the following environment variable: AIRFLOW__COSMOS__ENABLE_CACHE_PACKAGE_LOCKFILE=True
Teradata profile mapping
Lastly, this release added support for profile mapping to enable an integration between the Teradata Airflow provider and the dbt-teradata adapter. This enhancement expands Cosmos' capabilities, allowing seamless connections with Teradata Vantage and improved workflow management.
Profile_config = TeradataUserPasswordProfileMapping(conn_id=”airflow_teradata_conn”,profile_args={"schema": "my_schema"})
You can learn more about Teradata profile mapping in the Cosmos Docs.
Conclusion
The 1.6 release of Cosmos added many highly anticipated features. We’d like to thank the Cosmos contributors making this release possible, in particular Alejandro Rojas Benítez, Daniel Reeves, Giovanni Corsetti Silva, Jaegwon.seo, Micah Victoria, Pankaj Koti, Pankaj Singh, Satish Chinthanippu and Tatiana Al-Chueyr.
By combining Cosmos and dbt Deploys in one platform, dbt on Astro enables teams to observe and deploy their dbt and Airflow code from a single place. Experience the latest firsthand with our 14-day Free Trial (new signups receive $300 in credits to get started). Sign up today to get started!