Thursday, October 15, 2015

Ad hoc data analysis/discovery of event data...



Recently an opportunity at work arose to run some ad hoc data analysis/discovery on our event data. The required analysis was a bit more complicated than what was provided by our current reporting capabilities. This complication was primarily due to both the quality and the layout/structure of the existing data. We wanted to understand how much of the revenue was coming from an internal (machine learning supported) recommendation service, for given products over various time boundaries. From these sales, we wanted to further understand which recommendation algorithm was used to drive the purchase. Measuring the performance of these algorithms would allow us to fine tune them in the future and have the ability to ascertain whether or not those changes made a positive or negative impact on the overall revenue.

It was determined early on that trying to do this in plain SQL wasn't a great fit, and having this data stored in Amazon Redshift constrained the approach to just SQL. Luckily, I had been pulling the same event data off Kafka , converting it to Parquetand storing in S3 for some time. For the older data that wasn’t currently in S3, I was able to bulk extract the data from Redshift and process it in Spark to convert it into the required Parquet format. Having this data stored in S3 opened up the door for alternative compute engines, and Parquet provided a very performant columnar data storage format that can be processed by multiple Big Data engines. These included, but were not limited to Hive, Presto, Spark, and Hadoop. Separating the compute engine from the storage engine offers a number of advantages when it comes to scaling, compute flexibility, and overall cost of data processing.


After reviewing the data in greater detail, it appeared to me that this fit nicely into a recursive graph oriented problem. Users may purchase multiple items within a given session, and each of these purchases may or may not be driven by a recommendation. All of these events are linked together via the event-id and parent-event-id. Below is an example of what that dataset looks like. It is a small subset of the user session data, and is used for illustration purposes. User sessions may be one or even thousands of interactions with the system, each generating a unique event.


step-id - event-id,parent-event-id,item-id,page,action,price,recommendation-algorithm
1 - A,,7234,SELL_PAGE,DISPLAY,19.99,NONE
2 - B,A,5431,RECO_SELL_PAGE,DISPLAY,14.99,algorithm-99
3 - C,B,5431,SELL_PAGE,DISPLAY,14.99,algorithm-99
4 - D,C,5431,SELL_PAGE,ADD_TO_CART,14.99,NONE
5 - E,D,5431,CART,PAID,14.99,

I've replaced the actual event-id and referring-event-id with easier-to-follow values. Also a step-id is added for easier reference. In this session subset, a user was presented with the product 7234 for $19.99 initially (step-id 1), but we see that they actually end up purchasing an item that was presented to them via the recommendation panel (step-id 2) on the website (step-id 5). We can also see that the recommendation-algorithm is dropped from the event data back at step 3. This could represent a bug in the current system and/or a requirement that has not yet been implemented, or just an oversight. In any case, you need to work with the data that’s given to you, and 80% of the Data Science work lies in Data Wrangling and getting the data ready for further analysis.


All the events are linked, where the event-id may have a parent-event-id. Getting the information we need can easily be done using a graph, traversing the graph to extract and keep the nodes of interest. Using Clojure and the graph library Loom turned out to be a winning approach. Clojure is just awesome for working with data interactively and runs on the JVM, so leveraging existing libraries written in Java/Scala is doable. Also, porting the standalone app code to run on Apache Spark to run at scale on much larger datasets turned out to be rather straightforward using the Flambo DSL. Additionally the Clojure REPL allows one to easily interact with the data and iterate fast.


The first thing was to get the base data of interest out of Parquet and into Clojure.  This was pretty straightforward using a combination of the Spark DataFrame API and resilient distributed datasets (RDD)Converting a DataFrame to a JavaRDD is simple and allowed me wrangle the data into more Clojure friendly data structure. Being able to mix the usage of SQL and the richer data structures provided via a RDD is a huge win and provides greater flexibility to shape and work with the data.

