Improving a Data Quality Process by Adding Great Expectations
As data quality becomes a core concern for leading data companies, the number of data quality checks running on Airflow pipelines is vastly increasing, creating a greater need to integrate Airflow with specialized data quality tools like Great Expectations. We at Astronomer have been working with Superconductive, the team behind Great Expectations, to continuously improve the GreatExpectationsOperator
to provide a streamlined, Airflow-centric experience.
Recently, Astronomer took ownership of the Great Expectations Airflow Provider in a continued effort to maintain the operator and provide an Airflow-centric way to use Great Expectations. The first release we made comes with some great improvements and upgrades. Many thanks to the Superconductive team for their help!
The previous versions of the GreatExpectationsOperator
required a Data Context, Checkpoint, and Expectations Suite. The new version provides greater ease-of-use and flexibility -- just initialize a project, write your Expectations, and the operator will do the rest.
Updates and refinements include:
- A pre-defined Checkpoint is no longer required. A default Checkpoint is now created by the
GreatExpectationsOperator
when one isn’t supplied. - The
checkpoint_kwargs
parameter offers full Checkpoint customizability. - Connections are managed with the Airflow backend when using default Checkpoints. This removes the need to define datasources within the Data Context.
- Full backwards compatibility.
Improving a Data Quality Process by Adding Great Expectations
Despite its importance, data quality in pipelines is still largely an afterthought for data teams. With pressure to get data to the warehouse, understanding whether or not it’s the right data becomes a problem for the next sprint, then the next one, and then the one after that. But it shouldn’t be this way. Data quality should be a primary concern when building data pipelines, and it should be easy to include quality checks when a pipeline is first written.
Great Expectations simplifies the data quality check experience by converting SQL and Python checks into simple JSON templates called Expectations. Each Expectation comes with configuration options and is intuitively named in a human-readable way (for example the expect_column_distinct_values_to_be_in_set
Expectation). This makes writing checks both easy and flexible, whether your data is in a relational database table or a Spark or pandas dataframe.
Why Great Expectations Data Quality is Essential for Your Airflow DAGs
If your organization is just starting its data quality journey, or it’s on the fence about switching tools, here are a few reasons why you might consider Great Expectations:
- All data stakeholders will speak the same simple data quality language. That means there’s no need to parse SQL or Python code to understand a data quality check someone else wrote.
- The use of JSON to define Expectations lowers the barrier to entry for writing data quality checks. This broadens the base of stakeholders who can participate.
- You have the ability to write data quality checks on pandas or Spark dataframes, which is a feature not currently supported by Airflow-native operators.
- Great Expectations creates an in-depth analysis of Expectation results in the form of a Data Docs webpage.
At Astronomer, we use Great Expectations for specific use cases within our data quality approach to add more detail about check successes and failures. As Great Expectations integrates natively with OpenLineage, we also get a more robust set of information about our datasets in the Lineage tab of our Astro deployments.
How We Added Great Expectations to Our Environment
Setting up a new project always comes with pain points, so we at Astronomer thought hard about making the configuration of Great Expectations in an Airflow environment as simple as possible. Notably, we’ve removed steps involving Jupyter notebooks and configuring YAML files. With the new GreatExpectationsOperator
, all you need to do is init
a project and write your Expectations.
Setting up a Great Expectations Environment in Airflow
The steps to add Great Expectations functionality to an Airflow deployment are simple:
- If you don't have an existing Airflow setup, install the open-source Astro CLI and run
astro dev init
in your project directory. - Install Great Expectations locally by running
pip install great_expectations
. - Build the file structure inside the
include
folder of your Airflow instance with the commandcd /path/to/airflow/project/include && great_expectations init
. - Add the Great Expectations Provider to your Airflow instance’s
requirements.txt
file. - Turn on XCom pickling by setting the environment variable
AIRFLOW__CORE__ENABLE_XCOM_PICKLING=True
. Alternatively, you can use a custom serialization method in a custom XCom backend.
Within minutes you’ll have a functioning Great Expectations environment in your Airflow instance, and you’ll be ready to use the new GreatExpectationsOperator
.
Dynamic Task Mapping
In our internal use case, we used automatic datasource creation with the new GreatExpectationsOperator
by passing in the connection ID (snowflake_connection
) of our preexisting Airflow connection to our Snowflake database. After adding the /path/to/airflow/project/include/great_expectations
file path as the data_context_root_dir
parameter, we were prepared to pass in the names of our tables together with the corresponding Expectations Suite running checks on each table. By not setting the checkpoint_config
or checkpoint_name
parameters, we told the operator to create a default Checkpoint for us.
Since most of the configuration of the GreatExpectationsOperator
would stay the same between running an Expectation Suite on specific tables, this was a great use case for dynamic task mapping.
The code snippet below shows all the code within our Great Expectations DAG. It’s short but powerful!
list_of_checked_tables = [
"SCHEMA_NAME_1.TABLE_NAME_1",
"SCHEMA_NAME_1.TABLE_NAME_2",
"SCHEMA_NAME_2.TABLE_NAME_1",
…
]
@task
def create_table_suite_pairs(table):
"""Creates pairs of table and Expectation suite names."""
return {
"data_asset_name": table,
"expectation_suite_name": table.replace(".", "_") + "_SUITE",
}
gx_data_quality_checks = GreatExpectationsOperator.partial(
task_id="gx_data_quality_checks",
conn_id="snowflake_connection",
data_context_root_dir=gx_root_dir,
).expand_kwargs(
create_table_suite_pairs.expand(
table=list_of_checked_tables
)
)
In this example, we provided a list of tables in the format "SCHEMA_NAME.TABLE_NAME"
on which we want to run Expectation Suites. For each of these tables, we created an Expectation Suite in the include/great_expectations/expectations/
folder with the name of SCHEMA_NAME_TABLE_NAME_SUITE
.
The create_table_suite_pairs
task dynamically maps over the provided list of table names. The task creates a dictionary for each table name that contains matched inputs to two GreatExpectationsOperator
parameters: data_asset_name
and expectations_suite_name
.
The resulting list of dictionaries is passed to the GreatExpectationsOperator
to map over dynamically, creating a mapped task instance for every table that is being checked.
The graph view in Figure 1 (below) shows the two sequential tasks both having 13 dynamically mapped task instances.
Figure 1. Screenshot of the Airflow UI with the Graph view of the Great Expectations DAG showing two dynamically mapped tasks.
In the grid view in Figure 2 (below) we can see any check failures in any mapped task instance. A data quality issue was found in the table checked in mapped task instance No. 10.
Figure 2. Screenshot of the Airflow UI with the Grid view of the Great Expectations DAG showing 13 dynamically mapped task instances of the gx_data_quality_checks
task. In this example, 12 mapped task instances completed successfully and one mapped task instance failed.
Easy Schema Checking with Great Expectations
Great Expectations simplifies the code to check for changes in table schemas. In the past, when we wanted to make sure the current list of columns of a table in the data warehouse matched a provided list of columns, we used a custom SQL statement with the SQLCheckOperator
that included self-joins. With Great Expectations, we’re able to remove that complicated SQL statement in favor of the expect_table_columns_to_match_set
Expectation.
This check is crucial because it alerts data quality stakeholders to any schema changes that might prompt the writing of additional data quality checks on a specific table.
Using this Expectation in a Suite is straightforward -- you supply the name of the Expectation Suite as well as the list of known columns. Here’s an example:
{
"data_asset_type": null,
"expectation_suite_name": "<SCHEMA_NAME_TABLE_NAME_SUITE>",
"expectations": [
{
"expectation_context": {
"description": null
},
"expectation_type": "expect_table_columns_to_match_set",
"ge_cloud_id": null,
"kwargs": {
"column_set": ["<column 1>", "<column 2>", …, "<column n>"],
"exact_match": true
},
"meta": {}
}
],
"ge_cloud_id": null,
"meta": {
"great_expectations_version": "0.15.34"
}
}
Running Great Expectations on Astro
One of the reasons teams choose to integrate Great Expectations with Airflow is the increased observability of data quality checks. Astro takes this integration a step further -- with its built-in data lineage, Astro integrates seamlessly with Great Expectations. You just deploy the changes from your local environment via your existing setup and view Lineage events from the GreatExpectationsOperator
in the Astro Lineage tab.
In Figure 3 (below) the Lineage graph shows the connection between tasks using the GreatExpectationsOperator
and the datasets they are running data quality checks on (top quarter of the screenshot). The results from checks on each individual column can be viewed in the Quality tab, right down to which individual Expectations succeeded and which failed.
Figure 3. Screenshot of the Lineage graph in the Astro UI showing lineage events recorded from the GreatExpectationOperator
.
Additionally, if you configure your Great Expectations Data Docs to be saved in S3, Azure, or GCS, an operator extra link will appear on the new GreatExpectationsOperator
that links to the Data Docs page of each specific Expectations Suite.
Figure 4. Screenshot of the Data Docs page for a validation run of the Expectations Suite on a table called CLUSTERS
. Out of 13 Expectations, one was unsuccessful. The schema check shows the mismatch between the expected schema and the observed one: a new column was added cluster_type
.
Conclusion
You can now benefit from all of Great Expectations’ features by having Airflow handle much of the configuration for you, while still retaining full control and backwards compatibility with previous versions of the operator. Migrating from other data quality approaches, such as Airflow-native data quality check operators, is often only a matter of copying values into Expectations Suites using one of the many pre-defined Expectations.
If you want to improve the trust in your data quality with Great Expectations, check out: Orchestrate Great Expectations with Airflow