Launch a Pod in an external cluster
If some of your tasks require specific resources such as a GPU, you might want to run them in a different cluster than your Airflow instance. In setups where both clusters are used by the same AWS, Azure or GCP account, you can manage separate clusters with roles and permissions.
To launch Pods in external clusters from a local Airflow environment, you must additionally mount credentials for the external cluster so that your local Airflow environment has permissions to launch a Pod in the external cluster. See Authenticate to cloud services with user credentials for setup steps.
Prerequisites
- A network connection between your Astro Deployment and your external cluster.
Setup
- EKS cluster on AWS
- AKS cluster on Azure
This example shows how to set up an EKS cluster on AWS and run a Pod on it from an Airflow instance where cross-account access is not available.
Step 1: Set up your external cluster
-
Create an EKS cluster IAM role with a unique name and add the following permission policies:
AmazonEKSWorkerNodePolicy
AmazonEKS_CNI_Policy
AmazonEC2ContainerRegistryReadOnly
-
Update the trust policy of this new role to include the workload identity of your Deployment. This step ensures that the role can be assumed by your Deployment.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::<aws account id>:<your user>",
"Service": [
"ec2.amazonaws.com",
"eks.amazonaws.com"
]
},
"Action": "sts:AssumeRole"
}
]
} -
If you don't already have a cluster, create a new EKS cluster and assign the new role to it.
Step 2: Retrieve the KubeConfig file from the EKS cluster
-
Use a
KubeConfig
file to remotely connect to your new cluster. On AWS, you can run the following command to retrieve it:aws eks --region <your-region> update-kubeconfig --name <cluster-name>
This command copies information relating to the new cluster into your existing
KubeConfig
file at~/.kube/config
. -
Check this file before making it available to Airflow. It should appear similar to the following configuration. Add any missing configurations to the file.
apiVersion: v1
clusters:
- cluster:
certificate-authority-data: <your certificate>
server: <your AWS server address>
name: <arn of your cluster>
contexts:
- context:
cluster: <arn of your cluster>
user: <arn of your cluster>
name: <arn of your cluster>
current-context: <arn of your cluster>
kind: Config
preferences: {}
users:
- name: <arn of your cluster>
user:
exec:
apiVersion: client.authentication.k8s.io/v1alpha1
args:
- --region
- <your cluster's AWS region>
- eks
- get-token
- --cluster-name
- <name of your cluster>
- --role
- <your-assume-role-arn>
command: aws
interactiveMode: IfAvailable
provideClusterInfo: false
Step 3: Create a Kubernetes cluster connection
Astronomer recommends creating a Kubernetes cluster connection because it's more secure than adding an unencrypted kubeconfig
file directly to your Astro project.
- Convert the
kubeconfig
configuration you retrieved from your cluster to JSON format. - In either the Airflow UI or the Astro environment manager, create a new Kubernetes Cluster Connection connection. In the Kube config (JSON format) field, paste the
kubeconfig
configuration you retrieved from your cluster after converting it fromyaml
tojson
format. - Click Save.
You can now specify this connection in the configuration of any KubernetesPodOperator task that needs to access your external cluster.
Step 4: Install the AWS CLI in your Astro environment
To connect to your external EKS cluster, you need to install the AWS CLI in your Astro project.
-
Add the following to your
Dockerfile
to install the AWS CLI:USER root
RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
# Note: if you are testing your pipeline locally you may need to adjust the zip version to your dev local environment
RUN unzip awscliv2.zip
RUN ./aws/install
USER astro -
Add the
unzip
package to yourpackages.txt
file to make theunzip
command available in your Docker container:unzip
If you are working locally, you need to restart your Astro project to apply the changes.
Step 5: Configure your task
In your KubernetesPodOperator task configuration, ensure that you set cluster-context
and namespace
for your remote cluster. In the following example, the task launches a Pod in an external cluster based on the configuration defined in the k8s
connection.
run_on_EKS = KubernetesPodOperator(
task_id="run_on_EKS",
kubernetes_conn_id="k8s",
cluster_context="<your-cluster-id>",
namespace="<your-namespace>",
name="example_pod",
image="ubuntu",
cmds=["bash", "-cx"],
arguments=["echo hello"],
get_logs=True,
startup_timeout_seconds=240,
)
Example DAG
The following DAG uses several classes from the Amazon provider package to dynamically spin up and delete Pods for each task in a newly created node group. If your remote Kubernetes cluster already has a node group available, you only need to define your task in the KubernetesPodOperator itself.
The example DAG contains 5 consecutive tasks:
- Create a node group according to the user's specifications (For the example that uses GPU resources).
- Use a sensor to check that the cluster is running correctly.
- Use the KubernetesPodOperator to run any valid Docker image in a Pod on the newly created node group on the remote cluster. The example DAG uses the standard
Ubuntu
image to print "hello" to the console using abash
command. - Delete the node group.
- Verify that the node group has been deleted.
# import DAG object and utility packages
from airflow import DAG
from pendulum import datetime
from airflow.configuration import conf
# import the KubernetesPodOperator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
KubernetesPodOperator,
)
# import EKS related packages from the Amazon Provider
from airflow.providers.amazon.aws.hooks.eks import EksHook, NodegroupStates
from airflow.providers.amazon.aws.operators.eks import (
EksCreateNodegroupOperator,
EksDeleteNodegroupOperator,
)
from airflow.providers.amazon.aws.sensors.eks import EksNodegroupStateSensor
# custom class to create a node group with Nodes on EKS
class EksCreateNodegroupWithNodesOperator(EksCreateNodegroupOperator):
def execute(self, context):
# instantiating an EKSHook on the basis of the AWS connection (Step 5)
eks_hook = EksHook(
aws_conn_id=self.aws_conn_id,
region_name=self.region,
)
# define the Node group to create
eks_hook.create_nodegroup(
clusterName=self.cluster_name,
nodegroupName=self.nodegroup_name,
subnets=self.nodegroup_subnets,
nodeRole=self.nodegroup_role_arn,
scalingConfig={"minSize": 1, "maxSize": 1, "desiredSize": 1},
diskSize=20,
instanceTypes=["g4dn.xlarge"],
amiType="AL2_x86_64_GPU", # get GPU resources
updateConfig={"maxUnavailable": 1},
)
# instantiate the DAG
with DAG(
start_date=datetime(2022, 6, 1),
catchup=False,
schedule="@daily",
dag_id="KPO_remote_EKS_cluster_example_dag",
) as dag:
# task 1 creates the node group
create_gpu_nodegroup = EksCreateNodegroupWithNodesOperator(
task_id="create_gpu_nodegroup",
cluster_name="<your cluster name>",
nodegroup_name="gpu-nodes",
nodegroup_subnets=["<your subnet>", "<your subnet>"],
nodegroup_role_arn="<arn of your EKS role>",
aws_conn_id="<your aws conn id>",
region="<your region>",
)
# task 2 check for node group status, if it is up and running
check_nodegroup_status = EKSNodegroupStateSensor(
task_id="check_nodegroup_status",
cluster_name="<your cluster name>",
nodegroup_name="gpu-nodes",
mode="reschedule",
timeout=60 * 30,
exponential_backoff=True,
aws_conn_id="<your aws conn id>",
region="<your region>",
)
# task 3 the KubernetesPodOperator running a task
# here, cluster_context and the kubernetes_conn_id are defined at the task level.
run_on_EKS = KubernetesPodOperator(
task_id="run_on_EKS",
cluster_context="<arn of your cluster>",
namespace="airflow-kpo-default",
name="example_pod",
image="ubuntu",
cmds=["bash", "-cx"],
arguments=["echo hello"],
get_logs=True,
in_cluster=False,
kubernetes_conn_id="k8s",
startup_timeout_seconds=240,
)
# task 4 deleting the node group
delete_gpu_nodegroup = EksDeleteNodegroupOperator(
task_id="delete_gpu_nodegroup",
cluster_name="<your cluster name>",
nodegroup_name="gpu-nodes",
aws_conn_id="<your aws conn id>",
region="<your region>",
)
# task 5 checking that the node group was deleted successfully
check_nodegroup_termination = EksNodegroupStateSensor(
task_id="check_nodegroup_termination",
cluster_name="<your cluster name>",
nodegroup_name="gpu-nodes",
aws_conn_id="<your aws conn id>",
region="<your region>",
mode="reschedule",
timeout=60 * 30,
target_state=NodegroupStates.NONEXISTENT,
)
# setting the dependencies
create_gpu_nodegroup >> check_nodegroup_status >> run_on_EKS
run_on_EKS >> delete_gpu_nodegroup >> check_nodegroup_termination
This example shows how to configure an Azure Managed Identity (MI) to run a Pod on an AKS cluster from an Airflow instance where cross-account access is not available.
Step 1: Set Up Azure Managed Identity
- Create a Microsoft Entra ID tenant with Global Administrator or Application Administrator privileges.
- Create a user-assigned managed identity on Azure.
- Authorize your Astro Deployment to Azure using Azure Managed Identity (MI) by following steps 1 and 2 described in the Deployment Workload identity set up.
- Confirm that the OIDC credentials appear in the Managed Identity's Federated credentials tab.
- From the Managed Identity's Properties tab, note the Client ID.
From your Azure Portal, go to Azure Active Directory (Microsoft Entra ID) and note the Tenant ID.
Both the Client ID and Tenant ID will be needed in Step 3 to configure your
kubeconfig
file.
Step 2: Install dependencies in your Astro Runtime Docker Image
To trigger remote Pods on an Azure AKS Cluster, the following packages and dependencies need to be added to your Docker image.
- Azure CLI
- Kubectl
- Kubelogin
To do so, add the following commands to your Dockerfile:
FROM quay.io/astronomer/astro-runtimeX.Y.Z
USER root
RUN curl -sL https://aka.ms/InstallAzureCLIDeb | sudo bash
RUN az aks install-cli
USER astro
Step 3: Configure your kubeconfig
file
The following configuration file below is a sample Kubernetes kubeconfig
file that allows the Kubernetes command-line tool, kubectl
, or other clients to connect to a remote Kubernetes cluster, remote-kpo
, using Azure Workload Identity for authentication.
# Specifies the version of the Kubernetes API for this configuration file.
# v1 is the standard version used for kubeconfig files.
apiVersion: v1
# List of Kubernetes clusters that the configuration can connect to.
clusters:
- cluster:
# base64-encoded certificate for the Kubernetes API server to verify SSL communication.
certificate-authority-data: <certificate>
# URL of the Kubernetes API server.
# This is the endpoint of the remote cluster you want to interact with.
server: <Azure server address>
# Name of the cluster, which is referenced in the contexts section.
name: <AKS cluster>
# List of contexts that define which cluster and user combination to use when interacting with Kubernetes.
contexts:
# Describes the context for connecting to the cluster.
- context:
# References the cluster from the clusters section.
cluster: <AKS cluster>
# Associates the user configuration to be used for authentication with the cluster.
user: <user>
# The name of the context, which is referenced by current-context.
name: <AKS cluster>
# Specifies the active context that will be used by default when running kubectl commands.
current-context: <AKS cluster>
# Identifies the file type as a Kubernetes Config.
kind: Config
preferences: {}
# List of users and the method they use for authentication.
users:
# Defines the user that is being used in the context.
# This user is responsible for authenticating with the Kubernetes cluster.
- name: <your user>
user:
exec:
apiVersion: client.authentication.k8s.io/v1beta1
args:
- get-token
- --login
- workloadidentity
- --tenant-id
- <Tenant ID from Step 1>
- --client-id
- <Client ID from Step 1>
# The server ID for Azure Kubernetes Service (AKS). This is a static ID representing AKS.
- --server-id
- 6dae42f8-4368-4678-94ff-3960e28e3630
# Specifies the path to the federated token that the managed identity uses to authenticate.
- --federated-token-file
- /var/run/secrets/azure/tokens/azure-identity-token
- --environment
- AzurePublicCloud
command: kubelogin
# Specifies if the kubelogin command should not attempt to provide additional cluster information beyond the authentication token.
provideClusterInfo: false
Step 4: Create an Airflow Connection to use the kubeconfig
file
To use the kubeconfig
file, you will need to create a new Kubernetes Airflow Connection.
There are multiple ways to pass the kubeconfig
file to your Airflow Connection. If your kubeconfig
file contains any sensitive information, we recommend storing it as JSON inside the connection, described in option 3.
- External File in the default location
If thekubeconfig
file resides in the default location on the machine (~/.kube/config), you can leave all fields empty in the connection configuration. Airflow will automatically use thekubeconfig
from the default location.
Add the followingCOPY
command at the end of your Dockerfile to add yourkubeconfig
file inside your Astro Runtime Docker Image.
COPY kubeconfig ~/.kube/config/airflow/kubeconfig
- External file with a Custom Path:
You can specify a custom path to thekubeconfig
file by inserting the path into the Kube config path field of your Airflow Connection.
Add the followingCOPY
command at the end of your Dockerfile to add yourkubeconfig
file inside your Astro Runtime Docker Image.
COPY kubeconfig /usr/local/airflow/kubeconfig
- JSON Format
You can convert thekubeconfig
file to JSON format and paste it into the Kube config (JSON format) field in the connection configuration.
Use an online converter like https://jsonformatter.org/yaml-to-json to convert YAML to JSON. Remove any sensitive information first.
Step 4: Configure your task
Run a Kubernetes Pod with Airflow KubernetesPodOperator.
# import DAG object
from airflow.decorators import dag
# import the KubernetesPodOperator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
KubernetesPodOperator,
)
from airflow.utils.dates import days_ago
default_args = {
"owner": "Astronomer",
"depends_on_past": False,
}
# instantiate the DAG
with DAG(
dag_id="remote_kpo",
default_args=default_args,
schedule_interval=None,
start_date=days_ago(1),
tags=["KPO"],
):
# launch a pod in the Kubernetes cluster
remote_kpo = KubernetesPodOperator(
task_id="az_remote_kpo",
kubernetes_conn_id="<my-az-connection>",
namespace="<my-aks-namespace>",
image="debian",
cmds=["bash", "-cx"],
arguments=["echo", "hello world!"],
name="hello-world",
get_logs=True,
in_cluster=False,
)