Run the KubernetesPodOperator on Astro
The KubernetesPodOperator is one of the most customizable Apache Airflow operators. A task using the KubernetesPodOperator runs in a dedicated, isolated Kubernetes Pod that terminates after the task completes. To learn more about the benefits and usage of the KubernetesPodOperator, see the KubernetesPodOperator Learn guide.
On Astro, the infrastructure required to run the KubernetesPodOperator is built into every Deployment and is managed by Astronomer. Astro supports setting a default Pod configuration so that any task Pods without specific resource requests and limits cannot exceed your expected resource usage for the Deployment.
Some task-level configurations will differ on Astro compared to other Airflow environments. Use this document to learn how to configure individual task Pods for different use cases on Astro. To configure the default Pod resources for all KubernetesPodOperator Pods, see Configure Kubernetes Pod resources.
Known limitations
-
Cross-account service accounts are not supported on Pods launched in an Astro cluster. To allow access to external data sources, you can provide credentials and secrets to tasks.
-
PersistentVolumes (PVs) are not supported on Pods launched in an Astro cluster.
-
(Hybrid only) You cannot run a KubernetesPodOperator task in a worker queue or node pool that is different than the worker queue of its parent worker. For example, a KubernetesPodOperator task that is triggered by an
m5.4xlarge
worker on AWS will also be run on anm5.4xlarge
node. To run a task on a different node instance type, you must launch it in an external Kubernetes cluster. If you need assistance launching KubernetesPodOperator tasks in external Kubernetes clusters, contact Astronomer support. -
You can't use an image built for an ARM architecture in the KubernetesPodOperator. To build images using the x86 architecture on a Mac with an Apple chip, include the
--platform
flag in theFROM
command of theDockerfile
that constructs your custom image. For example:FROM --platform=linux/amd64 postgres:latest
If you use an ARM image, your KPO task will fail with the error:
base] exec /usr/bin/psql: exec format error
.
Prerequisites
- An Astro project.
- An Astro Deployment.
Set up the KubernetesPodOperator on Astro
The following snippet is the minimum configuration you'll need to create a KubernetesPodOperator task on Astro:
from airflow.configuration import conf
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
namespace = conf.get("kubernetes", "NAMESPACE")
KubernetesPodOperator(
namespace=namespace,
image="<your-docker-image>",
cmds=["<commands-for-image>"],
arguments=["<arguments-for-image>"],
labels={"<pod-label>": "<label-name>"},
name="<pod-name>",
task_id="<task-name>",
get_logs=True,
in_cluster=True,
)
For each instantiation of the KubernetesPodOperator, you must specify the following values:
namespace = conf.get("kubernetes", "NAMESPACE")
: Every Deployment runs on its own Kubernetes namespace within a cluster. Information about this namespace can be programmatically imported as long as you set this variable.image
: This is the Docker image that the operator will use to run its defined task, commands, and arguments. Astro assumes that this value is an image tag that's publicly available on Docker Hub. To pull an image from a private registry, see Pull images from a Private Registry.in_cluster
: If a Connection object is not passed to theKubernetesPodOperator
'skubernetes_conn_id
parameter, specifyin_cluster=True
to run the task in the Deployment's Astro cluster.
Configure task-level Pod resources
Astro automatically allocates resources to Pods created by the KubernetesPodOperator. Unless otherwise specified in your task-level configuration, the amount of resources your task Pod can use is defined by your default Pod resource configuration. To further optimize your resource usage, Astronomer recommends specifying compute resource requests and limits for each task.
To do so, define a kubernetes.client.models.V1ResourceRequirements
object and provide that to the container_resources
argument of the KubernetesPodOperator. For example:
from airflow.configuration import conf
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from kubernetes.client import models as k8s
compute_resources = k8s.V1ResourceRequirements(
limits={"cpu": "800m", "memory": "3Gi"},
requests={"cpu": "800m", "memory": "3Gi"}
)
namespace = conf.get("kubernetes", "NAMESPACE")
KubernetesPodOperator(
namespace=namespace,
image="<your-docker-image>",
cmds=["<commands-for-image>"],
arguments=["<arguments-for-image>"],
labels={"<pod-label>": "<label-name>"},
name="<pod-name>",
container_resources=compute_resources,
task_id="<task-name>",
get_logs=True,
in_cluster=True,
)
Applying the previous code example ensures that when this DAG runs, it launches a Kubernetes Pod with exactly 800m of CPU and 3Gi of memory as long as that infrastructure is available in your Deployment. After the task finishes, the Pod will terminate gracefully.
For Astro Hosted environments, if you set resource requests to be less than the maximum limit, Astro automatically requests the maximum limit that you set. This means that you might consume more resources than you expected if you set the limit much higher than the resource request you need. Check your Billing and usage to view your resource use and associated charges.
Mount a temporary directory
Alternative Astro Hybrid setup
On Astro Hybrid, this configuration works only on AWS clusters where you have enabled m5d
and m6id
worker types. These worker types have NVMe SSD volumes that can be used by tasks for ephemeral storage. See Amazon EC2 M6i Instances and Amazon EC2 M5 Instances for the amount of available storage in each node type.
The task which mounts a temporary directory must run on a worker queue that uses either m5d
and m6id
worker types. See Modify a cluster for instructions on enabling m5d
and m6id
workers on your cluster. See Configure a worker queue to configure a worker queue to use one of these worker types.
To run a task run the KubernetesPodOperator that utilizes your Deployment's ephemeral storage, mount an emptyDir volume to the KubernetesPodOperator. For example:
from airflow.configuration import conf
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from kubernetes.client import models as k8s
volume = k8s.V1Volume(
name="cache-volume",
emptyDir={},
)
volume_mounts = [
k8s.V1VolumeMount(
mount_path="/cache", name="cache-volume"
)
]
example_volume_test = KubernetesPodOperator(
namespace=namespace,
image="<your-docker-image>",
cmds=["<commands-for-image>"],
arguments=["<arguments-for-image>"],
labels={"<pod-label>": "<label-name>"},
name="<pod-name>",
task_id="<task-name>",
get_logs=True,
in_cluster=True,
volume_mounts=volume_mounts,
volumes=[volume],
)
Run images from a private registry
By default, the KubernetesPodOperator expects to pull a Docker image that's hosted publicly on Docker Hub. If your images are hosted on the container registry native to your cloud provider, you can grant access to the images directly. Otherwise, if you are using any other private registry, you need to create a Kubernetes Secret containing credentials to the registry, then specify the Kubernetes Secret in your DAG.
- Private Registry
- Amazon Elastic Container Registry (ECR)
- Google Artifact Registry
Prerequisites
- An Astro project.
- An Astro Deployment.
- Access to a private Docker registry.
Step 1: Create a Kubernetes Secret
To run Docker images from a private registry on Astro, a Kubernetes Secret that contains credentials to your registry must be created. Injecting this secret into your Deployment's namespace will give your tasks access to Docker images within your private registry.
Submit a request to Astronomer support for creating a Kubernetes Secret to enable pulling images from private registries. Astronomer Support can provide you the necessary instructions on how to generate and securely send the credentials.
Step 2: Specify the Kubernetes Secret in your DAG
Once Astronomer has added the Kubernetes secret to your Deployment, you will be notified and provided with the name of the secret.
After you receive the name of your Kubernetes secret from Astronomer, you can run images from your private registry by importing models
from kubernetes.client
and configuring image_pull_secrets
in your KubernetesPodOperator instantiation:
from kubernetes.client import models as k8s
KubernetesPodOperator(
namespace=namespace,
image_pull_secrets=[k8s.V1LocalObjectReference("<your-secret-name>")],
image="<your-docker-image>",
cmds=["<commands-for-image>"],
arguments=["<arguments-for-image>"],
labels={"<pod-label>": "<label-name>"},
name="<pod-name>",
task_id="<task-name>",
get_logs=True,
in_cluster=True,
)
Policy-based setup is available only on Astro Hosted dedicated clusters and Astro Hybrid. To run images from a private registry on Astro Hosted standard clusters, follow the steps in Private Registry.
If your Docker image is hosted in an Amazon ECR repository, add a permissions policy to the repository to allow the KubernetesPodOperator to pull the Docker image. You don't need to create a Kubernetes secret, or specify the Kubernetes secret in your DAG. Docker images hosted in Amazon ECR repositories can only be pulled from AWS clusters.
-
Log in to the Amazon ECR Dashboard and then select Menu > Repositories.
-
Click the Private tab and then click the name of the repository that hosts the Docker image.
-
Click Permissions in the left menu.
-
Click Edit policy JSON.
-
Copy and paste the following policy into the Edit JSON pane:
{
"Version": "2008-10-17",
"Statement": [
{
"Sid": "AllowImagePullAstro",
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::<AstroAccountID>:role/EKS-NodeInstanceRole-<ClusterID>"
},
"Action": [
"ecr:GetDownloadUrlForLayer",
"ecr:BatchGetImage"
]
}
]
}- Replace
<AstroAccountID>
with your Astro AWS account ID. - Replace
<ClusterID>
with your Cluster ID. To find the Cluster ID, click Clusters in the Organization section of the Astro UI and then select the cluster. The ID is displayed at the top, along with other information about the Astro Cluster.
- Replace
-
Click Save to create a new permissions policy named AllowImagePullAstro.
-
Replace
<your-docker-image>
in the instantiation of the KubernetesPodOperator with the Amazon ECR repository URI that hosts the Docker image. To locate the URI:- In the Amazon ECR Dashboard, click Repositories in the left menu.
- Open the Private tab and then copy the URI of the repository that hosts the Docker image.
Passwordless setup is available only on Astro Hosted dedicated clusters and Astro Hybrid. For Astro Hosted standard clusters, please follow the steps in Private Registry to create a Kubernetes secret containing your registry credentials.
If your Docker image is hosted in Google Artifact Registry repository, add a permissions policy to the repository to allow the KubernetesPodOperator to pull the Docker image. You don't need to create a Kubernetes secret or specify the Kubernetes secret in your DAG. Docker images hosted in Google Artifact Registry repositories can be pulled only to Deployments hosted on GCP clusters.
Setup
-
Contact Astronomer support to request the Compute Engine default service account ID for your cluster.
-
Log in to Google Artifact Registry.
-
Click the checkbox next to the repository that you want to use.
-
In the Properties pane that appears, click ADD PRINCIPAL in the PERMISSIONS tab.
-
In the Add Principals text box, paste the Compute Engine default service account ID that was provided to you by Astronomer Support.
-
In the Assign Roles selector, search for
Artifact Registry Reader
and select the role that appears. -
Click Save to grant read access for the registry to Astro.
-
Set up the KubernetesPodOperator. When you configure an instantiation of the KubernetesPodOperator, replace
<your-docker-image>
with the Google Artifact Registry image URI. To retrieve the URI:- In the Google Artifact Registry, click the registry containing the image.
- Click the image you want to use.
- Click the copy icon next to the image in the top corner. The string you copy should be in the format
<GCP Region>-docker.pkg.dev/<Project Name>/<Registry Name>/<Image Name>
.
Use secret environment variables with the KubernetesPodOperator
Astro environment variables marked as secrets are stored in a Kubernetes secret called env-secrets
. To use a secret value in a task running on the Kubernetes executor, you pull the value from env-secrets
and mount it to the Pod running your task as a new Kubernetes Secret.
-
Add the following import to your DAG file:
from airflow.kubernetes.secret import Secret
-
Define a Kubernetes
Secret
in your DAG instantiation using the following format:secret_env = Secret(deploy_type="env", deploy_target="<VARIABLE_KEY>", secret="env-secrets", key="<VARIABLE_KEY>")
namespace = conf.get("kubernetes", "NAMESPACE") -
Reference the key for the environment variable, formatted as
$VARIABLE_KEY
in the task using the KubernetesPodOperator.
In the following example, a secret named MY_SECRET
is pulled from env-secrets
and printed to logs.
import pendulum
from airflow.kubernetes.secret import Secret
from airflow.models import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from airflow.configuration import conf
with DAG(
dag_id='test-kube-pod-secret',
start_date=pendulum.datetime(2022, 1, 1, tz="UTC"),
end_date=pendulum.datetime(2022, 1, 5, tz="UTC"),
schedule_interval="@once",
) as dag:
secret_env = Secret(deploy_type="env", deploy_target="MY_SECRET", secret="env-secrets", key="MY_SECRET")
namespace = conf.get("kubernetes", "NAMESPACE")
k = KubernetesPodOperator(
namespace=namespace,
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo $MY_SECRET && sleep 150"],
name="test-name",
task_id="test-task",
get_logs=True,
in_cluster=True,
secrets=[secret_env],
)
Launch a Pod in an external cluster
If some of your tasks require specific resources such as a GPU, you might want to run them in a different cluster than your Airflow instance. In setups where both clusters are used by the same AWS or GCP account, you can manage separate clusters with roles and permissions.
This example shows how to set up an EKS cluster on AWS and run a Pod on it from an Airflow instance where cross-account access is not available. The same process applicable to other Kubernetes services such as GKE.
To launch Pods in external clusters from a local Airflow environment, you must additionally mount credentials for the external cluster so that your local Airflow environment has permissions to launch a Pod in the external cluster. See Authenticate to cloud services with user credentials for setup steps.
Prerequisites
- A network connection between your Astro Deployment and your external cluster.
Step 1: Set up your external cluster
-
Create an EKS cluster IAM role with a unique name and add the following permission policies:
AmazonEKSWorkerNodePolicy
AmazonEKS_CNI_Policy
AmazonEC2ContainerRegistryReadOnly
-
Update the trust policy of this new role to include the workload identity of your Deployment. This step ensures that the role can be assumed by your Deployment.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::<aws account id>:<your user>",
"Service": [
"ec2.amazonaws.com",
"eks.amazonaws.com"
]
},
"Action": "sts:AssumeRole"
}
]
} -
If you don't already have a cluster, create a new EKS cluster and assign the new role to it.
Step 2: Retrieve the KubeConfig file from the EKS cluster
-
Use a
KubeConfig
file to remotely connect to your new cluster. On AWS, you can run the following command to retrieve it:aws eks --region <your-region> update-kubeconfig --name <cluster-name>
This command copies information relating to the new cluster into your existing
KubeConfig
file at~/.kube/config
. -
Check this file before making it available to Airflow. It should appear similar to the following configuration. Add any missing configurations to the file.
apiVersion: v1
clusters:
- cluster:
certificate-authority-data: <your certificate>
server: <your AWS server address>
name: <arn of your cluster>
contexts:
- context:
cluster: <arn of your cluster>
user: <arn of your cluster>
name: <arn of your cluster>
current-context: <arn of your cluster>
kind: Config
preferences: {}
users:
- name: <arn of your cluster>
user:
exec:
apiVersion: client.authentication.k8s.io/v1alpha1
args:
- --region
- <your cluster's AWS region>
- eks
- get-token
- --cluster-name
- <name of your cluster>
- --role
- <your-assume-role-arn>
command: aws
interactiveMode: IfAvailable
provideClusterInfo: false
Step 3: Create a Kubernetes cluster connection
Astronomer recommends creating a Kubernetes cluster connection because it's more secure than adding an unencrypted kubeconfig
file directly to your Astro project.
- Convert the
kubeconfig
configuration you retrieved from your cluster to JSON format. - In either the Airflow UI or the Astro environment manager, create a new Kubernetes Cluster Connection connection. In the Kube config (JSON format) field, paste the
kubeconfig
configuration you retrieved from your cluster after converting it fromyaml
tojson
format. - Click Save.
You can now specify this connection in the configuration of any KubernetesPodOperator task that needs to access your external cluster.
Step 4: Install the AWS CLI in your Astro environment
To connect to your external EKS cluster, you need to install the AWS CLI in your Astro project.
-
Add the following to your
Dockerfile
to install the AWS CLI:USER root
RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
# Note: if you are testing your pipeline locally you may need to adjust the zip version to your dev local environment
RUN unzip awscliv2.zip
RUN ./aws/install
USER astro -
Add the
unzip
package to yourpackages.txt
file to make theunzip
command available in your Docker container:unzip
If you are working locally, you need to restart your Astro project to apply the changes.
Step 5: Configure your task
In your KubernetesPodOperator task configuration, ensure that you set cluster-context
and namespace
for your remote cluster. In the following example, the task launches a Pod in an external cluster based on the configuration defined in the k8s
connection.
run_on_EKS = KubernetesPodOperator(
task_id="run_on_EKS",
kubernetes_conn_id="k8s",
cluster_context="<your-cluster-id>",
namespace="<your-namespace>",
name="example_pod",
image="ubuntu",
cmds=["bash", "-cx"],
arguments=["echo hello"],
get_logs=True,
startup_timeout_seconds=240,
)
Example DAG
The following DAG uses several classes from the Amazon provider package to dynamically spin up and delete Pods for each task in a newly created node group. If your remote Kubernetes cluster already has a node group available, you only need to define your task in the KubernetesPodOperator itself.
The example DAG contains 5 consecutive tasks:
- Create a node group according to the users' specifications (For the example that uses GPU resources).
- Use a sensor to check that the cluster is running correctly.
- Use the KubernetesPodOperator to run any valid Docker image in a Pod on the newly created node group on the remote cluster. The example DAG uses the standard
Ubuntu
image to print "hello" to the console using abash
command. - Delete the node group.
- Verify that the node group has been deleted.
# import DAG object and utility packages
from airflow import DAG
from pendulum import datetime
from airflow.configuration import conf
# import the KubernetesPodOperator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
KubernetesPodOperator,
)
# import EKS related packages from the Amazon Provider
from airflow.providers.amazon.aws.hooks.eks import EksHook, NodegroupStates
from airflow.providers.amazon.aws.operators.eks import (
EksCreateNodegroupOperator,
EksDeleteNodegroupOperator,
)
from airflow.providers.amazon.aws.sensors.eks import EksNodegroupStateSensor
# custom class to create a node group with Nodes on EKS
class EksCreateNodegroupWithNodesOperator(EksCreateNodegroupOperator):
def execute(self, context):
# instantiating an EKSHook on the basis of the AWS connection (Step 5)
eks_hook = EksHook(
aws_conn_id=self.aws_conn_id,
region_name=self.region,
)
# define the Node group to create
eks_hook.create_nodegroup(
clusterName=self.cluster_name,
nodegroupName=self.nodegroup_name,
subnets=self.nodegroup_subnets,
nodeRole=self.nodegroup_role_arn,
scalingConfig={"minSize": 1, "maxSize": 1, "desiredSize": 1},
diskSize=20,
instanceTypes=["g4dn.xlarge"],
amiType="AL2_x86_64_GPU", # get GPU resources
updateConfig={"maxUnavailable": 1},
)
# instantiate the DAG
with DAG(
start_date=datetime(2022, 6, 1),
catchup=False,
schedule="@daily",
dag_id="KPO_remote_EKS_cluster_example_dag",
) as dag:
# task 1 creates the node group
create_gpu_nodegroup = EksCreateNodegroupWithNodesOperator(
task_id="create_gpu_nodegroup",
cluster_name="<your cluster name>",
nodegroup_name="gpu-nodes",
nodegroup_subnets=["<your subnet>", "<your subnet>"],
nodegroup_role_arn="<arn of your EKS role>",
aws_conn_id="<your aws conn id>",
region="<your region>",
)
# task 2 check for node group status, if it is up and running
check_nodegroup_status = EKSNodegroupStateSensor(
task_id="check_nodegroup_status",
cluster_name="<your cluster name>",
nodegroup_name="gpu-nodes",
mode="reschedule",
timeout=60 * 30,
exponential_backoff=True,
aws_conn_id="<your aws conn id>",
region="<your region>",
)
# task 3 the KubernetesPodOperator running a task
# here, cluster_context and the kubernetes_conn_id are defined at the task level.
run_on_EKS = KubernetesPodOperator(
task_id="run_on_EKS",
cluster_context="<arn of your cluster>",
namespace="airflow-kpo-default",
name="example_pod",
image="ubuntu",
cmds=["bash", "-cx"],
arguments=["echo hello"],
get_logs=True,
in_cluster=False,
kubernetes_conn_id="k8s",
startup_timeout_seconds=240,
)
# task 4 deleting the node group
delete_gpu_nodegroup = EksDeleteNodegroupOperator(
task_id="delete_gpu_nodegroup",
cluster_name="<your cluster name>",
nodegroup_name="gpu-nodes",
aws_conn_id="<your aws conn id>",
region="<your region>",
)
# task 5 checking that the node group was deleted successfully
check_nodegroup_termination = EksNodegroupStateSensor(
task_id="check_nodegroup_termination",
cluster_name="<your cluster name>",
nodegroup_name="gpu-nodes",
aws_conn_id="<your aws conn id>",
region="<your region>",
mode="reschedule",
timeout=60 * 30,
target_state=NodegroupStates.NONEXISTENT,
)
# setting the dependencies
create_gpu_nodegroup >> check_nodegroup_status >> run_on_EKS
run_on_EKS >> delete_gpu_nodegroup >> check_nodegroup_termination