At Zulily, our need to perform various dependent actions on different schedules has continued to grow. Sometimes, we need to communicate inventory changes to advertising platforms. At other times, we need to aggregate data and produce reports on the effectiveness of spend, conversions of ads or other tasks. Early on, we knew that we needed a reliable workflow management system. We started with Apache Airflow version 1.8 two years ago and have continued to add more hardware resources to it as the demands of our workloads increased.
Apache Airflow is a workflow management system that allows developers to describe workflow tasks and their dependency graph as code in Python. This allows us to keep a history of the changes and build solid pipelines step-by-step with proper monitoring.
Our Airflow deployment runs a large majority of our advertising management and reporting workflows. As our usage of Airflow increased, we have made our Airflow deployment infrastructure more resilient to failures leveraging the new KubernetesPodOperator. This post will talk about our journey with Airflow from Celery to KubernetesPodOperator.
Our First Airflow 1.8 Deployment using Celery Executor
With this version of Airflow we needed to maintain many separate services: the scheduler, RabbitMQ, and workers.
Components and Concepts
DAG is the one complete workflow definition code that is composed of tasks and their dependencies with other tasks. The AWS Elastic File Share contains the code for the DAGs.
Git Syncer is responsible for polling and getting the DAG code from Zulily’s Gitlab at regular intervals of 5 minutes and putting the code on the AWS EFS.
AWS EFS is the file share that has all the DAG code. It is mounted on the Webserver pod and Scheduler pod.
Webserverpod hosts the Airflow UI that shows running tasks, task history and allows users to start and stop tasks and view logs of tasks that already completed.
Scheduler pod reads the DAG code from AWS EFS and reads the scheduling data from the Airflow Metadata DB and schedules tasks on the Worker pods by pushing them on the RabbitMQ.
Airflow Metadata DB contains the scheduling information and history of DAG runs.
Workers deque the tasks from the RabbitMQ and execute them copying the logs to S3 when done.
Having the ability to add more worker nodes as loads increased was a plus.
By using the Git Syncer we were able to sync code every 5 minutes. From a developer point of view, after merging code in master branch it would automatically get pulled to production machines within 5 minutes.
Multiple single points of failure: RabbitMQ, GitSyncer
All DAGs and the Airflow scheduler comprised of one application that shared packages across the board. This means that there was one gigantic Pipfile and each package in that had to be compatible with all the others.
All the DAGs had to be written in Python which restricted the ability to re-use existing components written in Java and other languages.
Our Current Airflow 1.10.4 Deployment using KubernetesPodOperator
In Airflow version 1.10.2 a new kind of operator called the KubernetesPodOperator was introduced. This allowed us to reduce setup steps and make the overall setup more robust and resilient by leveraging our existing Kubernetes cluster.
Differences and New Components
DAG continues to be a Python definition of dependencies. We used the KubernetesPodOperator to define all our DAGs. As a result, our DAG becomes a tree of task containers. We used the LocalExecutor to run our DAGs from the scheduler.
Temporary Task Pods run Task Containers which operate like any other container and contain the business logic needed for that task. The key benefit is that there is no need to bundle in any Airflow specific packages in the task container. This is a game changer as it allows us to use pretty much any code, written in any language, that can be made into a container, to be used as tasks inside an Airflow DAG. For our Python DAGs, it also breaks up the giant Pipfile into smaller Pipfiles, one per Python task container, making the package dependencies much more manageable.
GitSyncer goes away. Git Syncer was polling Zulily’s Gitlab every 5 minutes. We avoid that by using kubectl cp command during the CI CD. After developer merges code to master, as a step in CI CD, the updated DAG definition is copied over to the AWS EFS. Hence, the need for polling every 5 minutes is eliminated using this push-based approach.
Webserver and Scheduler containers both run on the same pod. Starting from the Airflow Kubernetes deploy yaml, we removed the portions for setting up the git sync and created one pod with both Webserver and Scheduler containers. This simplified deployment. We used a minimal version of the Airflow Dockerfile for our Webserver and Scheduler containers.
Scheduling: When the scheduler needs to schedule a new task, using the Kubernetes API, it creates a temporary worker pod with the container image specified and starts it. After the task has completed, the logs are copied over to S3. Then the worker pod ends. By following this approach, the task worker containers are automatically distributed over the whole Kubernetes cluster. In order to increase processing capacity of Airflow, we simply need to scale up Kubernetes which we do using Kops.
High Level Sequence of Events
Developer pushes or merges DAG definition and Task container code to master.
The CI CD is setup to:
Build and push the task container images to AWS ECR.
Push the new DAG definition to the AWS EFS.
When the Scheduler needs to schedule a task:
It reads the DAG definition from the AWS EFS.
It creates the short-lived Task pod with the Task container image specified.
Task executes and finishes.
Logs are copied to S3.
Task pod exits.
Resiliency & Troubleshooting
Using a single pod for Airflow webserver and scheduler containers simplifies things; Kubernetes will bring this pod up if it goes down. We do need to handle orphaned task pods.
Worker pods are temporary. If they error out, we investigate, fix and re-run. The KubernetesPodOperator provides the option to keep older task containers around for troubleshooting purposes.
As before, Airflow Metadata DB is a managed AWS RDS instance for us.
The DAGs volume is also an AWS EFS. If we were to lose this in the case of a catastrophic, although unlikely event, we can always restore from our source repository.
Kubernetes cluster has been working for us without any major resiliency issues. We have it deployed on AWS EC2s and use Kops for cluster management. AWS EKS is something we are exploring at the moment.
Airflow continues to be a great tool helping us achieve our business goals. Using the KubernetesPodOperator and the LocalExecutor with Airflow version 1.10.4, we have streamlined our infrastructure and made it more resilient in the face of machine failures. Our package dependencies have become more manageable and our tasks have become more flexible. We are piggy-backing on our existing Kubernetes infrastructure instead of maintaining another queuing and worker mechanism. This has enabled our developers to devote more time to improve customer experience and to worry less about infrastructure. We are excited about future developments in Airflow and how they enable us to drive Zulily business!