Tuesday, November 3, 2015

Spark + Clojure + Snowflake DB = Fun!



Recently I had the pleasure to work on a small project that required converting event data from one of our given sources into a new/different format. This new format would be JSON and would require a little Extraction Transformation and Load (ETL) work on the data before further analysis and storage. Once this data was transformed, we planned on loading it into Snowflake DB and from there, layer Looker on top for some visual analysis and eye candy....

The first thing I needed to do was to convert the data into the new format. The volume of the data was rather small by "Big Data" standards, averaging ~300M events a month. I decided to use a combination of Apache Spark and Clojure to do the "heavy" ETL work. Why these? because Spark is awesome....Clojure is awesome...so awesome + awesome = more awesome. The core event data was already in S3 as Parquet files, and other metadata needed was in Redshift.

The data that we were converting needed to be transformed ten different ways. We had "base/core" key:value pairs in the JSON record that all entries had; i.e. name, visitorId,created,etc... and then each different record type had variable contents in the "details" payload. The details is kept rather simple and doesn't allow for any nested objects. The two records below illustrate these:

{
"name": "checkout:payment:item:purchase",
"visitorId": 4227262,
"created": 1441098500022,
"type": "U",
"version": 1,
"details": {
  "ext": 1236295,
  "listingId": 1004870,
  "quantity": 1}
}
{
"name": "filteredGallery:gallery:item:impression",
"visitorId": 6499643,
"created": 1441073457564,
"type": "U",
"version": 1,
"details": {
  "ext": 1245372,
  "galleryItemType": "product-for-sale-group"}
}


Loading this core dataset into Spark was relatively easy, the event data was loaded into a DataFrame via the sqlContext.read.load method. Flambo has a nice wrapper function around it.

(sql/load c input-path "org.apache.spark.sql.parquet")

Once the event data was loaded in as a DataFrame we register it as a temp table. This would allow us to run SQL queries against it. Here we cache the table if we have more than one event type for a given day that needs to be processed, since we'll be querying the same dataset over and over again.

(let ...
     ...
   _ (do
       (sql/register-temp-table event-df "events")
       (if (> (count (:events options)) 1)
       (sql/sql c "CACHE TABLE events")))]

A number of the events requiring converting need to be joined against metadata tables within Redshift. This was pretty straight forward since Spark provides a nice way to get the data into a DataFrame via JDBC. For instance, the below loads the course table into a DataFrame.

(defn get-course-df [c]
  (let [url (str "CREATE TEMPORARY TABLE course USING org.apache.spark.sql.jdbc 
OPTIONS (url \"jdbc:" db-uri \"", dbtable \"(select course_id,base_product_id 
from course)\")")]
  (sql/sql c url)))

Once we have the events and all the necessary "join tables" loaded into Spark as DataFrames, we can get to work with the queries and transformations. The way I approached this was to have a Clojure map that has all the necessary queries predefined within it. This way I could easily lookup the query that needed to be ran on the DataFrame based off what the event type. Since keywords can act as functions within Clojure, you can simply call the keyword function on the incoming event string, and then use that to look-up the query up in the map. We have two nested loops, one over the days that we will be converting, and the other over the event type that we will be converting. Once we have the query for the given event, we can execute it against the SparkSQL engine and thread the DataFrame through a series of transformations using the thread first macro.


(loop ...
  (let [event (first events-to-process)
       event-query (get-in q/query-map [(keyword event) :query])]
       ...
       ...
       (->
         (sql/sql c event-query)
       ...
       ...)))

Now that we have all the data needed, we must extract and format the final JSON record accordingly. An additional part of information that is stored in the Clojure map that defines all the queries, is the event specific information. These details will be stored in the :details key of the final JSON record. The :details key in the map is a vector of keywords. With the JSON records I mentioned earlier, you can see that the checkout:payment:item:purchase record requires the ext, listingId, quantity while the filteredGallery:gallery:item:impression record requires the ext and galleryItemType.

{:checkout:payment:item:purchase:ecomm 
   {:query "select foo from bar" :details [:ext :listingId :quantity]}}

From here, we can build a simple function that takes the vector with the row data, along with the event type we're working on and build the necessary JSON record.


(defn get-event [event v]
  (let [e (-> (zipmap base-events (subvec v 0 5))
                 (dissoc :sessionOrdinal :sessionId)
                 (assoc :type "U" :version 1))
              detail (zipmap (get-in query-map [event :details]) (subvec v 5))]
  (json/generate-string (merge e {:details detail }))))


Then all we need to do from there is run a couple of map operations over the RDD, and then save the results to a file. ...


...
(f/map (f/fn [x] (sql/row->vec x)))
(f/map (f/fn [x] (q/get-event (keyword event) x)))
...
...

From here we easily loaded the JSON data into Snowflake DB for further querying and analysis.


{
    "name": "checkout:payment:item:purchase",
    "visitorId": 8402509,
    "created": 1433149846291,
    "type": "U",
    "version": 1,
    "details": {
        "ext": 1089212,
        "listingId": 1000471,
        "quantity": 1
    }
}

The next step is to write the data to Kinesis vs a text file, but that's still work in progress.

So there you have it, Bob's your uncle!




Keeping in-sync with Salesforce

Keeping two or more data stores in-sync is always a fun task. With the help of Apache NiFi and some decently tooling, I've been able to...