This article is about having Airflow configured with KubernetesExecutor and deployed to Kubernetes. This configuration will ensure that Airflow takes advantage of Kubernetes scalability by scheduling individual containers for each task.

Requirements

Airflow is well known to be a great scheduler for parallel tasks. Below is a set of requirements for running Airflow at scale:

  • tasks should run in parallel in different containers, one temporary container per task
  • all the logs should be centralised and be available at all time
  • the variables will need to be imported automatically from AWS Secrets Manager
  • enable Google authentication in Airflow

The solution

Luckily other people had the exact same requirements and now we are able to take advantage.

Temporary containers

This can be achieved quite easily by configuring Airflow to use the KubernetesExecutor:

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: airflow-env
  labels:
    app: airflow
data:
  TZ: Etc/UTC
  POSTGRES_HOST: "MY_PSQL"
  POSTGRES_PORT: "5432"
  POSTGRES_DB: "airflow"
  POSTGRES_USER: "airflow"
  POSTGRES_PASSWORD: "MY_PASS"
  REDIS_HOST: "MY_REDIS_HOST"
  REDIS_PORT: "6379"
  REDIS_PASSWORD: ""
  FLOWER_PORT: "5555"
  AIRFLOW__CORE__EXECUTOR: "KubernetesExecutor"
  FERNET_KEY: "oniqx7yno09xmpe9umpqxR390U-0="
  AIRFLOW__CORE__FERNET_KEY: "oniqx7yno09xmpe9umpqxR390U-0="
  DO_WAIT_INITDB: "true"
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: "postgresql+psycopg2://airflow:MY_PASS@MY_PSQL:5432/airflow"
  AIRFLOW__CELERY__RESULT_BACKEND: "db+postgresql://airflow:MY_PASS@MY_PSQL:5432/airflow"
  AIRFLOW__CORE__DONOT_PICKLE: "false"
  AIRFLOW__CELERY__FLOWER_URL_PREFIX: ""
  AIRFLOW__CELERY__WORKER_CONCURRENCY: "10"
  AIRFLOW__CORE__DAGS_FOLDER: "/usr/local/airflow/dags"
  AIRFLOW__WEBSERVER__BASE_URL: "https://MY_AIRFLOW_URL"
  AIRFLOW__CODE__ENABLE_XCOM_PICKLING: "false"
  AIRFLOW__KUBERNETES__POD_TEMPLATE_FILE: "/repo-sync/dags-repo/kubernetes/pod_templates/pod.yaml"
  AIRFLOW__KUBERNETES__NAMESPACE: "default"
  AIRFLOW__KUBERNETES__DELETE_WORKER_PODS: "True"
  AIRFLOW__KUBERNETES__DELETE_WORKER_PODS_ON_FAILURE: "True"
  AIRFLOW__CORE__STORE_SERIALIZED_DAGS: "False"
  AIRFLOW__CORE__MIN_SERIALIZED_DAG_UPDATE_INTERVAL: "30"
  AIRFLOW__CORE__STORE_DAG_CODE: "False"
  AIRFLOW__WEBSERVER__AUTHENTICATE: "True"
  AIRFLOW__WEBSERVER__AUTH_BACKEND: "airflow.contrib.auth.backends.google_auth"
  AIRFLOW__GOOGLE__CLIENT_ID: "MY_CLIENT_IDm"
  AIRFLOW__GOOGLE__CLIENT_SECRET: "MY_SECRET"
  AIRFLOW__GOOGLE__OAUTH_CALLBACK_ROUTE: "/oauth2callback"
  AIRFLOW__GOOGLE__DOMAIN: "MY_DOMAIN"

The pod template will usually be the same airflow pod container with some extra added packages depending on what the dags will be required to do. git-sync will be used for initial sync of the dags to the temporary pod. Below example will stream also the logs also to STDOUT while the task is running. At the end the logs will be pushed to S3.

---
apiVersion: v1
kind: Pod
metadata:
  name: airflow-worker
  labels:
    app: airflow
    component: worker
  annotations:
    iam.amazonaws.com/role: Airflow
  namespace: default
