Realtime Archival of Mongo Collections to BigQuery

Here at zulily, we maintain a wide variety of database solutions.  In this post, we’ll focus on two that we use for two differing needs in Ad Tech: MongoDB for our real-time needs and Google’s BigQuery for our long-term archival & analytics needs. In many cases we end up with data that needs to be represented and maintained in both databases, which causes issues due to their conflicting needs and abilities. While previously we had solved this issue by creating manual batch jobs for whatever collections we wanted represented in BigQuery, it resulted in delays of data appearing in BigQuery and issues with maintaining multiple mostly-similar-but-slightly-different jobs. As such, we’ve developed a new service, called mongo_bigquery_archiver, that automates the process of archiving data to BigQuery with only minimal configuration.

For the configuration of the archiver, we have a collection, ‘bigquery_archiver_settings,’ established on the Mongo server whose contents we want to back up, which is under a custom database ‘config’ on that server. This collection maintains one document for each collection that we want to back up to BigQuery, which serves both as configuration and an up-to-date reference for the status of that backup process. Starting out, the configuration is simple:

{
    "mongo_database" : The Mongo database that houses the collection we want to archive.
    "mongo_collection" : The Mongo collection we want to archive.
    "bigquery_table" : The BigQuery table we want to create to store this data. Note that the table doesn't need to already exist! The archiver can create them on the fly.
    "backfill_full_collection" : Whether to upload existing documents in the Mongo collection, or only upload future ones. This field is useful for collections that may have some junk test data in them starting out, or for quickly testing it for onboarding purposes.
    "dryrun": Whether to actually begin uploading to BigQuery or just perform initial checks. This is mostly useful for the schema creation feature discussed below.
}

And that’s it! When that configuration is added to the collection, if the archiver process is active, it’ll automatically pick up the new configuration and create a subprocess to begin archiving that Mongo collection. The first step is determining the schema to use when creating the BigQuery table. If one already exists or is (optionally) specified in the configuration document manually, it’ll use that. Otherwise, it’ll begin the process of determining the maximum viable schema. By iterating through the entire collection, it analyzes each document to determine what fields exist across the superset of all present documents, coming up with all fields that maintain consistent types across every document and/or exist in some documents and not others, treating ones with the missing fields as null values. In addition, it analyzes subdocuments in the same way, creating RECORD field definitions that similarly load all the valid fields, recurring as necessary based on the depth of the collection’s documents. When complete, it stores the generated maximum viable schema in the configuration document for the user to review and modify as needed, in case there’s extraneous fields that, while possible to upload to BigQuery, would just result in useless overhead. It creates a BigQuery table based on this generated schema, and moves on to the next step.

Here’s an example of the generated schema:

{
    "columns" : [
        {
            "name" : "_id",
            "type" : "STRING",
        }
        {
            "name" : "tracking_code",
            "type" : "INTEGER",
            "mongo_field" : "tracking_code"
        },
        {
            "name" : "midnight_to_timestamp",
            "type" : "RECORD",
            "mongo_field" : "midnight_to_timestamp",
            "fields" : [
                {
                    "name" : "cpta",
                    "type" : "FLOAT"
                },
                {
                    "name" : "activations",
                    "type" : "INTEGER"
                },
                {
                    "name" : "spend",
                    "type" : "FLOAT"
                },
                {
                    "name" : "date_start",
                    "type" : "STRING"
                },
                {
                    "name" : "ad_spend_id",
                    "type" : "STRING"
                }
            ]
        },
        {
            "name" : "created_on_timestamp",
            "type" : "FLOAT",
            "mongo_field" : "created_on_timestamp"
        },
        {
            "name" : "lifetime_to_timestamp",
            "type" : "RECORD",
            "mongo_field" : "lifetime_to_timestamp",
            "fields" : [
                {
                    "name" : "cpta",
                    "type" : "FLOAT"
                },
                {
                    "name" : "activations",
                    "type" : "INTEGER"
                },
                {
                    "name" : "spend",
                    "type" : "FLOAT"
                }
            ]
        }
    ]
}

Next, it uses a new feature added in Mongo 3.6, change streams. A change stream is like a subscription to the OpLog on the Mongo server for the collection in question – whenever an operation comes in that modifies the collection, the subscriber is notified about it. From here, we maintain a subscription for the watched collections, and whenever an update comes in, we process that update to get the current state of the document in Mongo, without querying again since we can configure the change stream to also give us the latest version of the document. By filtering down using the generated schema from before, we can upload the change to the BigQuery table via the BigQuery streaming API, along with the kind of operation it is – create, replace, update, or delete. In case of a delete, we blank all fields but the _id field and log that as a DELETE operation in the table. This table now represents the entire history of operations on the collection in question and can be used to track the state of the collection across time.

Since a full-history table makes it somewhat cumbersome to get the most current version of the data, the archiver automatically creates views on top of all generated tables, using the following query:

SELECT * FROM (
    SELECT *, ROW_NUMBER() OVER(
        PARTITION BY _id ORDER BY time_archived DESC) rank FROM project.dataset_id.table_id
    )
WHERE rank=1 AND operationType != 'delete'

What this query does is determine the latest operation performed on each unique document ID, which is guaranteed to be unique in the Mongo collection. If the latest operation is a DELETE, it doesn’t show any entries for that particular ID in the final query results, otherwise it shows the status of the document as of the latest modification. This results in a view of each unique document exactly as of its latest update. With the minimal latency of the BigQuery streaming API, changes are reflected in BigQuery within seconds of their creation in Mongo, allowing for sophisticated real-time analytics via BigQuery. While BigQuery does not have official SLAs about the performance of the streaming API, we consistently see results uploaded within 3-4 seconds at most via query results. The preview mechanism via the BigQuery UI does not accurately reflect it, but querying the table via a SELECT statement properly shows the results.

