Wednesday, October 21, 2015

Simple stuff made easy...



My last post ended with the result set providing the initially needed information in a nicely formatted map, broken down by day and the triggering-event-reco-source



{:event-date "2015-10-07", 
:triggering-event-reco-source "algorithm-30", 
:estimated-reco-sales-channel-total "8089.56", 
:estimated-daily-sales-total "97789.76", 
:contribution-rate "8.27"}

So the next logical question was: Is the product a online video, downloadable material, or physically shipped good? Since we only had the product-id in the event stream, the answer to these questions would require the joining of the event data against metadata that we store in Redshift. This would allow us to "classify/categorize" the event, and provide the answer to our burning questions.  The metadata we needed was contained in three separate tables in Redshift. The approach taken was to merge all these tables into a single RDD at the start of the Spark program. Since we're iteratively processing multiple days, it made sense to cache the data in memory across the cluster. This would allow for quicker joining of the data within Spark and would only require us to read the data once from Redshift.


The first step was to get the data out of Redshift. With really large datasets we would want to bulk unload the data from Redshift into S3, and then source it into Spark via a RDD. Since what we need is "smaller" metadata, getting it out of Redshift turned out to be rather straightforward with the use of the low-level Clojure wrapper for JDBC. The Clojure JDBC library allows you to control how the ResultSet is transformed and returned by specifying the :result-set-fn within your query specification. The example below will apply the metadata-tuples function to the entire result set (a lazy sequence) before it is returned:

(sql/query db-spec ["SELECT <QUERY TO GET REQUIRED METADATA>"] :result-set-fn metadata-tuples)

The metadata-tuples function creates a list of tuples where the key (K) of each is a concatenation of the  product-type and the product-id. The need to create an artificial composite key is primarily due to the data being split across three different tables and we might have overlapping identifiers. The value (V) of each tuple is a map of the sales-channel, where the :sales-channel is either a physically shipped good, online video, or downloadable material:

(defn metadata-tuples [rs]
  (reduce (fn [tuples t]
    (into tuples [(ft/tuple (str (:product-type t) "_" (:product-id t))
                             {:sales-channel (:sales-channel t)})]))
   ()
  rs))

Once we have the three result sets, they are merged into one via Clojure's into function and then turned into a RDD of (K,V) pairs and cached in Spark: 

(let [products (->> products-for-sale
                    (f/parallelize-pairs x)
                    (f/cache))]
     ...
     ...)

The "processing code" above was omitted for clarity's sake. Once the RDD of metadata was cached in the cluster, we're able to join it with the processed event data via a left outer join:

(f/flat-map-to-pair (f/fn [x]
  (let [[_ v] (f/untuple x)
        e (into [] (remove #(empty? (:recommendation-nodes %))
                     (traverse-session-events v)))]
    (map (fn [y] (ft/tuple (str (:cart-item-type y) "_" (:cart-id y))(identity y))) e))))

(f/left-outer-join products)

and with a little more processing and formatting, we now we have all the required information:



{:event-date "2015-10-07", 
:triggering-event-reco-source "algorithm-30", 
:sales-channel "VIDEO",
:estimated-reco-sales-channel-total "123.99", 
:estimated-daily-sales-total "97789.76", 
:contribution-rate "0.12"}


One point of clarification with the above example, the records represent one grouping of products by recommendation source and sale-channel. We have multiple sales records for algorithm-30 that are categorized as the "VIDEO" sales-channels. The estimated-daily-sales is across all sales for that given day, and there's a reason for using "estimated", but that's for another day.....Having multiple result sets might clarify things a bit.


{:event-date "2015-10-12",
:triggering-event-reco-source "algorithm-30",
:sales-channel "VIDEO",
:estimated-reco-sales-channel-total "1349.46",
:estimated-sales-total "165201.41",
:contribution-rate "0.82"}

{:event-date "2015-10-12",
:triggering-event-reco-source "algorithm-32",
:sales-channel "PRODUCT",
:estimated-reco-sales-channel-total "942.09",
:estimated-sales-total "165201.41",
:contribution-rate "0.57"}

{:event-date "2015-10-12",
:triggering-event-reco-source "algorithm-33",
:sales-channel "DOWNLOADABLE_MATERIAL",
:estimated-reco-sales-channel-total "65.51",
:estimated-sales-total "165201.41",
:contribution-rate "0.04"}

Once again, the turnaround to provide the additional information was only a day. We were able to materialize this data rather quickly into an external Hive table and layer some quick visualizations on-top with Looker. With Clojure and REPL driven development aiding in agility, you can iterate fast, even with Big Data.  

So there you have it, Bob's your uncle!


Keeping in-sync with Salesforce

Keeping two or more data stores in-sync is always a fun task. With the help of Apache NiFi and some decently tooling, I've been able to...