Astronomer and Fivetran Partner to Release Production-Grade ELT Airflow Provider

  • Viraj Parekh
  • Julian LaNeve

Today, we’re excited to announce a new Fivetran provider for Apache Airflow®, complete with asynchronous functionality, developed and maintained by Astronomer and Fivetran. This represents a joint commitment to provide first-class support for data ingestion, orchestrated by Airflow and powered by Fivetran.

The provider is a succession of the (now) legacy Fivetran provider, which served the community well. The legacy provider will be deprecated in favor of this new provider, but fear not - migrating to the new provider is easy. Read on to learn more about why we’re releasing a new provider and how to migrate your Airflow DAGs to the new provider.

Airflow and Fivetran using the legacy provider

Suppose a data team wants to ingest data into a warehouse as part of a customer billing use case. They need to aggregate data from four sources: their SaaS app, Stripe, Salesforce, and HubSpot.

While the data team may need to write a custom API integration to ingest data from their SaaS app, the other data sources are well-supported by Fivetran. Data teams generally prefer Fivetran when there’s a pre-built connector because they’re not required to maintain custom code for common data sources.

Using the legacy Fivetran provider, a DAG author could use Airflow to orchestrate this ingestion process. While Fivetran offers scheduling functionality, for teams working with more than just Fivetran, it’s helpful to use a single orchestration tool to put their data ingestion in the context of upstream and downstream processes. Additionally, as a mature orchestration tool, Airflow offers helpful functionality around error notifications, scheduling, dependency management, and visibility.

Tactically, a user would add two tasks to their Airflow DAG per ingestion process: the FivetranOperator to begin a sync, and the FivetranSensor to poll Fivetran until the sync is complete.

Rethinking the experience with modern Airflow features

There are two large drawbacks to the old “fire and forget” nature of the Fivetran Operator: you have to declare two Airflow tasks per Fivetran sync, and it consumes Airflow resources inefficiently.

In the scenario above, the task using the FivetranOperator is marked as “success” almost immediately when the request is received by Fivetran, not when the underlying sync finishes. This means that in order to add downstream tasks that are dependent on the sync completing, the user has to add a sensor after the FivetranOperator task to know when the data has arrived. Managing two operators per sync can be cumbersome, particularly for such a common use case.

Separately, running a sensor to wait for the Fivetran sync to finish is quite inefficient; it means that there’s an Airflow worker (with potentially large resourcing) responsible solely for making an API request every few minutes. To address this problem, Airflow 2.2 introduced the triggerer to allow for asynchronous functionality. Now, when an Airflow task is waiting for a condition to be met, it can be deferred to this triggerer instead of consuming a worker slot, resulting in a more scalable and cost-effective architecture.

The new FivetranAsyncOperator addresses these issues by combining the old FivetranOperator and FivetranSensor into a single, asynchronous operator. Instead of the old “fire and forget” method, the new FivetranAsyncOperator kicks off the Fivetran sync, goes into the deferred state while the sync is running, and is only marked completed once the data has arrived.

Further, data-driven scheduling was introduced in Airflow 2.4, allowing for DAGs that access the same dataset to have explicit, visible relationships. DAGs that depend on each other can be scheduled based on updates to the underlying datasets they share.

ELT workloads often have split responsibilities between those responsible for delivering a dataset and those responsible for the analytics transformations required to produce insights from it. Data-driven scheduling allows teams to write DAGs based on separation of concerns and responsibilities; one DAG can “produce” a dataset from a source system (in this case, via Fivetran) into an analytics destination (the “EL” DAG), and another DAG can focus on the transformation layer (the “T” DAG). The “T” DAG can be scheduled to run after the dataset arrives in the “EL” DAG.

To implement data-driven scheduling, the DAG author defines a dataset (e.g. the Fivetran destination), indicates producer tasks (the FivetranOperator task that loads that dataset) by providing it to an outlets parameter, and schedules any consumer DAGs (the transformation DAGs) that are dependent on that dataset by providing it to the schedule parameter.