Untitled.png

An example of the architecture. Note that normal user access
always hits Mongo and never goes to BigQuery, making the process more efficient.

Thanks to the Archiver, we’ve been able to leverage the strengths of both Mongo and BigQuery in our rapidly-modifying datasets while not having to actively maintain two disparate data loading processes for each system.

We’ve open-sourced the Archiver on Github under the Apache V2 license: https://github.com/zulily/mongo_bigquery_archiver. If you choose to use it for your own needs, check out Google’s for streaming inserts into BigQuery.

Two Million Mobile Push Notifications in Five Minutes

 

Authors: Arun Selvadhurai, Julian Kim, Sergey Podlazov, Tin Liu, Vinay Yerramilli

In Marketing Tech, one of our jobs is to tell customers about zulily offers.  These days everything and everyone goes mobile, and Mobile Push notifications are a great way to reach customers.

Our team faced a double-sided challenge.  Imagine that you have to ferry passengers across a river. There’ll be times when only one or two passengers show up every second or so, but they need to make it across as soon as possible. Under other circumstances, two million passengers will show up at once, all demanding an immediate transfer to the opposite bank.

One way to solve this is to build a big boat and a bunch of small boats and use them as appropriate.  While this works, the big boat will sit idle most of the time. If we build the big boat only, we will be able to easily handle the crowds, but it will cost a fortune to service individual passengers. Two million small boats alone won’t work either because they will probably take the entire length of the river.

Fortunately, in the world of software we can solve this challenge by building a boat that scales. Unlike the Lambda architecture with two different code paths for real-time and batch processing, an auto-scaling system offers a single code path that can handle one or one million messages with equal ease.

Let’s take a look at the system architecture diagram.

Screen Shot 2018-06-18 at 10.27.14 PM

Campaigns and one-offs are passengers. In the case of a campaign, we have to send potentially millions of notifications in a matter of minutes. One-offs arrive randomly, one at a time.

An AWS Kinesis Stream paired with a Lambda function make a boat that scales. While we do need to provision both to have enough capacity to process the peak loads, we only pay for what we use with Lambda, and Kinesis is dirt-cheap.

We also ensure that the boat doesn’t ferry the same passenger multiple times, which would result in an awful customer experience (just imagine having your phone beep every few minutes). To solve this problem, we built a Frequency Cap service on top of Redis, which gave us a response time under 50ms per message.  Before the code attempts to send a notification, it checks with the Frequency Cap service if the send has already been attempted.  If it has, the message is skipped. Otherwise, it is marked as “Send Attempted”. It’s important to note that the call to the Frequency Cap API is made before an actual send is attempted.  Such a sequence prevents the scenario where we send the message and fail to mark it accordingly due to a system failure.

Another interesting challenge worth explaining is to how we line up millions of passengers to board the boat efficiently. Imagine that they all arrive without a ticket, and the ticketing times vary. Yet, the board departs at an exact time that cannot be changed.  We solve for this by ticketing in advance (the Payload Builder EMR service) and gathering passengers in a waiting area (files in S3). At an exact time, we open multiple doors from the waiting area (multithreading in the Kinesis Loader Java service), and the passengers make their way onto the boat (Kinesis Stream). The Step Function AWS service connects the Payload Builder and a Kinesis Loader into a workflow.

In summary, we built a system that can handle one or one million Mobile Push notifications with equal ease. We achieved this by combining batch and streaming architecture patterns and adding a service to prevent duplicate sends. We also did some cool stuff in the Payload Builder service to personalize each notification so check back in a few weeks for a new post on that.

Calculating Ad Performance Metrics in Real Time

Authors: Sergey Podlazov, Rahul Srivastava

zulily is a flash sales company.  We post a product on the site, and puff… it’s gone in 72 hours.  Online ads for those products come and go just as fast, which doesn’t leave us much time to manually evaluate the performance of the ads and take corrective actions if needed.  To optimize our ad spend, we need to know in real-time how each ad is doing, and this is exactly what we engineered.

While we track multiple metrics to measure impact of an ad, I am going to focus on one that provides a good representation of the system architecture.  This is an engineering blog after all!

The metric in question is Cost per Total Activation, or CpTA in short.  The formula for the metric is this:  divide the total cost of the ad by the number of customer activations.  We call the numerator in this formula “spend” and refer to the denominator as an “activation”.  For example, if an ad costs zulily $100 between midnight and 15:45 PST on January 31 and results in 20 activations, the CpTA for this ad as of 15:45 PST is $100/20 = $5.

Here’s how zulily collects this metric in real-time.  For the sake of simplicity, I will skip archiving processes that are sprinkled on top the architecture below.

Screen Shot 2018-01-30 at 6.22.22 PM

The source of the spend for the metric is an advertiser API, e.g. Facebook.  We’ve implemented a Spend Producer (in reference to the Producer-Consumer model) that queries the API every 15 minutes for live ads and pushes the spend into a MongoDB.  Each spend record has a tracking code that uniquely identifies the ad.

The source for the activations is a Kafka stream of purchase orders that customers place with zulily.  We consume these orders and throw them into an AWS Kinesis stream.  This gives us the ability to process and archive the orders without causing an extra strain on Kafka.  It’s important to note that relevant orders also have the ad’s tracking code, just like the spend.  That’s the link that glues spend and activations together.

