Skip to main content

Launch a Pod in an external cluster

If some of your tasks require specific resources such as a GPU, you might want to run them in a different cluster than your Airflow instance. In setups where both clusters are used by the same AWS, Azure or GCP account, you can manage separate clusters with roles and permissions.

info

To launch Pods in external clusters from a local Airflow environment, you must additionally mount credentials for the external cluster so that your local Airflow environment has permissions to launch a Pod in the external cluster. See Authenticate to cloud services with user credentials for setup steps.

Prerequisites

Setup

This example shows how to set up an EKS cluster on AWS and run a Pod on it from an Airflow instance where cross-account access is not available.

Step 1: Set up your external cluster

  1. Create an EKS cluster IAM role with a unique name and add the following permission policies:

    • AmazonEKSWorkerNodePolicy
    • AmazonEKS_CNI_Policy
    • AmazonEC2ContainerRegistryReadOnly
  2. Update the trust policy of this new role to include the workload identity of your Deployment. This step ensures that the role can be assumed by your Deployment.

    {
    "Version": "2012-10-17",
    "Statement": [
    {
    "Effect": "Allow",
    "Principal": {
    "AWS": "arn:aws:iam::<aws account id>:<your user>",
    "Service": [
    "ec2.amazonaws.com",
    "eks.amazonaws.com"
    ]
    },
    "Action": "sts:AssumeRole"
    }
    ]
    }
  3. If you don't already have a cluster, create a new EKS cluster and assign the new role to it.

Step 2: Retrieve the KubeConfig file from the EKS cluster

  1. Use a KubeConfig file to remotely connect to your new cluster. On AWS, you can run the following command to retrieve it:

    aws eks --region <your-region> update-kubeconfig --name <cluster-name>

    This command copies information relating to the new cluster into your existing KubeConfig file at ~/.kube/config.

  2. Check this file before making it available to Airflow. It should appear similar to the following configuration. Add any missing configurations to the file.

    apiVersion: v1
    clusters:
    - cluster:
    certificate-authority-data: <your certificate>
    server: <your AWS server address>
    name: <arn of your cluster>
    contexts:
    - context:
    cluster: <arn of your cluster>
    user: <arn of your cluster>
    name: <arn of your cluster>
    current-context: <arn of your cluster>
    kind: Config
    preferences: {}
    users:
    - name: <arn of your cluster>
    user:
    exec:
    apiVersion: client.authentication.k8s.io/v1alpha1
    args:
    - --region
    - <your cluster's AWS region>
    - eks
    - get-token
    - --cluster-name
    - <name of your cluster>
    - --role
    - <your-assume-role-arn>
    command: aws
    interactiveMode: IfAvailable
    provideClusterInfo: false

Step 3: Create a Kubernetes cluster connection

Astronomer recommends creating a Kubernetes cluster connection because it's more secure than adding an unencrypted kubeconfig file directly to your Astro project.

  1. Convert the kubeconfig configuration you retrieved from your cluster to JSON format.
  2. In either the Airflow UI or the Astro environment manager, create a new Kubernetes Cluster Connection connection. In the Kube config (JSON format) field, paste the kubeconfig configuration you retrieved from your cluster after converting it from yaml to json format.
  3. Click Save.

You can now specify this connection in the configuration of any KubernetesPodOperator task that needs to access your external cluster.

Step 4: Install the AWS CLI in your Astro environment

To connect to your external EKS cluster, you need to install the AWS CLI in your Astro project.

  1. Add the following to your Dockerfile to install the AWS CLI:

    USER root

    RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
    # Note: if you are testing your pipeline locally you may need to adjust the zip version to your dev local environment
    RUN unzip awscliv2.zip
    RUN ./aws/install

    USER astro
  2. Add the unzip package to your packages.txt file to make the unzip command available in your Docker container:

    unzip

If you are working locally, you need to restart your Astro project to apply the changes.

Step 5: Configure your task

In your KubernetesPodOperator task configuration, ensure that you set cluster-context and namespace for your remote cluster. In the following example, the task launches a Pod in an external cluster based on the configuration defined in the k8s connection.

run_on_EKS = KubernetesPodOperator(
task_id="run_on_EKS",
kubernetes_conn_id="k8s",
cluster_context="<your-cluster-id>",
namespace="<your-namespace>",
name="example_pod",
image="ubuntu",
cmds=["bash", "-cx"],
arguments=["echo hello"],
get_logs=True,
startup_timeout_seconds=240,
)
Example DAG

