From Cart Pick to Put Walls

A critical part of any e-commerce company is getting product to its customers. While many of the customer experience discussions that you hear about companies focus on their website and apps or customer service and support, we often forget to think about those companies delivering their customers’ products when the company said they would. This part of the promise made (or implied) for customers is critical for building trust and providing a great end-to-end customer experience. Most large e-commerce companies operate — or pay someone else to operate — one or more “fulfillment centers”, which is where products are stored and combined with other items that need to be sent to the customer. zulily’s unique business model means we work with both big brands and boutique, smaller vendors with a variety of different capabilities, and so our products are inspected for quality, frequently bagged to keep clothing from getting dirty and often need barcoding (as many smaller vendors may not have them). The quality of zulily’s fulfillment processes drives our ability to deliver on our promises to customers and zulily’s software drives those fulfillment processes.

All fulfillment center systems start with a few basic needs: be able to receive products in from vendors, store products in a way that they can be later retrieved, and ship the product to customers. “Shipping product out,” also known as “outbound” is the most expensive operation inside the fulfillment center, so we have invested heavily in making it efficient. The problem seems simple at first glance. You gather product for customer shipment, put products in boxes, put labels on the boxes, and hand the box to UPS or USPS, etc.. The trick is making this process as efficient as possible. When zulily first started, each associate would walk the length of the warehouse picking each item and sorting it into 1 of 20 shoebox sized bins they had in their cart with each bin representing a customer shipment. Once all of the shipments had been picked, the picker delivers the completed cart to a packing station. The job of collecting products to be shipped out is known as “picking” and when our warehouse was fairly small, this strategy of one person picking the whole order worked fine. As the company has grown, our warehouses did too – some of our buildings have a million square feet of storage spread over multiple floors. Now these pickers were walking quite a long way in order for just 20 shipments. We could have just increased the size or quantity of the carts, but this is a solution that costs more as the company grows.  In addition, concerns about safety related to pulling more or larger carts and the complexities of taking one cart to multiple floors of a building make this idea impractical, to say the least.


A pick cart. Each of the 20 slots on the cart represents a single customer shipment. The picker, guided by an app on a mobile device, walks the storage area until they’ve picked all of the items for the 20 shipments. We call this process “pick to shipment” because no further sorting is necessary to make sure each shipment is fully assembled.

We needed a solution that would allow pickers to spend less time walking between bins and more time picking items from those bins. We have developed a solution such that the picking software tries to keep a given picker within a zone of 10-20 storage aisles and invested in a conveyor system to carry the picked items out of the picking locations. The picker focuses on picking everything that can be picked within their zone and there’s no need for a picker to leave a zone unless they are needed in another zone. The biggest difference from the old model is that the picker is no longer assembling complete shipments. If you ordered a pair of shoes and a t-shirt from zulily, it’s unlikely that those two items would be found in the same zone due to storage considerations. Instead of an individual picker picking for 20 orders, we now have one picker picking for many orders at the same time, but staying within a certain physical area of the building. This is considerably more efficient for the pickers, but it means that we now needed a solution to assemble these zone picks into customer shipments.


The picker picks for multiple shipments into a single container. Because the sorting into customer shipments happens later, this solution is called “pick to sort”.

In order to take the efficiently picked items and sort them into the right order to be sent to our customers, we have implemented a sorting solution that uses a physical solution we call a “put wall”. A put wall looks like a large shelf with no back divided into sections (called “slots”), each measuring about one foot cubed. Working at these put walls is an employee (called a “putter”) whose job is to take products from the pick totes and sort them into a slot in that put wall. Each slot in the wall is assigned to a shipment. Once all the products needed for a given shipment have been put into the slot, an indicator light on the other side of the wall lets a packer know that the shipment is ready to be placed into a box and shipped out to our customer. In larger warehouses, having just one put wall is not practical because putters would end up having to move too much distance and all the efficiency gained in packing would be lost on the putting side, so defining an appropriate size for each put wall is critical. This creates an interesting technical challenge as we have to make sure that the right products all end up in the put wall at the right time. Our picking system has to make sure that once we start picking a shipment to a wall that all the other products for that shipment also go to that wall as quickly as possible. This challenge is made more difficult by the physical capacity of the put walls. We need to limit how much is going to the wall to avoid a situation where there is no slot for a new shipment to go. We also have to make sure that each of the walls have enough work so we don’t have idle associates. When selecting shipments to be picked, we must include shipments that are due out today, but also include future work to make the operation efficient.  To do this, we have pickers rotate picking against different put walls to make sure that they get an even spread of work. A simple round-robin rotation would be naive, since throughput of the put walls is determined by humans with a wide range of different work rates. In order to solve this problem, we turn to control theory to help us select a put wall for a picker based on many of the above requirements. We also need to make sure that when the first product shows up for a shipment there is room in the wall for it.


