Skip to main content

Pass data between tasks

Sharing data between tasks is a very common use case in Airflow. If you've been writing DAGs, you probably know that breaking them up into smaller tasks is a best practice for debugging and recovering quickly from failures. What do you do when one of your downstream tasks requires metadata about an upstream task, or processes the results of the task immediately before it?

There are a few methods you can use to implement data sharing between your Airflow tasks. In this guide, you'll walk through the two most commonly used methods, learn when to use them, and use some example DAGs to understand how they can be implemented.

Other ways to learn

There are multiple resources for learning about this topic. See also:

Assumed knowledge

To get the most out of this guide, you should have an understanding of:

Best practices

Before you dive into the specifics, there are a couple of important concepts to understand before you write DAGs that pass data between tasks.

Ensure idempotency

An important concept for any data pipeline, including an Airflow DAG, is idempotency. This is the property whereby an operation can be applied multiple times without changing the result. This concept is often associated with your entire DAG. If you execute the same DAGRun multiple times, you will get the same result. However, this concept also applies to tasks within your DAG. If every task in your DAG is idempotent, your full DAG is idempotent as well.

When designing a DAG that passes data between tasks, it's important that you ensure that each task is idempotent. This helps with recovery and ensures no data is lost if a failure occurs.

Consider the size of your data

Knowing the size of the data you are passing between Airflow tasks is important when deciding which implementation method to use. As you'll learn, XComs are one method of passing data between tasks, but they are only appropriate for small amounts of data. Large data sets require a method making use of intermediate storage and possibly utilizing an external processing framework.

XCom

The first method for passing data between Airflow tasks is to use XCom, which is a key Airflow feature for sharing task data.

What is XCom

XCom is a built-in Airflow feature. XComs allow tasks to exchange task metadata or small amounts of data. They are defined by a key, value, and timestamp.

XComs can be "pushed", meaning sent by a task, or "pulled", meaning received by a task. When an XCom is pushed, it is stored in the Airflow metadata database and made available to all other tasks. Any time a task returns a value (for example, when your Python callable for your PythonOperator has a return), that value is automatically pushed to XCom. Tasks can also be configured to push XComs by calling the xcom_push() method. Similarly, xcom_pull() can be used in a task to receive an XCom.

You can view your XComs in the Airflow UI by going to Admin > XComs. You should see something like this:

XCom UI

When to use XComs

XComs should be used to pass small amounts of data between tasks. For example, task metadata, dates, model accuracy, or single value query results are all ideal data to use with XCom.

While you can technically pass large amounts of data with XCom, be very careful when doing so and consider using a custom XCom backend and scaling your Airflow resources.

When you use the standard XCom backend, the size-limit for an XCom is determined by your metadata database. Common sizes are:

  • Postgres: 1 Gb
  • SQLite: 2 Gb
  • MySQL: 64 Kb

You can see that these limits aren't very big. If you think your data passed via XCom might exceed the size of your metadata database, either use a custom XCom backend or intermediary data storage.

The second limitation in using the standard XCom backend is that only certain types of data can be serialized.

By default, Airflow supports serializations for:

If you need to serialize other data types you can do so using a custom XCom backend.

Custom XCom backends

Using a custom XCom backend means you can push and pull XComs to and from an external system such as S3, GCS, or HDFS rather than the default of Airflow's metadata database. You can also implement your own serialization and deserialization methods to define how XComs are handled. To learn how to implement a custom XCom backend using Amazon S3, Google Cloud Storage or Azure blob Storage, follow this step-by-step tutorial.

Example DAG using XComs

In this section, you'll review a DAG that uses XCom to pass data between tasks. The DAG uses XComs to analyze cat facts that are retrieved from an API. To implement this use case, the first task makes a request to the cat facts API and pulls the fact parameter from the results. The second task takes the results from the first task and performs an analysis. This is a valid use case for XCom, because the data being passed between the tasks is a short string.

You can use the TaskFlow API to push and pull values to and from XCom. To push a value to XCom return it at the end of your task as with traditional operators. To retrieve a value from XCom provide the object created by the upstream task as an input to your downstream task.

Using the TaskFlow API usually requires less code to pass data between tasks than working with the traditional syntax.

