Making Facebook Ad Publishing More Efficient

Remember Bart Simpson’s punishment for being bad? He had to write the same thing on the chalkboard over and over again, and he absolutely hated it! We as humans hate repetitive actions, and that’s why we invented computers – to help us optimize our time to do more interesting work.

At zulily, our Marketing Specialists previously published ads to Facebook individually. However, they quickly realized that creating ads manually was limiting to the scale they could reach in their work: acquiring new customers and retaining existing shoppers. So in partnership with the marketing team, we worked together to build a solution that would help the team use resources efficiently.

At first, we focused on automating individual tasks. For instance, we wrote a tool that Marketing used to stitch images into a video ad. That was cool and saved some time but still didn’t necessarily allow us to operate at scale.

Now, we are finally at the point where the entire process runs end-to-end efficiently, and we are able to publish hundreds of ads per day, up from a handful.

Here’s how we engineered it.

The Architecture

Automated Ad Publishing Architecture (1)

Sales Events

Sales Events is an internal system at zulily that stores the data about all sales events we run; typically, we launch 100+ sales each day that could include 9,000 products that last three days. Each event includes links to appropriate products and product images. The system exposes the data through a REST API.

Evaluate an Event

This component holds the business logic that allows us to pick events that we want to advertise, using a rules-based system uniquely built for our high-velocity business. We implemented the component as an Airflow DAG that hits the Sales Events system multiple times a day for new events to evaluate. When a decision to advertise is made, the component triggers the next step.

Make Creatives

In this crucial next step, our zulily-built tool creates a video advertisement, which is uploaded to AWS S3 as an MP4 file. These creatives also include metadata used to match Creatives with Placements downstream.

Product Sort

A sales event at zulily could easily have dozens if not hundreds of products. We have a Machine Learning model that uses a proprietary algorithm to rank products for a given event. The Product Sort is available through a REST API, and we use it to optimize creative assets.

Match Creatives to Placements

A creative is a visual item that needs to be published so that a potential shopper on Facebook can see it. That end result advertisement that is seen by the potential shopper is described by a Placement. A Placement defines where on Facebook the ad will go and who the audience should be for the ad. We match creatives with placements using Match Filters defined by Marketing Specialists.

Define Match Filters

Match Filters allow Marketing Specialists to define rules that will pick a Placement for a new Creative.

Screen Shot 2018-09-26 at 9.04.15 AM

These rules are based on the metadata of Creatives: “If a Creative has a tag X with the value Y, match it to the Placement Z.”

MongoDB

Once we match a Creative with one or more Placements, we persist the result in MongoDB. We use the schemaless database technology rather than a SQL database because we want to be able to extend the schema of Creatives and Placements without having to update table definitions. MongoDB (version 3.6 and above) also gives us a change stream, which is essentially a log of changes happening to a collection. We rely on this feature to automatically kick off the next step.

Publish Ads to Facebook

Once the ad definition is ready, and the new object is pushed to the MongoDB collection, we publish the ad to Facebook through a REST API. Along the way, the process automatically picks up videos to S3 and uploads them to Facebook. Upon a successful publish, the process marks the Ad as synced in the MongoDB collection.

Additional Technical Details

While this post is fairly high level, we want to share a few important technical details about the architecture that can be instructive for engineers interested in building something similar.

    1. Self-healing. We run our services on Kubernetes, which means that the service auto-recovers. This is key in an environment where we only have a limited time (in our case, typically three days) to advertise an event.
    2. Retry logic. Whenever you work with an external API, you want to have some retry logic to minimize downtime due to external issues. We use exponential retry, but every use case is different. If the number of re-tries is exhausted, we write the event to a Dead Letter Queue so it can be processed later.
    3. Event-driven architecture. In addition to MongoDB change streams, we also rely on message services such as AWS Kinesis and SQS (alternatives such as Kafka and RabbitMQ are readily available if you are not in AWS). This allows us to de-couple individual components of the system to achieve a stable and reliable design.
    4. Data for the users. While it’s not shown directly on the diagram, the system publishes business data it generates (Creatives, Placements, and Ads) to zulily’s analytics solution where it can be easily accessed by Marketing Specialists. If your users can access the data easily, it’ll make validations quicker, help build trust in the system and ultimately allow for more time to do more interesting work – not just troubleshooting.

In short, employing automated processes can help Marketing Tech teams scale and optimize work.

Realtime Archival of Mongo Collections to BigQuery

Here at zulily, we maintain a wide variety of database solutions.  In this post, we’ll focus on two that we use for two differing needs in Ad Tech: MongoDB for our real-time needs and Google’s BigQuery for our long-term archival & analytics needs. In many cases we end up with data that needs to be represented and maintained in both databases, which causes issues due to their conflicting needs and abilities. While previously we had solved this issue by creating manual batch jobs for whatever collections we wanted represented in BigQuery, it resulted in delays of data appearing in BigQuery and issues with maintaining multiple mostly-similar-but-slightly-different jobs. As such, we’ve developed a new service, called mongo_bigquery_archiver, that automates the process of archiving data to BigQuery with only minimal configuration.

For the configuration of the archiver, we have a collection, ‘bigquery_archiver_settings,’ established on the Mongo server whose contents we want to back up, which is under a custom database ‘config’ on that server. This collection maintains one document for each collection that we want to back up to BigQuery, which serves both as configuration and an up-to-date reference for the status of that backup process. Starting out, the configuration is simple:

{
    "mongo_database" : The Mongo database that houses the collection we want to archive.
    "mongo_collection" : The Mongo collection we want to archive.
    "bigquery_table" : The BigQuery table we want to create to store this data. Note that the table doesn't need to already exist! The archiver can create them on the fly.
    "backfill_full_collection" : Whether to upload existing documents in the Mongo collection, or only upload future ones. This field is useful for collections that may have some junk test data in them starting out, or for quickly testing it for onboarding purposes.
    "dryrun": Whether to actually begin uploading to BigQuery or just perform initial checks. This is mostly useful for the schema creation feature discussed below.
}

And that’s it! When that configuration is added to the collection, if the archiver process is active, it’ll automatically pick up the new configuration and create a subprocess to begin archiving that Mongo collection. The first step is determining the schema to use when creating the BigQuery table. If one already exists or is (optionally) specified in the configuration document manually, it’ll use that. Otherwise, it’ll begin the process of determining the maximum viable schema. By iterating through the entire collection, it analyzes each document to determine what fields exist across the superset of all present documents, coming up with all fields that maintain consistent types across every document and/or exist in some documents and not others, treating ones with the missing fields as null values. In addition, it analyzes subdocuments in the same way, creating RECORD field definitions that similarly load all the valid fields, recurring as necessary based on the depth of the collection’s documents. When complete, it stores the generated maximum viable schema in the configuration document for the user to review and modify as needed, in case there’s extraneous fields that, while possible to upload to BigQuery, would just result in useless overhead. It creates a BigQuery table based on this generated schema, and moves on to the next step.

Here’s an example of the generated schema:

{
    "columns" : [
        {
            "name" : "_id",
            "type" : "STRING",
        }
        {
            "name" : "tracking_code",
            "type" : "INTEGER",
            "mongo_field" : "tracking_code"
        },
        {
            "name" : "midnight_to_timestamp",
            "type" : "RECORD",
            "mongo_field" : "midnight_to_timestamp",
            "fields" : [
                {
                    "name" : "cpta",
                    "type" : "FLOAT"
                },
                {
                    "name" : "activations",
                    "type" : "INTEGER"
                },
                {
                    "name" : "spend",
                    "type" : "FLOAT"
                },
                {
                    "name" : "date_start",
                    "type" : "STRING"
                },
                {
                    "name" : "ad_spend_id",
                    "type" : "STRING"
                }
            ]
        },
        {
            "name" : "created_on_timestamp",
            "type" : "FLOAT",
            "mongo_field" : "created_on_timestamp"
        },
        {
            "name" : "lifetime_to_timestamp",
            "type" : "RECORD",
            "mongo_field" : "lifetime_to_timestamp",
            "fields" : [
                {
                    "name" : "cpta",
                    "type" : "FLOAT"
                },
                {
                    "name" : "activations",
                    "type" : "INTEGER"
                },
                {
                    "name" : "spend",
                    "type" : "FLOAT"
                }
            ]
        }
    ]
}

Next, it uses a new feature added in Mongo 3.6, change streams. A change stream is like a subscription to the OpLog on the Mongo server for the collection in question – whenever an operation comes in that modifies the collection, the subscriber is notified about it. From here, we maintain a subscription for the watched collections, and whenever an update comes in, we process that update to get the current state of the document in Mongo, without querying again since we can configure the change stream to also give us the latest version of the document. By filtering down using the generated schema from before, we can upload the change to the BigQuery table via the BigQuery streaming API, along with the kind of operation it is – create, replace, update, or delete. In case of a delete, we blank all fields but the _id field and log that as a DELETE operation in the table. This table now represents the entire history of operations on the collection in question and can be used to track the state of the collection across time.

