Skip to main content

Deploy DAGs from an AWS S3 bucket to Astro using AWS Lambda

Use the following CI/CD template to automate deploying Apache Airflow DAGs from an S3 bucket to Astro using AWS Lambda.

Prerequisites

DAG deploy template

This CI/CD template can be used to deploy DAGs from a single S3 bucket to a single Astro Deployment. When you create or modify a DAG in the S3 bucket, a Lambda function triggers and initializes an astro project to deploy your DAGs using Astro CLI.

info

To deploy any non-DAG code changes to Astro, you need to trigger a standard image deploy with your Astro project. When you do this, your Astro project must include the latest version of your DAGs from your S3 bucket. If your Astro project dags folder isn't up to date with your S3 DAGs bucket when you trigger this deploy, you will revert your DAGs back to the version hosted in your Astro project.

  1. Download the latest Astro CLI binary from GitHub releases, then rename the file to, astro_cli.tar.gz. For example, to use Astro CLI version 1.13.0 in your template, download astro_1.13.0_linux_amd64.tar.gz and rename it to astro_cli.tar.gz.

  2. In your S3 bucket, create the following new folders:

    • dags
    • cli_binary
  3. Add astro_cli.tar.gz to cli_binary.

  4. In the AWS IAM console, create a new role for AWS Lambda with the following permissions. Replace <account_id>, <lambda_function_name>, and <bucket_name> with your values.

    {
    "Version": "2012-10-17",
    "Statement": [
    {
    "Sid": "lambdacreateloggroup",
    "Effect": "Allow",
    "Action": "logs:CreateLogGroup",
    "Resource": "arn:aws:logs:us-east-1:<account_id>:*"
    },
    {
    "Sid": "lambdaputlogevents",
    "Effect": "Allow",
    "Action": [
    "logs:CreateLogStream",
    "logs:PutLogEvents"
    ],
    "Resource": [
    "arn:aws:logs:us-east-1:<account_id>:log-group:/aws/lambda/<lambda_function_name>:*"
    ]
    },
    {
    "Sid": "bucketpermission",
    "Effect": "Allow",
    "Action": [
    "s3:GetObject",
    "s3:ListBucket"
    ],
    "Resource": [
    "arn:aws:s3::<bucket_name>",
    "arn:aws:s3::<bucket_name>/*"
    ]
    }
    ]
    }
  5. Author a new AWS Lambda function from scratch with the following configurations:

    • Function name: Any
    • Runtime: Python 3.9
    • Architecture: Any
    • Execution role: Click Use an existing role and enter the role you created.
  6. Configure the following Lambda environment variables for your Lambda function:

    • ASTRO_HOME: \tmp
    • ASTRO_API_TOKEN: The value for your Workspace or Organization API token.
    • ASTRO_DEPLOYMENT_ID: Your Deployment ID.

    For production Deployments, Astronomer recommends storing your API credentials in AWS Secrets Manager and referencing them from Lambda. See https://docs.aws.amazon.com/lambda/latest/dg/configuration-database.html

  7. Add the following code to lambda_function.py. Replace <bucket_name> with your value.


    import boto3
    import subprocess
    import os
    import tarfile

    BUCKET = os.environ.get("BUCKET", "<bucket_name>")
    s3 = boto3.resource('s3')
    deploymentId = os.environ.get('ASTRO_DEPLOYMENT_ID')

    def untar(filename: str, destination: str) -> None:
    with tarfile.open(filename) as file:
    file.extractall(destination)

    def run_command(cmd: str) -> None:
    p = subprocess.Popen("set -x; " + cmd, shell=True)
    p.communicate()

    def download_to_local(bucket_name: str, s3_folder: str, local_dir: str = None) -> None:
    """
    Download the contents of a folder directory
    Args:
    bucket_name: the name of the s3 bucket
    s3_folder: the folder path in the s3 bucket
    local_dir: a relative or absolute directory path in the local file system
    """
    bucket = s3.Bucket(bucket_name)
    for obj in bucket.objects.filter(Prefix=s3_folder):
    target = obj.key if local_dir is None \
    else os.path.join(local_dir, os.path.relpath(obj.key, s3_folder))
    if not os.path.exists(os.path.dirname(target)):
    os.makedirs(os.path.dirname(target))
    if obj.key[-1] == '/':
    continue
    bucket.download_file(obj.key, target)
    print("downloaded file")

    def lambda_handler(event, context) -> None:
    """Triggered by a change to a Cloud Storage bucket.
    :param event: Event payload.
    :param context: Metadata for the event.
    """
    base_dir = '/tmp/astro'
    if not os.path.isdir(base_dir):

    os.mkdir(base_dir)

    download_to_local(BUCKET, 'dags/', f'{base_dir}/dags')
    download_to_local(BUCKET, 'cli_binary/', base_dir)

    os.chdir(base_dir)
    untar('./astro_cli.tar.gz', '.')

    run_command('echo y | ./astro dev init')
    run_command(f"./astro deploy {deploymentId} --dags")

    return {"statusCode": 200}
  8. Create a trigger for your Lambda function with the following configuration:

    • Source: Select S3.
    • Bucket: Select the bucket that contains your dags directory.
    • Event types: Select All object create events.
    • Prefix: Enter dags/.
    • Suffix: Enter .py.
  9. If you haven't already, deploy your complete Astro project to your Deployment. See Deploy code.

    info

    If you stage multiple commits to DAG files and push them all at once to your remote branch, the template only deploys DAG code changes from the most recent commit. It will miss any code changes made in previous commits.

    To avoid this, either push commits individually or configure your repository to Squash commits for pull requests that merge multiple commits simultaneously.

  10. Add your DAGs to the dags folder in your storage bucket

  11. In the Astro UI, select a Workspace, click Deployments, and then select your Deployment. Confirm that your Lambda function worked by checking the Deployment DAG bundle version. The version's name should include the time that you added the DAGs to your S3 bucket.

Was this page helpful?