Thursday, February 11, 2016

More fun with Clojure and Specter!


I was approached with a fun data wrangling exercise the other day by our Data Science folks.  They wanted to know if we could parse and extract a certain set of fields from a query string within our raw clickstream request logs. Of course the answer was yes.  The logs are written to Kafka and persisted into S3 in JSON format.  Later these logs are converted in batch with Spark into Parquet format to allow for faster querying with technologies like Hive and Presto.  We get around 10-15M such events a day.  Below is a sample event:
{
    "sessionId": "1455128662234:6yx5i731",
    "sessionOrdinal": 1,
    "protocol": "HTTP/1.1",
    "queryString": "ext=FB_LP_DPA_Registrations_Courses_quilting-5001&maid=88152&utm_source=Facebook&utm_medium=Social%2520Performance&fbc_id=Quilting%7CPaid_Class%7CActivity%7CCPC&fbads_id=Online+Classes+%3E+Quilting+US%2CGB%2CAU%2CNZ%2CCA%2CIE+18-65+Mobilefeed%2C+Desktopfeed+Link+clicks+Link+clicks+Cu&fbbid_type&fbg_name=male%2Cfemale&fbd_id=desktop%2Candroid_phone%2Candroid_tablet%2Ciphone%2Cipad&fbage_name=18-65&fbcountry_name=US%2CGB%2CAU%2CNZ%2CCA%2CIE",
    "remoteUser": null,
    "requestUri""/class/the-ultimate-t-shirt-quilt/5001",
    "serverName": "www.cool-server.com",
    "serverPort": 80,
    "ajaxRequest": false,
    "ajaxRequestType": null,
    "messageDate": 1455128662236
}







Since the raw logs have no additional ETL performed on them before the conversion to Parquet takes place, and doing crazy regex type stuff tends to cause people to pull there hair out, jam pencils into their eyes, and throw things, I knew that we would need to do some additional transformation on the data. But hey, as the saying goes, 80% of Data Science is data wrangling and getting it in a form that is conducive to running the required analysis.




The desired outcome is to have the queryString represented as a single row/element per key.  From here, these events can be materialized into Presto allowing for easier querying and analysis.  For example, if we are interested in the fbads_id, fbg_name, and fbcountry_name query string keys, the above event would end up being turned into the following three events:


{
  "session_id": "1455128662234:6yx5i731",
  "request_uri": "/class/the-ultimate-t-shirt-quilt/5001",
  "message_date": 1455128662236,
  "fbads_id": "Online Classes > Quilting US,GB,AU,NZ,CA,IE 18-65 Mobilefeed, Desktopfeed Link clicks Link clicks Cu"
}

{
  "session_id": "1455128662234:6yx5i731",
  "request_uri": "/class/the-ultimate-t-shirt-quilt/5001",
  "message_date": 1455128662236,
  "fbg_name": "male,female"
}

{
  "session_id": "1455128662234:6yx5i731",
  "request_uri": "/class/the-ultimate-t-shirt-quilt/5001",
  "message_date": 1455128662236,
  "fbcountry_name": "US,GB,AU,NZ,CA,IE"
}







It's not a complicated problem/task at all. Essentially the steps are as follows:
  • Parse the query string
  • Mapcat/Flatmap over the query string map
  • Filter out anything we're not interested in
  • Format and output these results
Since we need to do a series of lookups and transformations, I thought this would be a great opportunity to learn more about, and use a Clojure library that caught my eye called Specter.  It's written by Nathan Marz, the same guy that wrote Storm and Cascalog. The first time I read about it on github, I was in love.  And hey, learning more coolio Clojure stuff is always on my  radar! As it turns out, the solution boiled down to just a few functions.  I'll spare you from the   details and briefly cover the one that processes the :queryString.

(defn extract-query-string [parameter-logs]
  (->> parameter-logs
    (transform [ALL :queryString] (fn [x]
                                     ( -> x
                                          cemerick.url/query->map
                                          clojure.walk/keywordize-keys
                                          (select-keys parameter-log-types)))
    (map #(select-keys % [:requestUri :SessionId :messageDate :queryString]))
    (filter #(seq (:queryString %)))
    (transform-keys ->snake_case)))

This function takes a sequence of parameter log records (the parsing into a Clojure data structure   is handled by Cheshire), and then we call Specter function transform on the sequence. The "ALL" is called a selector which navigates to every map in the sequence, and for each map, the :queryString navigates to that key within every map. Once we're at the :queryString, we run it thru the thread macro where it's parsed and keys of interest are selected. The call to (select-keys ...) does a JDBC query to Redshift to get the keys we're interested in at runtime.  I'm pretty sure that we can accomplish the last three fn calls within the transform, but this was a first stab and it seems to work fine.  From here it's just the domino effect in play.  The end result looks something like this (from a different dataset):


/patterns/knitting,1449359995890:xn4qha9i,1449359999886,adwad_id,45472460017
/patterns/knitting,1449359995890:xn4qha9i,1449359999886,adwadgr_id,8538149617
/patterns/knitting,1449359995890:xn4qha9i,1449359999886,adwd_id,t
/patterns/knitting,1449359995890:xn4qha9i,1449359999886,adwmtch_id,b


Nathan's post does an excellent job of explaining in more detail the why and how of Specter. It's a great read and I recommend it.



So there you have it, Bob's your uncle !


Keeping in-sync with Salesforce

Keeping two or more data stores in-sync is always a fun task. With the help of Apache NiFi and some decently tooling, I've been able to...