Event-driven scheduling
Event-driven scheduling is a sub-type of data-aware scheduling where a DAG is triggered when messages are posted to a message queue. This is useful for scenarios where you want to trigger a DAG based on events that occur outside of Airflow, such as data delivery to an external system or IoT sensor events, and is key to inference execution pipelines.
In this guide you will learn about the concepts used in event-driven scheduling, common usage patterns, and how to implement an example using Amazon SQS.
Assumed knowledge
To get the most out of this guide, you should have an existing knowledge of:
- Airflow assets. See Assets and data-aware scheduling in Airflow.
Concepts
There are a number of concepts that are important to understand when using event-driven scheduling.
- Data-aware scheduling: Data-aware or data-driven scheduling refers to all ways you can schedule a DAG based on updates to assets. Updates to assets outside of event-driven scheduling occur by tasks in the same Airflow instance completing successfully, manually through the Airflow UI, or through a call to the Airflow REST API. See Assets and data-aware scheduling in Airflow.
- Event-driven scheduling: Event-driven scheduling is a sub-type of data-aware scheduling where a DAG is run based on messages posted to a message queue. This message in the queue is triggered by an event that occurs outside of Airflow.
- Message queue: A message queue is a service that allows you to send and receive messages between different systems. Examples of message queues include Amazon SQS, RabbitMQ, and Apache Kafka. Event-driven scheduling in Airflow 3.0 is supported for Amazon SQS with support for other message queues planned for future releases.
- Trigger: A trigger is an asynchronous Python function running in the Airflow triggerer component. Triggers that inherit from
BaseEventTrigger
can be used in AssetWatchers for event-driven scheduling. The trigger is responsible for polling the message queue for new messages, when a new message is found, aTriggerEvent
is created. The message is deleted from the queue. - AssetWatcher: An AssetWatcher is a class in Airflow that watches one or more triggers for events. When a trigger fires a
TriggerEvent
, the AssetWatcher updates the asset it is associated with, creating anAssetEvent
. The payload of the trigger is attached to theAssetEvent
in itsextra
dictionary. - AssetEvent: An
Asset
is an object in Airflow that represents a concrete or abstract data entity. For example, an asset can be a file, table in a database, or not tied to any specific data. AnAssetEvent
represents one update to an asset. In the context of event-driven scheduling, theAssetEvent
represents one message having been detected in the message queue.
When to use event-driven scheduling
Basic and advanced data-aware scheduling are great for use cases where updates to assets occur within Airflow or can be accomplished via a call to the Airflow REST API. However, there are scenarios where you need a DAG to run based on events in external systems. Two common patterns exist for event-driven scheduling:
-
Data delivery to an external system: Data is delivered to an external system, such as manually by a domain expert, and a data-ready event is sent to a message queue. The DAG in Airflow is scheduled based on this message event and runs an extract-transform-load (ETL) pipeline that processes the data in the external system.
-
IoT sensor events: An Internet of Things (IoT) device sends a sensor event to a message queue. A DAG in Airflow is scheduled based on this message event and consumes the message to evaluate the sensor value. If the evaluation determines that an alert is warranted, an alert event is published to another message queue.
One common use case for event-driven scheduling is inference execution, where the DAG that is triggered involves a call to a machine learning model. Airflow can be used to orchestrate inference execution pipelines of all types, including in Generative AI applications.
A key change enabling inference execution in Airflow 3.0 is that a DAG can be triggered with None
provided as the logical_date
, meaning simultaneous triggering of multiple DAG runs is possible.
Currently, the MessageQueueTrigger
works with Amazon SQS out-of-the-box with support for other message queues planned for future releases. You can create your own triggers to use with AssetWatchers by inheriting from the BaseEventTrigger
class. See the Airflow documentation for more information on supported triggers for event-driven scheduling.
Example: Amazon SQS
This example shows how to configure a DAG to run as soon as a message is posted to an Amazon SQS queue.
-
Create an Amazon SQS queue. See Amazon Simple Queue Service Documentation for instructions.
-
Add the Airflow Common Messaging provider and Airflow Amazon provider to your Airflow instance. When using the Astro CLI, you can add the providers to your
requirements.txt
file:apache-airflow-providers-amazon
apache-airflow-providers-common-messaging
aiobotocore -
Set the connection to your Amazon SQS queue in your Airflow instance. Note that the connection needs to use the connection ID
aws_default
and needs to include theregion_name
in theextra
field. Replace<your access key>
,<your secret key>
, and<your region>
with your AWS credentials and region. For other authentication options see the Airflow Amazon provider documentation.AIRFLOW_CONN_AWS_DEFAULT='{
"conn_type":"aws",
"login":"<your access key>",
"password":"<you secret key>",
"extra": {
"region_name":"<your region>",
}
}' -
Create a new file in the
dags
folder of your Airflow project and add the following code. Replace theSQS_QUEUE
with the URL to your message queue.from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger
from airflow.sdk import Asset, AssetWatcher, dag, task
import os
# Define the SQS queue URL
SQS_QUEUE = "https://sqs.<region>.amazonaws.com/<acct id>/<queue name>"
# Define a trigger that listens to an external message queue (AWS SQS in this case)
trigger = MessageQueueTrigger(queue=SQS_QUEUE)
# Define an asset that watches for messages on the queue
sqs_queue_asset = Asset(
"sqs_queue_asset", watchers=[AssetWatcher(name="sqs_watcher", trigger=trigger)]
)
# Schedule the DAG to run when the asset is triggered
@dag(schedule=[sqs_queue_asset])
def event_driven_dag():
@task
def process_message(**context):
# Extract the triggering asset events from the context
triggering_asset_events = context["triggering_asset_events"]
for event in triggering_asset_events[sqs_queue_asset]:
# Get the message from the TriggerEvent payload
print(
f"Processing message: {event.extra["payload"]["message_batch"][0]["Body"]}"
)
process_message()
event_driven_dag()This DAG is scheduled to run as soon as the
sqs_queue_asset
asset is updated. This asset uses oneAssetWatcher
with the namesqs_watcher
that watches oneMessageQueueTrigger
. This trigger is polling for new messages in the provided SQS queue.The
process_message
task gets the triggering asset events from the Airflow context and prints the message body from the triggering message. Theprocess_message
task is a placeholder for your own task that processes the message. -
Create a new message in the SQS queue. It will trigger the DAG to run and the
process_message
task will print the message body.