The Activation Evaluator application examines each purchase and determines if the purchase is an activation.  To do that, it looks up the previous purchase in a MongoDB collection for the customer Id on the purchase order.  If the most recent transaction is non-existent or older than X days, the purchase is an activation.  The Activation Evaluator updates the customer record with the date of the new purchase.  To make sure that we don’t drop any data if the Activation Evaluator runs into issues, we don’t move the checkpoint in the Kinesis stream until the write to Mongo is confirmed.

The Activation Evaluator sends evaluated purchases into another Kinesis stream.  Chaining up Kinesis stream is a pretty common pattern for AWS applications, as it allows for the separation of concern and makes the whole system more resilient to failure of individual components.

The Activation Calculator reads the evaluated purchases from the second Kinesis stream and captures them in Mongo.  We index the data by tracking code and timestamp, and voila, a simple count() will return the number of activations for a specified period.

The last step in the process is to take the Spend and divide it by the activations.  Done.

With this architecture, zulily measures a key advertising performance metric every 15 minutes and uses it to pause poorly-performing ads.  The metric also serves as an input for various Machine Learning models, but more on those in a future blog post… Stay tuned!!

 

 

 

zuFlow– Query Workflow and Scheduling for Google BigQuery

Authors: Matthew Kang, Shailu Mishra, Sudhir Hasbe

In 2014, we made a decision to build our core data platform on Google Cloud Platform and one of the products which was critical for the decision was Google BigQuery. The scale at which it enabled us to perform analysis we knew would be critical in long run for our business. Today we have more than 200 unique users performing analysis on a monthly basis.

Once we started using Google BiqQuery at scale we soon realized our analysts needed better tooling around it. The key requests we started getting were

  1. Ability to schedule jobs: Analysts needed to have ability to run queries at regular intervals to generate data and metrics.
  2. Define workflow of queries: Basically analysts wanted to run multiple queries in a sequence and share data across them through temp tables.
  3. Simplified data sharing: Finally it became clear teams needed to share this data generated with other systems. For example download it to leverage in R programs or send it to another system to process through Kafka.

zuFlow Overview

zuFlow is zulily’s  a query workflow and scheduling solution for Google BigQuery. There are few key concepts

  • Job: Job is a executable entity that encompasses multiple queries with a schedule.
  • Query: SQL statement that can be executed on Google BigQuery
  • Keyword: Variable defined to be used in the queries
  • Looper: Ability to define loops like foreach statements.

High Level Design

image

zuFlow is a web application that enables users to setup jobs and run them either on demand or based on a schedule.

  • We use Django with NGINX for handling our web traffic.
  • We leverage Google Cloud SQL for storing config db & keep track of runtime state
  • We have integrated the system with off the shelf open source scheduler called SOS. We do plan to migrate this to Airflow in future.
  • Flowrunner is the brain of the system written in python. It leverages data from config db and executes the queries and stores back the runtime details in the db. Few key capabilities it provides are
    • Concurrency: We have to manage our concurrency to make sure we are not overloading the system
    • Retry: In few scenarios based on error codes we retry the queries
    • Cleanup: It is responsible for cleaning up after the jobs are run including historical data cleanup

zuFlow Experience

Job Viewer: Once logged-in you can see your jobs or you can view all jobs in the system

image

Creating Job: You can provide it a name, schedule to run and email address of the owner.

image

Keywords/variables: You can create keywords which you can reuse in your query. This enables analysts to define a parameter and use it in there queries instead of hardcoding values. We also have predefined system keywords for date time handling and making it easier for users to shard tables. Examples:

  • DateTime:  CURRENT_TIMESTAMP_PT, CURRENT_DATE_PT, CURRENT_MONTH_PT, CURRENT_TIME_PT, LAST_RUN_TIMESTAMP_PT, LAST_RUN_TIMESTAMP, LAST_RUN_DATE_PT
    BQ Format Pacific date of the last run of this job (will be CURRENT_DATE_PT on first run)
  • Sharding: *_BQ
    Will provide formatted version of date strings for table shard references (without dashs – YYYYmmdd)

image

Looping: Very soon after our first release we got requests to add loops. This enables users to define variable and loop through the values.

image

Query Definition: Now you are ready to write a Google BigQuery query and define where the output will be stored. There are 4 options

  1. BQ Table: In this case you provide BQ table and decide if you want to replace it or append to it. You can also define the output table as temp table and system will clean it up after execution of job is completed.
  2. CSV: If you pick CSV you need to provide GCS location for output
  3. Cloud SQL(MySQL): You can also export to the Cloud SQL.
  4. Kafka: You can provide Kakfa topic name to publish results as messages.

You can define multiple queries and share data across them through temp tables in BQ.

image

Job Overview: This shows the full definition of the job.

image

We have thought about open sourcing the solution. Please let us know if you are interested in this system

Zome: Real-time Merchant Home Page with Spark Streaming & Kafka

Authors: Bapi Akula, Shailu Mishra, Sudhir Hasbe

zulily is a daily business, we launch our events every day at 6am PST and then most of our sales are in early hours of launching the events. It is critical for our merchants to know what is happening and react to drive more sales. We have significantly improved merchants ability to drive sales by providing them new real-time home page so everyday when they come they can take actions based on the situation.

Historical View:

Historically we had a dashboard for our merchants which was not very useful. It showed them upcoming events and some other info, but when you come every day you want to know what is happening today not tomorrow.

image

New View