Since a full-history table makes it somewhat cumbersome to get the most current version of the data, the archiver automatically creates views on top of all generated tables, using the following query:

SELECT * FROM (
    SELECT *, ROW_NUMBER() OVER(
        PARTITION BY _id ORDER BY time_archived DESC) rank FROM project.dataset_id.table_id
    )
WHERE rank=1 AND operationType != 'delete'

What this query does is determine the latest operation performed on each unique document ID, which is guaranteed to be unique in the Mongo collection. If the latest operation is a DELETE, it doesn’t show any entries for that particular ID in the final query results, otherwise it shows the status of the document as of the latest modification. This results in a view of each unique document exactly as of its latest update. With the minimal latency of the BigQuery streaming API, changes are reflected in BigQuery within seconds of their creation in Mongo, allowing for sophisticated real-time analytics via BigQuery. While BigQuery does not have official SLAs about the performance of the streaming API, we consistently see results uploaded within 3-4 seconds at most via query results. The preview mechanism via the BigQuery UI does not accurately reflect it, but querying the table via a SELECT statement properly shows the results.

Untitled.png

An example of the architecture. Note that normal user access
always hits Mongo and never goes to BigQuery, making the process more efficient.

Thanks to the Archiver, we’ve been able to leverage the strengths of both Mongo and BigQuery in our rapidly-modifying datasets while not having to actively maintain two disparate data loading processes for each system.

We’ve open-sourced the Archiver on Github under the Apache V2 license: https://github.com/zulily/mongo_bigquery_archiver. If you choose to use it for your own needs, check out Google’s for streaming inserts into BigQuery.

Managing Your First Data Science or Machine Learning Project

Introduction

When I got started managing software development projects the standard methodology in use was the Waterfall, in which you attempted to understand and document the entire project lifecycle before anyone wrote a single line of code. While this may have offered some benefit for coordinating large multi-team projects, more often than not it resulted in missed deadlines, inflexibility, and delivering systems that didn’t fully achieve their business goals. Agile has since emerged as the dominant project methodology, addressing many of the Waterfall’s shortcomings. Agile recognizes that it’s unlikely we’ll know everything up front, and as such is built on an iterative foundation. This allows us to learn as we go, regularly incorporate stakeholder feedback, and to avoid failure.

For Data Science and Machine Learning (DS/ML) projects, I’d argue that an iterative approach is a necessary, but not sufficient for a successful project outcome. DS/ML projects are different. And these differences can fly below the traditional project manager’s radar until they pop up late in the schedule and deliver a nasty bite. In this blog post I’ll point out some of the key differences I’ve seen between a traditional software development project and a DS/ML project, and how you can protect your team and your stakeholders from these hidden dangers.

Project Goal

At a high level DS/ML projects typically seek to do one of three things: 1) Explain; 2) Predict; or 3) Find Hidden Structure. In the first two we are predicting something, but with different aims. When we are tasked with explanation we use ‘transparent’ models, meaning they show us directly how they arrive at a prediction. If our model is sufficiently accurate, we can make inferences about what drives the outcome we’re trying to predict. If our goal is simply getting the best possible prediction we can also use ‘black box’ models. These models are generally more complex and don’t provide an easy way to see how they make their predictions, but they can be more accurate than transparent models. When the goal is to find hidden structure, we are interested in creating groups of like entities such as customers, stores, or products, and then working with those groups rather than the individual entities. Regardless of the immediate goal, in all three cases we’re using DS/ML to help allocate scarce organizational resources more effectively.

When we want to explain or predict we need to explicitly define an outcome or behavior of interest. Most retail organizations, for example, are interested in retaining good customers. A common question is “How long will a given customer remain active?” One way to answer this is to build a churn model that attempts to estimate how likely a customer is to stop doing business with us. We now have a defined behavior of interest: customer churn, so we’re ready to start building a model, right? Wrong. A Data Scientist will need a much more precise definition of churn or risk building a model that won’t get used. Which group of customers are we targeting here? Those who just made their first purchase? Those who have been loyal customers for years? Those who haven’t bought anything from us in the last 6 months? High value customers whose activity seems to be tapering off recently? Each of these definitions of ‘at risk’ creates a different population of customers to model, leaving varying amounts of observed behavior at our disposal.

By definition we know less about new customers than long time loyal customers, so a model built for the former use case will probably not generalize well to the latter group. Once we’ve defined the population of customers under consideration, it’s really important to compute the ‘size of the prize’.

Size of the Prize

Say we build a churn model that makes predictions that are 100% accurate (if this were to happen, we’ve likely made a modeling mistake – more on that later), and we apply that model to the customer audience we defined to be at risk of churn. What’s the maximum potential ROI? Since our model can correctly distinguish those customers who will churn from those that won’t, we’d only consider offering retention incentives to those truly at risk of leaving. Maybe we decide to give them a discount on their next purchase. How effective have such interventions been in the past at retaining similar customers?

If a similar discount has historically resulted in 2% of the target audience making an incremental purchase, how many of todays at risk customers would we retain? If you assume that each retained customer’s new order will be comparable to their average historical order amount, and then back out the cost of the discount, how much is left? In some cases, even under ideal conditions you may find that you’re targeting a fairly narrow slice of customers to begin with, and the maximum ROI isn’t enough to move forward with model-based targeting for a retention campaign. It’s much better to know this before you’ve sunk time and effort in to building a model that won’t get used. Instead, maybe you could build a model to explain why customers leave in the first place and try to address the root causes.

Definition of Success

Many times there is a difference in the way you’ll assess the accuracy of a model and how stakeholders will measure the success of the project. DS/ML practitioners use a variety of model performance metrics, many of which are quite technical in nature. These can be very different from the organizational KPIs that a stakeholder will use to judge a project’s success. You need to make sure that success from a model accuracy standpoint will also move the KPI needle in the right direction.

Models are built to help us make better decisions in a specific organizational context. If we’re tasked with improving the decision making in an existing process, we need to understand all the things that are taken into account when making that decision today. For example, if we build a model that makes recommendations for products based on previous purchases, but fail to consider current inventory, we may recommend something we can’t deliver. If our business is seasonal in nature, we may be spot on with our product recommendation and have plenty on hand, but suggest something that’s seasonally inappropriate.

Then there is the technical context to consider. If the goal will be making a recommendation in real time in an e-commerce environment, such as when an item is added to a shopping cart, you’ve got to deliver that recommendation quickly and without adding any friction to the checkout process. That means you’ll need to be ready to quickly supply this recommendation for any customer at any time. Models are usually built or ‘trained’ offline, and that process can take some time. A trained model is then fed new data and will output a prediction. This last step is commonly called ‘scoring’. Scoring can be orders of magnitude faster than training. But keep in mind that some algorithms require much more training time than others. Even if your scoring code can keep up with your busiest bursts of customer traffic, if you want to train frequently – perhaps daily so your recommendations take recent customer activity into account – the data acquisition, preparation, and training cycle may not be able to keep up.

Some algorithms need more than data, they also require supplying values for ‘tuning parameters’. Think of these as ‘knobs’ that have to be dialed in to get the best performance. The optimal settings for these knobs will differ from project to project, and can vary over time for the same model. A behavioral shift in your customer base, seasonality, or a change in your product portfolio can all require that a model be retrained and retuned. These are all factors that can affect the quality of your model’s recommendations.

Once you have clearly defined the desired outcome, the target audience, the definition of success from both the KPI and model accuracy perspectives, and how the model will be deployed you’ve eliminated some of the major reasons models get built but don’t get used.

Model Inputs and Experimentation

In traditional software development, the inputs and outputs are usually very well defined. Whether the output is a report, an on-line shopping experience, an automatic inventory replenishment system, or an automobile’s cruise control, we usually enter into the project with a solid understanding of the inputs and outputs. The desired software just needs to consume the inputs and produce the outputs. That is certainly not to trivialize these types of projects; they can be far more difficult and complex than building a predictive model.

But DS/ML projects often differ in one key respect: while we’ll usually know (or will quickly define) what the desired output should be, many times the required inputs are unknown at the beginning of project. It’s also possible that the data needed to make the desired predictions can’t be acquired quickly enough or is of such poor quality that the project is infeasible. Unfortunately these outcomes are not often apparent until the Data Scientist has had a chance to explore the data and try some preliminary things out.

Our stakeholders can be of immense help when it comes to identifying candidate model inputs. Data that’s used to inform current decision making processes and execute existing business rules can be a rich source of predictive ‘signal’. Sometimes our current business processes rely on information that would be difficult to obtain (residing in multiple spreadsheets maintained by different groups) or almost impossible to access (tribal knowledge or intuition). Many algorithms can give us some notion of which data items they find useful to make predictions (strong signal), which play more of a supporting role (weak signal) or are otherwise uninformative (noise). Inputs containing strong signal are easy to identify, but the distinction between weak signal and noise is not always obvious.

Ghost Patterns