spec:
  restartPolicy: Never
  securityContext:
    runAsUser: 50000
  serviceAccountName: airflow
  hostNetwork: false
  initContainers:
    - name: "git-sync"
      env:
        - name: "GIT_SYNC_REPO"
          value: "dags-repo.git"
        - name: "GIT_SYNC_BRANCH"
          value: "master"
        - name: "GIT_SYNC_DEST"
          value: "dags-repo"
        - name: "GIT_SYNC_SSH"
          value: "true"
        - name: "GIT_SYNC_WAIT"
          value: "60"
        - name: "GIT_SYNC_ROOT"
          value: "/tmp/git"
        - name: "GIT_KNOWN_HOSTS"
          value: "false"
        - name: "GIT_SYNC_ADD_USER"
          value: "true"
        - name: GIT_SYNC_ONE_TIME
          value: "true"
        - name: "GIT_SSH_KEY_FILE"
          value: "/etc/git-secret/ssh"
      image: "k8s.gcr.io/git-sync/git-sync:v3.1.7"
      volumeMounts:
        - name: dags-data
          mountPath: /tmp/git
        - name: ssh-key
          mountPath: /etc/git-secret/
  containers:
    - name: airflow-worker-base
      imagePullPolicy: IfNotPresent
      image: airflow/airflow:1.10.12
      lifecycle:
        postStart:
          exec:
            command: ["/bin/sh", "-c", "ln -sfn /repo-sync/dags-repo/dags /usr/local/airflow/dags && ln -sfn /repo-sync/dags-repo/libs /usr/local/airflow/libs && ln -sfn /home/airflow/.local /root/.local; mkdir -p ${AIRFLOW_HOME}/logs && (sleep 30; tail -f ${AIRFLOW_HOME}/logs/**/**/**/*.log >> /proc/1/fd/1 ) & "]
      envFrom:
        - configMapRef:
            name: "airflow-env"
      volumeMounts:
        - name: dags-data
          mountPath: /repo-sync
  volumes:
        - name: dags-data
          emptyDir: {}
        - name: "ssh-key"
          secret:
            secretName: "ssh-key"
            defaultMode: 0755

Centralize logs

Airflow has support for logs streaming to an s3 bucket. The following config will be required for Airflow

AIRFLOW__CORE__REMOTE_LOGGING: "True"
AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER: "s3://my-bucket-for-airflow-logs"
AIRFLOW__CORE__REMOTE_LOG_CONN_ID: "" # the IAM role will have read/write access to this bucket
AIRFLOW__CORE__ENCRYPT_S3_LOGS: "False"
AIRFLOW__HIVE__WORKER_CLASS: "sync"

AWS Secrets manager for variables

For this we will create the variables.json file as a secret on AWS Secrets Manager and we will use Kubernetes External Secret addon to create the configmap in Kubernetes. The configmap will be created automatically and will be kept in sync by this addon.

Below is the External Secret configuration:

---
apiVersion: 'kubernetes-client.io/v1'
kind: ExternalSecret
metadata:
  name: airflow-variables
secretDescriptor:
  backendType: secretsManager
  data:
    - key: "arn:aws:secretsmanager:SECRET_REGION:MY_AWS_ACCOUNT:secret:devops/airflow/variables.json"
      name: "variables.json"

Below is the Airflow custom script attached to Airflow-scheduler to import the variables automatically when are updated in Secrets Manager.

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: airflow-scripts
  labels:
    app: airflow
data:
  import-variables.sh: |
    #!/bin/sh -e
    while sleep 30
    do
     if [ -f ${AIRFLOW_HOME}/vars_md5sum ]
     then
       md5sum_sm=`md5sum /airflow-variables/variables.json | awk "{print $1}"`
       md5sum_imported=`cat ${AIRFLOW_HOME}/vars_md5sum`
       if [ "$md5sum_sm" != "$md5sum_imported" ]
       then
         echo "[INFO]: Change detected, importing the variables"
         airflow variables -i /airflow-variables/variables.json || true
         md5sum /airflow-variables/variables.json | awk "{print $1}" > ${AIRFLOW_HOME}/vars_md5sum
      fi
     else
       airflow variables -i /airflow-variables/variables.json || true
       md5sum /airflow-variables/variables.json | awk "{print $1}" > ${AIRFLOW_HOME}/vars_md5sum
     fi
    done

Conclusion

Using the above snippets with the configuration recommended by Airflow will result in a scalable solution which will reduce the AWS bill. This will require a minimum infrastructure, only Airflow Scheduler and Airflow Web.

This configuration has been tested with Airflow 1.10.12.