As totes full of picked items are conveyed to the put wall, a putter scans each item and puts them into a slot representing a customer shipment. He is guided by both his mobile device and flashing lights on the put wall which guide him to the correct slot.

As we scaled up our operation, we initially saw that adding more pickers and put walls was not providing as much gain in throughput as we expected. In analyzing the data from the system, we determined that one of the problems was how we were selecting our put walls. Our initial implementation would select a wall for a shipment based on that wall having enough capacity and need. The problem with this approach is that we didn’t consider the makeup of each of the shipments. If you imagine a shipment that is composed of multiple products spread throughout the warehouse, you have situations where a picker has to walk through their zone N times, where N is the number of put walls we are using at any given time. As we turn on more and more put walls, that picker will have to walk through the zone that many more times. We realized that if we can create some affinity between zones and walls, we can limit the amount of put walls that a picker needs to pick and make them more efficient. We did this by assigning put walls a set of zones and try to make the vast majority of shipments for that put wall come from those zones. While we need to sometimes have larger sets that normal to cover a given shipment, we can overall significantly improve pick performance and increase the overall throughput for putters and packers.

And that’s really just the beginning of the story for a small part of our fulfillment center software suite. As the business grows, we continue to find new ways to further optimize these processes to make better use of our employees’ time and save literally millions of dollars while also increasing our total capacity using the same buildings and people! This is true of most of the software in the fulfillment space – improved algorithms are not just a fun and challenging part of the job, but also critical to the long-term success of our business.
Continue reading

zuFlow– Query Workflow and Scheduling for Google BigQuery

Authors: Matthew Kang, Shailu Mishra, Sudhir Hasbe

In 2014, we made a decision to build our core data platform on Google Cloud Platform and one of the products which was critical for the decision was Google BigQuery. The scale at which it enabled us to perform analysis we knew would be critical in long run for our business. Today we have more than 200 unique users performing analysis on a monthly basis.

Once we started using Google BiqQuery at scale we soon realized our analysts needed better tooling around it. The key requests we started getting were

  1. Ability to schedule jobs: Analysts needed to have ability to run queries at regular intervals to generate data and metrics.
  2. Define workflow of queries: Basically analysts wanted to run multiple queries in a sequence and share data across them through temp tables.
  3. Simplified data sharing: Finally it became clear teams needed to share this data generated with other systems. For example download it to leverage in R programs or send it to another system to process through Kafka.

zuFlow Overview

zuFlow is zulily’s  a query workflow and scheduling solution for Google BigQuery. There are few key concepts

  • Job: Job is a executable entity that encompasses multiple queries with a schedule.
  • Query: SQL statement that can be executed on Google BigQuery
  • Keyword: Variable defined to be used in the queries
  • Looper: Ability to define loops like foreach statements.

High Level Design


zuFlow is a web application that enables users to setup jobs and run them either on demand or based on a schedule.

  • We use Django with NGINX for handling our web traffic.
  • We leverage Google Cloud SQL for storing config db & keep track of runtime state
  • We have integrated the system with off the shelf open source scheduler called SOS. We do plan to migrate this to Airflow in future.
  • Flowrunner is the brain of the system written in python. It leverages data from config db and executes the queries and stores back the runtime details in the db. Few key capabilities it provides are
    • Concurrency: We have to manage our concurrency to make sure we are not overloading the system
    • Retry: In few scenarios based on error codes we retry the queries
    • Cleanup: It is responsible for cleaning up after the jobs are run including historical data cleanup

