Deploy DAGs from an AWS S3 bucket to Astro using AWS Lambda
Use the following CI/CD template to automate deploying Apache Airflow DAGs from an S3 bucket to Astro using AWS Lambda.
Prerequisites
- An AWS S3 bucket
- An Astro Deployment with DAG-only deploys enabled.
- A Deployment API token, Workspace API token, or Organization API token.
- An Astro project containing your project configurations.
DAG deploy template
This CI/CD template can be used to deploy DAGs from a single S3 bucket to a single Astro Deployment. When you create or modify a DAG in the S3 bucket, a Lambda function triggers and initializes an astro
project to deploy your DAGs using Astro CLI.
To deploy any non-DAG code changes to Astro, you need to trigger a standard image deploy with your Astro project. When you do this, your Astro project must include the latest version of your DAGs from your S3 bucket. If your Astro project dags
folder isn't up to date with your S3 DAGs bucket when you trigger this deploy, you will revert your DAGs back to the version hosted in your Astro project.
-
Download the latest Astro CLI binary from GitHub releases, then rename the file to,
astro_cli.tar.gz
. For example, to use Astro CLI version 1.13.0 in your template, downloadastro_1.13.0_linux_amd64.tar.gz
and rename it toastro_cli.tar.gz
. -
In your S3 bucket, create the following new folders:
dags
cli_binary
-
Add
astro_cli.tar.gz
tocli_binary
. -
In the AWS IAM console, create a new role for AWS Lambda with the following permissions. Replace
<account_id>
,<lambda_function_name>
, and<bucket_name>
with your values.{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "lambdacreateloggroup",
"Effect": "Allow",
"Action": "logs:CreateLogGroup",
"Resource": "arn:aws:logs:us-east-1:<account_id>:*"
},
{
"Sid": "lambdaputlogevents",
"Effect": "Allow",
"Action": [
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": [
"arn:aws:logs:us-east-1:<account_id>:log-group:/aws/lambda/<lambda_function_name>:*"
]
},
{
"Sid": "bucketpermission",
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3::<bucket_name>",
"arn:aws:s3::<bucket_name>/*"
]
}
]
} -
Author a new AWS Lambda function from scratch with the following configurations:
- Function name: Any
- Runtime: Python 3.9
- Architecture: Any
- Execution role: Click Use an existing role and enter the role you created.
-
Configure the following Lambda environment variables for your Lambda function:
ASTRO_HOME
:\tmp
ASTRO_API_TOKEN
: The value for your Workspace or Organization API token.ASTRO_DEPLOYMENT_ID
: Your Deployment ID.
For production Deployments, Astronomer recommends storing your API credentials in AWS Secrets Manager and referencing them from Lambda. See https://docs.aws.amazon.com/lambda/latest/dg/configuration-database.html
-
Add the following code to
lambda_function.py
. Replace<bucket_name>
with your value.
import boto3
import subprocess
import os
import tarfile
BUCKET = os.environ.get("BUCKET", "<bucket_name>")
s3 = boto3.resource('s3')
deploymentId = os.environ.get('ASTRO_DEPLOYMENT_ID')
def untar(filename: str, destination: str) -> None:
with tarfile.open(filename) as file:
file.extractall(destination)
def run_command(cmd: str) -> None:
p = subprocess.Popen("set -x; " + cmd, shell=True)
p.communicate()
def download_to_local(bucket_name: str, s3_folder: str, local_dir: str = None) -> None:
"""
Download the contents of a folder directory
Args:
bucket_name: the name of the s3 bucket
s3_folder: the folder path in the s3 bucket
local_dir: a relative or absolute directory path in the local file system
"""
bucket = s3.Bucket(bucket_name)
for obj in bucket.objects.filter(Prefix=s3_folder):
target = obj.key if local_dir is None \
else os.path.join(local_dir, os.path.relpath(obj.key, s3_folder))
if not os.path.exists(os.path.dirname(target)):
os.makedirs(os.path.dirname(target))
if obj.key[-1] == '/':
continue
bucket.download_file(obj.key, target)
print("downloaded file")
def lambda_handler(event, context) -> None:
"""Triggered by a change to a Cloud Storage bucket.
:param event: Event payload.
:param context: Metadata for the event.
"""
base_dir = '/tmp/astro'
if not os.path.isdir(base_dir):
os.mkdir(base_dir)
download_to_local(BUCKET, 'dags/', f'{base_dir}/dags')
download_to_local(BUCKET, 'cli_binary/', base_dir)
os.chdir(base_dir)
untar('./astro_cli.tar.gz', '.')
run_command('echo y | ./astro dev init')
run_command(f"./astro deploy {deploymentId} --dags")
return {"statusCode": 200} -
Create a trigger for your Lambda function with the following configuration:
- Source: Select S3.
- Bucket: Select the bucket that contains your
dags
directory. - Event types: Select All object create events.
- Prefix: Enter
dags/
. - Suffix: Enter
.py
.
-
If you haven't already, deploy your complete Astro project to your Deployment. See Deploy code.
infoIf you stage multiple commits to DAG files and push them all at once to your remote branch, the template only deploys DAG code changes from the most recent commit. It will miss any code changes made in previous commits.
To avoid this, either push commits individually or configure your repository to Squash commits for pull requests that merge multiple commits simultaneously.
-
Add your DAGs to the
dags
folder in your storage bucket -
In the Astro UI, select a Workspace, click Deployments, and then select your Deployment. Confirm that your Lambda function worked by checking the Deployment DAG bundle version. The version's name should include the time that you added the DAGs to your S3 bucket.