If we’re lucky we’ll have some good leads on possible model inputs. If we’re not so lucky we’ll have to start from scratch. There are real costs to including uninformative or redundant inputs. Besides the operational costs of acquiring, preparing, and managing inputs, not being selective about what goes into an algorithm can cause some models to learn ‘patterns’ in the training data that turn out to be spurious.

Say I asked you to predict a student’s math grade. Here’s your training data: Amy gets an ‘A’, Bobby gets a ‘B’, and Cindy gets a ‘C’. Now make a prediction: What grade does David get? If that’s all the information you had, you might be inclined to guess ‘D’, despite how shaky that seems. You’d probably be even less inclined to hazard a guess if I asked about Mary’s grade. The more data we put into a model the greater the chance that some data items completely unrelated to the outcome just happen to have a pattern in the training data that looks useful. When you try to make predictions with a data set that your model didn’t see during training, that spurious pattern won’t be there and model performance will suffer.

To figure out which candidate model inputs are useful to keep and which should be dropped from further consideration, the DS/ML practitioner must form hypotheses about and conduct experiments on the data. And there’s no guarantee that there will be reliable enough signal in the data to build a model that will meet your stakeholder’s definition of success.

Time Travel

We hope to form long, mutually beneficial relationships with our customers. If we earn our customers’ repeat business, we accumulate data and learn things about them over time. It’s common to use historical data to train a model from one period of time and then test its accuracy on data from a subsequent period. This reasonably simulates what would happen if I build a model on today’s data and use the model to predict what will happen tomorrow. Looking into the past like this is not without risk though. When we reach back into a historical data set like this, we need to be careful to avoid considering data that arrived after the point in business process at which we want to make a prediction.

I once built a model to predict how likely it was that a customer would make their first purchase between two points in time in their tenure. To test how accurate my model was I needed to compare it to real outcomes, so that meant using historical data. When I went to gather the historical data I accidentally included a data item that captured information about the customer’s first purchase – something I wouldn’t know at the point in time at which I wanted to make the prediction. If a customer had already made their first purchase at the time I wanted make the prediction, they wouldn’t be part of the target population to begin with.

The first indication that I had accidentally let my model cheat by peeking into the future was that it was 100% accurate when tested on a new data set. That generally doesn’t happen, at least to me, and least of all on the first version of a model I build. When I examined the model to see which inputs had a strong signal, the data item with information from the future stood out like a sore thumb. In this case my mistake was obvious, so I simply removed the data item that was ‘leaking’ information from the future and kept going. This particular information leak was easy to detect and correct, but that’s not always the case. And this issue is something that can bite even stakeholders during the conceptualization phases of projects, especially when trying to use DS/ML to improve decision making in longer running business processes.

Business processes that run over days, weeks, or longer typically fill in more and more data as they progress. When we’re reporting or doing other analysis on historical data, it can be easy to lose sight of the fact that not all of that data showed up at the same time. If you want to use a DS/ML capability to improve a long running business process, you need to be mindful of when the data actually becomes available. If not, there’s a real risk of proposing something that sounds awesome but is just not feasible.

Data availability and timing issues can also crop up in business processes that need to take quick action on new information. Just because data appears in an operational system in a timely fashion, that data still has to be readied for model scoring and fed to the scoring code. This pre-scoring data preparation process in some cases can be computationally intensive and may have its own input data requirements. Once the data is prepared and delivered to the scoring process, the scoring step itself is typically quick and efficient.

Unified Modeling Language (UML) Sequence & Timing Diagrams are useful tools for figuring out how long the end to end process might take might take. It’s wise to get a ballpark estimate on this before jumping into model building.

Deploying Models as a Service

Paraphrasing Josh Wills, a Data Scientist is a better programmer than most statisticians, and a better statistician than most programmers. That said, you probably still want software engineers building applications and machine learning engineers building modeling and scoring pipelines. There are two main strategies an application can use to obtain predictions from a model: The model can be directly integrated into the application’s code base or the application can call a service (API) to get model predictions. This choice can have a huge impact on architecture and success of an ML/DS project.

Integrating a predictive model directly into an application may seem tempting – no need to stand up and maintain a separate service, and the end-to-end implementation of making and acting on a prediction is in the same code base, which can simplify troubleshooting and debugging. But there are downsides. Application and model development are typically done on different cadences and by different teams. An integrated model means more a complicated deployment and testing process and can put application support engineers in the awkward position of having to troubleshoot code developed by another team with a different set of skills. Integrated models can’t easily be exposed to other applications or monitoring processes and can cause application feature bloat if there’s a desire to include a capability to A/B test competing models.

Using a service to host the scoring code gets around these issues, but also impacts the overall system architecture. Model inputs need to be made available behind the API. At first blush, this may seem like a disadvantage – more work and more moving parts. But the process that collects and prepares data for scoring will often need to operate in the background anyway, independent of normal application flow.

Exposing model predictions as a service has a number of advantages. Most importantly, it allows teams work more independently and focus on improving the parts of the system that best align with their skill sets. A/B testing of two or more models can be implemented behind the API without touching the application. Having the scoring code run in its own environment also makes it easier to scale. You’ll want to log some or all of the predictions, along with their inputs, for off-line analysis and long-term prediction performance monitoring. Being able to identify the cases where the model’s predictions are the least accurate can be incredibly valuable when looking to improve model performance.

If a latter revision of the model needs to incorporate additional data or prepare existing data in a different way, that work doesn’t need to be prioritized with the application development team. Imagine that you’ve integrated a model directly into an application, but the scoring code needs to be sped up to maintain a good user experience. Your options are much more limited than if you’ve deployed the scoring code as a service. How about adding in that A/B testing capability? Or logging predictions for off line analysis? Even just deploying a retuned version of an existing model will require cross-team coordination.

Modeling within a Service Based Architecture

The model scoring API is the contract between the DS/ML & application development teams. Changing what’s passed into this service or what’s returned back to the application (the API’s ‘signature’) is a dead giveaway that the division of responsibilities on either side of the API was not completely thought through. That is a serious risk to project success. For teams to work independently on subsystems, that contract cannot change. A change to an API signature will require both service producers and service consumers to make changes and will often result in a tighter coupling between the systems – one of the problems we’re trying to avoid in the first place with a service-based approach. And always keep the number of things going into and coming out that API to a bare minimum. The less the API client and server know about each other the better.

Application development teams may be uneasy about relying on such an opaque service. The more opaque a service is the less insight the application team has into a core component of their system. It may be tempting to include a lot of diagnostic information in the API’s response payload. Don’t do it. Instead, focus your efforts on persisting this diagnostic information behind the API. It’s fine to return a token or tracing id that can be used to retrieve this information through another channel at a later point in time, just keep your API signature as clean as possible.

As previously discussed, DS/ML projects are inherently iterative in nature and often require substantial experimentation. At the outset we don’t know exactly which data items will be useful as model inputs. This presents a problem for a service-based architecture. You want to encourage the Data Scientist to build the best model they can, so they’ll need to run a lot of little experiments, each of which could change what the model needs as inputs. So Machine Learning engineers will need to wait a bit until model input requirements settle down enough to the point where they can start to build data acquisition and processing pipelines. But there’s a catch: Waiting too long before building out the API unnecessarily extends timelines.

So how do we solve this? One idea is to work at the data source rather than data item level. The Data Scientist should quickly be able to narrow down the possible source systems from which data will need to acquired, and not long after that know which tables or data structures in those sources are of interest. One useful idiom from the Data Warehousing world is “Touch It, Take It”. This means that if today you know you’ll need a handful of data items from a given table, it’s better to grab everything in that table the first time rather than cherry-pick your first set and then having to open up the integration code each time you identify the need for an additional column. Sure, you’ll be acquiring some data prospectively, but you’ll also be maintaining a complete set of valuable candidate predictors. You’ll thank yourself when building the next version of the model, or different model in that same domain, because the input data will already be available.

Once the Data Scientist has identified the desired tables or data structures, you’ll have a good idea of the universe of data that could potentially be made available to the scoring code behind the API. This is the time to nail down the leanest API signature you can. A customer id goes in and a yes / no decision and a tracing token comes out. That’s it. Once you’ve got a minimal signature defined freeze it – at least until the first version of the model is in production.

Business Rules First

Predictive models are often used to improve on an existing business process built on business rules. If improving a business rule based process, consider the business rules as version zero of the model. They establish the baseline level of performance against which the model will be judged. Consider powering the first version of the API with these business rules. The earlier the end-to-end path can be exercised the better, and having a business rule based version of the model available as a fallback can provide a quick way to roll back the first model without rolling back the architecture.

In Closing

In this post I’ve tried to highlight some of the more important differences I’ve experienced between a traditional software development project and a DS/ML project. I was fortunate enough to be on the lookout for a few of these, but most only came to light in hindsight. DS/ML projects have enough inherent uncertainty; hopefully you’ll be able to use some of the information in this post to avoid some of these pitfalls in your next DS/ML project.

