Apache airflow is a powerful tool in data processing world, below is the official airflow introduction

Airflow is a platform to programmatically author, schedule and monitor workflows.
Use airflow to author workflows as directed acyclic graphs (DAGs) of tasks. The airflow scheduler executes your tasks on an array of workers while following the specified dependencies. Rich command line utilities make performing complex surgeries on DAGs a snap. The rich user interface makes it easy to visualize pipelines running in production, monitor progress, and troubleshoot issues when needed.

Apache airflow support Celery as its task queue to distribute tasks across different worker nodes. it will be great if we could deploy apache airflow into kubernetes cluster and let kubernetes to manage the celery cluster with simple command kubectl scale --replicas=N airflow-worker, that's why my project airflow-kubernetes was born.

airflow-on-kube

I'm re-using an popular airflow docker image, but modify it to support deploy airflow into kubernetes with both LocalExecutor and CeleryExecutor. the LocalExecutor is useful for local development, the CeleryExecutor is useful for production deployment with as many celery works as you want in kubernetes cluster. there are some difference between my Docker image and Puckel's docker image

  • we use mysql as backend rather than postgresql
  • we use rabbitmq as broker rather than postgresql
  • we've implemnet a SystemV style of init script, is user copy anything in /usr/local/airflow/config/init/ of docker contianer , it will be executed before webserver started, this is a perfect place to init airflow variables and connections etc
  • we've implemnet a SystemV style of super-init script, is user copy anything in /usr/local/airflow/config/super-init/ of docker contianer , it will be executed before webserver started , as root, this is a perfect place to init airflow under root user, e.g: fix the hostname and ip mapping issue in /etc/hosts
  • for CeleryExecutor, we have flower enabled to check the task stats

all the celery deployment are managed by just one yaml file, I will break them down into details to explain

  1. have a dedicated namespace for airflow
apiVersion: v1
kind: Namespace
metadata:
  name: airflow
  1. have a mysql stateful set with single nodes and a mysql service to act as airflow backend, the mysql data dir is claimed from a PVC, you will need default storageclass in your kubernetes cluster, otherwise this will fail. in normal prodution, it's common to have mysql database served as a dedicated service e.g: AWS RDS.
apiVersion: v1
kind: Service
metadata:
  name: mysql
  namespace: airflow
spec:
  type: NodePort
  ports:
  - name: mysql
    port: 3306
    targetPort: 3306
    protocol: TCP
  selector:
    app: mysql

---

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: mysql
  namespace: airflow
