Realtime Archival of Mongo Collections to BigQuery

Here at zulily, we maintain a wide variety of database solutions.  In this post, we’ll focus on two that we use for two differing needs in Ad Tech: MongoDB for our real-time needs and Google’s BigQuery for our long-term archival & analytics needs. In many cases we end up with data that needs to be represented and maintained in both databases, which causes issues due to their conflicting needs and abilities. While previously we had solved this issue by creating manual batch jobs for whatever collections we wanted represented in BigQuery, it resulted in delays of data appearing in BigQuery and issues with maintaining multiple mostly-similar-but-slightly-different jobs. As such, we’ve developed a new service, called mongo_bigquery_archiver, that automates the process of archiving data to BigQuery with only minimal configuration.

For the configuration of the archiver, we have a collection, ‘bigquery_archiver_settings,’ established on the Mongo server whose contents we want to back up, which is under a custom database ‘config’ on that server. This collection maintains one document for each collection that we want to back up to BigQuery, which serves both as configuration and an up-to-date reference for the status of that backup process. Starting out, the configuration is simple:

{
    "mongo_database" : The Mongo database that houses the collection we want to archive.
    "mongo_collection" : The Mongo collection we want to archive.
    "bigquery_table" : The BigQuery table we want to create to store this data. Note that the table doesn't need to already exist! The archiver can create them on the fly.
    "backfill_full_collection" : Whether to upload existing documents in the Mongo collection, or only upload future ones. This field is useful for collections that may have some junk test data in them starting out, or for quickly testing it for onboarding purposes.
    "dryrun": Whether to actually begin uploading to BigQuery or just perform initial checks. This is mostly useful for the schema creation feature discussed below.
}

And that’s it! When that configuration is added to the collection, if the archiver process is active, it’ll automatically pick up the new configuration and create a subprocess to begin archiving that Mongo collection. The first step is determining the schema to use when creating the BigQuery table. If one already exists or is (optionally) specified in the configuration document manually, it’ll use that. Otherwise, it’ll begin the process of determining the maximum viable schema. By iterating through the entire collection, it analyzes each document to determine what fields exist across the superset of all present documents, coming up with all fields that maintain consistent types across every document and/or exist in some documents and not others, treating ones with the missing fields as null values. In addition, it analyzes subdocuments in the same way, creating RECORD field definitions that similarly load all the valid fields, recurring as necessary based on the depth of the collection’s documents. When complete, it stores the generated maximum viable schema in the configuration document for the user to review and modify as needed, in case there’s extraneous fields that, while possible to upload to BigQuery, would just result in useless overhead. It creates a BigQuery table based on this generated schema, and moves on to the next step.

Here’s an example of the generated schema:

{
    "columns" : [
        {
            "name" : "_id",
            "type" : "STRING",
        }
        {
            "name" : "tracking_code",
            "type" : "INTEGER",
            "mongo_field" : "tracking_code"
        },
        {
            "name" : "midnight_to_timestamp",
            "type" : "RECORD",
            "mongo_field" : "midnight_to_timestamp",
            "fields" : [
                {
                    "name" : "cpta",
                    "type" : "FLOAT"
                },
                {
                    "name" : "activations",
                    "type" : "INTEGER"
                },
                {
                    "name" : "spend",
                    "type" : "FLOAT"
                },
                {
                    "name" : "date_start",
                    "type" : "STRING"
                },
                {
                    "name" : "ad_spend_id",
                    "type" : "STRING"
                }
            ]
        },
        {
            "name" : "created_on_timestamp",
            "type" : "FLOAT",
            "mongo_field" : "created_on_timestamp"
        },
        {
            "name" : "lifetime_to_timestamp",
            "type" : "RECORD",
            "mongo_field" : "lifetime_to_timestamp",
            "fields" : [
                {
                    "name" : "cpta",
                    "type" : "FLOAT"
                },
                {
                    "name" : "activations",
                    "type" : "INTEGER"
                },
                {
                    "name" : "spend",
                    "type" : "FLOAT"
                }
            ]
        }
    ]
}

Next, it uses a new feature added in Mongo 3.6, change streams. A change stream is like a subscription to the OpLog on the Mongo server for the collection in question – whenever an operation comes in that modifies the collection, the subscriber is notified about it. From here, we maintain a subscription for the watched collections, and whenever an update comes in, we process that update to get the current state of the document in Mongo, without querying again since we can configure the change stream to also give us the latest version of the document. By filtering down using the generated schema from before, we can upload the change to the BigQuery table via the BigQuery streaming API, along with the kind of operation it is – create, replace, update, or delete. In case of a delete, we blank all fields but the _id field and log that as a DELETE operation in the table. This table now represents the entire history of operations on the collection in question and can be used to track the state of the collection across time.