zuFlow Experience

Job Viewer: Once logged-in you can see your jobs or you can view all jobs in the system


Creating Job: You can provide it a name, schedule to run and email address of the owner.


Keywords/variables: You can create keywords which you can reuse in your query. This enables analysts to define a parameter and use it in there queries instead of hardcoding values. We also have predefined system keywords for date time handling and making it easier for users to shard tables. Examples:

    BQ Format Pacific date of the last run of this job (will be CURRENT_DATE_PT on first run)
  • Sharding: *_BQ
    Will provide formatted version of date strings for table shard references (without dashs – YYYYmmdd)


Looping: Very soon after our first release we got requests to add loops. This enables users to define variable and loop through the values.


Query Definition: Now you are ready to write a Google BigQuery query and define where the output will be stored. There are 4 options

  1. BQ Table: In this case you provide BQ table and decide if you want to replace it or append to it. You can also define the output table as temp table and system will clean it up after execution of job is completed.
  2. CSV: If you pick CSV you need to provide GCS location for output
  3. Cloud SQL(MySQL): You can also export to the Cloud SQL.
  4. Kafka: You can provide Kakfa topic name to publish results as messages.

You can define multiple queries and share data across them through temp tables in BQ.


Job Overview: This shows the full definition of the job.


We have thought about open sourcing the solution. Please let us know if you are interested in this system

Zome: Real-time Merchant Home Page with Spark Streaming & Kafka

Authors: Bapi Akula, Shailu Mishra, Sudhir Hasbe

zulily is a daily business, we launch our events every day at 6am PST and then most of our sales are in early hours of launching the events. It is critical for our merchants to know what is happening and react to drive more sales. We have significantly improved merchants ability to drive sales by providing them new real-time home page so everyday when they come they can take actions based on the situation.

Historical View:

Historically we had a dashboard for our merchants which was not very useful. It showed them upcoming events and some other info, but when you come every day you want to know what is happening today not tomorrow.


New View

We replaced this non actionable home page with a new real-time version which shows merchants real-time sales for there events, conversion rates, real-time inventory, top selling styles and projected styles which would sell out. This enables merchants to talk to vendors to get more inventory or add different products.


Technical Design

To build a real-time home page for merchants we had to combine real-time clickstream data (unstructured) with real-time sales (structured) and historical event and product data. Bringing these very different types of data-sets into a single pipeline and in real-time merging/processing them was a challenge.

Real-time Clickstream & Orders

We have built a high scale real-time collection service called ZIP. It peaks every day around 18k to 20k transactions per second. Our clickstream data & Order data is collected through this service. One of the capabilities of ZIP is to publish this data in real-time to Kafka cluster. This enables other systems to access data that is being collected in near-real-time.

We will describe other capabilities of this service in future post.

Historical data:

We have our data platform running on Google Cloud Platform and includes Google DataProc as our ETL Processing platform  which after processing data stores it in Google Big Query for analytics and in Google Cloud Storage. All our historical data which includes our products, events, prices and orders are stored in Google Big Query and Google Cloud storage.


Spark Streaming Processing

We used spark streaming to connect the clickstream and order data collected in Kafka with historical data in GCS using GCS connector provided by Google. This allowed us to create derivative datasets like real-time sales, conversion rates, top sellers which were stored in AWS Aurora DB. AWS Aurora is an amazing database for high scale queries. In future we will write up a post on why Aurora compared to other options.

Data Access through Zata

We then used our ZATA API to access this data from our Merch tools to build amazing UI for our merchants.

Spark Streaming Details

Reading the data from kafka(KafkaUtils.createDirectStream)

Kafka Utils is the object with the factory methods to create input dstreams  and RDD’s from records in topics in Apache Kafka. createDirectStream skips receivers and zookeeper and uses simple API to consume messages.This means it needs to track offsets internally.

So at the beginning of each batch, connector reads partition offsets for each topic from Kafka and uses them to ingest data. To ensure exactly once semantics, it tracks offset information in Spark Streaming checkpoints,


def getDstreamFromKafka(ssc,topic,kafka_servers):

