Datasets and data-aware scheduling in Airflow
With Datasets, DAGs that access the same data can have explicit, visible relationships, and DAGs can be scheduled based on updates to these datasets. This feature helps make Airflow data-aware and expands Airflow scheduling capabilities beyond time-based methods such as cron.
Datasets can help resolve common issues. For example, consider a data engineering team with a DAG that creates a dataset and an analytics team with a DAG that analyses the dataset. Using datasets, the data analytics DAG runs only when the data engineering team's DAG publishes the dataset.
In this guide, you'll learn about datasets in Airflow and how to use them to implement triggering of DAGs based on dataset updates. You'll also learn how datasets work with the Astro Python SDK.
Datasets are a separate feature from object storage, which allows you to interact with files in cloud and local object storage systems. To learn more about using Airflow to interact with files, see Use Airflow object storage to interact with cloud storage in an ML pipeline.
There are multiple resources for learning about this topic. See also:
- Astronomer Academy: Airflow: Datasets module.
- Webinar: Data Driven Scheduling.
- Use case: Orchestrate machine learning pipelines with Airflow datasets.
Assumed knowledge
To get the most out of this guide, you should have an existing knowledge of:
- Airflow scheduling concepts. See Schedule DAGs in Airflow.
- Creating dependencies between DAGs. See Cross-DAG Dependencies.
- The Astro Python SDK. See Using the Astro Python SDK.
Why use datasets?
Datasets allow you to define explicit dependencies between DAGs and updates to your data. This helps you to:
- Standardize communication between teams. Datasets can function like an API to communicate when data in a specific location has been updated and is ready for use.
- Reduce the amount of code necessary to implement cross-DAG dependencies. Even if your DAGs don't depend on data updates, you can create a dependency that triggers a DAG after a task in another DAG updates a dataset.
- Get better visibility into how your DAGs are connected and how they depend on data. The Datasets tab in the Airflow UI shows a graph of all dependencies between DAGs and datasets in your Airflow environment.
- Reduce costs, because datasets do not use a worker slot in contrast to sensors or other implementations of cross-DAG dependencies.
As of Airflow 2.8, you can use listeners to enable Airflow to notify you when certain dataset events occur. There are two listener hooks for the following events:
- on_dataset_created
- on_dataset_changed
For examples, refer to our Create Airflow listeners tutorial.
Dataset concepts
You can define datasets in your Airflow environment and use them to create dependencies between DAGs. To define a dataset, instantiate the Dataset
class and provide a string to identify the location of the dataset. This string must be in the form of a valid Uniform Resource Identifier (URI). See What is valid URI? for detailed information.
Currently, the URI is not used to connect to an external system and there is no awareness of the content or location of the dataset. However, using this naming convention helps you to easily identify the datasets that your DAG accesses and ensures compatibility with future Airflow features.
The dataset URI is saved as plain text, so it is recommended that you hide sensitive values using environment variables or a secrets backend.
You can reference the dataset in a task by passing it to the task's outlets
parameter. outlets
is part of the BaseOperator
, so it's available to every Airflow operator.
When you define a task's outlets
parameter, Airflow labels the task as a producer task that updates the datasets. It is up to you to determine which tasks should be considered producer tasks for a dataset. As long as a task has an outlet dataset, Airflow considers it a producer task even if that task doesn't operate on the referenced dataset. In the following example, the write_instructions_to_file
and write_info_to_file
are both producer tasks because they have defined outlets.
- TaskFlow API
- Traditional syntax
from pendulum import datetime
from airflow.datasets import Dataset
from airflow.decorators import dag, task
API = "https://www.thecocktaildb.com/api/json/v1/1/random.php"
INSTRUCTIONS = Dataset("file://localhost/airflow/include/cocktail_instructions.txt")
INFO = Dataset("file://localhost/airflow/include/cocktail_info.txt")
@dag(
start_date=datetime(2022, 10, 1),
schedule=None,
catchup=False,
)
def datasets_producer_dag():
@task
def get_cocktail(api):
import requests
r = requests.get(api)
return r.json()
@task(outlets=[INSTRUCTIONS])
def write_instructions_to_file(response):
cocktail_name = response["drinks"][0]["strDrink"]
cocktail_instructions = response["drinks"][0]["strInstructions"]
msg = f"See how to prepare {cocktail_name}: {cocktail_instructions}"
f = open("include/cocktail_instructions.txt", "a")
f.write(msg)
f.close()
@task(outlets=[INFO])
def write_info_to_file(response):
import time
time.sleep(30)
cocktail_name = response["drinks"][0]["strDrink"]
cocktail_category = response["drinks"][0]["strCategory"]
alcohol = response["drinks"][0]["strAlcoholic"]
msg = f"{cocktail_name} is a(n) {alcohol} cocktail from category {cocktail_category}."
f = open("include/cocktail_info.txt", "a")
f.write(msg)
f.close()
cocktail = get_cocktail(api=API)
write_instructions_to_file(cocktail)
write_info_to_file(cocktail)
datasets_producer_dag()
from pendulum import datetime
from airflow import DAG, Dataset
from airflow.decorators import task
from airflow.operators.python import PythonOperator
API = "https://www.thecocktaildb.com/api/json/v1/1/random.php"
INSTRUCTIONS = Dataset("file://localhost/airflow/include/cocktail_instructions.txt")
INFO = Dataset("file://localhost/airflow/include/cocktail_info.txt")
def get_cocktail_func(api):
import requests
r = requests.get(api)
return r.json()
def write_instructions_to_file_func(response):
cocktail_name = response["drinks"][0]["strDrink"]
cocktail_instructions = response["drinks"][0]["strInstructions"]
msg = f"See how to prepare {cocktail_name}: {cocktail_instructions}"
f = open("include/cocktail_instructions.txt", "a")
f.write(msg)
f.close()
def write_info_to_file_func(response):
import time
time.sleep(30)
cocktail_name = response["drinks"][0]["strDrink"]
cocktail_category = response["drinks"][0]["strCategory"]
alcohol = response["drinks"][0]["strAlcoholic"]
msg = (
f"{cocktail_name} is a(n) {alcohol} cocktail from category {cocktail_category}."
)
f = open("include/cocktail_info.txt", "a")
f.write(msg)
f.close()
with DAG(
dag_id="datasets_producer_dag",
start_date=datetime(2022, 10, 1),
schedule=None,
catchup=False,
render_template_as_native_obj=True,
):
get_cocktail = PythonOperator(
task_id="get_cocktail",
python_callable=get_cocktail_func,
op_kwargs={"api": API},
)
write_instructions_to_file = PythonOperator(
task_id="write_instructions_to_file",
python_callable=write_instructions_to_file_func,
op_kwargs={"response": "{{ ti.xcom_pull(task_ids='get_cocktail') }}"},
outlets=[INSTRUCTIONS],
)
write_info_to_file = PythonOperator(
task_id="write_info_to_file",
python_callable=write_info_to_file_func,
op_kwargs={"response": "{{ ti.xcom_pull(task_ids='get_cocktail') }}"},
outlets=[INFO],
)
get_cocktail >> write_instructions_to_file >> write_info_to_file
A consumer DAG runs whenever the dataset(s) it is scheduled on is updated by a producer task, rather than running on a time-based schedule. For example, if you have a DAG that should run when the INSTRUCTIONS
and INFO
datasets are updated, you define the DAG's schedule using the names of those two datasets.
Any DAG that is scheduled with a dataset is considered a consumer DAG even if that DAG doesn't actually access the referenced dataset. In other words, it's up to you as the DAG author to correctly reference and use datasets.
- TaskFlow API
- Traditional syntax
from pendulum import datetime
from airflow.datasets import Dataset
from airflow.decorators import dag, task
INSTRUCTIONS = Dataset("file://localhost/airflow/include/cocktail_instructions.txt")
INFO = Dataset("file://localhost/airflow/include/cocktail_info.txt")
@dag(
dag_id="datasets_consumer_dag",
start_date=datetime(2022, 10, 1),
schedule=[INSTRUCTIONS, INFO], # Scheduled on both Datasets
catchup=False,
)
def datasets_consumer_dag():
@task
def read_about_cocktail():
cocktail = []
for filename in ("info", "instructions"):
with open(f"include/cocktail_{filename}.txt", "r") as f:
contents = f.readlines()
cocktail.append(contents)
return [item for sublist in cocktail for item in sublist]
read_about_cocktail()
datasets_consumer_dag()
from pendulum import datetime
from airflow import DAG, Dataset
from airflow.operators.python import PythonOperator
INSTRUCTIONS = Dataset("file://localhost/airflow/include/cocktail_instructions.txt")
INFO = Dataset("file://localhost/airflow/include/cocktail_info.txt")
def read_about_cocktail_func():
cocktail = []
for filename in ("info", "instructions"):
with open(f"include/cocktail_{filename}.txt", "r") as f:
contents = f.readlines()
cocktail.append(contents)
return [item for sublist in cocktail for item in sublist]
with DAG(
dag_id="datasets_consumer_dag",
start_date=datetime(2022, 10, 1),
schedule=[INSTRUCTIONS, INFO], # Scheduled on both Datasets
catchup=False,
):
PythonOperator(
task_id="read_about_cocktail",
python_callable=read_about_cocktail_func,
)
Any number of datasets can be provided to the schedule
parameter as a list or as an expression using conditional logic. If the Datasets are provided in a list, the DAG is triggered after all of the datasets have received at least one update due to a producing task completing successfully.
When you work with datasets, keep the following considerations in mind:
- Datasets can only be used by DAGs in the same Airflow environment.
- Airflow monitors datasets only within the context of DAGs and tasks. It does not monitor updates to datasets that occur outside of Airflow.
- Consumer DAGs that are scheduled on a dataset are triggered every time a task that updates that dataset completes successfully. For example, if
task1
andtask2
both producedataset_a
, a consumer DAG ofdataset_a
runs twice - first whentask1
completes, and again whentask2
completes. - Consumer DAGs scheduled on a dataset are triggered as soon as the first task with that dataset as an outlet finishes, even if there are downstream producer tasks that also operate on the dataset.
Airflow 2.9 added several new features to datasets:
-
Datasets are now shown in the Graph view of a DAG in the Airflow UI. The
upstream1
DAG in the screenshot below is a consumer of thedataset0
dataset, and has one taskupdate_dataset_1
that updates thedataset1
dataset.
For more information about datasets, see Data-aware scheduling.
The Datasets tab, and the DAG Dependencies view in the Airflow UI give you observability for datasets and data dependencies in the DAG's schedule.
On the DAGs view, you can see that your dataset_downstream_1_2
DAG is scheduled on two producer datasets (one in dataset_upstream1
and dataset_upstream2
). When Datasets are provided as a list, the DAG is scheduled to run after all Datasets in the list have received at least one update. In the following screenshot, the dataset_downstream_1_2
DAG's next run is pending one dataset update. At this point the dataset_upstream
DAG has run and updated its dataset, but the dataset_upstream2
DAG has not.
The Datasets tab shows a list of all datasets in your Airflow environment and a graph showing how your DAGs and datasets are connected. You can filter the lists of Datasets by recent updates.
Click one of the datasets to display a list of task instances that updated the dataset and a highlighted view of that dataset and its connections on the graph.
The DAG Dependencies view (found under the Browse tab) shows a graph of all dependencies between DAGs (in green) and datasets (in orange) in your Airflow environment.
DAGs that are triggered by datasets do not have the concept of a data interval. If you need information about the triggering event in your downstream DAG, you can use the parameter triggering_dataset_events
from the context. This parameter provides a list of all the triggering dataset events with parameters [timestamp, source_dag_id, source_task_id, source_run_id, source_map_index ]
.
Updating a dataset
As of Airflow 2.9+ there are three ways to update a dataset:
-
A task with an outlet parameter that references the dataset completes successfully.
-
A
POST
request to the datasets endpoint of the Airflow REST API. -
A manual update in the Airflow UI.
Conditional dataset scheduling
In Airflow 2.9 and later, you can use logical operators to combine any number of datasets provided to the schedule
parameter. The logical operators supported are |
for OR and &
for AND.
For example, to schedule a DAG on an update to either dataset1
, dataset2
, dataset3
, or dataset4
, you can use the following syntax. Note that the full statement is wrapped in ()
.
- TaskFlow API
- Traditional syntax
from airflow.decorators import dag
from airflow.models.datasets import Dataset
from pendulum import datetime
@dag(
start_date=datetime(2024, 3, 1),
schedule=(
Dataset("dataset1")
| Dataset("dataset2")
| Dataset("dataset3")
| Dataset("dataset4")
), # Use () instead of [] to be able to use conditional dataset scheduling!
catchup=False,
)
def downstream1_on_any():
# your tasks here
downstream1_on_any()
from airflow.models import DAG
from airflow.models.datasets import Dataset
from pendulum import datetime
with DAG(
dag_id="downstream1_on_any",
start_date=datetime(2024, 3, 1),
schedule=(
Dataset("dataset1")
| Dataset("dataset2")
| Dataset("dataset3")
| Dataset("dataset4")
), # Use () instead of [] to be able to use conditional dataset scheduling!
catchup=False,
):
# your tasks here
The downstream1_on_any
DAG is triggered whenever any of the datasets dataset1
, dataset2
, dataset3
, or dataset4
are updated. When clicking on x of 4 Datasets updated in the DAGs view, you can see the dataset expression that defines the schedule.
You can also combine the logical operators to create more complex expressions. For example, to schedule a DAG on an update to either dataset1
or dataset2
and either dataset3
or dataset4
, you can use the following syntax:
- TaskFlow API
- Traditional syntax
from airflow.decorators import dag
from airflow.models.datasets import Dataset
from pendulum import datetime
@dag(
start_date=datetime(2024, 3, 1),
schedule=(
(Dataset("dataset1") | Dataset("dataset2"))
& (Dataset("dataset3") | Dataset("dataset4"))
), # Use () instead of [] to be able to use conditional dataset scheduling!
catchup=False
)
def downstream2_one_in_each_group():
# your tasks here
downstream2_one_in_each_group()
from airflow.models import DAG
from airflow.models.datasets import Dataset
from pendulum import datetime
with DAG(
dag_id="downstream2_one_in_each_group",
start_date=datetime(2024, 3, 1),
schedule=(
(Dataset("dataset1") | Dataset("dataset2"))
& (Dataset("dataset3") | Dataset("dataset4"))
), # Use () instead of [] to be able to use conditional dataset scheduling!
catchup=False,
):
# your tasks here
The dataset expression this schedule creates is:
{
"all": [
{
"any": [
"dataset1",
"dataset2"
]
},
{
"any": [
"dataset3",
"dataset4"
]
}
]
}
Combined dataset and time-based scheduling
In Airflow 2.9 and later, you can combine dataset-based scheduling with time-based scheduling with the DatasetOrTimeSchedule
timetable. A DAG scheduled with this timetable will run either when its timetable
condition is met or when its dataset
condition is met.
The DAG shown below runs on a time-based schedule defined by the 0 0 * * *
cron expression, which is every day at midnight. The DAG also runs when either dataset3
or dataset4
is updated.
- TaskFlow API
- Traditional syntax
from airflow.decorators import dag, task
from airflow.datasets import Dataset
from pendulum import datetime
from airflow.timetables.datasets import DatasetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable
@dag(
start_date=datetime(2024, 3, 1),
schedule=DatasetOrTimeSchedule(
timetable=CronTriggerTimetable("0 0 * * *", timezone="UTC"),
datasets=(Dataset("dataset3") | Dataset("dataset4")),
# Use () instead of [] to be able to use conditional dataset scheduling!
),
catchup=False,
)
def toy_downstream3_dataset_and_time_schedule():
# your tasks here
toy_downstream3_dataset_and_time_schedule()
from airflow.models import DAG
from airflow.datasets import Dataset
from pendulum import datetime
from airflow.timetables.datasets import DatasetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable
with DAG(
dag_id="toy_downstream3_dataset_and_time_schedule",
start_date=datetime(2024, 3, 1),
schedule=DatasetOrTimeSchedule(
timetable=CronTriggerTimetable("0 0 * * *", timezone="UTC"),
datasets=(Dataset("dataset3") | Dataset("dataset4")),
# Use () instead of [] to be able to use conditional dataset scheduling!
),
catchup=False,
):
# your tasks here
Datasets with the Astro Python SDK
If you are using the Astro Python SDK version 1.1 or later, you do not need to make any code updates to use datasets. Datasets are automatically registered for any functions with output tables and you do not need to define any outlet
parameters.
The following example DAG results in three registered datasets: one for each load_file
function and one for the resulting data from the transform
function.
from pendulum import datetime
from airflow.decorators import dag
from astro.files import File
from astro.sql import (
load_file,
transform,
)
from astro.sql.table import Table
SNOWFLAKE_CONN_ID = "snowflake_conn"
AWS_CONN_ID = "aws_conn"
# The first transformation combines data from the two source tables
@transform
def extract_data(homes1: Table, homes2: Table):
return """
SELECT *
FROM {{homes1}}
UNION
SELECT *
FROM {{homes2}}
"""
@dag(start_date=datetime(2021, 12, 1), schedule="@daily", catchup=False)
def example_sdk_datasets():
# Initial load of homes data csv's from S3 into Snowflake
homes_data1 = load_file(
task_id="load_homes1",
input_file=File(path="s3://airflow-kenten/homes1.csv", conn_id=AWS_CONN_ID),
output_table=Table(name="HOMES1", conn_id=SNOWFLAKE_CONN_ID),
if_exists="replace",
)
homes_data2 = load_file(
task_id="load_homes2",
input_file=File(path="s3://airflow-kenten/homes2.csv", conn_id=AWS_CONN_ID),
output_table=Table(name="HOMES2", conn_id=SNOWFLAKE_CONN_ID),
if_exists="replace",
)
# Define task dependencies
extracted_data = extract_data(
homes1=homes_data1,
homes2=homes_data2,
output_table=Table(name="combined_homes_data"),
)
example_sdk_datasets = example_sdk_datasets()