Since a full-history table makes it somewhat cumbersome to get the most current version of the data, the archiver automatically creates views on top of all generated tables, using the following query:

SELECT * FROM (
    SELECT *, ROW_NUMBER() OVER(
        PARTITION BY _id ORDER BY time_archived DESC) rank FROM project.dataset_id.table_id
    )
WHERE rank=1 AND operationType != 'delete'

What this query does is determine the latest operation performed on each unique document ID, which is guaranteed to be unique in the Mongo collection. If the latest operation is a DELETE, it doesn’t show any entries for that particular ID in the final query results, otherwise it shows the status of the document as of the latest modification. This results in a view of each unique document exactly as of its latest update. With the minimal latency of the BigQuery streaming API, changes are reflected in BigQuery within seconds of their creation in Mongo, allowing for sophisticated real-time analytics via BigQuery. While BigQuery does not have official SLAs about the performance of the streaming API, we consistently see results uploaded within 3-4 seconds at most via query results. The preview mechanism via the BigQuery UI does not accurately reflect it, but querying the table via a SELECT statement properly shows the results.

Untitled.png

An example of the architecture. Note that normal user access
always hits Mongo and never goes to BigQuery, making the process more efficient.

Thanks to the Archiver, we’ve been able to leverage the strengths of both Mongo and BigQuery in our rapidly-modifying datasets while not having to actively maintain two disparate data loading processes for each system.

We’ve open-sourced the Archiver on Github under the Apache V2 license: https://github.com/zulily/mongo_bigquery_archiver. If you choose to use it for your own needs, check out Google’s for streaming inserts into BigQuery.

Calculating Ad Performance Metrics in Real Time

Authors: Sergey Podlazov, Rahul Srivastava

zulily is a flash sales company.  We post a product on the site, and puff… it’s gone in 72 hours.  Online ads for those products come and go just as fast, which doesn’t leave us much time to manually evaluate the performance of the ads and take corrective actions if needed.  To optimize our ad spend, we need to know in real-time how each ad is doing, and this is exactly what we engineered.

While we track multiple metrics to measure impact of an ad, I am going to focus on one that provides a good representation of the system architecture.  This is an engineering blog after all!

The metric in question is Cost per Total Activation, or CpTA in short.  The formula for the metric is this:  divide the total cost of the ad by the number of customer activations.  We call the numerator in this formula “spend” and refer to the denominator as an “activation”.  For example, if an ad costs zulily $100 between midnight and 15:45 PST on January 31 and results in 20 activations, the CpTA for this ad as of 15:45 PST is $100/20 = $5.

Here’s how zulily collects this metric in real-time.  For the sake of simplicity, I will skip archiving processes that are sprinkled on top the architecture below.

Screen Shot 2018-01-30 at 6.22.22 PM

The source of the spend for the metric is an advertiser API, e.g. Facebook.  We’ve implemented a Spend Producer (in reference to the Producer-Consumer model) that queries the API every 15 minutes for live ads and pushes the spend into a MongoDB.  Each spend record has a tracking code that uniquely identifies the ad.

The source for the activations is a Kafka stream of purchase orders that customers place with zulily.  We consume these orders and throw them into an AWS Kinesis stream.  This gives us the ability to process and archive the orders without causing an extra strain on Kafka.  It’s important to note that relevant orders also have the ad’s tracking code, just like the spend.  That’s the link that glues spend and activations together.

The Activation Evaluator application examines each purchase and determines if the purchase is an activation.  To do that, it looks up the previous purchase in a MongoDB collection for the customer Id on the purchase order.  If the most recent transaction is non-existent or older than X days, the purchase is an activation.  The Activation Evaluator updates the customer record with the date of the new purchase.  To make sure that we don’t drop any data if the Activation Evaluator runs into issues, we don’t move the checkpoint in the Kinesis stream until the write to Mongo is confirmed.

The Activation Evaluator sends evaluated purchases into another Kinesis stream.  Chaining up Kinesis stream is a pretty common pattern for AWS applications, as it allows for the separation of concern and makes the whole system more resilient to failure of individual components.

The Activation Calculator reads the evaluated purchases from the second Kinesis stream and captures them in Mongo.  We index the data by tracking code and timestamp, and voila, a simple count() will return the number of activations for a specified period.

The last step in the process is to take the Spend and divide it by the activations.  Done.

With this architecture, zulily measures a key advertising performance metric every 15 minutes and uses it to pause poorly-performing ads.  The metric also serves as an input for various Machine Learning models, but more on those in a future blog post… Stay tuned!!