(sql/with-sql-context c conf
           (let [df (sql/load c s3n://bucket-to-data/dt=2015-10-07/ "org.apache.spark.sql.parquet")
      _  (do
          (sql/register-data-frame-as-table c df "events")
          (sql/register-data-frame-as-table c (sql/sql c 
           "SELECT
             session_id,
             event_id,
             parent_event_id,
             location,
             product_id,
             event_type,
             recommendation_source,
             (quantity*price) as total,
             currency_code,
             dt as event_date
           FROM events
           WHERE location IN (‘SELL_PAGE',’RECO_SELL_PAGE’,'CART')
           AND event_type IN (‘DISPLAY','ADD_TO_CART','PAID')") "event_subset"))]
             (->
               (sql/sql c SELECT session_id,
                                  event_id,
                                  parent_event_id,
                                  location,
                                  product_id,
                                  event_type,
                                  recommendation_source,
                                  total,
                                  currency_code,
                                  event_date
                            FROM event_subset")
             
 (.toJavaRDD)

Now I want to get the data into a Clojure map, which is done via passing the RDD to the map-to-pair function. This allows us to convert the data into a Clojure map by using zipmap to map keys to values.

(f/map-to-pair (f/fn [x] (let [[k & v] (sql/row->vec x)]
                 (ft/tuple k (event-map v)))))


Note that each step we take is a data transformation and returns a new RDD. So in this case we'll have a RDD where the key(K) value is the user's session-id, and the values(V) will be the Clojure map of that event. Since we need to work on the data at the session level, we call the Spark groupByKey function on the previous RDD. This is done via the Flambo convenience function group-by-key.  Note this is an expensive operation since groupByKey must be able to hold all the key-value pairs for any key in memory. If a key has too many values, it can result in out-of-memory errors. But since we are running this across multiple machines and multiple JVM's we stand a pretty good chance of this not happening.

(f/group-by-key :n 200)

Now we have all of the session values together in (K,V) pairs, where the K is the session-id and the V is now a Scala collection of our Clojure maps. Example:

#flambo/tuple [1444244343173:hs9o16ss #object[
scala.collection.convert.Wrappers$IterableWrapper 0x4228e9bd [{:reco-source algorithm-30, 
  :event-id 00ef52b1-59d4-408c-9cad-4ab205afa873, 
  :product-id 7124, 
  :currency-code USD, 
  :parent-event-id a1d90bbb-f66b-4537-94a2-c407552a4d49, 
  :total 49.99, 
  :event-date 2015-10-07, 
  :location SELL_PAGE, 
  :event-type DISPLAY}]]]

Now that we have all the data grouped by session-id, the next step is to use the Loom library to create a graph of the corrected nodes. Once we have the graph of nodes, we can then search them with relative ease. Below you can see that we are unrolling each Scala Tuple via the untuple function.  Since we don’t care about the session-id any longer, we denoted that by specifying the _ .  All the data of interest is in the V of the (K,V) of the tuple.


(f/flat-map-to-pair (f/fn [x]
  (let [[_ v] (f/untuple x)
        e (into [] (remove #(empty? (:recommendation-nodes %))
                   (traverse-session-events v)))]
(map (fn [y] (ft/tuple (str (:cart-item-type y) "_" (:cart-id y))(identity y))) e))))


Within the traverse-session-events, we create the session graph of all the connected events. Once we have this, list comprehension can be used to process each of the PAID event nodes within a session. Each “PAID” node is checked to see if there’s a connected parent event node of interest. These “parent nodes of interest” are events that have a :label matching a defined set. These checks are done in the walk-nodes function, being called in the let binding. If they do not, we just return an empty :recommendation-nodes list which is filtered out.

(defn traverse-session-events
 "given a graph, traverse all the end nodes and check to see if they were triggered by any recommendation driven events"
 [events]
 (let [g (session-graph events)]
   (for [node (paid-cart-events g)
       :let [x (mapv #(attr g node %) [:total :event-date :currency-code :id])
             reco-nodes (walk-nodes g node)
             event-map {:cart-event-id node 
                  :cart-total (nth x 0) 
                  :event-date (nth x 1) 
                  :currency-code (nth x 2) 
                  :cart-id (nth x 3) 
                  :triggering-event-reco-source (:reco-source (first reco-nodes)
                  :recommendation-nodes reco-nodes}]]
    event-map)))

 (defn walk-nodes
 "given an end-node and a graph, traverse the graph to see if the end-node has a valid connected start node."
 [g n]
 (for [node (bf-traverse g n)
       :let [x (mapv #(attr g node %) [:label :reco-source :thing-id])]
       :when (valid-connected-nodes (nth x 0))]
   {:triggering-event-id node :label (nth x 0) :reco-source (nth x 1)}))

Once this step/transformation is complete, we have all the information needed for the first draft of this program. Now we’re able to see the estimated total revenue for that given recommendation source.


{:event-date "2015-10-07", 
:triggering-event-reco-source "algorithm-30", 
:estimated-reco-sales-channel-total "8089.56", 
:estimated-daily-sales-total "97789.76", 
:contribution-rate "8.27"}


Later on we added metadata, extracted from Redshift, to further categorize the data. The overall time to turn this attribution report around was rather quick - in less than a week and a half we had the data we needed.  Separating the compute from storage engines is very powerful! Having the capabilities and facilities to do such quick exploration and data discovery is also very powerful and extremely valuable and cost effective.  


So there you have it, Bob's your uncle !

Keeping in-sync with Salesforce

Keeping two or more data stores in-sync is always a fun task. With the help of Apache NiFi and some decently tooling, I've been able to...