Apache airflow is a powerful tool in data processing world, below is the official airflow introduction
Airflow is a platform to programmatically author, schedule and monitor workflows.
Use airflow to author workflows as directed acyclic graphs (DAGs) of tasks. The airflow scheduler executes your tasks on an array of workers while following the specified dependencies. Rich command line utilities make performing complex surgeries on DAGs a snap. The rich user interface makes it easy to visualize pipelines running in production, monitor progress, and troubleshoot issues when needed.
Apache airflow support Celery as its task queue to distribute tasks across different worker nodes. it will be great if we could deploy apache airflow into kubernetes cluster and let kubernetes to manage the celery cluster with simple command kubectl scale --replicas=N airflow-worker
, that's why my project airflow-kubernetes was born.
I'm re-using an popular airflow docker image, but modify it to support deploy airflow into kubernetes with both LocalExecutor and CeleryExecutor. the LocalExecutor is useful for local development, the CeleryExecutor is useful for production deployment with as many celery works as you want in kubernetes cluster. there are some difference between my Docker image and Puckel's docker image
- we use mysql as backend rather than postgresql
- we use rabbitmq as broker rather than postgresql
- we've implemnet a SystemV style of init script, is user copy anything in /usr/local/airflow/config/init/ of docker contianer , it will be executed before webserver started, this is a perfect place to init airflow variables and connections etc
- we've implemnet a SystemV style of super-init script, is user copy anything in /usr/local/airflow/config/super-init/ of docker contianer , it will be executed before webserver started , as root, this is a perfect place to init airflow under root user, e.g: fix the hostname and ip mapping issue in /etc/hosts
- for CeleryExecutor, we have flower enabled to check the task stats
all the celery deployment are managed by just one yaml file, I will break them down into details to explain
- have a dedicated namespace for airflow
apiVersion: v1
kind: Namespace
metadata:
name: airflow
- have a mysql stateful set with single nodes and a mysql service to act as airflow backend, the mysql data dir is claimed from a PVC, you will need default storageclass in your kubernetes cluster, otherwise this will fail. in normal prodution, it's common to have mysql database served as a dedicated service e.g: AWS RDS.
apiVersion: v1
kind: Service
metadata:
name: mysql
namespace: airflow
spec:
type: NodePort
ports:
- name: mysql
port: 3306
targetPort: 3306
protocol: TCP
selector:
app: mysql
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: mysql
namespace: airflow
spec:
serviceName: "mysql"
replicas: 1
selector:
matchLabels:
app: mysql
template:
metadata:
labels:
app: mysql
spec:
containers:
- name: mysql
image: email2liyang/mysql-for-airflow:5.7.24
volumeMounts:
- name: datadir
mountPath: /var/lib/mysql
env:
- name: MYSQL_ROOT_PASSWORD
value: airflow
- name: MYSQL_USER
value: airflow
- name: MYSQL_PASSWORD
value: airflow
- name: MYSQL_DATABASE
value: airflow
ports:
- containerPort: 3306
# No pre-stop hook is required, a SIGTERM plus some time is all that's
# needed for graceful shutdown of a node.
terminationGracePeriodSeconds: 60
volumeClaimTemplates:
- metadata:
name: datadir
spec:
accessModes:
- "ReadWriteOnce"
resources:
requests:
storage: 1Gi
- have a rabbitmq statefulset to support celery worker scheduling, by the time of (15th Sep 2019) rabbitmq cluster still have issue to be deployed into kubernetes(rabbitmq datadir is based on pod ip, but the pod ip will change every time when pod get restarted, so it's conflict with data persisted in PV), but in our case 1 node of rabbitmq is also fine and we use hostname as the rabbitmq's data dir.
apiVersion: v1
kind: ServiceAccount
metadata:
name: rabbitmq
namespace: airflow
---
kind: Role
apiVersion: rbac.authorization.k8s.io/v1beta1
metadata:
name: endpoint-reader
namespace: airflow
rules:
- apiGroups: [""]
resources: ["endpoints"]
verbs: ["get"]
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1beta1
metadata:
name: endpoint-reader
namespace: airflow
subjects:
- kind: ServiceAccount
name: rabbitmq
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: endpoint-reader
---
kind: Service
apiVersion: v1
metadata:
namespace: airflow
name: rabbitmq
labels:
app: rabbitmq
type: LoadBalancer
spec:
type: NodePort
ports:
- name: http
protocol: TCP
port: 15672
targetPort: 15672
nodePort: 31672
- name: amqp
protocol: TCP
port: 5672
targetPort: 5672
nodePort: 30672
selector:
app: rabbitmq
---
apiVersion: v1
kind: ConfigMap
metadata:
name: rabbitmq-config
namespace: airflow
data:
enabled_plugins: |
[rabbitmq_management,rabbitmq_peer_discovery_k8s].
rabbitmq.conf: |
## Cluster formation. See https://www.rabbitmq.com/cluster-formation.html to learn more.
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_k8s
cluster_formation.k8s.host = kubernetes.default.svc.cluster.local
## Should RabbitMQ node name be computed from the pod's hostname or IP address?
## IP addresses are not stable, so using [stable] hostnames is recommended when possible.
## Set to "hostname" to use pod hostnames.
## When this value is changed, so should the variable used to set the RABBITMQ_NODENAME
## environment variable.
cluster_formation.k8s.address_type = hostname
## How often should node cleanup checks run?
cluster_formation.node_cleanup.interval = 30
## Set to false if automatic removal of unknown/absent nodes
## is desired. This can be dangerous, see
## * https://www.rabbitmq.com/cluster-formation.html#node-health-checks-and-cleanup
## * https://groups.google.com/forum/#!msg/rabbitmq-users/wuOfzEywHXo/k8z_HWIkBgAJ
cluster_formation.node_cleanup.only_log_warning = true
cluster_partition_handling = autoheal
## See https://www.rabbitmq.com/ha.html#master-migration-data-locality
queue_master_locator=min-masters
## See https://www.rabbitmq.com/access-control.html#loopback-users
loopback_users.guest = false
---
apiVersion: apps/v1beta1
kind: StatefulSet
metadata:
name: rabbitmq
namespace: airflow
spec:
serviceName: rabbitmq
replicas: 1
template:
metadata:
labels:
app: rabbitmq
spec:
serviceAccountName: rabbitmq
terminationGracePeriodSeconds: 10
containers:
- name: rabbitmq-k8s
image: rabbitmq:3.7
volumeMounts:
- name: config-volume
mountPath: /etc/rabbitmq
- name: datadir
mountPath: /var/lib/rabbitmq
ports:
- name: http
protocol: TCP
containerPort: 15672
- name: amqp
protocol: TCP
containerPort: 5672
livenessProbe:
exec:
command: ["rabbitmqctl", "status"]
initialDelaySeconds: 60
# See https://www.rabbitmq.com/monitoring.html for monitoring frequency recommendations.
periodSeconds: 60
timeoutSeconds: 15
readinessProbe:
exec:
command: ["rabbitmqctl", "status"]
initialDelaySeconds: 20
periodSeconds: 60
timeoutSeconds: 10
imagePullPolicy: Always
env:
- name: MY_POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: MY_POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: RABBITMQ_USE_LONGNAME
value: "true"
# See a note on cluster_formation.k8s.address_type in the config file section
- name: RABBITMQ_NODENAME
value: "rabbit@$(MY_POD_NAME).rabbitmq.airflow.svc.cluster.local"
- name: K8S_SERVICE_NAME
value: "rabbitmq"
- name: K8S_HOSTNAME_SUFFIX
value: '.rabbitmq.airflow.svc.cluster.local'
- name: RABBITMQ_ERLANG_COOKIE
value: "mycookie"
volumes:
- name: config-volume
configMap:
name: rabbitmq-config
items:
- key: rabbitmq.conf
path: rabbitmq.conf
- key: enabled_plugins
path: enabled_plugins
volumeClaimTemplates:
- metadata:
name: datadir
spec:
accessModes:
- "ReadWriteOnce"
resources:
requests:
storage: 5Gi
- have an airflow webserver to provide UI support for airflow,we could manage/ track the dag via UI, kubernetes is check it's liveness and readiness on path
/admin/
and port 8080, we could also deploy ingress-nginx with http basic auth to expose the this web UI out side of kubernetes cluster
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: airflow-celery-webserver
namespace: airflow
spec:
replicas: 1
selector:
matchLabels:
app: airflow-celery-webserver
template:
metadata:
labels:
name: airflow-celery-webserver
app: airflow-celery-webserver
spec:
containers:
- name: airflow-celery-webserver
image: docker.io/email2liyang/docker-airflow:1.10.2
imagePullPolicy: Always
env:
- name: LOAD_EX
value: "y"
- name: EXECUTOR
value: Celery
- name: AIRFLOW_MYSQL_DB_HOST
value: mysql
- name: AIRFLOW_MYSQL_DB_PORT
value: "3306"
- name: FERNET_KEY
value: 46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
- name: AIRFLOW__CELERY__BROKER_URL
value: amqp://guest:guest@rabbitmq:5672
- name: AIRFLOW__CELERY__RESULT_BACKEND
value: db+mysql://airflow:airflow@$(AIRFLOW_MYSQL_DB_HOST):$(AIRFLOW_MYSQL_DB_PORT)/airflow
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
value: mysql://airflow:airflow@$(AIRFLOW_MYSQL_DB_HOST):$(AIRFLOW_MYSQL_DB_PORT)/airflow
- name: AIRFLOW__CORE__EXECUTOR
value: CeleryExecutor
readinessProbe:
httpGet:
path: /admin/
port: 8080
initialDelaySeconds: 8
timeoutSeconds: 10
livenessProbe:
httpGet:
path: /admin/
port: 8080
initialDelaySeconds: 8
timeoutSeconds: 10
ports:
- name: webserver
containerPort: 8080
command: ["/entrypoint.sh"]
args: ["webserver"]
---
apiVersion: v1
kind: Service
metadata:
name: airflow-celery-webserver
namespace: airflow
spec:
type: NodePort
ports:
- name: webserver
port: 8080
targetPort: webserver
selector:
app: airflow-celery-webserver
- have an airflow scheduler deployed in kubernetes to schedule tasks into different worker nodes, each cluster could only have 1 scheduler for now, we use ps command to do the liveness and readiness check
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: airflow-celery-scheduler
namespace: airflow
spec:
replicas: 1
selector:
matchLabels:
app: airflow-celery-scheduler
template:
metadata:
labels:
name: airflow-celery-scheduler
app: airflow-celery-scheduler
spec:
containers:
- name: airflow-celery-scheduler
image: docker.io/email2liyang/docker-airflow:1.10.2
imagePullPolicy: Always
env:
- name: LOAD_EX
value: "y"
- name: EXECUTOR
value: Celery
- name: AIRFLOW_MYSQL_DB_HOST
value: mysql
- name: AIRFLOW_MYSQL_DB_PORT
value: "3306"
- name: FERNET_KEY
value: 46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
- name: AIRFLOW__CELERY__BROKER_URL
value: amqp://guest:guest@rabbitmq:5672
- name: AIRFLOW__CELERY__RESULT_BACKEND
value: db+mysql://airflow:airflow@$(AIRFLOW_MYSQL_DB_HOST):$(AIRFLOW_MYSQL_DB_PORT)/airflow
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
value: mysql://airflow:airflow@$(AIRFLOW_MYSQL_DB_HOST):$(AIRFLOW_MYSQL_DB_PORT)/airflow
- name: AIRFLOW__CORE__EXECUTOR
value: CeleryExecutor
readinessProbe:
exec:
command:
- /bin/bash
- -c
- ps -ef | grep scheduler | grep -v "grep"
initialDelaySeconds: 8
timeoutSeconds: 10
livenessProbe:
exec:
command:
- /bin/bash
- -c
- ps -ef | grep scheduler | grep -v "grep"
initialDelaySeconds: 8
timeoutSeconds: 10
command: ["/entrypoint.sh"]
args: ["scheduler"]
- have an airflow worker deployed into the kubernetes cluster , we use ps command to do the liveness and readiness check.
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: airflow-celery-worker
namespace: airflow
spec:
replicas: 2
selector:
matchLabels:
app: airflow-celery-worker
template:
metadata:
labels:
name: airflow-celery-worker
app: airflow-celery-worker
spec:
containers:
- name: airflow-celery-worker
image: docker.io/email2liyang/docker-airflow:1.10.2
imagePullPolicy: Always
env:
- name: LOAD_EX
value: "y"
- name: EXECUTOR
value: Celery
- name: AIRFLOW_MYSQL_DB_HOST
value: mysql
- name: AIRFLOW_MYSQL_DB_PORT
value: "3306"
- name: FERNET_KEY
value: 46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
- name: AIRFLOW__CELERY__BROKER_URL
value: amqp://guest:guest@rabbitmq:5672
- name: AIRFLOW__CELERY__RESULT_BACKEND
value: db+mysql://airflow:airflow@$(AIRFLOW_MYSQL_DB_HOST):$(AIRFLOW_MYSQL_DB_PORT)/airflow
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
value: mysql://airflow:airflow@$(AIRFLOW_MYSQL_DB_HOST):$(AIRFLOW_MYSQL_DB_PORT)/airflow
- name: AIRFLOW__CORE__EXECUTOR
value: CeleryExecutor
readinessProbe:
exec:
command:
- /bin/bash
- -c
- ps -ef | grep celeryd | grep -v "grep"
initialDelaySeconds: 8
timeoutSeconds: 10
livenessProbe:
exec:
command:
- /bin/bash
- -c
- ps -ef | grep celeryd | grep -v "grep"
initialDelaySeconds: 8
timeoutSeconds: 10
command: ["/entrypoint.sh"]
args: ["worker"]
- optionally, have an airflow celery flower deployed into the cluster to monitor celery work's health, we use path
/
and port 5555 to do the readiness and liveness check. with celery workers managed by kubernetes, you may see some celery workers come and go, this is natture in kubernetes world.
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: airflow-celery-flower
namespace: airflow
spec:
replicas: 1
selector:
matchLabels:
app: airflow-celery-flower
template:
metadata:
labels:
name: airflow-celery-flower
app: airflow-celery-flower
spec:
containers:
- name: airflow-celery-flower
image: docker.io/email2liyang/docker-airflow:1.10.2
imagePullPolicy: Always
env:
- name: LOAD_EX
value: "y"
- name: EXECUTOR
value: Celery
- name: AIRFLOW_MYSQL_DB_HOST
value: mysql
- name: AIRFLOW_MYSQL_DB_PORT
value: "3306"
- name: FERNET_KEY
value: 46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
- name: AIRFLOW__CELERY__BROKER_URL
value: amqp://guest:guest@rabbitmq:5672
- name: AIRFLOW__CELERY__RESULT_BACKEND
value: db+mysql://airflow:airflow@$(AIRFLOW_MYSQL_DB_HOST):$(AIRFLOW_MYSQL_DB_PORT)/airflow
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
value: mysql://airflow:airflow@$(AIRFLOW_MYSQL_DB_HOST):$(AIRFLOW_MYSQL_DB_PORT)/airflow
- name: AIRFLOW__CORE__EXECUTOR
value: CeleryExecutor
readinessProbe:
httpGet:
path: /
port: 5555
initialDelaySeconds: 8
timeoutSeconds: 10
livenessProbe:
httpGet:
path: /
port: 5555
initialDelaySeconds: 8
timeoutSeconds: 10
ports:
- name: flower
containerPort: 5555
command: ["/entrypoint.sh"]
args: ["flower"]
---
apiVersion: v1
kind: Service
metadata:
name: airflow-celery-flower
namespace: airflow
spec:
type: NodePort
ports:
- name: flower
port: 5555
targetPort: flower
selector:
app: airflow-celery-flower
now we could achieve the goal to manage the celery worker by just one command, if we want 20 celery workers, we could simply execute command kubectl scale --replicas=20 deployment airflow-celery-worker -n airflow
, then you will get 20 celery works in airflow cluster. I will discuss more on how to organize airflow project in next article.