Calculating Ad Performance Metrics in Real Time

Authors: Sergey Podlazov, Rahul Srivastava

zulily is a flash sales company.  We post a product on the site, and puff… it’s gone in 72 hours.  Online ads for those products come and go just as fast, which doesn’t leave us much time to manually evaluate the performance of the ads and take corrective actions if needed.  To optimize our ad spend, we need to know in real-time how each ad is doing, and this is exactly what we engineered.

While we track multiple metrics to measure impact of an ad, I am going to focus on one that provides a good representation of the system architecture.  This is an engineering blog after all!

The metric in question is Cost per Total Activation, or CpTA in short.  The formula for the metric is this:  divide the total cost of the ad by the number of customer activations.  We call the numerator in this formula “spend” and refer to the denominator as an “activation”.  For example, if an ad costs zulily $100 between midnight and 15:45 PST on January 31 and results in 20 activations, the CpTA for this ad as of 15:45 PST is $100/20 = $5.

Here’s how zulily collects this metric in real-time.  For the sake of simplicity, I will skip archiving processes that are sprinkled on top the architecture below.

Screen Shot 2018-01-30 at 6.22.22 PM

The source of the spend for the metric is an advertiser API, e.g. Facebook.  We’ve implemented a Spend Producer (in reference to the Producer-Consumer model) that queries the API every 15 minutes for live ads and pushes the spend into a MongoDB.  Each spend record has a tracking code that uniquely identifies the ad.

The source for the activations is a Kafka stream of purchase orders that customers place with zulily.  We consume these orders and throw them into an AWS Kinesis stream.  This gives us the ability to process and archive the orders without causing an extra strain on Kafka.  It’s important to note that relevant orders also have the ad’s tracking code, just like the spend.  That’s the link that glues spend and activations together.

The Activation Evaluator application examines each purchase and determines if the purchase is an activation.  To do that, it looks up the previous purchase in a MongoDB collection for the customer Id on the purchase order.  If the most recent transaction is non-existent or older than X days, the purchase is an activation.  The Activation Evaluator updates the customer record with the date of the new purchase.  To make sure that we don’t drop any data if the Activation Evaluator runs into issues, we don’t move the checkpoint in the Kinesis stream until the write to Mongo is confirmed.

The Activation Evaluator sends evaluated purchases into another Kinesis stream.  Chaining up Kinesis stream is a pretty common pattern for AWS applications, as it allows for the separation of concern and makes the whole system more resilient to failure of individual components.

The Activation Calculator reads the evaluated purchases from the second Kinesis stream and captures them in Mongo.  We index the data by tracking code and timestamp, and voila, a simple count() will return the number of activations for a specified period.

The last step in the process is to take the Spend and divide it by the activations.  Done.

With this architecture, zulily measures a key advertising performance metric every 15 minutes and uses it to pause poorly-performing ads.  The metric also serves as an input for various Machine Learning models, but more on those in a future blog post… Stay tuned!!

 

 

 

Zome: Real-time Merchant Home Page with Spark Streaming & Kafka

Authors: Bapi Akula, Shailu Mishra, Sudhir Hasbe

zulily is a daily business, we launch our events every day at 6am PST and then most of our sales are in early hours of launching the events. It is critical for our merchants to know what is happening and react to drive more sales. We have significantly improved merchants ability to drive sales by providing them new real-time home page so everyday when they come they can take actions based on the situation.

Historical View:

Historically we had a dashboard for our merchants which was not very useful. It showed them upcoming events and some other info, but when you come every day you want to know what is happening today not tomorrow.

image

New View

We replaced this non actionable home page with a new real-time version which shows merchants real-time sales for there events, conversion rates, real-time inventory, top selling styles and projected styles which would sell out. This enables merchants to talk to vendors to get more inventory or add different products.

image

Technical Design

To build a real-time home page for merchants we had to combine real-time clickstream data (unstructured) with real-time sales (structured) and historical event and product data. Bringing these very different types of data-sets into a single pipeline and in real-time merging/processing them was a challenge.