We replaced this non actionable home page with a new real-time version which shows merchants real-time sales for there events, conversion rates, real-time inventory, top selling styles and projected styles which would sell out. This enables merchants to talk to vendors to get more inventory or add different products.

image

Technical Design

To build a real-time home page for merchants we had to combine real-time clickstream data (unstructured) with real-time sales (structured) and historical event and product data. Bringing these very different types of data-sets into a single pipeline and in real-time merging/processing them was a challenge.

Real-time Clickstream & Orders

We have built a high scale real-time collection service called ZIP. It peaks every day around 18k to 20k transactions per second. Our clickstream data & Order data is collected through this service. One of the capabilities of ZIP is to publish this data in real-time to Kafka cluster. This enables other systems to access data that is being collected in near-real-time.

We will describe other capabilities of this service in future post.

Historical data:

We have our data platform running on Google Cloud Platform and includes Google DataProc as our ETL Processing platform  which after processing data stores it in Google Big Query for analytics and in Google Cloud Storage. All our historical data which includes our products, events, prices and orders are stored in Google Big Query and Google Cloud storage.

image

Spark Streaming Processing

We used spark streaming to connect the clickstream and order data collected in Kafka with historical data in GCS using GCS connector provided by Google. This allowed us to create derivative datasets like real-time sales, conversion rates, top sellers which were stored in AWS Aurora DB. AWS Aurora is an amazing database for high scale queries. In future we will write up a post on why Aurora compared to other options.

Data Access through Zata

We then used our ZATA API to access this data from our Merch tools to build amazing UI for our merchants.

Spark Streaming Details

Reading the data from kafka(KafkaUtils.createDirectStream)

Kafka Utils is the object with the factory methods to create input dstreams  and RDD’s from records in topics in Apache Kafka. createDirectStream skips receivers and zookeeper and uses simple API to consume messages.This means it needs to track offsets internally.

So at the beginning of each batch, connector reads partition offsets for each topic from Kafka and uses them to ingest data. To ensure exactly once semantics, it tracks offset information in Spark Streaming checkpoints,

/code:

def getDstreamFromKafka(ssc,topic,kafka_servers):

kafkaStream = KafkaUtils.createDirectStream(ssc,[topic], {“bootstrap.servers”: kafka_servers})