from airflow.decorators import dag
from airflow.datasets import Dataset
from fivetran_provider_async.operators import FivetranOperatorAsync

my_snowflake_table = Dataset("snowflake://my_snowflake_conn_id/my_schema/my_table")

@dag(schedule="@daily")
def my_first_dag():
    fivetran_task = FivetranOperatorAsync(
        task_id="fivetran_task",
        connector_id="my_connector_id",
        fivetran_conn_id="my_fivetran_conn_id",
        # the outlet defines which dataset the task updates
        outlets=[my_snowflake_table],
    )

# the schedule parameter now takes a list of datasets instead of a time-based schedule
@dag(schedule=[my_snowflake_table])
def my_second_dag():
    # this dag will now run immediately after your fivetran job finishes
    pass

In the example above, the FivetranOperatorAsync will run on a daily schedule. Immediately after the Fivetran job is complete, Airflow will recognize that the Dataset called my_snowflake_table is updated and trigger any DAGs scheduled on that dataset.

Automatic data lineage extraction

Using Airflow to coordinate Fivetran syncs using the new FivetranAsyncOperator brings an additional advantage: the gathering of lineage metadata through OpenLineage. When using the default extractor mechanism provided by OpenLineage version 0.16.1 or later, the operator automatically identifies the input and output datasets and sends them to Astro, Marquez, or another OpenLineage backend of your choice.

The input dataset refers to the data source obtained from the Fivetran connector configuration or the original data origin. The output dataset represents where you have configured the data to be stored and is part of the destination API call. If possible, the output dataset's schema is also extracted from the data obtained through the schema API call. In addition, the sync's Job facet information is collected to provide details about the service the connection is executed for, the group ID, and the connector ID. This ensures that your team has all the necessary information to trace any issues back to their source. Furthermore, the run facet collects error message information and is only provided if the Fivetran sync fails.

Let's consider a common scenario: using Fivetran for data ingestion and dbt for modeling. It's likely that data is sourced from multiple platforms such as GCS, S3, Salesforce, and Google Sheets, making Fivetran an ideal choice. Each Fivetran job will have a different connector, but for simplicity, let's assume they all fall under the same "DWH" group. Our dbt jobs will begin transforming the "raw" tables, which are the outputs of the Fivetran jobs, in a Snowflake database. This data will ultimately be utilized for generating dashboards or performing reverse-ETL.

Prior to the integration of lineage, dbt tasks were displayed with connections to the datasets they produced, but we were unable to extend our view beyond the Snowflake ecosystem in which dbt operates. With the inclusion of Fivetran extractors, we not only gain visibility into the exact data source (confirming that it's Fivetran moving data into Snowflake), but we also establish a complete connection with the datasets and models used by dbt. The result is a much more comprehensive and detailed understanding of data pipelines.

image2.png

Migrating to the new provider

To begin using the new provider, we recommend updating your FivetranOperator and FivetranOperator + FivetranSensor combinations to the new FivetranOperatorAsync. For example, if you previously had the operators:

from fivetran_provider.operators.fivetran import FivetranOperator
from fivetran_provider.sensors.fivetran import FivetranSensor

trigger_fivetran_job = FivetranOperator(...)
wait_for_completion = FivetranSensor(...)

You should now use:

from fivetran_provider_async.operators import FivetranOperatorAsync

trigger_fivetran_and_wait_for_completion = FivetranOperatorAsync(...)

Similarly, if you were previously using just the FivetranSensor to monitor for job completion, you can replace that with the new FivetranSensorAsync.

As the Airflow community continues to innovate around data-driven scheduling, new ways to author DAGs, and so much more, users can expect a first- class story between Fivetran and Airflow, with Astronomer and Fivetran working closely together to support the community for both. The new Fivetran provider is Apache 2.0 licensed and ready for use. And as always, if you’re looking to get started with Airflow, get started with Astro in less than 5 minutes!

For more information on the Fivetran async provider, check out their blog post

Build, run, & observe your data workflows.
All in one place.

Get $300 in free credits during your 14-day trial.

Get Started Free