kafkaStream = KafkaUtils.createDirectStream(ssc,[topic], {“bootstrap.servers”: kafka_servers})

parsed = v:json.dumps(v[1]).replace(‘”,’,'”;’).split(‘,’))

return parsed


Reading data from GCS: (TextfileStream)

This method monitors any Hadoop-compatible filesystem directory for new files and when it detects a new file – reads it into Spark Streaming. In our case, we use GCS and streaming job internally uses GCS connector. We pass GCS connector as jar file when invoking the job


ssc.textFileStream(GCS bucket path)

Merge Values: combineByKey(createCombiner,mergeValue,mergeCombiners):

In SPARK, groupByKey() doesn’t do any local aggregation while computing on the partition’s data, this is where combineByKey() comes in handy.
In combineByKey values are merged into one value at each partition, finally each value from each partition is merged into a single value.So combineByKey is a optimization to groupByKey as we end up sending fewer key value pairs across network

We used combineByKey to calculate aggregations like total sales,average price,demand.Three lambda functions were passes as arguments to this method


createCombiner : The first required argument in the method is a function to be used as the very first aggregation step for each key. This function is invoked only once for every key

mergeValue : This function tells what to do when a combiner is given a new value

mergeCombiners : This Function is called to combine values of a key across multiple partitions


creCmb = (lambda v:(v[0],float(v[1]),0.0 ,0.0,0.0,v[4],v[5],v[6],v[7],v[8],1) if v[3]==-1 else (v[0],float(v[1]),float(v[1])/float(v[2]) ,float(v[3]),((float(v[1])/float(v[2]))-float(v[3])),v[4],v[5],v[6],v[7],v[8],1))

mrgVal = (lambda x, v:(max(x[0],v[0]),float(x[1])+float(v[1]),(float(x[2]))+0.0,float(x[3])+0.0,float(x[4])+0.0,min(x[5],v[4]), min(x[6],v[5]),min(x[7],v[6]),min(x[8],v[7]), max(x[9],v[8]),int(x[10])+1) if v[3]==-1 else (max(x[0],v[0]),float(x[1])+ float(v[1]),(float(x[2]))+(float(v[1])/float(v[2])),float(x[3])+float(v[3]), float(x[4])+((float(v[1])/float(v[2]))-float(v[3])),min(x[5],v[4]), min(x[6],v[5]),min(x[7],v[6]),min(x[8],v[7]),max(x[9],v[8]), int(x[10])+1))

mrgCmb = (lambda x,y :(max(x[0],y[0]),x[1]+y[1],x[2]+y[2],x[3]+y[3],x[4]+y[4], min(x[5],y[5]),min(x[6],y[6]),min(x[7],y[7]),min(x[8],y[8]),max(x[9],y[9]), int(x[10])+int(y[10])))

combineByKey(creCmb, mrgVal, mrgCmb)

stateful transformations (updateStateBykey() )

We required a framework that supported building knowledge based on both historical and real-time data. Spark Streaming provided just that.
Using stateful functions like updateStateByKey that computes running sum of all the sales we were able to achieve our requirement.
We used updateStateByKey(func) for stateful transformation,
for example : you want to keep track of number of times a customer visited the web page if customer “123” visited twice in the first hour, she visits again in the next hour
aggregated count at the end of second hour should be 3 (includes current batch count and history) so this history state will be in the memory handled by updateStateByKey
Checkpoint mechanism of spark streaming takes care of preserving the state of sales history in memory.
As an additional recovery point, we stored the state in a database
and recovered from the database in case files were cleared from checkpoint during new code deployments or configuration changes.



def updateSales(newstate,oldstate):

# Incase of empty rdd

# If event Product insert timestamp is older than two days then remove from memory


if (oldstate != None) and validate(oldstate[0][-4],’updateSalesFn1′) and (oldstate[0][-4]<(( – datetime.timedelta(days =2)).astimezone(pytz.timezone(‘US/Pacific’)).strftime(‘%Y-%m-%d %H:%M:%S’))):

oldstate = None

# If event Product event end date is order than current timestamp then remove from memory

if (oldstate != None) and validate(oldstate[0][-3],’updateSalesFn2′) and (oldstate[0][-3] < (‘US/Pacific’)).strftime(‘%Y-%m-%d %H:%M:%S’))):