The following DAG uses several classes from the Amazon provider package to dynamically spin up and delete Pods for each task in a newly created node group. If your remote Kubernetes cluster already has a node group available, you only need to define your task in the KubernetesPodOperator itself.

The example DAG contains 5 consecutive tasks:

  • Create a node group according to the user's specifications (For the example that uses GPU resources).
  • Use a sensor to check that the cluster is running correctly.
  • Use the KubernetesPodOperator to run any valid Docker image in a Pod on the newly created node group on the remote cluster. The example DAG uses the standard Ubuntu image to print "hello" to the console using a bash command.
  • Delete the node group.
  • Verify that the node group has been deleted.
# import DAG object and utility packages
from airflow import DAG
from pendulum import datetime
from airflow.configuration import conf

# import the KubernetesPodOperator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
KubernetesPodOperator,
)

# import EKS related packages from the Amazon Provider
from airflow.providers.amazon.aws.hooks.eks import EksHook, NodegroupStates
from airflow.providers.amazon.aws.operators.eks import (
EksCreateNodegroupOperator,
EksDeleteNodegroupOperator,
)
from airflow.providers.amazon.aws.sensors.eks import EksNodegroupStateSensor


# custom class to create a node group with Nodes on EKS
class EksCreateNodegroupWithNodesOperator(EksCreateNodegroupOperator):
def execute(self, context):
# instantiating an EKSHook on the basis of the AWS connection (Step 5)
eks_hook = EksHook(
aws_conn_id=self.aws_conn_id,
region_name=self.region,
)

# define the Node group to create
eks_hook.create_nodegroup(
clusterName=self.cluster_name,
nodegroupName=self.nodegroup_name,
subnets=self.nodegroup_subnets,
nodeRole=self.nodegroup_role_arn,
scalingConfig={"minSize": 1, "maxSize": 1, "desiredSize": 1},
diskSize=20,
instanceTypes=["g4dn.xlarge"],
amiType="AL2_x86_64_GPU", # get GPU resources
updateConfig={"maxUnavailable": 1},
)


# instantiate the DAG
with DAG(
start_date=datetime(2022, 6, 1),
catchup=False,
schedule="@daily",
dag_id="KPO_remote_EKS_cluster_example_dag",
) as dag:
# task 1 creates the node group
create_gpu_nodegroup = EksCreateNodegroupWithNodesOperator(
task_id="create_gpu_nodegroup",
cluster_name="<your cluster name>",
nodegroup_name="gpu-nodes",
nodegroup_subnets=["<your subnet>", "<your subnet>"],
nodegroup_role_arn="<arn of your EKS role>",
aws_conn_id="<your aws conn id>",
region="<your region>",
)

# task 2 check for node group status, if it is up and running
check_nodegroup_status = EKSNodegroupStateSensor(
task_id="check_nodegroup_status",
cluster_name="<your cluster name>",
nodegroup_name="gpu-nodes",
mode="reschedule",
timeout=60 * 30,
exponential_backoff=True,
aws_conn_id="<your aws conn id>",
region="<your region>",
)

# task 3 the KubernetesPodOperator running a task
# here, cluster_context and the kubernetes_conn_id are defined at the task level.
run_on_EKS = KubernetesPodOperator(
task_id="run_on_EKS",
cluster_context="<arn of your cluster>",
namespace="airflow-kpo-default",
name="example_pod",
image="ubuntu",
cmds=["bash", "-cx"],
arguments=["echo hello"],
get_logs=True,
in_cluster=False,
kubernetes_conn_id="k8s",
startup_timeout_seconds=240,
)

# task 4 deleting the node group
delete_gpu_nodegroup = EksDeleteNodegroupOperator(
task_id="delete_gpu_nodegroup",
cluster_name="<your cluster name>",
nodegroup_name="gpu-nodes",
aws_conn_id="<your aws conn id>",
region="<your region>",
)

# task 5 checking that the node group was deleted successfully
check_nodegroup_termination = EksNodegroupStateSensor(
task_id="check_nodegroup_termination",
cluster_name="<your cluster name>",
nodegroup_name="gpu-nodes",
aws_conn_id="<your aws conn id>",
region="<your region>",
mode="reschedule",
timeout=60 * 30,
target_state=NodegroupStates.NONEXISTENT,
)

# setting the dependencies
create_gpu_nodegroup >> check_nodegroup_status >> run_on_EKS
run_on_EKS >> delete_gpu_nodegroup >> check_nodegroup_termination

Was this page helpful?