Two Million Mobile Push Notifications in Five Minutes

 

Authors: Arun Selvadhurai, Julian Kim, Sergey Podlazov, Tin Liu, Vinay Yerramilli

In Marketing Tech, one of our jobs is to tell customers about zulily offers.  These days everything and everyone goes mobile, and Mobile Push notifications are a great way to reach customers.

Our team faced a double-sided challenge.  Imagine that you have to ferry passengers across a river. There’ll be times when only one or two passengers show up every second or so, but they need to make it across as soon as possible. Under other circumstances, two million passengers will show up at once, all demanding an immediate transfer to the opposite bank.

One way to solve this is to build a big boat and a bunch of small boats and use them as appropriate.  While this works, the big boat will sit idle most of the time. If we build the big boat only, we will be able to easily handle the crowds, but it will cost a fortune to service individual passengers. Two million small boats alone won’t work either because they will probably take the entire length of the river.

Fortunately, in the world of software we can solve this challenge by building a boat that scales. Unlike the Lambda architecture with two different code paths for real-time and batch processing, an auto-scaling system offers a single code path that can handle one or one million messages with equal ease.

Let’s take a look at the system architecture diagram.

Screen Shot 2018-06-18 at 10.27.14 PM

Campaigns and one-offs are passengers. In the case of a campaign, we have to send potentially millions of notifications in a matter of minutes. One-offs arrive randomly, one at a time.

An AWS Kinesis Stream paired with a Lambda function make a boat that scales. While we do need to provision both to have enough capacity to process the peak loads, we only pay for what we use with Lambda, and Kinesis is dirt-cheap.

We also ensure that the boat doesn’t ferry the same passenger multiple times, which would result in an awful customer experience (just imagine having your phone beep every few minutes). To solve this problem, we built a Frequency Cap service on top of Redis, which gave us a response time under 50ms per message.  Before the code attempts to send a notification, it checks with the Frequency Cap service if the send has already been attempted.  If it has, the message is skipped. Otherwise, it is marked as “Send Attempted”. It’s important to note that the call to the Frequency Cap API is made before an actual send is attempted.  Such a sequence prevents the scenario where we send the message and fail to mark it accordingly due to a system failure.

Another interesting challenge worth explaining is to how we line up millions of passengers to board the boat efficiently. Imagine that they all arrive without a ticket, and the ticketing times vary. Yet, the board departs at an exact time that cannot be changed.  We solve for this by ticketing in advance (the Payload Builder EMR service) and gathering passengers in a waiting area (files in S3). At an exact time, we open multiple doors from the waiting area (multithreading in the Kinesis Loader Java service), and the passengers make their way onto the boat (Kinesis Stream). The Step Function AWS service connects the Payload Builder and a Kinesis Loader into a workflow.

In summary, we built a system that can handle one or one million Mobile Push notifications with equal ease. We achieved this by combining batch and streaming architecture patterns and adding a service to prevent duplicate sends. We also did some cool stuff in the Payload Builder service to personalize each notification so check back in a few weeks for a new post on that.

Practical A/B Testing

Introduction

A/B testing is essential to how we operate a data-driven business at zulily. We use it to assess the impact of new features and programs before we roll them out. This blog post focuses on some of the more practical aspects of A/B testing. It is divided into four parts. It begins with an introduction to A/B testing and how we measure long-term impact. Then, it moves into the A/B splitting mechanism. Next, it turns to Decima, our in-house A/B test analysis platform. Finally, it goes behind the scenes and describes the architecture of Decima.

A/B Testing

A/B Testing Basics

In A/B testing, the classic example is changing the color of a button. Say a button is blue, but a PM comes along with a great idea: What would happen if we make it green instead? The blue button is version A, the current version, the control. The green button is version B, the new version, the test. We want to know: Is the green button as awesome as we think? Is it a better experience for our users? Does it lead to better outcomes for our business? To find out, we run an A/B test. We randomly assign some users to see version A and some to see version B. Then we measure a few key outcome metrics for the users in each group. Finally, we use statistical analysis to compare those metrics between the two groups and determine whether the results are significant.

Statistical significance is a formal way of measuring whether a result is interesting. We know that there is natural variability in our users. Not everyone behaves exactly the same way. So, we want to check if the difference between A and B could just be due to chance. Pretend we ran an A/A test instead. We randomly split the users into two groups, but everyone gets the blue button. There is a range of differences (close to zero) that we could reasonably expect to see. When the results of the A/B test are statistically significant, it means they would be highly unusual to see under an A/A test. In that case, we would conclude that the green button did make a difference.

ab_testing_basics

Figure 1. A/B testing – Split users and assign to version A or B. Measure behavior of each group. Use statistical analysis to compare.

Cumulative Outcome Metrics

To shop on zulily, users have to create an account. Requiring our users to be signed in is great for A/B testing, and for analytics in general. It means we can tie together all of a user’s actions via their account id, even if they switch browsers or devices. This makes it easy to measure long-term behaviors, well beyond a single session. And, since we can measure them, we can A/B test for them.

One of the common outcomes we measure at zulily is purchasing. A short-term outcome would be: How much did this user spend in the session when they saw the blue or green button? A long-term outcome would be: How much did the user spend during the A/B test? Whenever a user sees the control or test experience, we say they were exposed. A user can be exposed repeatedly over the course of a test. We accumulate outcome metrics from the first exposure through the end of the test. By measuring cumulative outcomes, we can better understand long-term impact and not be distracted by novelty effects.

cumulative_outcomes

Figure 2. Cumulative outcome metrics – Measure each user’s behaviors from their first exposure forward. Users can be exposed multiple times – focus on the first time. Do not count the user’s behaviors before their first exposure.

Lift

Usually, A/B test analysis measures the difference between version B and version A. For an outcome metric x, the difference between test and control is xB – xA. This difference, especially for cumulative outcomes, can increase over time. Consider the example of spend per exposed user. As the A/B test goes on, both groups keep purchasing and accumulating more spend. Version B is a success if the test group’s spend increases faster than the control’s.

Instead of difference, we measure the lift of B over A. Lift scales the difference by the baseline value. For an outcome metric x, the lift of test over control is (xB – xA) / xA * 100%. We have found that lift for cumulative metrics tends to be stable over time.

lift_spend_difference

Figure 3. Lift over time – Cumulative behaviors increases over time for both A and B, so the difference between them grows too. The lift tends to stay constant, making it a better summary of the results.

Power Analysis

Before starting an A/B test, it is good to ask two questions: What percent of users should get test versus control? and How long will the test need to run? The formal statistical way of answering these questions is a power analysis. First, we need to know what is the smallest difference (or lift) that would be meaningful to the business. This is called the effect size. Second, we need to know how much the outcome metric typically fluctuates. The power analysis calculates the sample size, the number of users needed to detect this size of effect with statistical significance.

There are two components to using the sample size. The split is the fraction of users in test versus control, and this impacts the sample size needed. The time for the test is however long it will take to expose that many users. Since users can come back and be exposed again, the cumulative number exposed will grow more slowly as time goes on. Purely mathematically, the more unbalanced the split (the further from 50-50 in either direction), the longer the test. Likewise, the smaller the effect size, the longer the test.

Size + Time – Practical Considerations

Often the power analysis doesn’t tell the whole story. For example, at zulily we have a strong weekly cycle – people shop differently on weekends from weekdays. We always recommend running A/B tests for at least one week, and ideally in multiples of seven days. Of course, if the results look dramatically negative after the first day or two, it is fine to turn off the test early.

The balance of the split affects the length of the test run, but we also consider the level of risk. If we have a big program with lots of moving parts, we might start with 90% control, 10% test. On the flip side, if we want to make sure an important feature keeps providing lift, we might maintain a holdout with 5% control, 95% test. But, if we have a low risk test, such as a small UI change, a split at 50% control, 50% test will mean shorter testing time.

A/B Split

Goals for the A/B Split

There are three key properties that any splitting strategy should have. First, the users should be randomly assigned to treatments. That way, all other characteristics of the users will be approximately the same for each of the treatment groups. The only difference going in is the treatment, so we can conclude that any differences coming out were caused by the treatment. Second, the treatments for each A/B test should be assigned independently from all other A/B tests. That way, we can run many A/B tests simultaneously and not worry about them interfering with each other’s results. Of course, it wouldn’t make sense to apply two conflicting tests to the same feature at the same time. Third, the split should be reproducible. The same user should always be assigned to the same treatment of a test. The treatment shouldn’t vary randomly from page to page or from visit to visit.

Our Strategy

At zulily, our splitting strategy is to combine the user id with the test name and apply a hash function. The result is a bucket number for that user in that test. We often set up more buckets than treatments. This provides the flexibility to start with a small test group and later increase it by moving some of the buckets from control to test.

Our splitting strategy has all three key properties. First, the hash produces pseudo-random bucketing. Second, by including the test name, the user will get independent buckets for different tests. Third, the bucket is reproducible because the hash function is deterministic.

