Configure task log collection and exporting to ElasticSearch

Airflow task logs are stored in a logging backend to ensure you can access them after your Pods terminate. By default, Astronomer uses Fluentd to collect task logs and export them to an ElasticSearch instance.

You can configure how Astronomer collects Deployment task logs and exports them to ElasticSearch. The following are the supported methods for exporting task logs to ElasticSearch:

  • Using a Fluentd Daemonset pod on each Kubernetes node in your cluster.
  • Using container sidecars for Deployment components.

Export task logs using a Fluentd DaemonSet

By default, Astronomer Software uses a Fluentd DaemonSet to aggregate task logs. The is the workflow for the default implementation:

  • Deployments write task logs to stdout.
  • Kubernetes takes the output from stdout and writes it to the Deployment’s node.
  • A Fluentd pod reads logs from the node and forwards them to ElasticSearch.

This implementation is recommended for organizations that:

  • Run longer tasks using Celery executor.
  • Run Astronomer Software in a dedicated cluster.
  • Run privileged containers in a cluster with a ClusterRole.

This approach is not suited for organizations that run many small tasks using the Kubernetes executor. Because task logs exist only for the lifetime of the pod, your pods running small tasks might complete before Fluentd can collect their task logs.

Export logs using container sidecars

You can use a logging sidecar container to collect and export logs. In this implementation:

  • Each container running an Airflow component for a Deployment receives its own Vector sidecar.
  • Task logs are written to a shared directory.
  • The Vector sidecar reads logs from the shared directory and writes them to ElasticSearch.

This implementation is recommended for organizations that:

  • Run Astronomer Software in a multi-tenant cluster, where security is a concern.
  • Use the KubernetesExecutor to run many short-lived tasks, which requires improved reliability.
With this implementation, the Vector sidecars each utilize 100m cpu and 384Mi memory. More compute and memory resources are used for exporting logs with sidecars than when using a Fluentd Daemonset.

Configure logging sidecars

  1. Retrieve your values.yaml file. See Apply a config change.

  2. Add the following entry to your values.yaml file:

    1global:
    2 fluentdEnabled: false
    3 loggingSidecar:
    4 enabled: true
    5 name: sidecar-log-consumer

    If you’re migrating from Fluentd, additionally set the following configuration so that Astronomer Software can retain logs:

    1global:
    2 logging:
    3 indexNamePrefix: <your-index-prefix>
  3. Push the configuration change. See Apply a config change.

To revert to the default behavior and export task logs using a Fluentd Daemonset, remove this configuration from your values.yaml file and reapply it.

Customize Vector logging sidecars

You can customize the default Astronomer Vector logging sidecar to have different transformations and sinks based on your team’s requirements. This is useful if you want to annotate, otherwise customize, or filter your logs before sending them to your logging platform.

  1. Add the following line to your values.yaml file:

    1global:
    2 loggingSidecar:
    3 enabled: true
    4 name: sidecar-log-consumer
    5 customConfig: true
  2. Push the configuration change to your cluster. See Apply a config change.

  3. Create a custom vector configuration yaml file to change how and where sidecars forward your logs. The following examples are template configurations for each commonly used external logging service. For the complete default logging sidecar configmap, see the Astronomer GitHub.

1log_schema:
2 timestamp_key : "@timestamp"
3data_dir: "${SIDECAR_LOGS}"
4sources:
5 airflow_log_files:
6 type: file
7 include:
8 - "${SIDECAR_LOGS}/*.log"
9 read_from: beginning
10transforms:
11 transform_airflow_logs:
12 type: remap
13 inputs:
14 - airflow_log_files
15 source: |
16 .component = "${COMPONENT:--}"
17 .workspace = "${WORKSPACE:--}"
18 .release = "${RELEASE:--}"
19 .date_nano = parse_timestamp!(.@timestamp, format: "%Y-%m-%dT%H:%M:%S.%f%Z")
20
21 filter_common_logs:
22 type: filter
23 inputs:
24 - transform_airflow_logs
25 condition:
26 type: "vrl"
27 source: '!includes(["worker","scheduler"], .component)'
28
29 filter_scheduler_logs:
30 type: filter
31 inputs:
32 - transform_airflow_logs
33 condition:
34 type: "vrl"
35 source: 'includes(["scheduler"], .component)'
36
37 filter_worker_logs:
38 type: filter
39 inputs:
40 - transform_airflow_logs
41 condition:
42 type: "vrl"
43 source: 'includes(["worker"], .component)'
44
45 filter_gitsyncrelay_logs:
46 type: filter
47 inputs:
48 - transform_airflow_logs
49 condition:
50 type: "vrl"
51 source: 'includes(["git-sync-relay"], .component)'
52
53 transform_task_log:
54 type: remap
55 inputs:
56 - filter_worker_logs
57 - filter_scheduler_logs
58 source: |-
59 . = parse_json(.message) ?? .
60 .@timestamp = parse_timestamp(.timestamp, "%Y-%m-%dT%H:%M:%S%Z") ?? now()
61 .check_log_id = exists(.log_id)
62 if .check_log_id != true {
63 .log_id = join!([to_string!(.dag_id), to_string!(.task_id), to_string!(.execution_date), to_string!(.try_number)], "_")
64 }
65 .offset = to_int(now()) * 1000000000 + to_unix_timestamp(now()) * 1000000
66
67 final_task_log:
68 type: remap
69 inputs:
70 - transform_task_log
71 source: |
72 .component = "${COMPONENT:--}"
73 .workspace = "${WORKSPACE:--}"
74 .release = "${RELEASE:--}"
75 .date_nano = parse_timestamp!(.@timestamp, format: "%Y-%m-%dT%H:%M:%S.%f%Z")
76
77 transform_remove_fields:
78 type: remap
79 inputs:
80 - final_task_log
81 - filter_common_logs
82 - filter_gitsyncrelay_logs
83 source: |
84 del(.host)
85 del(.file)
86# Configuration for ElasticSearch sink.
87sinks:
88 out:
89 type: elasticsearch
90 # Specify the transforms you want to run before your logs are exported.
91 inputs:
92 - transform_remove_fields
93 mode: bulk
94 compression: none
95 endpoint: "http://example-host:<example-port>"
96 auth:
97 strategy: "basic"
98 user: "example-user"
99 password : "example-pass"
100 bulk:
101 index: "vector.${RELEASE:--}.%Y.%m.%d"
102 action: create
  1. Run the following command to add the configuration file to your cluster as a Kubernetes secret:

    1kubectl create secret generic sidecar-config --from-file=vector-values.yaml=vector-values.yaml
  2. Run the following command to annotate the secret so that it’s automatically applied to all new Deployments:

    1kubectl annotate secret secret-name astronomer.io/commander-sync="platform-release=astronomer"
  3. Run the following command to sync existing Deployments with the new configuration:

    1kubectl create job --from=cronjob/astronomer-config-syncer sync-secrets -n astronomer