from airflow.decorators import dag, task
from pendulum import datetime

import requests
import json

url = "http://catfact.ninja/fact"

default_args = {"start_date": datetime(2021, 1, 1)}


@dag(schedule="@daily", default_args=default_args, catchup=False)
def xcom_taskflow_dag():
@task
def get_a_cat_fact():
"""
Gets a cat fact from the CatFacts API
"""
res = requests.get(url)
return {"cat_fact": json.loads(res.text)["fact"]}

@task
def print_the_cat_fact(cat_fact: str):
"""
Prints the cat fact
"""
print("Cat fact for today:", cat_fact)
# run some further cat analysis here

# Invoke functions to create tasks and define dependencies
print_the_cat_fact(get_a_cat_fact())


xcom_taskflow_dag()

If you run this DAG and then go to the XComs page in the Airflow UI, you'll see that a new row has been added for your get_a_cat_fact task with the key cat_fact and Value returned from the API.

Example XCom

In the logs for the analyze_data task, you can see the value from the prior task was printed, meaning the value was successfully retrieved from XCom.

Example XCom Log

Intermediary data storage

As mentioned previously, XCom is a great option for sharing data between tasks because it doesn't rely on any tools external to Airflow itself. However, it is only designed to be used for very small amounts of data. What if the data you need to pass is a little bit larger, for example a small dataframe?

The best way to manage this use case is to use intermediary data storage. This means saving your data to some system external to Airflow at the end of one task, then reading it in from that system in the next task. This is commonly done using cloud file storage such as S3, GCS, or Azure Blob Storage, but it could also be done by loading the data in either a temporary or persistent table in a database.

While this is a great way to pass data that is too large to be managed with XCom, you should still exercise caution. Airflow is meant to be an orchestrator, not an execution framework. If your data is very large, it is probably a good idea to complete any processing using a framework like Spark or compute-optimized data warehouses like Snowflake or dbt.

Example DAG

Building on the previous cat fact example, you are now interested in getting more cat facts and processing them. This case would not be ideal for XCom, but since the data returned is a small dataframe, it can be processed with Airflow.

from pendulum import datetime, duration
from io import StringIO

import pandas as pd
import requests
from airflow.decorators import dag, task
from airflow.providers.amazon.aws.hooks.s3 import S3Hook

S3_CONN_ID = "aws_conn"
BUCKET = "myexamplebucketone"


@task
def upload_to_s3(cat_fact_number):
# Instantiate
s3_hook = S3Hook(aws_conn_id=S3_CONN_ID)

# Base URL
url = "http://catfact.ninja/fact"

# Grab data
res = requests.get(url).json()

# Convert JSON to csv
res_df = pd.DataFrame.from_dict([res])
res_csv = res_df.to_csv()

# Take string, upload to S3 using predefined method
s3_hook.load_string(
res_csv,
"cat_fact_{0}.csv".format(cat_fact_number),
bucket_name=BUCKET,
replace=True,
)


@task
def process_data(cat_fact_number):
"""Reads data from S3, processes, and saves to new S3 file"""
# Connect to S3
s3_hook = S3Hook(aws_conn_id=S3_CONN_ID)

# Read data
data = StringIO(
s3_hook.read_key(
key="cat_fact_{0}.csv".format(cat_fact_number), bucket_name=BUCKET
)
)
df = pd.read_csv(data, sep=",")

# Process data
processed_data = df[["fact"]]
print(processed_data)

# Save processed data to CSV on S3
s3_hook.load_string(
processed_data.to_csv(),
"cat_fact_{0}_processed.csv".format(cat_fact_number),
bucket_name=BUCKET,
replace=True,
)


@dag(
start_date=datetime(2021, 1, 1),
max_active_runs=1,
schedule="@daily",
default_args={"retries": 1, "retry_delay": duration(minutes=1)},
catchup=False,
)
def intermediary_data_storage_dag():
upload_to_s3(cat_fact_number=1) >> process_data(cat_fact_number=1)


intermediary_data_storage_dag()

In this DAG you used the S3Hook to save data retrieved from the API to a CSV on S3 in the generate_file task. The process_data task then takes the data from S3, converts it to a dataframe for processing, and then saves the processed data back to a new CSV on S3.

Was this page helpful?