The hash is very fast to compute, so developers don’t have to worry about the A/B split slowing down their code. To implement a test, at the decision point in the code the developer places a call to our standard test lookup function with the test name and user id. It returns the bucket number and treatment name, so the user can be directed to version A or version B. Behind the scenes, the test lookup function generates a clickstream log with the test name, user id, timestamp, and bucket. We on the Data Science team use the clickstream records to know exactly who was exposed to which test when and which treatment they were assigned.

Audience v. Exposure

There are two main ways to assign users to an A/B test: using an audience or exposure. In an audience-based test, before the test launches we create an audience – a group of users who should be in the test – and randomly split them into control and test. Then we measure all of those users’ behavior for the entire test period. This is straightforward but imprecise. Not everyone in the audience will actually be touched by the A/B test. The results are statistically valid, but it will be more difficult to detect an effect due to the extra noise.

Instead, we prefer exposure-based testing. The user is only assigned to a treatment when they reach the feature being tested. The number of exposed users increases as the test runs. The only users in the analysis are those who could have been impacted by the A/B test, so it is easier to detect a lift. In addition, we only measure the cumulative outcomes starting from each user’s first exposure. This further refines the results by excluding anything a user might have done before they had a chance to be influenced by the test.

audience_v_exposure.png

Figure 4. Audience v exposure – While both statistically valid, exposure-based tests avoid sources of noise and can detect smaller effects.

Decima UI

A Bit of Roman Mythology

The ancient Romans had a concept of the Three Fates. These were three women who control each mortal’s thread of life. First, Nona spins the thread, then Decima measures it, and finally Morta cuts it when the life is over. We named our A/B test analysis system Decima because it measures all of the live tests at zulily.

three_fates

Figure 5. Three Fates – In ancient Roman mythology the Three Fates control the thread of life. Decima’s role is to measure it.

Decima UI

The Decima UI is the face of the system to internal users. These include PMs, analysts, developers, or anyone interested in the results of an A/B test. It has two main sections: the navigation and information panel and the results panel. Figure XX shows a screenshot of Decima displaying a demo A/B test.

demo_test_decima

Figure 6. Decima UI – At zulily, Decima displays the results of A/B tests. The left panel is for navigation and information. The main panel shows the results for each outcome metric.

Navigation + Information

The navigation and information panel is on the left. A/B tests are organized by Namespace or area of the business. Within a namespace, the Experiment drop-down lists the names of all live tests. The Platform drills down to just exposures and outcomes that occurred on that platform or group of platforms (all-apps, mobile-web, etc). The Segmentation drills down to users in a particular segment (new vs existing, US vs international, etc).

The date information shows the analysis_start_date and analysis_end_date. The results are for exposures and outcomes that occurred in this date range, inclusive. The n_days shows the length of the date range. The analysis_run_date shows the timestamp when the results were computed. For live tests, the end date is always yesterday and the run date is always this morning.

Results

The main panel displays the results for each outcome metric. We analyze whether the lift is zero or statistically significantly different from zero. If a lift is significant and positive, it is colored green. If it is significant and negative, it is colored orange. If it is flat, it is left gray. The plot shows the estimated lift and its 95% confidence interval. It is easy to see whether or not the confidence interval contains zero.

The table shows the average (or proportion for a binary outcome), standard deviation, and sample size for each treatment group. Based on the statistical analysis, it shows the estimated lift, confidence interval bounds, and p-value for comparing each test group to the control.

demo_test_single_metric

Figure 7. Single metric results – Zoom in one metric in the Decima UI. The plot shows the 95% confidence interval for lift. The table shows summary numbers and statistical results.

Common Metrics

We use a variety of outcome metrics depending on the goal of the new feature being tested. Our core metrics include purchasing and visiting behaviors. Specifically, spend per exposed approximates the impact of the test to our top-line. For each exposed user, we measure the cumulative spend (possibly zero) between their first exposure date and the analysis end date. Then we average this across all users for each treatment group. Spend per exposed can be broken down into two components: chance of purchase and spend per purchaser. Sometimes a test might cause more users to purchase but spend lower amounts, or vice versa. Spend per exposed combines the two to capture the overall impact. Revisit rate measures the impact of the test to repeat engagement. For each exposed user, we count the number of days they came back after their first exposure date. We have found that visit frequency is a strong predictor of future behaviors, months down the road.

common_metrics.png

Figure 8. Common outcome metrics. Spend per exposed can be broken into chance of purchase and demand per purchaser. Revisit rate is a proxy for long-term behavior.

Decima Architecture

Three Modules of Decima

Decima is comprised of three main modules. Each is named after a famous contributor to the field that corresponds to its role. Codd invented the relational database model, so the codd module assembles the user-level dataset from our data warehouse. Gauss was an influential statistician (the Gaussian or Normal distribution is named after him), so the gauss module performs the statistical analysis. Tufte is considered a pioneer in data visualization, so the tufte module displays the results in the Decima UI. Decima runs in Google Compute Engine (GCE), with a separate Docker container for each module.

Codd

The codd module is in charge of assembling the dataset. It is written in Python. It uses recursive formatting to compose the query out of parameterized query components, filling values for the dates, test name, etc. Then it submits the query to the data warehouse in Google BigQuery and exports the resulting dataset to Google Cloud Storage (GCS).

codd

Figure 9. Codd – The codd module of Decima does data assembly.

Gauss

The gauss module takes care of the statistical analysis. It is written in R. It imports the dataset produced by codd from GCS into a data.table. It loops through the outcome metrics and performs the statistical test for lift for each one using speedglm. It also loops through platforms and segmentations to generate results for the drill downs. Finally, it gathers all the results and writes them out to a file in GCS.

gauss

Figure 10. Gauss – The gauss module of Decima does statistical analysis.

Tufte

The tufte module serves the result visualizations. It is also written in R. It imports the results file produced by gauss from GCS. It creates the tables and plots for each metric in the test using ggplot2. It displays them in an interactive UI using shiny. The UI is hosted in GCE and can be accessed by anyone at zulily.

tufte

Figure 11. Tufte – The tufte module of Decima does data visualization.

Decima Meta

The fourth module of Decima is decima-meta. It doesn’t contain any software, just queries and configuration files. The queries are broken down into reusable pieces. For example, the exposure query and outcome metrics query can be mixed and matched. Each query piece has parameters for frequently changed values, such as dates or test ids. The configuration files are written in JSON and there is one per A/B test. They specify all the query pieces and parameters for codd, as well as the outcome metrics for gauss. The idea is: running an A/B test analysis should be as easy as adding a configuration file for it.

About the Author

Julie Michelman is a Data Scientist at zulily. She designs and analyzes A/B tests, utilizing Decima, the in-house A/B test analysis tool she helped build. She also builds machine learning models that are used across the business, including marketing, merchandising, and the recommender system. Julie holds a Master’s in Statistics from the University of Washington.

Image Sources

Figure 1. https://www.freepik.com/free-icon/multiple-users-silhouette_736514.htm, https://en.wikipedia.org/wiki/Normal_distribution
Figure 5. http://bytesdaily.blogspot.com/2015/12/some-december-trivia.html
Figure 9. https://en.wikipedia.org/wiki/Edgar_F._Codd
Figure 10. https://en.wikipedia.org/wiki/Carl_Friedrich_Gauss
Figure 11. https://en.wikipedia.org/wiki/Edward_Tufte

Calculating Ad Performance Metrics in Real Time

Authors: Sergey Podlazov, Rahul Srivastava

zulily is a flash sales company.  We post a product on the site, and puff… it’s gone in 72 hours.  Online ads for those products come and go just as fast, which doesn’t leave us much time to manually evaluate the performance of the ads and take corrective actions if needed.  To optimize our ad spend, we need to know in real-time how each ad is doing, and this is exactly what we engineered.

While we track multiple metrics to measure impact of an ad, I am going to focus on one that provides a good representation of the system architecture.  This is an engineering blog after all!

The metric in question is Cost per Total Activation, or CpTA in short.  The formula for the metric is this:  divide the total cost of the ad by the number of customer activations.  We call the numerator in this formula “spend” and refer to the denominator as an “activation”.  For example, if an ad costs zulily $100 between midnight and 15:45 PST on January 31 and results in 20 activations, the CpTA for this ad as of 15:45 PST is $100/20 = $5.

Here’s how zulily collects this metric in real-time.  For the sake of simplicity, I will skip archiving processes that are sprinkled on top the architecture below.

Screen Shot 2018-01-30 at 6.22.22 PM

The source of the spend for the metric is an advertiser API, e.g. Facebook.  We’ve implemented a Spend Producer (in reference to the Producer-Consumer model) that queries the API every 15 minutes for live ads and pushes the spend into a MongoDB.  Each spend record has a tracking code that uniquely identifies the ad.

The source for the activations is a Kafka stream of purchase orders that customers place with zulily.  We consume these orders and throw them into an AWS Kinesis stream.  This gives us the ability to process and archive the orders without causing an extra strain on Kafka.  It’s important to note that relevant orders also have the ad’s tracking code, just like the spend.  That’s the link that glues spend and activations together.

