Skip to main content
Version: Airflow 3.x

Event-driven scheduling

Event-driven scheduling is a sub-type of data-aware scheduling where a DAG is triggered when messages are posted to a message queue. This is useful for scenarios where you want to trigger a DAG based on events that occur outside of Airflow, such as data delivery to an external system or IoT sensor events, and is key to inference execution pipelines.

In this guide you will learn about the concepts used in event-driven scheduling, common usage patterns, and how to implement an example using Amazon SQS.

Assumed knowledge

To get the most out of this guide, you should have an existing knowledge of:

Concepts

There are a number of concepts that are important to understand when using event-driven scheduling.

  • Data-aware scheduling: Data-aware or data-driven scheduling refers to all ways you can schedule a DAG based on updates to assets. Updates to assets outside of event-driven scheduling occur by tasks in the same Airflow instance completing successfully, manually through the Airflow UI, or through a call to the Airflow REST API. See Assets and data-aware scheduling in Airflow.
  • Event-driven scheduling: Event-driven scheduling is a sub-type of data-aware scheduling where a DAG is run based on messages posted to a message queue. This message in the queue is triggered by an event that occurs outside of Airflow.
  • Message queue: A message queue is a service that allows you to send and receive messages between different systems. Examples of message queues include Amazon SQS, RabbitMQ, and Apache Kafka. Event-driven scheduling in Airflow 3.0 is supported for Amazon SQS with support for other message queues planned for future releases.
  • Trigger: A trigger is an asynchronous Python function running in the Airflow triggerer component. Triggers that inherit from BaseEventTrigger can be used in AssetWatchers for event-driven scheduling. The trigger is responsible for polling the message queue for new messages, when a new message is found, a TriggerEvent is created. The message is deleted from the queue.
  • AssetWatcher: An AssetWatcher is a class in Airflow that watches one or more triggers for events. When a trigger fires a TriggerEvent, the AssetWatcher updates the asset it is associated with, creating an AssetEvent. The payload of the trigger is attached to the AssetEvent in its extra dictionary.
  • AssetEvent: An Asset is an object in Airflow that represents a concrete or abstract data entity. For example, an asset can be a file, table in a database, or not tied to any specific data. An AssetEvent represents one update to an asset. In the context of event-driven scheduling, the AssetEvent represents one message having been detected in the message queue.

When to use event-driven scheduling

Basic and advanced data-aware scheduling are great for use cases where updates to assets occur within Airflow or can be accomplished via a call to the Airflow REST API. However, there are scenarios where you need a DAG to run based on events in external systems. Two common patterns exist for event-driven scheduling:

  1. Data delivery to an external system: Data is delivered to an external system, such as manually by a domain expert, and a data-ready event is sent to a message queue. The DAG in Airflow is scheduled based on this message event and runs an extract-transform-load (ETL) pipeline that processes the data in the external system.

  2. IoT sensor events: An Internet of Things (IoT) device sends a sensor event to a message queue. A DAG in Airflow is scheduled based on this message event and consumes the message to evaluate the sensor value. If the evaluation determines that an alert is warranted, an alert event is published to another message queue.

Two patterns using event-driven scheduling.

One common use case for event-driven scheduling is inference execution, where the DAG that is triggered involves a call to a machine learning model. Airflow can be used to orchestrate inference execution pipelines of all types, including in Generative AI applications. A key change enabling inference execution in Airflow 3.0 is that a DAG can be triggered with None provided as the logical_date, meaning simultaneous triggering of multiple DAG runs is possible.

info

Currently, the MessageQueueTrigger works with Amazon SQS out-of-the-box with support for other message queues planned for future releases. You can create your own triggers to use with AssetWatchers by inheriting from the BaseEventTrigger class. See the Airflow documentation for more information on supported triggers for event-driven scheduling.

Example: Amazon SQS

This example shows how to configure a DAG to run as soon as a message is posted to an Amazon SQS queue.

  1. Create an Amazon SQS queue. See Amazon Simple Queue Service Documentation for instructions.

  2. Add the Airflow Common Messaging provider and Airflow Amazon provider to your Airflow instance. When using the Astro CLI, you can add the providers to your requirements.txt file:

    apache-airflow-providers-amazon
    apache-airflow-providers-common-messaging
    aiobotocore
  3. Set the connection to your Amazon SQS queue in your Airflow instance. Note that the connection needs to use the connection ID aws_default and needs to include the region_name in the extra field. Replace <your access key>, <your secret key>, and <your region> with your AWS credentials and region. For other authentication options see the Airflow Amazon provider documentation.

    AIRFLOW_CONN_AWS_DEFAULT='{
    "conn_type":"aws",
    "login":"<your access key>",
    "password":"<you secret key>",
    "extra": {
    "region_name":"<your region>",
    }
    }'
  4. Create a new file in the dags folder of your Airflow project and add the following code. Replace the SQS_QUEUE with the URL to your message queue.

    from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger
    from airflow.sdk import Asset, AssetWatcher, dag, task
    import os

    # Define the SQS queue URL
    SQS_QUEUE = "https://sqs.<region>.amazonaws.com/<acct id>/<queue name>"

    # Define a trigger that listens to an external message queue (AWS SQS in this case)
    trigger = MessageQueueTrigger(queue=SQS_QUEUE)

    # Define an asset that watches for messages on the queue
    sqs_queue_asset = Asset(
    "sqs_queue_asset", watchers=[AssetWatcher(name="sqs_watcher", trigger=trigger)]
    )


    # Schedule the DAG to run when the asset is triggered
    @dag(schedule=[sqs_queue_asset])
    def event_driven_dag():
    @task
    def process_message(**context):
    # Extract the triggering asset events from the context
    triggering_asset_events = context["triggering_asset_events"]
    for event in triggering_asset_events[sqs_queue_asset]:
    # Get the message from the TriggerEvent payload
    print(
    f"Processing message: {event.extra["payload"]["message_batch"][0]["Body"]}"
    )

    process_message()


    event_driven_dag()

    This DAG is scheduled to run as soon as the sqs_queue_asset asset is updated. This asset uses one AssetWatcher with the name sqs_watcher that watches one MessageQueueTrigger. This trigger is polling for new messages in the provided SQS queue.

    The process_message task gets the triggering asset events from the Airflow context and prints the message body from the triggering message. The process_message task is a placeholder for your own task that processes the message.

  5. Create a new message in the SQS queue. It will trigger the DAG to run and the process_message task will print the message body.

Was this page helpful?