Native support for Source Node Rendering in Cosmos

  • Pankaj Singh
  • Alejandro Rojas

Over the past year, at GlossGenius, we’ve been using Cosmos to orchestrate our dbt-core workflows with Apache Airflow. Cosmos has been a game-changer, providing seamless integration between both tools, and simplifying how we manage and execute our data workflows. Features like rerunning specific tasks, refreshing downstream models, and leveraging Airflow’s advanced capabilities have made our life easier.

But as we became more familiar with Cosmos, we noticed an opportunity to improve how it handled source freshness checks. The freshness check will verify the health and adherence to SLAs of your source data.

Background: Challenges with source nodes before Cosmos 1.6

Stale, non-blocking sources could cause unnecessary failures

Initially, we followed the standard approach of running dbt source freshness daily after our dbt build DAG. However, this approach wasn’t ideal:

  • Running dbt source freshness checks all sources, regardless of their importance.
  • Checking for freshness before you build your models can create situations where:
  • Source A might be stale, but it doesn’t power any critical models.
  • Source B is fresh and supports the company’s core models.

Yet, a failure in Source A would block downstream tasks unnecessarily, delaying critical pipelines.

  • Checking for freshness after you build your models can waste compute if some models’ sources turned out to be stale. Moreover, following the principle that no data is better than wrong data, we prefer to avoid running models powered by stale sources altogether.

Fig 1. Running freshness check after cosmos tasks.

Custom rendering exposed compute inefficiencies