The Activation Evaluator application examines each purchase and determines if the purchase is an activation.  To do that, it looks up the previous purchase in a MongoDB collection for the customer Id on the purchase order.  If the most recent transaction is non-existent or older than X days, the purchase is an activation.  The Activation Evaluator updates the customer record with the date of the new purchase.  To make sure that we don’t drop any data if the Activation Evaluator runs into issues, we don’t move the checkpoint in the Kinesis stream until the write to Mongo is confirmed.

The Activation Evaluator sends evaluated purchases into another Kinesis stream.  Chaining up Kinesis stream is a pretty common pattern for AWS applications, as it allows for the separation of concern and makes the whole system more resilient to failure of individual components.

The Activation Calculator reads the evaluated purchases from the second Kinesis stream and captures them in Mongo.  We index the data by tracking code and timestamp, and voila, a simple count() will return the number of activations for a specified period.

The last step in the process is to take the Spend and divide it by the activations.  Done.

With this architecture, zulily measures a key advertising performance metric every 15 minutes and uses it to pause poorly-performing ads.  The metric also serves as an input for various Machine Learning models, but more on those in a future blog post… Stay tuned!!

 

 

 

From Cart Pick to Put Walls

A critical part of any e-commerce company is getting product to its customers. While many of the customer experience discussions that you hear about companies focus on their website and apps or customer service and support, we often forget to think about those companies delivering their customers’ products when the company said they would. This part of the promise made (or implied) for customers is critical for building trust and providing a great end-to-end customer experience. Most large e-commerce companies operate — or pay someone else to operate — one or more “fulfillment centers”, which is where products are stored and combined with other items that need to be sent to the customer. zulily’s unique business model means we work with both big brands and boutique, smaller vendors with a variety of different capabilities, and so our products are inspected for quality, frequently bagged to keep clothing from getting dirty and often need barcoding (as many smaller vendors may not have them). The quality of zulily’s fulfillment processes drives our ability to deliver on our promises to customers and zulily’s software drives those fulfillment processes.

All fulfillment center systems start with a few basic needs: be able to receive products in from vendors, store products in a way that they can be later retrieved, and ship the product to customers. “Shipping product out,” also known as “outbound” is the most expensive operation inside the fulfillment center, so we have invested heavily in making it efficient. The problem seems simple at first glance. You gather product for customer shipment, put products in boxes, put labels on the boxes, and hand the box to UPS or USPS, etc.. The trick is making this process as efficient as possible. When zulily first started, each associate would walk the length of the warehouse picking each item and sorting it into 1 of 20 shoebox sized bins they had in their cart with each bin representing a customer shipment. Once all of the shipments had been picked, the picker delivers the completed cart to a packing station. The job of collecting products to be shipped out is known as “picking” and when our warehouse was fairly small, this strategy of one person picking the whole order worked fine. As the company has grown, our warehouses did too – some of our buildings have a million square feet of storage spread over multiple floors. Now these pickers were walking quite a long way in order for just 20 shipments. We could have just increased the size or quantity of the carts, but this is a solution that costs more as the company grows.  In addition, concerns about safety related to pulling more or larger carts and the complexities of taking one cart to multiple floors of a building make this idea impractical, to say the least.

PickCartImage

A pick cart. Each of the 20 slots on the cart represents a single customer shipment. The picker, guided by an app on a mobile device, walks the storage area until they’ve picked all of the items for the 20 shipments. We call this process “pick to shipment” because no further sorting is necessary to make sure each shipment is fully assembled.

We needed a solution that would allow pickers to spend less time walking between bins and more time picking items from those bins. We have developed a solution such that the picking software tries to keep a given picker within a zone of 10-20 storage aisles and invested in a conveyor system to carry the picked items out of the picking locations. The picker focuses on picking everything that can be picked within their zone and there’s no need for a picker to leave a zone unless they are needed in another zone. The biggest difference from the old model is that the picker is no longer assembling complete shipments. If you ordered a pair of shoes and a t-shirt from zulily, it’s unlikely that those two items would be found in the same zone due to storage considerations. Instead of an individual picker picking for 20 orders, we now have one picker picking for many orders at the same time, but staying within a certain physical area of the building. This is considerably more efficient for the pickers, but it means that we now needed a solution to assemble these zone picks into customer shipments.

PickToteImage

The picker picks for multiple shipments into a single container. Because the sorting into customer shipments happens later, this solution is called “pick to sort”.

In order to take the efficiently picked items and sort them into the right order to be sent to our customers, we have implemented a sorting solution that uses a physical solution we call a “put wall”. A put wall looks like a large shelf with no back divided into sections (called “slots”), each measuring about one foot cubed. Working at these put walls is an employee (called a “putter”) whose job is to take products from the pick totes and sort them into a slot in that put wall. Each slot in the wall is assigned to a shipment. Once all the products needed for a given shipment have been put into the slot, an indicator light on the other side of the wall lets a packer know that the shipment is ready to be placed into a box and shipped out to our customer. In larger warehouses, having just one put wall is not practical because putters would end up having to move too much distance and all the efficiency gained in packing would be lost on the putting side, so defining an appropriate size for each put wall is critical. This creates an interesting technical challenge as we have to make sure that the right products all end up in the put wall at the right time. Our picking system has to make sure that once we start picking a shipment to a wall that all the other products for that shipment also go to that wall as quickly as possible. This challenge is made more difficult by the physical capacity of the put walls. We need to limit how much is going to the wall to avoid a situation where there is no slot for a new shipment to go. We also have to make sure that each of the walls have enough work so we don’t have idle associates. When selecting shipments to be picked, we must include shipments that are due out today, but also include future work to make the operation efficient.  To do this, we have pickers rotate picking against different put walls to make sure that they get an even spread of work. A simple round-robin rotation would be naive, since throughput of the put walls is determined by humans with a wide range of different work rates. In order to solve this problem, we turn to control theory to help us select a put wall for a picker based on many of the above requirements. We also need to make sure that when the first product shows up for a shipment there is room in the wall for it.

PutterImage

As totes full of picked items are conveyed to the put wall, a putter scans each item and puts them into a slot representing a customer shipment. He is guided by both his mobile device and flashing lights on the put wall which guide him to the correct slot.

As we scaled up our operation, we initially saw that adding more pickers and put walls was not providing as much gain in throughput as we expected. In analyzing the data from the system, we determined that one of the problems was how we were selecting our put walls. Our initial implementation would select a wall for a shipment based on that wall having enough capacity and need. The problem with this approach is that we didn’t consider the makeup of each of the shipments. If you imagine a shipment that is composed of multiple products spread throughout the warehouse, you have situations where a picker has to walk through their zone N times, where N is the number of put walls we are using at any given time. As we turn on more and more put walls, that picker will have to walk through the zone that many more times. We realized that if we can create some affinity between zones and walls, we can limit the amount of put walls that a picker needs to pick and make them more efficient. We did this by assigning put walls a set of zones and try to make the vast majority of shipments for that put wall come from those zones. While we need to sometimes have larger sets that normal to cover a given shipment, we can overall significantly improve pick performance and increase the overall throughput for putters and packers.

And that’s really just the beginning of the story for a small part of our fulfillment center software suite. As the business grows, we continue to find new ways to further optimize these processes to make better use of our employees’ time and save literally millions of dollars while also increasing our total capacity using the same buildings and people! This is true of most of the software in the fulfillment space – improved algorithms are not just a fun and challenging part of the job, but also critical to the long-term success of our business.
Continue reading

zuFlow– Query Workflow and Scheduling for Google BigQuery

Authors: Matthew Kang, Shailu Mishra, Sudhir Hasbe

In 2014, we made a decision to build our core data platform on Google Cloud Platform and one of the products which was critical for the decision was Google BigQuery. The scale at which it enabled us to perform analysis we knew would be critical in long run for our business. Today we have more than 200 unique users performing analysis on a monthly basis.

Once we started using Google BiqQuery at scale we soon realized our analysts needed better tooling around it. The key requests we started getting were

  1. Ability to schedule jobs: Analysts needed to have ability to run queries at regular intervals to generate data and metrics.
  2. Define workflow of queries: Basically analysts wanted to run multiple queries in a sequence and share data across them through temp tables.
  3. Simplified data sharing: Finally it became clear teams needed to share this data generated with other systems. For example download it to leverage in R programs or send it to another system to process through Kafka.

zuFlow Overview

zuFlow is zulily’s  a query workflow and scheduling solution for Google BigQuery. There are few key concepts

  • Job: Job is a executable entity that encompasses multiple queries with a schedule.
  • Query: SQL statement that can be executed on Google BigQuery
  • Keyword: Variable defined to be used in the queries
  • Looper: Ability to define loops like foreach statements.

High Level Design

image