Real-time Clickstream & Orders

We have built a high scale real-time collection service called ZIP. It peaks every day around 18k to 20k transactions per second. Our clickstream data & Order data is collected through this service. One of the capabilities of ZIP is to publish this data in real-time to Kafka cluster. This enables other systems to access data that is being collected in near-real-time.

We will describe other capabilities of this service in future post.

Historical data:

We have our data platform running on Google Cloud Platform and includes Google DataProc as our ETL Processing platform  which after processing data stores it in Google Big Query for analytics and in Google Cloud Storage. All our historical data which includes our products, events, prices and orders are stored in Google Big Query and Google Cloud storage.

image

Spark Streaming Processing

We used spark streaming to connect the clickstream and order data collected in Kafka with historical data in GCS using GCS connector provided by Google. This allowed us to create derivative datasets like real-time sales, conversion rates, top sellers which were stored in AWS Aurora DB. AWS Aurora is an amazing database for high scale queries. In future we will write up a post on why Aurora compared to other options.

Data Access through Zata

We then used our ZATA API to access this data from our Merch tools to build amazing UI for our merchants.

Spark Streaming Details

Reading the data from kafka(KafkaUtils.createDirectStream)

Kafka Utils is the object with the factory methods to create input dstreams  and RDD’s from records in topics in Apache Kafka. createDirectStream skips receivers and zookeeper and uses simple API to consume messages.This means it needs to track offsets internally.

So at the beginning of each batch, connector reads partition offsets for each topic from Kafka and uses them to ingest data. To ensure exactly once semantics, it tracks offset information in Spark Streaming checkpoints,

/code:

def getDstreamFromKafka(ssc,topic,kafka_servers):

kafkaStream = KafkaUtils.createDirectStream(ssc,[topic], {“bootstrap.servers”: kafka_servers})