Use an external Elasticsearch instance for Airflow task log management

Add Airflow task logs from your Astronomer Deployment to an existing Elasticsearch instance on Elastic Cloud to centralize log management and analysis. Centralized log management allows you to quickly identify, troubleshoot, and resolve task failure issues. Although these examples use Elastic Cloud, you can also use AWS Managed OpenSearch Service or any other elastic service (managed or hosted). With an external Elasticsearch instance configured for Astronomer Software, you can see the logs in your Elasticsearch instance and browse the logs from the Software UI.

If you use an existing Elasticsearch instance, make sure that the index template is configured to enable auto creation of new indices.

Create an Elastic Deployment and endpoint

  1. In your browser, go to https://cloud.elastic.co/ and create a new Elastic Cloud deployment. See Create a deployment.
  2. Copy and save your Elastic Cloud deployment credentials when the Save the deployment credentials screen appears.
  3. On the Elastic dashboard, click the Gear icon for your Deployment.
  1. Click Copy endpoint next to Elasticsearch.

  2. Optional. Test the Elastic Cloud deployment endpoint:

    • Open a new browser window, paste the endpoint you copied in step 4 in the Address bar, and then press Enter.
    • Enter the username and password you copied in step 2 and click Sign in. Output similar to the following appears:
    name "instance-0000000000"
    cluster_name "<cluster-name>"
    cluster_uuid "<cluster-uuid>"
    version
    number "8.3.2"
    build_type "docker"
    build_hash "8b0b1f23fbebecc3c88e4464319dea8989f374fd"
    build_date "2022-07-06T15:15:15.901688194Z"
    build_snapshot false
    lucene_version "9.2.0"
    minimum_wire_compatibility_version "7.17.0"
    minimum_index_compatibility_version "7.0.0"
    tagline "You Know, for Search"

Save your Elastic Cloud deployment credentials

After you’ve created an Elastic deployment and endpoint, you have two options to store your Elastic deployment credentials. You can store the credentials in your Astronomer Software helm values, or for greater security, as a secret in your Astronomer Software Kubernetes cluster. For additional information about adding an Astronomer Software configuration change, see Apply a config change.

  1. Run the following command to base64 encode your Elastic Cloud deployment credentials:
$ echo -n "<username>:<password>" | base64
  1. Add the following entry to your values.yaml file:
1global:
2 fluentdEnabled: true
3 customLogging:
4 enabled: true
5 scheme: https
6 # host endpoint copied from elasticsearch console with https
7 # and port number removed.
8 host: "<host-URL>"
9 port: "9243"
10 # encoded credentials from above step 1
11 secret: "<encoded credentials>"
  1. Add the following entry to your values.yaml file to disable internal logging:
1tags:
2 logging: false
  1. Run the following command to upgrade the Astronomer Software release version in the values.yaml file:
$helm upgrade -f values.yaml --version=0.27 --namespace=<your-platform-namespace> <your-platform-release-name> astronomer/astronomer

View Airflow task logs in Elastic

  1. On the Elastic dashboard in the Elasticsearch Service area, click the Deployment name.
  1. Click Menu > Discover. The Create index pattern screen appears.

  2. Enter fluentd.*, or vector.* if you use Vector Sidecar Logging. In the Name field, enter @timestamp in the Timestamp field, and then click Create index pattern.

  3. Click Menu > Dashboard to view all of the Airflow task logs for your Deployment on Astronomer.