zuFlow is a web application that enables users to setup jobs and run them either on demand or based on a schedule.

  • We use Django with NGINX for handling our web traffic.
  • We leverage Google Cloud SQL for storing config db & keep track of runtime state
  • We have integrated the system with off the shelf open source scheduler called SOS. We do plan to migrate this to Airflow in future.
  • Flowrunner is the brain of the system written in python. It leverages data from config db and executes the queries and stores back the runtime details in the db. Few key capabilities it provides are
    • Concurrency: We have to manage our concurrency to make sure we are not overloading the system
    • Retry: In few scenarios based on error codes we retry the queries
    • Cleanup: It is responsible for cleaning up after the jobs are run including historical data cleanup

zuFlow Experience

Job Viewer: Once logged-in you can see your jobs or you can view all jobs in the system

image

Creating Job: You can provide it a name, schedule to run and email address of the owner.

image

Keywords/variables: You can create keywords which you can reuse in your query. This enables analysts to define a parameter and use it in there queries instead of hardcoding values. We also have predefined system keywords for date time handling and making it easier for users to shard tables. Examples:

  • DateTime:  CURRENT_TIMESTAMP_PT, CURRENT_DATE_PT, CURRENT_MONTH_PT, CURRENT_TIME_PT, LAST_RUN_TIMESTAMP_PT, LAST_RUN_TIMESTAMP, LAST_RUN_DATE_PT
    BQ Format Pacific date of the last run of this job (will be CURRENT_DATE_PT on first run)
  • Sharding: *_BQ
    Will provide formatted version of date strings for table shard references (without dashs – YYYYmmdd)

image

Looping: Very soon after our first release we got requests to add loops. This enables users to define variable and loop through the values.

image

Query Definition: Now you are ready to write a Google BigQuery query and define where the output will be stored. There are 4 options

  1. BQ Table: In this case you provide BQ table and decide if you want to replace it or append to it. You can also define the output table as temp table and system will clean it up after execution of job is completed.
  2. CSV: If you pick CSV you need to provide GCS location for output
  3. Cloud SQL(MySQL): You can also export to the Cloud SQL.
  4. Kafka: You can provide Kakfa topic name to publish results as messages.

You can define multiple queries and share data across them through temp tables in BQ.

image

Job Overview: This shows the full definition of the job.

image

We have thought about open sourcing the solution. Please let us know if you are interested in this system

Zome: Real-time Merchant Home Page with Spark Streaming & Kafka

Authors: Bapi Akula, Shailu Mishra, Sudhir Hasbe

zulily is a daily business, we launch our events every day at 6am PST and then most of our sales are in early hours of launching the events. It is critical for our merchants to know what is happening and react to drive more sales. We have significantly improved merchants ability to drive sales by providing them new real-time home page so everyday when they come they can take actions based on the situation.

Historical View:

Historically we had a dashboard for our merchants which was not very useful. It showed them upcoming events and some other info, but when you come every day you want to know what is happening today not tomorrow.

image

New View

We replaced this non actionable home page with a new real-time version which shows merchants real-time sales for there events, conversion rates, real-time inventory, top selling styles and projected styles which would sell out. This enables merchants to talk to vendors to get more inventory or add different products.

image

Technical Design

To build a real-time home page for merchants we had to combine real-time clickstream data (unstructured) with real-time sales (structured) and historical event and product data. Bringing these very different types of data-sets into a single pipeline and in real-time merging/processing them was a challenge.

Real-time Clickstream & Orders

We have built a high scale real-time collection service called ZIP. It peaks every day around 18k to 20k transactions per second. Our clickstream data & Order data is collected through this service. One of the capabilities of ZIP is to publish this data in real-time to Kafka cluster. This enables other systems to access data that is being collected in near-real-time.

We will describe other capabilities of this service in future post.

Historical data:

We have our data platform running on Google Cloud Platform and includes Google DataProc as our ETL Processing platform  which after processing data stores it in Google Big Query for analytics and in Google Cloud Storage. All our historical data which includes our products, events, prices and orders are stored in Google Big Query and Google Cloud storage.

image

Spark Streaming Processing

We used spark streaming to connect the clickstream and order data collected in Kafka with historical data in GCS using GCS connector provided by Google. This allowed us to create derivative datasets like real-time sales, conversion rates, top sellers which were stored in AWS Aurora DB. AWS Aurora is an amazing database for high scale queries. In future we will write up a post on why Aurora compared to other options.

Data Access through Zata

We then used our ZATA API to access this data from our Merch tools to build amazing UI for our merchants.

Spark Streaming Details

Reading the data from kafka(KafkaUtils.createDirectStream)

Kafka Utils is the object with the factory methods to create input dstreams  and RDD’s from records in topics in Apache Kafka. createDirectStream skips receivers and zookeeper and uses simple API to consume messages.This means it needs to track offsets internally.

So at the beginning of each batch, connector reads partition offsets for each topic from Kafka and uses them to ingest data. To ensure exactly once semantics, it tracks offset information in Spark Streaming checkpoints,

/code:

def getDstreamFromKafka(ssc,topic,kafka_servers):

kafkaStream = KafkaUtils.createDirectStream(ssc,[topic], {“bootstrap.servers”: kafka_servers})