When Cosmos introduced custom rendering for nodes like sources and exposures, we saw an opportunity to check freshness at the source level. However, there was a limitation:

  • All sources were being rendered into tasks, even those without freshness checks.
  • These tasks were running dbt commands that effectively did nothing, wasting compute and cluttering our DAGs.
  • An open issue(#630) at the time, suggested a feature to render sources without any checks to be rendered as Empty Operators, which, per Airflow docs “are evaluated by the scheduler but never processed by the executor”.

The need for native support

While Cosmos already had patterns in place for models, tests, snapshots and more, it lacked native support for sources. To address this, I contributed a feature with the help of Pankaj Singh, extending the existing pattern to render sources natively.

How does native source rendering work?

Cosmos has multiple ways of rendering dbt DAGs into Airflow DAGs, including:

  • Reading directly from a manifest.json file stored locally or in cloud storage.
  • Running dbt ls while Airflow compiles the DAG to extract node names, dependencies, types, and other metadata.

To enable custom behavior for source nodes based on freshness checks, it was essential for Cosmos to extract freshness metadata consistently across all supported parsing methods.

  • Manifest files already include freshness values.
  • Dbt ls does not return freshness by default.

This required updating the dbt ls parsing method to include freshness data. Luckily, dbt ls supports a rich set of arguments, allowing customization of the returned values. This update was straightforward but restricted the feature to dbt versions 1.5 and above.

Key features

Key features of native source rendering include:

Freshness checks only when needed

  • A new variable, has_freshness, has been added to the DbtNode class.
  • What It Does:
    • True: Indicates the source requires a freshness check.
    • False: Indicates the source doesn’t require freshness checks.

Support for multiple modes

  • all: Cosmos renders all sources in the dbt project. It uses three different node types for this:
    • EmptyOperator: For sources that do not have tests or freshness checks.
    • DbtSourceOperator: For sources that have freshness checks.
    • DbtTestOperator: For sources that have tests.
  • None(default): No sources are rendered automatically. Custom converters can still be used.
  • with_tests_or_freshness:
    • Renders only sources that have either tests or freshness checks.
    • Ideal for large dbt projects with many sources, avoiding the rendering of thousands of tasks when using the all option.

New rendered template field: freshness

  • Includes the sources.json generated by dbt when running dbt source freshness.
  • Provides detailed information about the freshness checks for debugging or analysis.
{
  "metadata": {
    "dbt_schema_version": "https://schemas.getdbt.com/dbt/sources/v3.json",
    "dbt_version": "1.8.7",
    "generated_at": "2025-03-26T18:23:52.753220Z",
    "invocation_id": "c04024de-2b85-4bb8-b236-5d4ba7206382",
    "env": {}
  },
  "results": [
    {
      "unique_id": "source.altered_jaffle_shop.postgres_db.raw_orders",
      "max_loaded_at": "2018-04-09T00:00:00+00:00",
      "snapshotted_at": "2025-03-26T18:23:52.685384+00:00",
      "max_loaded_at_time_ago_in_s": 219781432.685384,
      "status": "pass",
      "criteria": {
        "warn_after": {
          "count": 3650,
          "period": "day"
        },
        "error_after": {
          "count": null,
          "period": null
        },
        "filter": null
      },
      "adapter_response": {
        "_message": "SELECT 1",
        "code": "SELECT",
        "rows_affected": 1
      },
      "timing": [
        {
          "name": "compile",
          "started_at": "2025-03-26T18:23:52.616401Z",
          "completed_at": "2025-03-26T18:23:52.616403Z"
        },
        {
          "name": "execute",
          "started_at": "2025-03-26T18:23:52.616556Z",
          "completed_at": "2025-03-26T18:23:52.688815Z"
        }
      ],
      "thread_id": "Thread-1",
      "execution_time": 0.07286596298217773
    }
  ],
  "elapsed_time": 0.6606731414794922
}

Support for source tests

  • The feature fully integrates with dbt source tests.

Example: Comparing modes

Here’s an example showcasing the three native source rendering modes:

Source configuration

version: 2 sources: - name: postgres_db database: "{{ env_var('POSTGRES_DB') }}" schema: "{{ env_var('POSTGRES_SCHEMA') }}" tables: - name: raw_customers - name: raw_payments columns: - name: id tests: - unique - not_null - name: raw_orders columns: - name: id tests: - unique - not_null freshness: warn_after: count: 3650 period: day loaded_at_field: CAST(order_date AS TIMESTAMP)

As you can see, we have 3 different source tables from the same database:

  • raw_customers doesn’t have tests nor freshness checks.
  • raw_payments has tests
  • raw_orders has both tests and freshness checks

Cosmos DAG setup

from datetime import datetime from cosmos import DbtDag, ProjectConfig, RenderConfig # New in 1.6: import SourceRenderingBehavior options from cosmos.constants import SourceRenderingBehavior # define the dbt profile airflow_db = ProfileConfig( profile_name="airflow_db", target_name="dev", profile_mapping=PostgresUserPasswordProfileMapping( conn_id="airflow_metadata_db", profile_args={"schema": "dbt"}, ), ) jaffle_shop_path = Path("/usr/local/airflow/dbt/jaffle_shop") dbt_executable = Path("/usr/local/airflow/dbt_venv/bin/dbt") # define the execution configuration venv_execution_config = ExecutionConfig( dbt_executable_path=str(dbt_executable), ) # create a DAG from a dbt-core project simple_dag = DbtDag( project_config=ProjectConfig(jaffle_shop_path), profile_config=airflow_db, execution_config=venv_execution_config, schedule_interval="@daily", start_date=datetime(2024, 1, 1), catchup=False, dag_id="simple_dag", # New in 1.6: set the render config to include source nodes render_config=RenderConfig( source_rendering_behavior=SourceRenderingBehavior.ALL, #swap between ALL, NONE or WITH_TESTS_OR_FRESHNESS ), )

Results

Mode: all

  • raw_customers: Rendered as EmptyOperator
  • raw_payments: Rendered as EmptyOperator and its tests rendered as a DbtTestOperator
  • raw_orders: Rendered as a DbtSourceOperator and its tests rendered as a DbtTestOperator

Fig 2. DAG graph view for source rendering behaviour all.

Mode: none

  • Not a single source is being rendered

Fig 3. DAG graph view for source rendering behaviour none.

Mode: with_tests_or_freshness

  • raw_customers: Not rendered
  • raw_payments: Rendered as DbtSourceOperator and its tests rendered as a DbtTestOperator
  • raw_orders: Rendered as a DbtSourceOperator and its tests rendered as a DbtTestOperator

Fig 4. DAG graph view for source rendering behaviour with_tests_or_freshness.

Getting Started

To use this feature:

  1. Upgrade to Cosmos version 1.6 or higher. Refer to the Cosmos getting started documentation
  2. Enable the ALL or WITH_TEST_OR_FRESHNESS option for native source node rendering. Check the Source rendering documentation for more details
  3. Test the feature with your dbt projects

Conclusion

This new feature in Cosmos is a step toward making dbt and Airflow workflows more efficient and tailored to real-world use cases. By rendering source nodes natively we can reduce wasted compute, avoid running models with stale data, and build workflows that align better with business priorities.

I’m excited to see how this feature will be used by the community and what further improvements it might inspire. If you have any questions, feedback, or ideas, join the conversation in the Airflow Slack in the airflow-dbt channel.

Build, run, & observe
your data workflows.
All in one place.

Try Astro today and get $300 in free credits during your 14-day trial.