parsed = kafkaStream.map(lambda v:json.dumps(v[1]).replace(‘”,’,'”;’).split(‘,’))

return parsed

 

Reading data from GCS: (TextfileStream)

This method monitors any Hadoop-compatible filesystem directory for new files and when it detects a new file – reads it into Spark Streaming. In our case, we use GCS and streaming job internally uses GCS connector. We pass GCS connector as jar file when invoking the job

/code:

ssc.textFileStream(GCS bucket path)

Merge Values: combineByKey(createCombiner,mergeValue,mergeCombiners):

In SPARK, groupByKey() doesn’t do any local aggregation while computing on the partition’s data, this is where combineByKey() comes in handy.
In combineByKey values are merged into one value at each partition, finally each value from each partition is merged into a single value.So combineByKey is a optimization to groupByKey as we end up sending fewer key value pairs across network

We used combineByKey to calculate aggregations like total sales,average price,demand.Three lambda functions were passes as arguments to this method

combineByKey(createCombiner,mergeValue,mergeCombiners)

createCombiner : The first required argument in the method is a function to be used as the very first aggregation step for each key. This function is invoked only once for every key

mergeValue : This function tells what to do when a combiner is given a new value

mergeCombiners : This Function is called to combine values of a key across multiple partitions

/code:

creCmb = (lambda v:(v[0],float(v[1]),0.0 ,0.0,0.0,v[4],v[5],v[6],v[7],v[8],1) if v[3]==-1 else (v[0],float(v[1]),float(v[1])/float(v[2]) ,float(v[3]),((float(v[1])/float(v[2]))-float(v[3])),v[4],v[5],v[6],v[7],v[8],1))

mrgVal = (lambda x, v:(max(x[0],v[0]),float(x[1])+float(v[1]),(float(x[2]))+0.0,float(x[3])+0.0,float(x[4])+0.0,min(x[5],v[4]), min(x[6],v[5]),min(x[7],v[6]),min(x[8],v[7]), max(x[9],v[8]),int(x[10])+1) if v[3]==-1 else (max(x[0],v[0]),float(x[1])+ float(v[1]),(float(x[2]))+(float(v[1])/float(v[2])),float(x[3])+float(v[3]), float(x[4])+((float(v[1])/float(v[2]))-float(v[3])),min(x[5],v[4]), min(x[6],v[5]),min(x[7],v[6]),min(x[8],v[7]),max(x[9],v[8]), int(x[10])+1))

mrgCmb = (lambda x,y :(max(x[0],y[0]),x[1]+y[1],x[2]+y[2],x[3]+y[3],x[4]+y[4], min(x[5],y[5]),min(x[6],y[6]),min(x[7],y[7]),min(x[8],y[8]),max(x[9],y[9]), int(x[10])+int(y[10])))

combineByKey(creCmb, mrgVal, mrgCmb)

stateful transformations (updateStateBykey() )

We required a framework that supported building knowledge based on both historical and real-time data. Spark Streaming provided just that.
Using stateful functions like updateStateByKey that computes running sum of all the sales we were able to achieve our requirement.
We used updateStateByKey(func) for stateful transformation,
for example : you want to keep track of number of times a customer visited the web page if customer “123” visited twice in the first hour, she visits again in the next hour
aggregated count at the end of second hour should be 3 (includes current batch count and history) so this history state will be in the memory handled by updateStateByKey
Checkpoint mechanism of spark streaming takes care of preserving the state of sales history in memory.
As an additional recovery point, we stored the state in a database
and recovered from the database in case files were cleared from checkpoint during new code deployments or configuration changes.

/code:

soi_batch_agg.updateStateByKey(updateSales)

def updateSales(newstate,oldstate):

# Incase of empty rdd

# If event Product insert timestamp is older than two days then remove from memory

try:

if (oldstate != None) and validate(oldstate[0][-4],’updateSalesFn1′) and (oldstate[0][-4]<((datetime.datetime.now(tz=pytz.utc) – datetime.timedelta(days =2)).astimezone(pytz.timezone(‘US/Pacific’)).strftime(‘%Y-%m-%d %H:%M:%S’))):

oldstate = None

# If event Product event end date is order than current timestamp then remove from memory

if (oldstate != None) and validate(oldstate[0][-3],’updateSalesFn2′) and (oldstate[0][-3] < (datetime.datetime.now(tz=pytz.utc).astimezone(pytz.timezone(‘US/Pacific’)).strftime(‘%Y-%m-%d %H:%M:%S’))):

oldstate = None

if not not newstate:

if oldstate is None:

oldstate = zome_aurora.aurora_get(str(newstate[0][-6]),str(newstate[0][-5]))

else:

print(‘Getting Records from Memory’)

if  not not oldstate:

return [(max(oldstate[0][0],newstate[0][0]),float(oldstate[0][1])+newstate[0][1],float(oldstate[0][2])+newstate[0][2],float(oldstate[0][3])+newstate[0][3],float(oldstate[0][4])+newstate[0][4],max(oldstate[0][5],newstate[0][5]),min(oldstate[0][6],newstate[0][6]),max(oldstate[0][7],newstate[0][7]),min(oldstate[0][8],newstate[0][8]),max(oldstate[0][9],newstate[0][9]),int(oldstate[0][10])+newstate[0][10])]

else:

return newstate

except Exception as e:

sys.exit(1)

Writing to Aurora :

We are not using Jdbc methods that are provided by Spark as we had some performance issues w.r.t connection creation, record insertion and commit.

We went with a approach of creating connection for each partition and do a bulk insert of all records under each partitions and insert all partitions in parallel.

/code:

def sendPartition(iter):

try:

connection=mc.connect(…) //Connect to database

cursor = connection.cursor()

data = []

for record in iter: //Loop through the records

data.append(record)

query = “INSERT INTO …. )”  //Insert into database

#cursor.execute(transaction_isolation_lock)

while Not successful …

try:

cursor.executemany(query, data)

connection.commit()

except Exception as e: //Handle Exception

finally:

cursor.close()

connection.close()