Skip to main content

Orchestrate Ray jobs on Anyscale with Apache Airflow®

Anyscale is a compute platform for AI/ML workloads built on the open-source Ray framework, providing the layer for parallel processing and distributed computing. The Anyscale provider package for Apache Airflow® allows you to interact with Anyscale from your Airflow DAGs. This tutorial shows a simple example of how to use the Anyscale provider package to orchestrate Ray jobs on Anyscale with Airflow. For more in-depth information, see the Anyscale provider documentation.

For instructions on how to run open-source Ray jobs with Airflow, see the Orchestrate Ray jobs with Apache Airflow® tutorial.

tip

This tutorial shows a simple implementation of the Anyscale provider package. For a more complex example, see the Processing User Feedback: an LLM-fine-tuning reference architecture with Ray on Anyscale reference architecture.

Time to complete

This tutorial takes approximately 30 minutes to complete.

Assumed knowledge

To get the most out of this tutorial, make sure you have an understanding of:

Prerequisites

Step 1: Configure your Astro project

Use the Astro CLI to create and run an Airflow project on your local machine.

  1. Create a new Astro project:

    $ mkdir astro-anyscale-tutorial && cd astro-anyscale-tutorial
    $ astro dev init
  2. In the requirements.txt file, add the Anyscale provider.

    astro-provider-anyscale==1.0.1
  3. Run the following command to start your Airflow project:

    astro dev start

Step 2: Configure a Ray connection

info

For Astro customers, Astronomer recommends to take advantage of the Astro Environment Manager to store connections in an Astro-managed secrets backend. These connections can be shared across multiple deployed and local Airflow environments. See Manage Astro connections in branch-based deploy workflows.

  1. In the Airflow UI, go to Admin -> Connections and click +.

  2. Create a new connection and choose the Anyscale connection type. Enter the following information:

  3. Click Save.

Step 3: Write a DAG to orchestrate Anyscale jobs

  1. Create a new file in your dags directory called anyscale_script.py and add the following code:

    # anyscale_script.py
    import numpy as np
    import ray
    import argparse


    @ray.remote
    def square(x):
    return x**2


    def main(data):
    ray.init()
    data = np.array(data)
    futures = [square.remote(x) for x in data]
    results = ray.get(futures)
    mean = np.mean(results)
    print(f"Mean squared value: {mean}")
    return mean


    if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Process some integers.")
    parser.add_argument(
    "data", nargs="+", type=float, help="List of numbers to process"
    )
    args = parser.parse_args()
    data = args.data
    main(data)

  2. Create a new file in your dags directory called anyscale_tutorial.py.

  3. Copy and paste the code below into the file:

    """
    ## Anyscale Tutorial

    This tutorial demonstrates how to use the Anyscale provider in Airflow to
    parallelize a task using Ray on Anyscale.
    """

    from airflow.decorators import dag
    from airflow.operators.python import PythonOperator
    from anyscale_provider.operators.anyscale import SubmitAnyscaleJob
    from airflow.models.baseoperator import chain
    from pathlib import Path

    CONN_ID = "anyscale_conn"
    FOLDER_PATH = Path(__file__).parent


    def _generate_data() -> list:
    """
    Generate sample data
    Returns:
    list: List of integers
    """
    import random

    return [random.randint(1, 100) for _ in range(10)]


    @dag(
    start_date=None,
    schedule=None,
    catchup=False,
    tags=["ray", "example"],
    doc_md=__doc__,
    )
    def anyscale_tutorial():

    data = PythonOperator(
    task_id="generate_data",
    python_callable=_generate_data,
    )

    get_mean_squared_value = SubmitAnyscaleJob(
    task_id="SubmitRayJob",
    conn_id=CONN_ID,
    name="AstroJob",
    image_uri="< your image uri >", # e.g. "anyscale/ray:2.35.0-slim-py312-cpu"
    compute_config="< your compute config >", # e.g. airflow-integration-testing:1
    entrypoint="python anyscale_script.py {{ ti.xcom_pull(task_ids='generate_data') | join(' ') }}",
    working_dir=str(FOLDER_PATH), # the folder containing the script
    requirements=["requests", "pandas", "numpy", "torch"],
    max_retries=1,
    job_timeout_seconds=3000,
    poll_interval=30,
    )

    chain(data, get_mean_squared_value)


    anyscale_tutorial()
    • The generate_data task randomly generates a list of 10 integers.
    • The get_mean_squared_value task submits a Ray job on Anyscale to calculate the mean squared value of the list of integers.

Step 4: Run the DAG

  1. In the Airflow UI, click the play button to manually run your DAG.

  2. After the DAG runs successfully, check your Anyscale account to see the job submitted by Airflow.

    Anyscale showing a Job completed successfully.

Conclusion

Congratulations! You've run a Ray job on Anyscale using Apache Airflow. You can now use the Anyscale provider package to orchestrate more complex jobs, see Processing User Feedback: an LLM-fine-tuning reference architecture with Ray on Anyscale for an example.

Was this page helpful?