spec:
  serviceName: "mysql"
  replicas: 1
  selector:
    matchLabels:
      app: mysql
  template:
    metadata:
      labels:
        app: mysql
    spec:
      containers:
      - name: mysql
        image: email2liyang/mysql-for-airflow:5.7.24
        volumeMounts:
        - name: datadir
          mountPath: /var/lib/mysql
        env:
        - name: MYSQL_ROOT_PASSWORD
          value: airflow
        - name: MYSQL_USER
          value: airflow
        - name: MYSQL_PASSWORD
          value: airflow
        - name: MYSQL_DATABASE
          value: airflow
        ports:
        - containerPort: 3306
      # No pre-stop hook is required, a SIGTERM plus some time is all that's
      # needed for graceful shutdown of a node.
      terminationGracePeriodSeconds: 60
  volumeClaimTemplates:
  - metadata:
      name: datadir
    spec:
      accessModes:
      - "ReadWriteOnce"
      resources:
        requests:
          storage: 1Gi
  1. have a rabbitmq statefulset to support celery worker scheduling, by the time of (15th Sep 2019) rabbitmq cluster still have issue to be deployed into kubernetes(rabbitmq datadir is based on pod ip, but the pod ip will change every time when pod get restarted, so it's conflict with data persisted in PV), but in our case 1 node of rabbitmq is also fine and we use hostname as the rabbitmq's data dir.
apiVersion: v1
kind: ServiceAccount
metadata:
  name: rabbitmq 
  namespace: airflow
---
kind: Role
apiVersion: rbac.authorization.k8s.io/v1beta1
metadata:
  name: endpoint-reader
  namespace: airflow
rules:
- apiGroups: [""]
  resources: ["endpoints"]
  verbs: ["get"]
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1beta1
metadata:
  name: endpoint-reader
  namespace: airflow
subjects:
- kind: ServiceAccount
  name: rabbitmq
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: Role
  name: endpoint-reader
---
kind: Service
apiVersion: v1
metadata:
  namespace: airflow
  name: rabbitmq
  labels:
    app: rabbitmq
    type: LoadBalancer  
spec:
  type: NodePort
  ports:
   - name: http
     protocol: TCP
     port: 15672
     targetPort: 15672
     nodePort: 31672
   - name: amqp
     protocol: TCP
     port: 5672
     targetPort: 5672
     nodePort: 30672
  selector:
    app: rabbitmq
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: rabbitmq-config
  namespace: airflow
data:
  enabled_plugins: |
      [rabbitmq_management,rabbitmq_peer_discovery_k8s].
  rabbitmq.conf: |
      ## Cluster formation. See https://www.rabbitmq.com/cluster-formation.html to learn more.
      cluster_formation.peer_discovery_backend  = rabbit_peer_discovery_k8s
      cluster_formation.k8s.host = kubernetes.default.svc.cluster.local
      ## Should RabbitMQ node name be computed from the pod's hostname or IP address?
      ## IP addresses are not stable, so using [stable] hostnames is recommended when possible.
      ## Set to "hostname" to use pod hostnames.
      ## When this value is changed, so should the variable used to set the RABBITMQ_NODENAME
      ## environment variable.
      cluster_formation.k8s.address_type = hostname
      ## How often should node cleanup checks run?
      cluster_formation.node_cleanup.interval = 30
      ## Set to false if automatic removal of unknown/absent nodes
      ## is desired. This can be dangerous, see
      ##  * https://www.rabbitmq.com/cluster-formation.html#node-health-checks-and-cleanup
      ##  * https://groups.google.com/forum/#!msg/rabbitmq-users/wuOfzEywHXo/k8z_HWIkBgAJ
      cluster_formation.node_cleanup.only_log_warning = true
      cluster_partition_handling = autoheal
      ## See https://www.rabbitmq.com/ha.html#master-migration-data-locality
      queue_master_locator=min-masters
      ## See https://www.rabbitmq.com/access-control.html#loopback-users
      loopback_users.guest = false
   
---
apiVersion: apps/v1beta1
kind: StatefulSet
metadata:
  name: rabbitmq
  namespace: airflow
spec:
  serviceName: rabbitmq
  replicas: 1
  template:
    metadata:
      labels:
        app: rabbitmq
    spec:
      serviceAccountName: rabbitmq
      terminationGracePeriodSeconds: 10
      containers:        
      - name: rabbitmq-k8s
        image: rabbitmq:3.7
        volumeMounts:
          - name: config-volume
            mountPath: /etc/rabbitmq
          - name: datadir
            mountPath: /var/lib/rabbitmq  
        ports:
          - name: http
            protocol: TCP
            containerPort: 15672
          - name: amqp
            protocol: TCP
            containerPort: 5672
        livenessProbe:
          exec:
            command: ["rabbitmqctl", "status"]
          initialDelaySeconds: 60
          # See https://www.rabbitmq.com/monitoring.html for monitoring frequency recommendations.
          periodSeconds: 60
          timeoutSeconds: 15
        readinessProbe:
          exec:
            command: ["rabbitmqctl", "status"]
          initialDelaySeconds: 20
          periodSeconds: 60
          timeoutSeconds: 10
        imagePullPolicy: Always
        env:
          - name: MY_POD_IP
            valueFrom:
              fieldRef:
                fieldPath: status.podIP
          - name: MY_POD_NAME
            valueFrom:
              fieldRef:
                fieldPath: metadata.name           
          - name: RABBITMQ_USE_LONGNAME
            value: "true"
          # See a note on cluster_formation.k8s.address_type in the config file section
          - name: RABBITMQ_NODENAME
            value: "rabbit@$(MY_POD_NAME).rabbitmq.airflow.svc.cluster.local"
          - name: K8S_SERVICE_NAME
            value: "rabbitmq"
          - name: K8S_HOSTNAME_SUFFIX
            value: '.rabbitmq.airflow.svc.cluster.local'
          - name: RABBITMQ_ERLANG_COOKIE
            value: "mycookie" 
      volumes:
        - name: config-volume
          configMap:
            name: rabbitmq-config
            items:
            - key: rabbitmq.conf
              path: rabbitmq.conf
            - key: enabled_plugins
              path: enabled_plugins
  volumeClaimTemplates:
  - metadata:
      name: datadir
    spec:
      accessModes:
      - "ReadWriteOnce"
      resources:
        requests:
          storage: 5Gi 
  1. have an airflow webserver to provide UI support for airflow,we could manage/ track the dag via UI, kubernetes is check it's liveness and readiness on path /admin/ and port 8080, we could also deploy ingress-nginx with http basic auth to expose the this web UI out side of kubernetes cluster
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: airflow-celery-webserver
  namespace: airflow
spec:
  replicas: 1
  selector:
    matchLabels:
      app: airflow-celery-webserver
  template:
    metadata:
      labels:
        name: airflow-celery-webserver
        app: airflow-celery-webserver
    spec:
      containers:
      - name: airflow-celery-webserver
        image: docker.io/email2liyang/docker-airflow:1.10.2
        imagePullPolicy: Always
        env:
        - name: LOAD_EX
          value: "y"
        - name: EXECUTOR
          value: Celery
        - name: AIRFLOW_MYSQL_DB_HOST
          value: mysql
        - name: AIRFLOW_MYSQL_DB_PORT
          value: "3306"
        - name: FERNET_KEY
          value: 46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
        - name: AIRFLOW__CELERY__BROKER_URL
          value: amqp://guest:guest@rabbitmq:5672
        - name: AIRFLOW__CELERY__RESULT_BACKEND
          value: db+mysql://airflow:airflow@$(AIRFLOW_MYSQL_DB_HOST):$(AIRFLOW_MYSQL_DB_PORT)/airflow
        - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
          value: mysql://airflow:airflow@$(AIRFLOW_MYSQL_DB_HOST):$(AIRFLOW_MYSQL_DB_PORT)/airflow
        - name: AIRFLOW__CORE__EXECUTOR
          value: CeleryExecutor
        readinessProbe:
          httpGet:
            path: /admin/
            port: 8080
          initialDelaySeconds: 8
          timeoutSeconds: 10
        livenessProbe:
          httpGet:
            path: /admin/
            port: 8080
          initialDelaySeconds: 8
          timeoutSeconds: 10
        ports:
        - name: webserver
          containerPort: 8080
        command: ["/entrypoint.sh"]
        args: ["webserver"]
---
apiVersion: v1
kind: Service
metadata:
  name: airflow-celery-webserver
  namespace: airflow
spec:
  type: NodePort
  ports:
  - name: webserver
    port: 8080
    targetPort: webserver
  selector:
    app: airflow-celery-webserver        
  1. have an airflow scheduler deployed in kubernetes to schedule tasks into different worker nodes, each cluster could only have 1 scheduler for now, we use ps command to do the liveness and readiness check
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: airflow-celery-scheduler
  namespace: airflow
spec:
  replicas: 1
  selector:
    matchLabels:
      app: airflow-celery-scheduler
  template:
    metadata:
      labels:
        name: airflow-celery-scheduler
        app: airflow-celery-scheduler
    spec:
      containers:
      - name: airflow-celery-scheduler
        image: docker.io/email2liyang/docker-airflow:1.10.2
        imagePullPolicy: Always
        env:
        - name: LOAD_EX
          value: "y"
        - name: EXECUTOR
          value: Celery
        - name: AIRFLOW_MYSQL_DB_HOST
          value: mysql
        - name: AIRFLOW_MYSQL_DB_PORT
          value: "3306"
        - name: FERNET_KEY
          value: 46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
        - name: AIRFLOW__CELERY__BROKER_URL
          value: amqp://guest:guest@rabbitmq:5672
        - name: AIRFLOW__CELERY__RESULT_BACKEND
          value: db+mysql://airflow:airflow@$(AIRFLOW_MYSQL_DB_HOST):$(AIRFLOW_MYSQL_DB_PORT)/airflow
        - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
          value: mysql://airflow:airflow@$(AIRFLOW_MYSQL_DB_HOST):$(AIRFLOW_MYSQL_DB_PORT)/airflow
        - name: AIRFLOW__CORE__EXECUTOR
          value: CeleryExecutor  
        readinessProbe:
          exec:
            command:
            - /bin/bash
            - -c
            - ps -ef | grep scheduler | grep -v "grep"
          initialDelaySeconds: 8
          timeoutSeconds: 10
        livenessProbe:
          exec:
            command:
            - /bin/bash
            - -c
            - ps -ef | grep scheduler | grep -v "grep"
          initialDelaySeconds: 8
          timeoutSeconds: 10
        command: ["/entrypoint.sh"]
        args: ["scheduler"]
  1. have an airflow worker deployed into the kubernetes cluster , we use ps command to do the liveness and readiness check.
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: airflow-celery-worker
  namespace: airflow
spec:
  replicas: 2
  selector:
    matchLabels:
      app: airflow-celery-worker
  template:
    metadata:
      labels:
        name: airflow-celery-worker
        app: airflow-celery-worker
    spec:
      containers:
      - name: airflow-celery-worker
        image: docker.io/email2liyang/docker-airflow:1.10.2
        imagePullPolicy: Always
        env:
        - name: LOAD_EX
          value: "y"
        - name: EXECUTOR
          value: Celery
        - name: AIRFLOW_MYSQL_DB_HOST
          value: mysql
        - name: AIRFLOW_MYSQL_DB_PORT
          value: "3306"
        - name: FERNET_KEY
          value: 46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
        - name: AIRFLOW__CELERY__BROKER_URL
          value: amqp://guest:guest@rabbitmq:5672
        - name: AIRFLOW__CELERY__RESULT_BACKEND
          value: db+mysql://airflow:airflow@$(AIRFLOW_MYSQL_DB_HOST):$(AIRFLOW_MYSQL_DB_PORT)/airflow
        - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
          value: mysql://airflow:airflow@$(AIRFLOW_MYSQL_DB_HOST):$(AIRFLOW_MYSQL_DB_PORT)/airflow
        - name: AIRFLOW__CORE__EXECUTOR
          value: CeleryExecutor  
        readinessProbe:
          exec:
            command:
            - /bin/bash
            - -c
            - ps -ef | grep celeryd | grep -v "grep"
          initialDelaySeconds: 8
          timeoutSeconds: 10
        livenessProbe:
          exec:
            command:
            - /bin/bash
            - -c
            - ps -ef | grep celeryd | grep -v "grep"
          initialDelaySeconds: 8
          timeoutSeconds: 10
        command: ["/entrypoint.sh"]
        args: ["worker"]
  1. optionally, have an airflow celery flower deployed into the cluster to monitor celery work's health, we use path / and port 5555 to do the readiness and liveness check. with celery workers managed by kubernetes, you may see some celery workers come and go, this is natture in kubernetes world.
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: airflow-celery-flower
  namespace: airflow
spec:
  replicas: 1
  selector:
    matchLabels:
      app: airflow-celery-flower
  template:
    metadata:
      labels:
        name: airflow-celery-flower
        app: airflow-celery-flower
    spec:
      containers:
      - name: airflow-celery-flower
        image: docker.io/email2liyang/docker-airflow:1.10.2
        imagePullPolicy: Always
        env:
        - name: LOAD_EX
          value: "y"
        - name: EXECUTOR
          value: Celery
        - name: AIRFLOW_MYSQL_DB_HOST
          value: mysql
        - name: AIRFLOW_MYSQL_DB_PORT
          value: "3306"
        - name: FERNET_KEY
          value: 46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
        - name: AIRFLOW__CELERY__BROKER_URL
          value: amqp://guest:guest@rabbitmq:5672
        - name: AIRFLOW__CELERY__RESULT_BACKEND
          value: db+mysql://airflow:airflow@$(AIRFLOW_MYSQL_DB_HOST):$(AIRFLOW_MYSQL_DB_PORT)/airflow
        - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
          value: mysql://airflow:airflow@$(AIRFLOW_MYSQL_DB_HOST):$(AIRFLOW_MYSQL_DB_PORT)/airflow
        - name: AIRFLOW__CORE__EXECUTOR
          value: CeleryExecutor
        readinessProbe:
          httpGet:
            path: /
            port: 5555
          initialDelaySeconds: 8
          timeoutSeconds: 10
        livenessProbe:
          httpGet:
            path: /
            port: 5555
          initialDelaySeconds: 8
          timeoutSeconds: 10
        ports:
        - name: flower
          containerPort: 5555
        command: ["/entrypoint.sh"]
        args: ["flower"]
---
apiVersion: v1
kind: Service
metadata:
  name: airflow-celery-flower
  namespace: airflow
spec:
  type: NodePort
  ports:
  - name: flower
    port: 5555
    targetPort: flower
  selector:
    app: airflow-celery-flower    

now we could achieve the goal to manage the celery worker by just one command, if we want 20 celery workers, we could simply execute command kubectl scale --replicas=20 deployment airflow-celery-worker -n airflow , then you will get 20 celery works in airflow cluster. I will discuss more on how to organize airflow project in next article.