Use DAG Factory to create DAGs
The DAG Factory is an open source tool managed by Astronomer that allows you to dynamically generate Apache Airflow® DAGs from YAML. While Airflow DAGs are traditionally written exclusively in Python, the DAG Factory makes it easy for people who don't know Python to use Airflow.
This guide includes instructions for installing the DAG Factory package into your Astro project and a sample YAML configuration file that you can use to easily specify the details of your DAG, including its schedule, callbacks, and task names.
The DAG Factory can be used with all Astronomer products and any Apache Airflow installation. To view the source code of the project, see DAG Factory.
Prerequisites
- Python version 3.8.0 or greater
- Apache Airflow version 2.0 or greater
If you're an Astronomer customer, you must have an Astro project with a supported version of Astro Runtime. See Astro Runtime lifecycle schedule for a list of currently supported Runtime versions.
Step 1: Install DAG Factory
To use the DAG Factory, install it as a Python package into your Apache Airflow environment.
If you're an Astronomer customer:
- In your Astro project, open your
requirements.txt
file. - Add
dag-factory<=1.0.0
to the file. - Save the changes to your
requirements.txt
file.
If you're not an Astronomer customer, install the Python package according to the standards at your organization:
pip install dag-factory<=1.0.0
Step 2: Create a sub-directory for your DAG configuration files and create a YAML file for your DAG
- In the
dags
directory of your Astro project, create a new sub-directory calledconfigs
to store your DAG configuration files defined in YAML. Astronomer recommends keeping these separate from standard DAG files written in Python. - Within the new sub-directory, create a new YAML file for your DAG called for example
my_dag.yaml
. Copy the contents of the following example configuration file into the YAML file.
<your-DAG-id>:
default_args:
owner: 'example_owner'
start_date: 2024-07-01 # or '2 days'
end_date: 2030-01-01
retries: 1
retry_delay_sec: 300
schedule_interval: '0 3 * * *'
catchup: False
concurrency: 1
max_active_runs: 1
dagrun_timeout_sec: 60
default_view: 'grid' # or 'graph', 'duration', 'gantt', 'landing_times' (for Run Duration in newer versions)
orientation: 'LR' # or 'TB', 'RL', 'BT'
description: 'This is an example dag!'
tasks:
task_1:
operator: airflow.operators.bash_operator.BashOperator
bash_command: 'echo 1'
task_2:
operator: airflow.operators.bash_operator.BashOperator
bash_command: 'echo 2'
dependencies: [task_1]
task_3:
operator: airflow.operators.bash_operator.BashOperator
bash_command: 'echo 3'
dependencies: [task_1]
-
Modify the example configuration file with parameters for the DAG you want to create, including replacing
<your-DAG-id>
with a validdag_id
. See DAG-level parameters in Airflow to learn more about each parameter. -
(Optional) Delete any configurations that you don't want to specify. For parameters that aren't specified, your DAG will assume the default values that correspond with your current Apache Airflow or Astro Runtime version.
Step 3: Create a DAG Factory file
All YAML files in your dags
directory must be parsed and converted into Python in order to run on Apache Airflow. In this step, you will create a new DAG Factory file in your Astro project that includes the conversion logic. You only need to do this once and do not need a separate DAG Factory file for each of your DAGs or YAML files.
- In the
dags
directory of your Astro project, create a new Python file calleddag_factory.py
. - Copy the following contents into your empty Python file. This file represents an Apache Airflow DAG and includes two commands that convert each of your YAML file(s) into DAGs.
from pathlib import Path
from airflow import DAG
from airflow.configuration import conf as airflow_conf
from dagfactory import load_yaml_dags
config_dir = Path(airflow_conf.get("core", "dags_folder")) / "configs"
load_yaml_dags(globals_dict=globals(), dags_folder=config_dir)
Step 4: (Optional) Add a DAG-level callback
In order to use DAG-level callbacks you will need to add callback parameters to your config file. The values will be the paths to the files that contain your callback functions, as well as the callback function names.
In my_dag.yml
, add the following parameters:
on_success_callback_name: placeholder_callback
on_success_callback_file: /usr/local/airflow/dags/callback_func.py
on_failure_callback_name: placeholder_callback
on_failure_callback_file: /usr/local/airflow/dags/callback_func.py
- Create a new file
callback_func.py
in yourdags
directory. - Copy the contents of the following placeholder callback into the file:
def placeholder_callback():
pass
- Save the file.