parsed = kafkaStream.map(lambda v:json.dumps(v[1]).replace(‘”,’,'”;’).split(‘,’))

return parsed

 

Reading data from GCS: (TextfileStream)

This method monitors any Hadoop-compatible filesystem directory for new files and when it detects a new file – reads it into Spark Streaming. In our case, we use GCS and streaming job internally uses GCS connector. We pass GCS connector as jar file when invoking the job

/code:

ssc.textFileStream(GCS bucket path)

Merge Values: combineByKey(createCombiner,mergeValue,mergeCombiners):

In SPARK, groupByKey() doesn’t do any local aggregation while computing on the partition’s data, this is where combineByKey() comes in handy.
In combineByKey values are merged into one value at each partition, finally each value from each partition is merged into a single value.So combineByKey is a optimization to groupByKey as we end up sending fewer key value pairs across network

We used combineByKey to calculate aggregations like total sales,average price,demand.Three lambda functions were passes as arguments to this method

combineByKey(createCombiner,mergeValue,mergeCombiners)

createCombiner : The first required argument in the method is a function to be used as the very first aggregation step for each key. This function is invoked only once for every key

mergeValue : This function tells what to do when a combiner is given a new value

mergeCombiners : This Function is called to combine values of a key across multiple partitions

/code:

creCmb = (lambda v:(v[0],float(v[1]),0.0 ,0.0,0.0,v[4],v[5],v[6],v[7],v[8],1) if v[3]==-1 else (v[0],float(v[1]),float(v[1])/float(v[2]) ,float(v[3]),((float(v[1])/float(v[2]))-float(v[3])),v[4],v[5],v[6],v[7],v[8],1))

mrgVal = (lambda x, v:(max(x[0],v[0]),float(x[1])+float(v[1]),(float(x[2]))+0.0,float(x[3])+0.0,float(x[4])+0.0,min(x[5],v[4]), min(x[6],v[5]),min(x[7],v[6]),min(x[8],v[7]), max(x[9],v[8]),int(x[10])+1) if v[3]==-1 else (max(x[0],v[0]),float(x[1])+ float(v[1]),(float(x[2]))+(float(v[1])/float(v[2])),float(x[3])+float(v[3]), float(x[4])+((float(v[1])/float(v[2]))-float(v[3])),min(x[5],v[4]), min(x[6],v[5]),min(x[7],v[6]),min(x[8],v[7]),max(x[9],v[8]), int(x[10])+1))

mrgCmb = (lambda x,y :(max(x[0],y[0]),x[1]+y[1],x[2]+y[2],x[3]+y[3],x[4]+y[4], min(x[5],y[5]),min(x[6],y[6]),min(x[7],y[7]),min(x[8],y[8]),max(x[9],y[9]), int(x[10])+int(y[10])))

combineByKey(creCmb, mrgVal, mrgCmb)

stateful transformations (updateStateBykey() )

We required a framework that supported building knowledge based on both historical and real-time data. Spark Streaming provided just that.
Using stateful functions like updateStateByKey that computes running sum of all the sales we were able to achieve our requirement.
We used updateStateByKey(func) for stateful transformation,
for example : you want to keep track of number of times a customer visited the web page if customer “123” visited twice in the first hour, she visits again in the next hour
aggregated count at the end of second hour should be 3 (includes current batch count and history) so this history state will be in the memory handled by updateStateByKey
Checkpoint mechanism of spark streaming takes care of preserving the state of sales history in memory.
As an additional recovery point, we stored the state in a database
and recovered from the database in case files were cleared from checkpoint during new code deployments or configuration changes.

/code:

soi_batch_agg.updateStateByKey(updateSales)

def updateSales(newstate,oldstate):

# Incase of empty rdd

# If event Product insert timestamp is older than two days then remove from memory

try:

if (oldstate != None) and validate(oldstate[0][-4],’updateSalesFn1′) and (oldstate[0][-4]<((datetime.datetime.now(tz=pytz.utc) – datetime.timedelta(days =2)).astimezone(pytz.timezone(‘US/Pacific’)).strftime(‘%Y-%m-%d %H:%M:%S’))):

oldstate = None

# If event Product event end date is order than current timestamp then remove from memory

if (oldstate != None) and validate(oldstate[0][-3],’updateSalesFn2′) and (oldstate[0][-3] < (datetime.datetime.now(tz=pytz.utc).astimezone(pytz.timezone(‘US/Pacific’)).strftime(‘%Y-%m-%d %H:%M:%S’))):

oldstate = None

if not not newstate:

if oldstate is None:

oldstate = zome_aurora.aurora_get(str(newstate[0][-6]),str(newstate[0][-5]))

else:

print(‘Getting Records from Memory’)

if  not not oldstate:

return [(max(oldstate[0][0],newstate[0][0]),float(oldstate[0][1])+newstate[0][1],float(oldstate[0][2])+newstate[0][2],float(oldstate[0][3])+newstate[0][3],float(oldstate[0][4])+newstate[0][4],max(oldstate[0][5],newstate[0][5]),min(oldstate[0][6],newstate[0][6]),max(oldstate[0][7],newstate[0][7]),min(oldstate[0][8],newstate[0][8]),max(oldstate[0][9],newstate[0][9]),int(oldstate[0][10])+newstate[0][10])]

else:

return newstate

except Exception as e:

sys.exit(1)

Writing to Aurora :

We are not using Jdbc methods that are provided by Spark as we had some performance issues w.r.t connection creation, record insertion and commit.

We went with a approach of creating connection for each partition and do a bulk insert of all records under each partitions and insert all partitions in parallel.

/code:

def sendPartition(iter):

try:

connection=mc.connect(…) //Connect to database

cursor = connection.cursor()

data = []

for record in iter: //Loop through the records

data.append(record)

query = “INSERT INTO …. )”  //Insert into database

#cursor.execute(transaction_isolation_lock)

while Not successful …

try:

cursor.executemany(query, data)

connection.commit()

except Exception as e: //Handle Exception

finally:

cursor.close()

connection.close()

ZATA: How we used Kubernetes and Google Cloud to expose our Big Data platform as a set of RESTful web services

Authors: Shailu Mishra, Sudhir Hasbe

In our initial blog post about zulily big data platform, We briefly talked about ZATA (zulily data access service).Today we want to deep dive into ZATA and explain our thought process and how we built it.

Goals

As a data platform team we had three goals:

  1. Rich data generated by our team shouldn’t be limited to analysts. It should be available for systems & applications via simple and consistent API.
  2. Have the flexibility to change our backend data storage solutions over time without impacting our clients
  3. Zero development time for incremental data driven APIs

ZATA was our solution for achieving our above goals. We abstracted our data platform using a REST-based service layer that our clients could use to fetch datasets. We were able to swap out storage layers without any change for our client systems.

Selecting Data Storage solution

There are three different attributes you have to figure out before you pick a storage technology:

  1. Size of Data: Is it big data or relatively small data? In short, do you need something that will fit in My SQL or do you need to look at solutions like Google Big Query or AWS Aurora?
  2. Query Latency: How fast do you need to respond to Queries? Is it milliseconds or are few seconds OK – especially for large datasets
  3. Data Type: Is it relational data or is it key value pairs or is it complex JSON documents or it is a search pattern?

As an enterprise, we need all combinations of these. The following are choices our team has made over time for different attributes:

  1. Google Big Query: Great for large datasets(in terabytes) but latency is in seconds and supports columnar storage
  2. AWS Aurora: Great for large datasets (in 100s of gigabytes) with very low latency for queries
  3. PostgresXL: Great for large datasets(100s of gigs to terabytes) with amazing performance for aggregation queries. This is very difficult to manage and still early in its maturity cycle. We eventually moved our datasets to AWS Aurora.
  4. Google Cloud SQL, MySQL or SQL Server: For Small datasets(GBs) with real low latency in milliseconds)
  5. Mongo DB or Google Big Table: Good for large scale datasets with low latency document lookup.
  6. Elastic Search: We use Elastic Search for scenarios related to search both fuzzy and exact match.

Zata Architecture

clip_image001

Key runtime components for ZATA are

Mapping Layer

This looks at the incoming URLs and maps them to backend systems. For example: Request: http://xxxxx.zulily.com/dataset/product-offering?eventStartDate=[2013-11-15,2013-12-01]&outputFields=eventId,vendorId,productId,grossUnits maps to

  1. Google Big Query(based on config db mapping for product-offering )
  2. Dataset used is product-offering which is just a view in the Google Big Query system
  3. Where eventStartDate=[2013-11-15,2013-12-01] is transformed to where eventstartDate between 2013-11-15 & 2013-12-01
  4. Output fields that are requested are eventId,vendorId,productId,grossUnit
  5. Query for Google Big Query is:

Select eventId,vendorId,productId,grossUnit from product-offering  where eventStartDate=[2013-11-15,2013-12-01]

The mapping layer decides what mappings to use and how to transform the http request to something that backend will understand. This will be very different for MongoDB or Google Big Table.

Execution Layer

Execution layer is responsible for generating queries using the protocol that the storage engine will understand. It also executes the queries against backend and fetches result sets in an efficient manner. Our current implementation supports various protocols such as mongodb, standard JDBC as well as http request for Google BigQuery, Big Table and elasticsearch.

Transform Layer

This layer is responsible for transforming data coming from any of the backend sources and normalizing it. This allows our clients to be agnostic of storage mechanism in our backend systems. We went JSON as the schema format given how prevalent it is amongst services and application developers

In previous example from Mapping layer the response will be following.

[

{“eventId”: “12345”, “vendorId”: “123”, “productId”: “3456”, “grossUnits”: “10”},

{“eventId”: “23456”, “vendorId”: “123”, “productId”: “2343”, “grossUnits”: “234”},

{“eventId”: “33445”, “vendorId”: “456”, “productId”: “8990”, “grossUnits”: “23”},

{“eventId”: “45566”, “vendorId”: “456”, “productId”: “2343”, “grossUnits”: “88”}

]

API auto discovery

Our third goal was to have zero development time for incremental data driven API. We achieved this by creating an auto discovery service. The job of this service is to regularly poll the backend storage service for changes and automatically add service definitions to the config db. For example, in Google Big query or My SQL, once you add a view in schema called “zata” we automatically add the API to ZATA service. This way the data engineer can keep adding services for dataset they created without anyone writing new code.

API Schema Definition

Schema service enables users to look for all the APIs supported by zata and also view its schema to understand what requests they can send. Clients can get the list of available datasets;

Dataset Request: http://xxxxx.zulily.com/dataset

[
{ “datasetName”: “product-offering-daily”,….},
{ “datasetName”: “sales-hourly”,…………………},
{ “datasetName”: “product-offering “,………….}
]

Schema Request: Then they can drill down to the schema of a selected dataset; http://xxxxx.zulily.com/dataset/product-offering/schema/

[
{ “fieldName”: “eventId”, “fieldType”: “INTEGER” },
{ “fieldName”: “eventStartDate”, “fieldType”: “DATETIME”},
{ “fieldName”: “eventEndDate”, “fieldType”: “DATETIME” },
{ “fieldName”: “vendorId”, “fieldType”: “INTEGER” },
{ “fieldName”: “productStyle”, “fieldType”: “VARCHAR” },
{ “fieldName”: “grossUnits”, “fieldType”: “INTEGER” },
{ “fieldName”: “netUnits”, “fieldType”: “INTEGER” },
{ “fieldName”: “grossSales”, “fieldType”: “NUMERIC” },
{ “fieldName”: “netSales”, “fieldType”: “NUMERIC” }
]

So far, the client is not aware of the location or has any knowledge of the storage system and this makes the whole data story more agile. It is moved from one location to another, or the schema is altered, it will be fine for all downstream system since the access points and the contracts are managed by Zata.

Storage Service Isolation

As we rolled out ZATA over time, we realized the need for storage service isolation. Having a single service support multiple backend storage solutions with different latency requirements didn’t work very well. The slowest backend tends to slow things down for everyone else.

This forced us to rethink about zata deployment strategy. Around the same time, we were experimenting with dockers and using Kubernetes as an orchestration mechanism.

We ended up creating separate docker containers and kubernetes service for each of the backend storage solutions. So we now have a zata-bigquery service which handles all bigquery specific calls. Similary we have a zata-mongo, zata-jdbc and zata-es service. Each of these kubernetes service can be individually scaled based on anticipated load.

In addition to individual kubernetes service, we also created a zata-router service which is essentially nginx hosted in docker. Zata-router service accepts on incoming HTTP requests for zata and based on the nginx config, it routes HTTP traffic to various kubernetes services available in the cluster. The nginx config in zata-router service is dynamically refreshed by polling service to make new APIs discoverable.

clip_image003

ZATA has enabled us to make our data more accessible across the organization while enabling us to move fast and change storage layer as we scaled up.

Helping Moms make purchase decisions

zulily’s unique needs

“Something special every day”  is our motto at zulily. Thousands of new products go live every morning. Most of these products are available for 72 hours and a good number of them often sell out before the sales event ends! Many of our engaged customers visit us every single day to discover what’s new. We want to make sure our customers don’t miss out on any items or events that may be special to them, while also giving them more confidence in their purchase decisions. A traditional eCommerce ratings and reviews model could help, but is not the best fit for zulily’s unique business model which offers customers a daily assortment of new products, for a limited amount of time. We needed a different approach.

Our solution

Our specific business requirements demanded a more real time and community-oriented approach. We came up with a set of signals that highlighted the social proof and scarcity. Signals like “Only X left” and “Almost Gone” were designed to encourage users to act on a product that they are interested in before it is gone. Signals like “Popular”, “X people viewing” and “Y just sold” were intended to give users more confidence in their purchase decision. We were quickly able to bring these signals to life, thanks to our real time big data platform. These signals were shown on our product pages and the shopping cart.

Product page

Shopping cart

Results

We tested the feature out on our web and m-web experiences. The results turned out to be better than our most optimistic expectations! It was interesting to note that the feature was almost twice as effective on the mobile device compared to desktop. In hindsight it made a lot of sense as our customers have a lot of small sessions on the mobile devices during the day and this information helped them make timely decisions. The social and scarcity signals turned out to be a perfect complement to zulily’s unique business model.