oldstate = None

if not not newstate:

if oldstate is None:

oldstate = zome_aurora.aurora_get(str(newstate[0][-6]),str(newstate[0][-5]))


print(‘Getting Records from Memory’)

if  not not oldstate:

return [(max(oldstate[0][0],newstate[0][0]),float(oldstate[0][1])+newstate[0][1],float(oldstate[0][2])+newstate[0][2],float(oldstate[0][3])+newstate[0][3],float(oldstate[0][4])+newstate[0][4],max(oldstate[0][5],newstate[0][5]),min(oldstate[0][6],newstate[0][6]),max(oldstate[0][7],newstate[0][7]),min(oldstate[0][8],newstate[0][8]),max(oldstate[0][9],newstate[0][9]),int(oldstate[0][10])+newstate[0][10])]


return newstate

except Exception as e:


Writing to Aurora :

We are not using Jdbc methods that are provided by Spark as we had some performance issues w.r.t connection creation, record insertion and commit.

We went with a approach of creating connection for each partition and do a bulk insert of all records under each partitions and insert all partitions in parallel.


def sendPartition(iter):


connection=mc.connect(…) //Connect to database

cursor = connection.cursor()

data = []

for record in iter: //Loop through the records


query = “INSERT INTO …. )”  //Insert into database


while Not successful …


cursor.executemany(query, data)


except Exception as e: //Handle Exception




ZATA: How we used Kubernetes and Google Cloud to expose our Big Data platform as a set of RESTful web services

Authors: Shailu Mishra, Sudhir Hasbe

In our initial blog post about zulily big data platform, We briefly talked about ZATA (zulily data access service).Today we want to deep dive into ZATA and explain our thought process and how we built it.


As a data platform team we had three goals:

  1. Rich data generated by our team shouldn’t be limited to analysts. It should be available for systems & applications via simple and consistent API.
  2. Have the flexibility to change our backend data storage solutions over time without impacting our clients
  3. Zero development time for incremental data driven APIs

ZATA was our solution for achieving our above goals. We abstracted our data platform using a REST-based service layer that our clients could use to fetch datasets. We were able to swap out storage layers without any change for our client systems.

Selecting Data Storage solution

There are three different attributes you have to figure out before you pick a storage technology:

  1. Size of Data: Is it big data or relatively small data? In short, do you need something that will fit in My SQL or do you need to look at solutions like Google Big Query or AWS Aurora?
  2. Query Latency: How fast do you need to respond to Queries? Is it milliseconds or are few seconds OK – especially for large datasets
  3. Data Type: Is it relational data or is it key value pairs or is it complex JSON documents or it is a search pattern?

As an enterprise, we need all combinations of these. The following are choices our team has made over time for different attributes:

  1. Google Big Query: Great for large datasets(in terabytes) but latency is in seconds and supports columnar storage
  2. AWS Aurora: Great for large datasets (in 100s of gigabytes) with very low latency for queries
  3. PostgresXL: Great for large datasets(100s of gigs to terabytes) with amazing performance for aggregation queries. This is very difficult to manage and still early in its maturity cycle. We eventually moved our datasets to AWS Aurora.
  4. Google Cloud SQL, MySQL or SQL Server: For Small datasets(GBs) with real low latency in milliseconds)
  5. Mongo DB or Google Big Table: Good for large scale datasets with low latency document lookup.
  6. Elastic Search: We use Elastic Search for scenarios related to search both fuzzy and exact match.

Zata Architecture


Key runtime components for ZATA are

Mapping Layer

This looks at the incoming URLs and maps them to backend systems. For example: Request:[2013-11-15,2013-12-01]&outputFields=eventId,vendorId,productId,grossUnits maps to

  1. Google Big Query(based on config db mapping for product-offering )
  2. Dataset used is product-offering which is just a view in the Google Big Query system
  3. Where eventStartDate=[2013-11-15,2013-12-01] is transformed to where eventstartDate between 2013-11-15 & 2013-12-01
  4. Output fields that are requested are eventId,vendorId,productId,grossUnit
  5. Query for Google Big Query is:

Select eventId,vendorId,productId,grossUnit from product-offering  where eventStartDate=[2013-11-15,2013-12-01]

The mapping layer decides what mappings to use and how to transform the http request to something that backend will understand. This will be very different for MongoDB or Google Big Table.

Execution Layer

Execution layer is responsible for generating queries using the protocol that the storage engine will understand. It also executes the queries against backend and fetches result sets in an efficient manner. Our current implementation supports various protocols such as mongodb, standard JDBC as well as http request for Google BigQuery, Big Table and elasticsearch.

Transform Layer

This layer is responsible for transforming data coming from any of the backend sources and normalizing it. This allows our clients to be agnostic of storage mechanism in our backend systems. We went JSON as the schema format given how prevalent it is amongst services and application developers

In previous example from Mapping layer the response will be following.


{“eventId”: “12345”, “vendorId”: “123”, “productId”: “3456”, “grossUnits”: “10”},

{“eventId”: “23456”, “vendorId”: “123”, “productId”: “2343”, “grossUnits”: “234”},

{“eventId”: “33445”, “vendorId”: “456”, “productId”: “8990”, “grossUnits”: “23”},

{“eventId”: “45566”, “vendorId”: “456”, “productId”: “2343”, “grossUnits”: “88”}


API auto discovery

Our third goal was to have zero development time for incremental data driven API. We achieved this by creating an auto discovery service. The job of this service is to regularly poll the backend storage service for changes and automatically add service definitions to the config db. For example, in Google Big query or My SQL, once you add a view in schema called “zata” we automatically add the API to ZATA service. This way the data engineer can keep adding services for dataset they created without anyone writing new code.

API Schema Definition

Schema service enables users to look for all the APIs supported by zata and also view its schema to understand what requests they can send. Clients can get the list of available datasets;

Dataset Request:

{ “datasetName”: “product-offering-daily”,….},
{ “datasetName”: “sales-hourly”,…………………},
{ “datasetName”: “product-offering “,………….}

Schema Request: Then they can drill down to the schema of a selected dataset;

{ “fieldName”: “eventId”, “fieldType”: “INTEGER” },
{ “fieldName”: “eventStartDate”, “fieldType”: “DATETIME”},
{ “fieldName”: “eventEndDate”, “fieldType”: “DATETIME” },
{ “fieldName”: “vendorId”, “fieldType”: “INTEGER” },
{ “fieldName”: “productStyle”, “fieldType”: “VARCHAR” },
{ “fieldName”: “grossUnits”, “fieldType”: “INTEGER” },
{ “fieldName”: “netUnits”, “fieldType”: “INTEGER” },
{ “fieldName”: “grossSales”, “fieldType”: “NUMERIC” },
{ “fieldName”: “netSales”, “fieldType”: “NUMERIC” }

So far, the client is not aware of the location or has any knowledge of the storage system and this makes the whole data story more agile. It is moved from one location to another, or the schema is altered, it will be fine for all downstream system since the access points and the contracts are managed by Zata.

Storage Service Isolation

As we rolled out ZATA over time, we realized the need for storage service isolation. Having a single service support multiple backend storage solutions with different latency requirements didn’t work very well. The slowest backend tends to slow things down for everyone else.

This forced us to rethink about zata deployment strategy. Around the same time, we were experimenting with dockers and using Kubernetes as an orchestration mechanism.

We ended up creating separate docker containers and kubernetes service for each of the backend storage solutions. So we now have a zata-bigquery service which handles all bigquery specific calls. Similary we have a zata-mongo, zata-jdbc and zata-es service. Each of these kubernetes service can be individually scaled based on anticipated load.

In addition to individual kubernetes service, we also created a zata-router service which is essentially nginx hosted in docker. Zata-router service accepts on incoming HTTP requests for zata and based on the nginx config, it routes HTTP traffic to various kubernetes services available in the cluster. The nginx config in zata-router service is dynamically refreshed by polling service to make new APIs discoverable.


ZATA has enabled us to make our data more accessible across the organization while enabling us to move fast and change storage layer as we scaled up.