parsed = kafkaStream.map(lambda v:json.dumps(v[1]).replace(‘”,’,'”;’).split(‘,’))

return parsed

 

Reading data from GCS: (TextfileStream)

This method monitors any Hadoop-compatible filesystem directory for new files and when it detects a new file – reads it into Spark Streaming. In our case, we use GCS and streaming job internally uses GCS connector. We pass GCS connector as jar file when invoking the job

/code:

ssc.textFileStream(GCS bucket path)

Merge Values: combineByKey(createCombiner,mergeValue,mergeCombiners):

In SPARK, groupByKey() doesn’t do any local aggregation while computing on the partition’s data, this is where combineByKey() comes in handy.
In combineByKey values are merged into one value at each partition, finally each value from each partition is merged into a single value.So combineByKey is a optimization to groupByKey as we end up sending fewer key value pairs across network

We used combineByKey to calculate aggregations like total sales,average price,demand.Three lambda functions were passes as arguments to this method

combineByKey(createCombiner,mergeValue,mergeCombiners)

createCombiner : The first required argument in the method is a function to be used as the very first aggregation step for each key. This function is invoked only once for every key

mergeValue : This function tells what to do when a combiner is given a new value

mergeCombiners : This Function is called to combine values of a key across multiple partitions

/code:

creCmb = (lambda v:(v[0],float(v[1]),0.0 ,0.0,0.0,v[4],v[5],v[6],v[7],v[8],1) if v[3]==-1 else (v[0],float(v[1]),float(v[1])/float(v[2]) ,float(v[3]),((float(v[1])/float(v[2]))-float(v[3])),v[4],v[5],v[6],v[7],v[8],1))

mrgVal = (lambda x, v:(max(x[0],v[0]),float(x[1])+float(v[1]),(float(x[2]))+0.0,float(x[3])+0.0,float(x[4])+0.0,min(x[5],v[4]), min(x[6],v[5]),min(x[7],v[6]),min(x[8],v[7]), max(x[9],v[8]),int(x[10])+1) if v[3]==-1 else (max(x[0],v[0]),float(x[1])+ float(v[1]),(float(x[2]))+(float(v[1])/float(v[2])),float(x[3])+float(v[3]), float(x[4])+((float(v[1])/float(v[2]))-float(v[3])),min(x[5],v[4]), min(x[6],v[5]),min(x[7],v[6]),min(x[8],v[7]),max(x[9],v[8]), int(x[10])+1))

mrgCmb = (lambda x,y :(max(x[0],y[0]),x[1]+y[1],x[2]+y[2],x[3]+y[3],x[4]+y[4], min(x[5],y[5]),min(x[6],y[6]),min(x[7],y[7]),min(x[8],y[8]),max(x[9],y[9]), int(x[10])+int(y[10])))

combineByKey(creCmb, mrgVal, mrgCmb)

stateful transformations (updateStateBykey() )

We required a framework that supported building knowledge based on both historical and real-time data. Spark Streaming provided just that.
Using stateful functions like updateStateByKey that computes running sum of all the sales we were able to achieve our requirement.
We used updateStateByKey(func) for stateful transformation,
for example : you want to keep track of number of times a customer visited the web page if customer “123” visited twice in the first hour, she visits again in the next hour
aggregated count at the end of second hour should be 3 (includes current batch count and history) so this history state will be in the memory handled by updateStateByKey
Checkpoint mechanism of spark streaming takes care of preserving the state of sales history in memory.
As an additional recovery point, we stored the state in a database
and recovered from the database in case files were cleared from checkpoint during new code deployments or configuration changes.

/code:

soi_batch_agg.updateStateByKey(updateSales)

def updateSales(newstate,oldstate):

# Incase of empty rdd

# If event Product insert timestamp is older than two days then remove from memory

try:

if (oldstate != None) and validate(oldstate[0][-4],’updateSalesFn1′) and (oldstate[0][-4]<((datetime.datetime.now(tz=pytz.utc) – datetime.timedelta(days =2)).astimezone(pytz.timezone(‘US/Pacific’)).strftime(‘%Y-%m-%d %H:%M:%S’))):

oldstate = None

# If event Product event end date is order than current timestamp then remove from memory

if (oldstate != None) and validate(oldstate[0][-3],’updateSalesFn2′) and (oldstate[0][-3] < (datetime.datetime.now(tz=pytz.utc).astimezone(pytz.timezone(‘US/Pacific’)).strftime(‘%Y-%m-%d %H:%M:%S’))):

oldstate = None

if not not newstate:

if oldstate is None:

oldstate = zome_aurora.aurora_get(str(newstate[0][-6]),str(newstate[0][-5]))

else:

print(‘Getting Records from Memory’)

if  not not oldstate:

return [(max(oldstate[0][0],newstate[0][0]),float(oldstate[0][1])+newstate[0][1],float(oldstate[0][2])+newstate[0][2],float(oldstate[0][3])+newstate[0][3],float(oldstate[0][4])+newstate[0][4],max(oldstate[0][5],newstate[0][5]),min(oldstate[0][6],newstate[0][6]),max(oldstate[0][7],newstate[0][7]),min(oldstate[0][8],newstate[0][8]),max(oldstate[0][9],newstate[0][9]),int(oldstate[0][10])+newstate[0][10])]

else:

return newstate

except Exception as e:

sys.exit(1)

Writing to Aurora :

We are not using Jdbc methods that are provided by Spark as we had some performance issues w.r.t connection creation, record insertion and commit.

We went with a approach of creating connection for each partition and do a bulk insert of all records under each partitions and insert all partitions in parallel.

/code:

def sendPartition(iter):

try:

connection=mc.connect(…) //Connect to database

cursor = connection.cursor()

data = []

for record in iter: //Loop through the records

data.append(record)

query = “INSERT INTO …. )”  //Insert into database

#cursor.execute(transaction_isolation_lock)

while Not successful …

try:

cursor.executemany(query, data)

connection.commit()

except Exception as e: //Handle Exception

finally:

cursor.close()

connection.close()

ZATA: How we used Kubernetes and Google Cloud to expose our Big Data platform as a set of RESTful web services

Authors: Shailu Mishra, Sudhir Hasbe

In our initial blog post about zulily big data platform, We briefly talked about ZATA (zulily data access service).Today we want to deep dive into ZATA and explain our thought process and how we built it.

Goals

As a data platform team we had three goals:

  1. Rich data generated by our team shouldn’t be limited to analysts. It should be available for systems & applications via simple and consistent API.
  2. Have the flexibility to change our backend data storage solutions over time without impacting our clients
  3. Zero development time for incremental data driven APIs

ZATA was our solution for achieving our above goals. We abstracted our data platform using a REST-based service layer that our clients could use to fetch datasets. We were able to swap out storage layers without any change for our client systems.

Selecting Data Storage solution

There are three different attributes you have to figure out before you pick a storage technology:

  1. Size of Data: Is it big data or relatively small data? In short, do you need something that will fit in My SQL or do you need to look at solutions like Google Big Query or AWS Aurora?
  2. Query Latency: How fast do you need to respond to Queries? Is it milliseconds or are few seconds OK – especially for large datasets
  3. Data Type: Is it relational data or is it key value pairs or is it complex JSON documents or it is a search pattern?

As an enterprise, we need all combinations of these. The following are choices our team has made over time for different attributes:

  1. Google Big Query: Great for large datasets(in terabytes) but latency is in seconds and supports columnar storage
  2. AWS Aurora: Great for large datasets (in 100s of gigabytes) with very low latency for queries
  3. PostgresXL: Great for large datasets(100s of gigs to terabytes) with amazing performance for aggregation queries. This is very difficult to manage and still early in its maturity cycle. We eventually moved our datasets to AWS Aurora.
  4. Google Cloud SQL, MySQL or SQL Server: For Small datasets(GBs) with real low latency in milliseconds)
  5. Mongo DB or Google Big Table: Good for large scale datasets with low latency document lookup.
  6. Elastic Search: We use Elastic Search for scenarios related to search both fuzzy and exact match.

Zata Architecture

clip_image001

Key runtime components for ZATA are

Mapping Layer

This looks at the incoming URLs and maps them to backend systems. For example: Request: http://xxxxx.zulily.com/dataset/product-offering?eventStartDate=[2013-11-15,2013-12-01]&outputFields=eventId,vendorId,productId,grossUnits maps to

  1. Google Big Query(based on config db mapping for product-offering )
  2. Dataset used is product-offering which is just a view in the Google Big Query system
  3. Where eventStartDate=[2013-11-15,2013-12-01] is transformed to where eventstartDate between 2013-11-15 & 2013-12-01
  4. Output fields that are requested are eventId,vendorId,productId,grossUnit
  5. Query for Google Big Query is:

Select eventId,vendorId,productId,grossUnit from product-offering  where eventStartDate=[2013-11-15,2013-12-01]

The mapping layer decides what mappings to use and how to transform the http request to something that backend will understand. This will be very different for MongoDB or Google Big Table.

Execution Layer

Execution layer is responsible for generating queries using the protocol that the storage engine will understand. It also executes the queries against backend and fetches result sets in an efficient manner. Our current implementation supports various protocols such as mongodb, standard JDBC as well as http request for Google BigQuery, Big Table and elasticsearch.

Transform Layer

This layer is responsible for transforming data coming from any of the backend sources and normalizing it. This allows our clients to be agnostic of storage mechanism in our backend systems. We went JSON as the schema format given how prevalent it is amongst services and application developers

In previous example from Mapping layer the response will be following.

[

{“eventId”: “12345”, “vendorId”: “123”, “productId”: “3456”, “grossUnits”: “10”},

{“eventId”: “23456”, “vendorId”: “123”, “productId”: “2343”, “grossUnits”: “234”},

{“eventId”: “33445”, “vendorId”: “456”, “productId”: “8990”, “grossUnits”: “23”},

{“eventId”: “45566”, “vendorId”: “456”, “productId”: “2343”, “grossUnits”: “88”}

]

API auto discovery

Our third goal was to have zero development time for incremental data driven API. We achieved this by creating an auto discovery service. The job of this service is to regularly poll the backend storage service for changes and automatically add service definitions to the config db. For example, in Google Big query or My SQL, once you add a view in schema called “zata” we automatically add the API to ZATA service. This way the data engineer can keep adding services for dataset they created without anyone writing new code.

API Schema Definition

Schema service enables users to look for all the APIs supported by zata and also view its schema to understand what requests they can send. Clients can get the list of available datasets;

Dataset Request: http://xxxxx.zulily.com/dataset

[
{ “datasetName”: “product-offering-daily”,….},
{ “datasetName”: “sales-hourly”,…………………},
{ “datasetName”: “product-offering “,………….}
]

Schema Request: Then they can drill down to the schema of a selected dataset; http://xxxxx.zulily.com/dataset/product-offering/schema/

[
{ “fieldName”: “eventId”, “fieldType”: “INTEGER” },
{ “fieldName”: “eventStartDate”, “fieldType”: “DATETIME”},
{ “fieldName”: “eventEndDate”, “fieldType”: “DATETIME” },
{ “fieldName”: “vendorId”, “fieldType”: “INTEGER” },
{ “fieldName”: “productStyle”, “fieldType”: “VARCHAR” },
{ “fieldName”: “grossUnits”, “fieldType”: “INTEGER” },
{ “fieldName”: “netUnits”, “fieldType”: “INTEGER” },
{ “fieldName”: “grossSales”, “fieldType”: “NUMERIC” },
{ “fieldName”: “netSales”, “fieldType”: “NUMERIC” }
]

So far, the client is not aware of the location or has any knowledge of the storage system and this makes the whole data story more agile. It is moved from one location to another, or the schema is altered, it will be fine for all downstream system since the access points and the contracts are managed by Zata.

Storage Service Isolation

As we rolled out ZATA over time, we realized the need for storage service isolation. Having a single service support multiple backend storage solutions with different latency requirements didn’t work very well. The slowest backend tends to slow things down for everyone else.

This forced us to rethink about zata deployment strategy. Around the same time, we were experimenting with dockers and using Kubernetes as an orchestration mechanism.

We ended up creating separate docker containers and kubernetes service for each of the backend storage solutions. So we now have a zata-bigquery service which handles all bigquery specific calls. Similary we have a zata-mongo, zata-jdbc and zata-es service. Each of these kubernetes service can be individually scaled based on anticipated load.

In addition to individual kubernetes service, we also created a zata-router service which is essentially nginx hosted in docker. Zata-router service accepts on incoming HTTP requests for zata and based on the nginx config, it routes HTTP traffic to various kubernetes services available in the cluster. The nginx config in zata-router service is dynamically refreshed by polling service to make new APIs discoverable.

clip_image003

ZATA has enabled us to make our data more accessible across the organization while enabling us to move fast and change storage layer as we scaled up.