Leveraging Google Cloud Dataflow for clickstream processing

Clickstream is the largest data set at zulily. As mentioned in zulily’s big data platform we store all our data in Google cloud storage and use Hadoop cluster in Google Compute Engine to process it. Our data size has been growing at a significant pace, this required us to reevaluate our design. The dataset that was growing fastest was our clickstream dataset.

We had to make a decision to either increase the size of cluster drastically or try something new. We came across Cloud Dataflow and started evaluating it as an alternative.

Why did we move clickstream processing to Cloud Dataflow?

  1. Instead of increasing the size of the cluster to manage peak loads of clickstream Cloud Dataflow gave us ability to have an on-demand cluster that could dynamically grow or shrink reducing costs
  2. Clickstream data was growing very fast and the ability to isolate it from other operational data processing would help the whole system in the long run
  3. Clickstream data processing did not require complex correlations and joins which are a must in some of the other datasets – this made the transition easy.
  4. Programming model was extremely simple and transition from development perspective was easy
  5. Most importantly, we have always thought about our data processing clusters as transient entities, which meant dropping another entity in this case Cloud Dataflow which was different from Hadoop was not a big deal. Our data would still reside in GCS and we could still use clickstream data from Hadoop cluster where required.

High Level Flow

image

One of the core principles of our clickstream system is to be self serviceable. In short, teams can start recording new types of events anytime by just registering a schema to our service. Clickstream events are pushed to our data platform through our real-time system(more later) and through log shipping to GCS.

In the new process using Cloud Dataflow we

  1. Create pipeline object and read data from GCS into PCollectionTuple object
  2. Create multiple outputs from data in the logs based on event types and schema using PCollectionTuple
  3. Apply ParDo transformations on the data to make it optimized for querying by business users (e.g. transform all dates to PST)
  4. Store the data back into GCS, which is our central data store but also for use from other Hadoop processes  and loading to Big query.
  5. Loading to BigQuery is not done directly from Dataflow but we use bq load command which we found to be much more performant

The next step for us is to identify other scenarios which are a good fit to transition to Cloud Dataflow, which have similar characteristics to clickstream – high volume, unstructured data not requiring heavy correlation with other data sets.

Building zulily’s big data platform

In July 2014 we started our journey to building a new data platform that would allow us to use big data to drive business decisions. I would like to start with a quote from our 2015 Q2 earnings that was highlighted in various press outlets and share how we built a data platform that allows zulily to make decisions which were near impossible to do before.

“We compared two sets of customers from 2012 that both came in through the same channel, a display ad on a major homepage, but through two different types of ads,” [Darrell] Cavens said. “The first ad featured a globally distributed well-known shoe brand, and the second was a set of uniquely styled but unbranded women’s dresses. When we analyze customers coming in through the two different ad types, the shoe ad had more than twice the rate of customer activations on day one. But 2.5 years later, the spend from customers that came in through the women dresses ad was significantly higher than the shoe ad with the difference increasing over time.” – www.bizjournals.com

Our vision is for every decision, at every level in the organization, to be driven by data. In early 2014 we realized the data platform we had which was combination of SQL server database for data warehousing primarily for structured operational data + Hadoop cluster for unstructured data was too limiting. We started with defining core principles for our new data platform (v3).

Principles for new platform

  1. Break silos of unstructured and structured data: One of the key issues with our old infrastructure was we had unstructured data in a Hadoop cluster and structured in SQL server, this was extremely limiting. Almost every major decision required us to perform analytics where we had to correlate usage patterns on site(unstructured) with demand or shipment or other transactions(structured). We had to come up with a solution that allowed us to correlate these different types of datasets.
  2. High scale and lowest possible cost: We already had a hadoop cluster but one of the challenges was our data was growing exponentially and as the storage need grew we had to start adding nodes to the cluster which increased the cost even if the compute was not growing at same pace. In short, we were paying at rate of compute(which is way higher) at pace of growth rate of storage. We had to break this. Additionally we wanted to build a highly scalable system which would support our growth.
  3. High availability or extremely low recovery time with eye on cost: Simply put we wanted high availability but not pay for it.
  4. Enable real-time analytics: Existing data platform has no concept of real-time data processing, hence anytime users needed to analyze things in real-time they had to be given access to production operational system which in turn increases risk, this had to change.
  5. Self service: Our tools and systems/services need to be easy for our users to use with no involvement from data team.
  6. Protect the platform from users: Another challenge in existing system was lack of isolation between core data platform components and user facing services. We wanted to make in new platform individual users of the system could not bring down the entire system.
  7. Service(Data) level agreement(SLA): We wanted to be able to give SLA’s for our datasets to our users. This includes data accuracy and freshness. In past this was a huge issue due to design of the system and lack of tools to monitor and report SLAs.

zulily data platform (v3)

zulily data platform includes multiple tools, platforms and products. The combination is what makes it possible to solve complex problem at scale. image

