Realtime Archival of Mongo Collections to BigQuery

Here at zulily, we maintain a wide variety of database solutions.  In this post, we’ll focus on two that we use for two differing needs in Ad Tech: MongoDB for our real-time needs and Google’s BigQuery for our long-term archival & analytics needs. In many cases we end up with data that needs to be represented and maintained in both databases, which causes issues due to their conflicting needs and abilities. While previously we had solved this issue by creating manual batch jobs for whatever collections we wanted represented in BigQuery, it resulted in delays of data appearing in BigQuery and issues with maintaining multiple mostly-similar-but-slightly-different jobs. As such, we’ve developed a new service, called mongo_bigquery_archiver, that automates the process of archiving data to BigQuery with only minimal configuration.

For the configuration of the archiver, we have a collection, ‘bigquery_archiver_settings,’ established on the Mongo server whose contents we want to back up, which is under a custom database ‘config’ on that server. This collection maintains one document for each collection that we want to back up to BigQuery, which serves both as configuration and an up-to-date reference for the status of that backup process. Starting out, the configuration is simple:

{
    "mongo_database" : The Mongo database that houses the collection we want to archive.
    "mongo_collection" : The Mongo collection we want to archive.
    "bigquery_table" : The BigQuery table we want to create to store this data. Note that the table doesn't need to already exist! The archiver can create them on the fly.
    "backfill_full_collection" : Whether to upload existing documents in the Mongo collection, or only upload future ones. This field is useful for collections that may have some junk test data in them starting out, or for quickly testing it for onboarding purposes.
    "dryrun": Whether to actually begin uploading to BigQuery or just perform initial checks. This is mostly useful for the schema creation feature discussed below.
}

And that’s it! When that configuration is added to the collection, if the archiver process is active, it’ll automatically pick up the new configuration and create a subprocess to begin archiving that Mongo collection. The first step is determining the schema to use when creating the BigQuery table. If one already exists or is (optionally) specified in the configuration document manually, it’ll use that. Otherwise, it’ll begin the process of determining the maximum viable schema. By iterating through the entire collection, it analyzes each document to determine what fields exist across the superset of all present documents, coming up with all fields that maintain consistent types across every document and/or exist in some documents and not others, treating ones with the missing fields as null values. In addition, it analyzes subdocuments in the same way, creating RECORD field definitions that similarly load all the valid fields, recurring as necessary based on the depth of the collection’s documents. When complete, it stores the generated maximum viable schema in the configuration document for the user to review and modify as needed, in case there’s extraneous fields that, while possible to upload to BigQuery, would just result in useless overhead. It creates a BigQuery table based on this generated schema, and moves on to the next step.

Here’s an example of the generated schema:

{
    "columns" : [
        {
            "name" : "_id",
            "type" : "STRING",
        }
        {
            "name" : "tracking_code",
            "type" : "INTEGER",
            "mongo_field" : "tracking_code"
        },
        {
            "name" : "midnight_to_timestamp",
            "type" : "RECORD",
            "mongo_field" : "midnight_to_timestamp",
            "fields" : [
                {
                    "name" : "cpta",
                    "type" : "FLOAT"
                },
                {
                    "name" : "activations",
                    "type" : "INTEGER"
                },
                {
                    "name" : "spend",
                    "type" : "FLOAT"
                },
                {
                    "name" : "date_start",
                    "type" : "STRING"
                },
                {
                    "name" : "ad_spend_id",
                    "type" : "STRING"
                }
            ]
        },
        {
            "name" : "created_on_timestamp",
            "type" : "FLOAT",
            "mongo_field" : "created_on_timestamp"
        },
        {
            "name" : "lifetime_to_timestamp",
            "type" : "RECORD",
            "mongo_field" : "lifetime_to_timestamp",
            "fields" : [
                {
                    "name" : "cpta",
                    "type" : "FLOAT"
                },
                {
                    "name" : "activations",
                    "type" : "INTEGER"
                },
                {
                    "name" : "spend",
                    "type" : "FLOAT"
                }
            ]
        }
    ]
}

Next, it uses a new feature added in Mongo 3.6, change streams. A change stream is like a subscription to the OpLog on the Mongo server for the collection in question – whenever an operation comes in that modifies the collection, the subscriber is notified about it. From here, we maintain a subscription for the watched collections, and whenever an update comes in, we process that update to get the current state of the document in Mongo, without querying again since we can configure the change stream to also give us the latest version of the document. By filtering down using the generated schema from before, we can upload the change to the BigQuery table via the BigQuery streaming API, along with the kind of operation it is – create, replace, update, or delete. In case of a delete, we blank all fields but the _id field and log that as a DELETE operation in the table. This table now represents the entire history of operations on the collection in question and can be used to track the state of the collection across time.

Since a full-history table makes it somewhat cumbersome to get the most current version of the data, the archiver automatically creates views on top of all generated tables, using the following query:

SELECT * FROM (
    SELECT *, ROW_NUMBER() OVER(
        PARTITION BY _id ORDER BY time_archived DESC) rank FROM project.dataset_id.table_id
    )
WHERE rank=1 AND operationType != 'delete'

What this query does is determine the latest operation performed on each unique document ID, which is guaranteed to be unique in the Mongo collection. If the latest operation is a DELETE, it doesn’t show any entries for that particular ID in the final query results, otherwise it shows the status of the document as of the latest modification. This results in a view of each unique document exactly as of its latest update. With the minimal latency of the BigQuery streaming API, changes are reflected in BigQuery within seconds of their creation in Mongo, allowing for sophisticated real-time analytics via BigQuery. While BigQuery does not have official SLAs about the performance of the streaming API, we consistently see results uploaded within 3-4 seconds at most via query results. The preview mechanism via the BigQuery UI does not accurately reflect it, but querying the table via a SELECT statement properly shows the results.

Untitled.png

An example of the architecture. Note that normal user access
always hits Mongo and never goes to BigQuery, making the process more efficient.

Thanks to the Archiver, we’ve been able to leverage the strengths of both Mongo and BigQuery in our rapidly-modifying datasets while not having to actively maintain two disparate data loading processes for each system.

We’ve open-sourced the Archiver on Github under the Apache V2 license: https://github.com/zulily/mongo_bigquery_archiver. If you choose to use it for your own needs, check out Google’s for streaming inserts into BigQuery.