There are 6 key broad components in the stack:

  1. Centralized big data storage built on Google cloud storage (GCS). This allows us to have all our data in one place (principle 1) and share it across all systems and layers of the stack.
  2. Big Data Platform is primarily for batch processing of data at scale. We use Hadoop on Google compute engine for our big data processing.
    • All our data is stored in GCS not HDFS, this enables us to decouple storage from compute and manage cost (principle 2).
    • Additionally the hadoop cluster for us is transient as it has no data so if our cluster completely goes away we can built new one on the fly. We think about our cluster as a transient entity (principle 3).
    • We have started looking at Google Dataflow for specific scenarios but more on that soon. Our goal is to make all data available for analytics anywhere from 30 minutes to few hours based on the SLA.
  3. Real-time data platform is a zulily built stack for high scale data collection and processing (principle 4). We now collect more than 10k events per second and it is growing really fast as we see value through our decision portal and other scenarios.
  4. Business data warehouse built on Google BigQuery enables us to provide highly scalable analytics service to 100’s of users in the company.
    • We push all our data, structured and unstructured, real-time and batch into Google BigQuery hence breaking all silos for data (principle 1).
    • It also enables us to keep our big data platform and data warehouse completely isolated (principle 6) making sure issues in one environment don’t impact the other system.
    • Keeping the interface(SQL) same for analysts between old and new platforms enabled us to lower barrier for adoption (principle 5)
    • We also have a high speed low latency data store that hosts part of our business data warehouse data for access through APIs.
  5. Data Access and Visualization component enables business users to make decisions based on information available.
    1. Real-time decision portal or zuPulse enables business to get insights into the business in real-time.
    2. Tableau is our reporting and analytics visualization tool. Business users use the tool to create reports for their daily execution. This enables our engineering team to focus on core data and high scale predictive scenarios and makes reporting self service (principle 5).
    3. ZATA our data access service enables us to expose all our data through an API. Any data in the data warehouse or low latency data store can be automatically exposed through ZATA. This enables various internal applications at zulily to show critical information to users including our vendors who can see their historical sales on vendor portal.
  6. Core data services are backbone to everything we do – from moving data to cloud, monitoring to and making sure we abide by our SLA’s etc. We continuously add new services and enhance existing services. The goal of these services is to make developers and analysts across other areas more efficient.
    • zuSync is our data synchronization service which enables us to get data from any source system and move to any destination. Source or Destination can be operational databases (SQL Server, MySQL etc), a queue (RabbitMQ), Google cloud storage (GCS), Google BigQuery, services (http/tcp/udp), ftp location…
    • zuflow is our workflow service built on top of Google BigQuery and allows analysts to define & schedule query chains for batch analytics. It also has ability to notify and deliver data in different formats.
    • zuDataMon is our data monitoring service which allows us to make sure data is consistent across all systems (principle 7). This helps us catch issues with our services or with the data early. Today we have more than 70% of issues identified by our monitoring tool instead of users reporting them. our goal is to get this number to be in 90%+.

This is a very high level view of what our data@zulily team does. Our goal is to share more often, with a deeper dive on our various pieces of technology, in the coming weeks. Stay tuned…

Seattle Scalability Meetup @ zulily: Google, Hortonworks, zulily

We are looking forward to meeting everyone attending the scalability meetup at our office. It is going to be a great event with a good overview of how zulily leverages big data and a deep dive into Google Big Query & Apache Optiq in Hive.

Agenda

Topic:  Building zulily’s Data Platform using Hadoop and Google Biq Query

Speakers: Sudhir Hasbe is Director of big data, data services and BI at zulily. (https://www.linkedin.com/in/shasbe). Also Paul Newson (https://www.linkedin.com/profile/view?id=971812 )

Abstract: zulily, with 4.1 million customers and projected 2014 revenues of over 1 billion dollars, is one of the largest e-commerce companies in the U.S. “Data-driven decision making” is part of our DNA. Growth in the business has triggered exponential growth in data, which required us to redesign our data platform. The zulily data platform is the backbone for all analytics and reporting, along with being the backbone of our data service APIs consumed by various teams in the organization. This session provides a technical deep dive into our data platform and shares key learnings, including our decision to build a Hadoop cluster in the cloud.

Topic: Delivering personalization and recommendations using Hadoop in cloud

Speakers: Steve Reed is a principal engineer at zulily, the author of dropship, and former Geek of the Week. Dylan Carney is a senior software engineer at zulily. They both work on personalization, recommendations and improving your shopping experience.

Abstract: Working on personalization and recommendations at zulily, we have come to lean heavily on on-premise Hadoop clusters to get real work done. Hadoop is a robust and fascinating system, with a myriad of knobs to turn and settings to tune.  Knowing the ins and outs of obscure Hadoop properties is crucial for the health and performance of your hadoop cluster. (To wit: How big is your fsimage? Is your secondary namenode daemon running? Did you know it’s not really a secondary namenode at all?)

But what if it didn’t have to be this way? Google Compute Engine (GCE) and other cloud platforms make promises of easier, faster and easier-to-maintain Hadoop installations. Join us as we describe learning from our years of Hadoop use, and give an overview of what we’ve been able to adapt, learn and unlearn while moving to GCE.

Topic: Apache Optiq in Hive

Speaker: Julian Hyde, Principal, Hortonworks

Abstract: Tez is making Hive faster, and now cost-based optimization (CBO) is making it smarter. A new initiative in Hive introduces cost-based optimization for the first time, based on the Optiq framework. Optiq’s lead developer Julian Hyde shows the improvements that CBO is bringing to Hive. For those interested in Hive internals, he gives an overview of the Optiq framework and shows some of the improvements that are coming to future versions of Hive.

Our format is flexible: We usually have 2 speakers who talk for ~30 minutes each and then do Q+A plus discussion (about 45 minutes each talk) finish by 8:45.

There will be beer afterwards, of course!

After-beer Location:

Paddy Coyne’s:  http://www.paddycoynes.com/

Doors open 30 minutes ahead of show-time.