Friday, September 30, 2016

JQ is a Kung fu Swiss Army knife!



If you need to do some basic reformatting of JSON data that's already been processed, try reaching for the Swiss Army knife of command line JSON Kung Fu,  JQ. There's a lot of different JSON parsing libraries out there, name your favorite language and there has to be at least a few you can use. Sometimes I just want to git 'R dun and not have to deal with writing a program, compiling, and then deploying it. For those times, the standard Unix toolkit and JQ can get you a long way quickly.


For instance, recently I was adding support for loading our Zendesk chat data into Google Big Query.  We currently load this data into Redshift, but we're moving towards using Big Query as our datastore for more and more datasets.  There were three things that I had to do with the existing data.  First was to transform all the JSON keys to snake case ,  second was to transform the timestamp field into a timestamp that I could load into Big Query, and third was to select out certain keys I wanted to load into the chat table.


So the JSON looks like this:




{"chat-date":"2016-03-01","ticket-id":"12345","chat-line-timestamp":"05:52:20 PM","chatter":"Mr. Magoo",
"chat-line":"Hello","created-at":"2016-03-01 05:52:20 PM"}
{"chat-date":"2016-03-01","ticket-id":"12345","chat-line-timestamp":"05:52:41 PM","chatter":"Daisy Duke",
"chat-line":"Hey Mr. Magoo","created-at":"2016-03-01 05:52:41 PM"}
{"chat-date":"2016-03-01","ticket-id":"12345","chat-line-timestamp":"05:52:44 PM","chatter":"Daisy Duke",
"chat-line":"How's it going?","created-at":"2016-03-01 05:52:44 PM"}


and once we run it thru the meat grinder we get what we want:




{"ticket_id":"12345","chatter":"Mr. Magoo","chat_line":"Hello","created_at":1456854740}
{"ticket_id":"12345","chatter":"Daisy Duke","chat_line":"Hey Mr. Magoo","created_at":1456854761}
{"ticket_id":"12345","chatter":"Daisy Duke","chat_line":"How's it going?","created_at":1456854764}



Dan-Young:~ dan$ gawk 'BEGIN { print strftime("%c", 1456854764); }'
Tue 01 Mar 2016 05:52:44 PM UTC


the easy JQ one-liner that made this all possible:



jq -r 'to_entries | 
map({"key": .key | gsub("-";"_")} + {"value": (if (.key =="created-at") then (.value | strptime("%Y-%m-%d %I:%M:%S %p")|mktime) 
else .value  end)}) | from_entries | with_entries(select(.key == ("chatter","created_at","ticket_id","chat_line"))) |tostring'

So next time you need to Kung fu some JSON, give the command line a try!  It's magic sauce....


So there you have it, Bob's your uncle !


Saturday, September 10, 2016

Parsing Zendesk Chat transcripts with Instaparse


One thing that really makes our software stand out is our world class analyst support. Time and time again we hear from clients how much they enjoy our real-time chat support.   Anytime you need help within Looker,  it's only a click away.  All of our support conversations feed through Zendesk, and eventually make their way through the process described below.
   

  
    The body of the chat event is stored in a character varying (65535) field within Salesforce.  The schema is pretty simple:  there's a timestamp, ticket id, and the body of the chat.  The chats have a certain signature/format to them, where the first row for a given ticket is the start event which contains metadata about the chat session.  After the start, we see n number of rows along with a concluding closing chat event.  If the chat extends over a longer period, we'll see multiple rows for a given ticket since the column size of the body column is limited. Typically though, we just see three rows.  The row(s) we're interested in are the ones where the actual dialog takes place, so anywhere from n+1 to n-1 rows. When we extract the chat events from the Salesforce tables, they are written as one line per chat.  For example, below is an example of a chat session minus the beginning/ending metadata records:


Chat started: 2016-08-22 08:09 AM UTC

(08:09:40 AM) Susan Smith: hey there!

(08:09:53 AM) *** Larry joined the chat ***

(08:09:54 AM) Larry: Hi Susan!

(08:10:15 AM) Susan Smith: I love Looker!
(08:10:30 AM) Larry: Me too!
(08:12:19 AM) *** Larry left the chat ***



    The final output of the ETL process will result in a series of rows with an event_id, ticket_id, created_at, participant, and the body of the chat. Using the above as an example, we would have four rows, throwing out the joined/left chat, and the Chat started/ended records.


event_id  | ticket_id |     created_at      |     participant     |                   body                   
--------+-----------+---------------------+---------------------+----------------------------
 1         | 51234     | 2016-08-22 08:09:40 | Susan Smith         | hey there!
 2         | 51234     | 2016-08-22 08:09:54 | Larry               | Hi Susan!
 3         | 51234     | 2016-08-22 08:10:15 | Susan Smith         | I Love Looker!
 4         | 51234     | 2016-08-22 08:10:30 | Larry               | Me too!

    The format of the chat event body can pretty much be anything someone can cut and paste or type into the chat dialog window.  For example, we often see people pasting in snippets of code and LookML



(10:06:02 PM) Bob Jones: dimension: MKTG_Qualified_Lead
 type: yesno                                                                                                          
   sql:

     ${TABLE}.test_lead_c = 'false'                                                                                    

      AND ${TABLE}.Email NOT LIKE '@test.com'                                                                                   


    So the high level approach would be pretty straight forward and follow a workflow like this:

  1. Unload the Salesforce data to S3
  2. Process/parse the chats in S3
  3. Upload the resulting JSON data to s3
  4. Load into Redshift via the COPY command

    All the above steps were bundled up into a Drake workflow. I'll go over the more interesting parts of the workflow below.  Things like sync'ing back and forth, loading into Redshift, etc... is grunt work and doesn't really need to be covered.

Unload Salesforce data to s3

    The Salesforce data is unloaded into S3 as a CSV file on a scheduled cadence. I used the control character ETX/^C to represent the field delimiter instead of say a | (pipe) delimiter. Since there could be, and are, pipes in the chat messages, the safest approach would be to use a non-printing character.   When extracting out the body from Salesforce, I decided to replace all the \n in the body with SOH/^A.  More than likely we will not experience control characters in the text that a user enters. The other thing this does is to get all the rows of text on one line, making it easier to read and process. Below is a trimmed down version of what the data in the CSV looks like after the export.


51119^C2016-08-22 16:58:49^CChat started: 2016-08-22 04:49 PM UTC^A^A(04:49:11 PM) Jen Smith: hi, i have a question on how to add a look to a dashboard^A(04:49:23 PM) *** Steve Jones joined the chat ***^A(04:49:34 PM) Steve Jones: Hi Jen! How's it going today?^A


Process/parse the chats in S3

    This by and far was the most interesting part of the whole ETL workflow for a number of reasons. For one, a chat message is defined as being between two timestamps, with the timestamp being in the format of  (08:09:40 AM) . The chat body/message is everything after that timestamp up to but not including the next timestamp. The message could be as simple as some text like "Hello Joe" or more complicated like someone pasting code and/or LookML which will have embedded newlines and who-knows-what characters.  
    The "chatter" is defined as anything from the timestamp, up-to but not including the : . Also, we don't know if people would go by just their first name, first and last, or other sorts of crazy permutations.
    Since a chat can span longer than the allowed 65535 character limit of a row, messages do occasionally span across n+1 rows in the database.  The starting event is pretty straight forward, they are defined by the text "Chat started" followed by the date and time. Below is an example chat session from the Salesforce database.  Note that chat session started and continued for an extended period of time, so we had multiple row entries that would need to be parsed.


48962 | 2016-08-01 21:34:18 | 
Chat started: 2016-08-01 09:18 PM UTC                                                         

(09:18:11 PM) Bob Smith: hi

(09:18:16 PM) *** Larry joined the chat ***                                                                                                                                
                                                                                                                                       

48962 | 2016-08-02 00:36:07 | 

(10:05:00 PM) Bob Smith: hi, I'm back

(10:05:09 PM) *** Larry joined the chat                                                                                        

(10:05:14 PM) Larry: Hi Bob!


    The light bulb in my head turned on!  "Hey I have a great idea on how to do this! I'll use Regular Expressions..." but as the saying goes, and probably famously coined by Jamie Zawinski (an early Netscape engineer),


Some people, when confronted with a problem, think  “I know, I'll use regular expressions.”   
Now they have two problems.

    From my own experience, and having lived thru the pain and suffering of the above quote, I knew it would be prudent to use Regular Expressions gingerly.  After noodling this for a bit,  I remember playing around with LPeg to parse log files with Lua and Hadoop Streaming.  I know, this sounds bat shit crazy, and it is and was, but Hadoop Streaming allowed me to easily and quickly parallelize my Lua scripts, and LPeg allowed for powerful parsing. So I knew that there was/is a more powerful way to parse text and decided to dust off the grammer cobwebs and try using Parsing Expression Grammars (PEG) and/or Context-free grammers (CFG) again.  A CFG grammar is non-deterministic, which means that some input could result in two or more possible parse-trees, where as PEG is deterministic, meaning that any input can only be parsed one way. I remember seeing a presentation by Mark Engelderg introducing InstaParse at either Clojure West or the Conj a few years back. I won't go into all the details of using InstaParse, you can read the details on the github page, but what I can say is that Instaparse along with working in the Clojure REPL allowed me to easily and quickly iterate while developing a "rule" that would parse the logs.


    After about a few hours of tinkering in the REPL with Instaparse, I had a prototype that would parse the Zendesk chats.  For example the following rule:



(insta/parser
    "S ::=  chat-header? chat-line (<line-delimiter> chat-line)*
     <chat-banner> ::= (<'Chat'> <space> <'started'> <[colon]> <[space]> <['on']> <space> <chat-session-start-date> <space> <'UTC'> <line-delimiter>+ )
     <lparen> ::= <'('>
     <rparen> ::= <')'>
     <colon> ::= ':'
     <line-delimiter> ::= #'\\u0001'
     <chat-line-delimiter> ::= #'\\(\\d{2}:\\d{2}:\\d{2}\\s(AM|PM)\\)'
     <new-line> ::= '\n' | '\r\n'
     <space>  ::= #'\\s+'
     <number> ::= #'[0-9]+'
     <dash> ::= '-'
     created-at-date ::= date
     chat-session-start-date ::= date <space> <hour-and-min> <space> <am-or-pm>
     hour-and-min ::= number colon number
     hour-min-sec ::= number colon number colon number
     chat-line-timestamp ::=  lparen number colon number colon number space #'AM|PM' rparen
     date ::= number dash number dash number
     line ::= #'.*?(?=\\u0001\\(\\d{2}:\\d{2}:\\d{2}\\s(AM|PM)\\))' / #'.*'
     join-or-left-chat ::= #'(joined|left)'
     is-now ::= 'is now known as'
     am-or-pm ::= #'(AM|PM)'
     <chat-header> ::= chat-banner
     chat-line ::=  chat-line-timestamp <space> chatter line?
     a-chatter ::= #'[\\p{L}-\\'\\.\\@\\(\\)\\+[0-9]]+'
     chatter ::= <'*** '> a-chatter (<space?> a-chatter?)* <join-or-left-chat> <space> <'the chat ***'> <new-line>? / a-chatter (<space?> a-chatter?)* <is-now> <space> <a-chatter?>* / a-chatter (<space?> a-chatter?)* <colon> <space>
     ")

would parse each row of text into an Array Map  of chats:


{:tag :chat-line,
 :content ({:tag :chat-line-timestamp, :content ("12" ":" "05" ":" "35" " " "AM")}
            {:tag :chatter,
             :content ({:tag :a-chatter, :content ("Bob")} {:tag :a-chatter, :content ("Jones")})}
            {:tag :line, :content ("want to basically see if a student is active between those dates")})}

    From here, a couple more transformations were done to add some additional information and finally format the output in JSON.  


(defn process-chat-line [l]
  (let [v (clojure.string/split l #"\u0003")
        [ticket-id created-at chat-line] v
        chats (-> (cfg/parse-chat chat-line)
                  (cfg/xform-chat-line-timestamp)
                  (cfg/xform-chatter)
                  (cfg/xform-chat-line)
                  (cfg/xform-chat-lines)
                  :content)

        xformed-chats (-> (cfg/add-ticket-id-and-chat-date ticket-id created-at chats)
                          (cfg/filter-empty-chats)
                          (cfg/replace-characters-in-chat-lines)
                          (cfg/created-at))]
    xformed-chats))

    So in the end, we had newline JSON files that we load into Redshift and Big Query.  Example:



{"chat-date":"51877","chat-line-timestamp":"06:03:06 PM","chatter":"Bob Jones","chat-line":"Opening up the look...","created-at":"2016-08-29 06:03:06 PM"}
{"chat-date":"2016-08-29","ticket-id":"51877","chat-line-timestamp":"06:03:09 PM","chatter":"Susan Smith","chat-line":"ok","created-at":"2016-08-29 06:03:09 PM"}





  

From here, we give it a little Looker Luv and we're in business. For example, so you want to know the average wait time for one of our top notch analyst to jump on chat?





Or who was the most active analyst in the last week?





So there you have it, Bob's your uncle !



Monday, June 27, 2016

Migrating OpenTSDB to a different HBase cluster




For the past few months, we've been running OpenTSDB here at Looker to collect all sorts system level metrics along with JMX metrics on our test/development Hadoop clusters.  We run hundreds of tests on hourly intervals across Spark, Presto and Hive and having performance telemetry data that we can review is extremely valuable when troubleshooting any sort of performance related issue.  OpenTSBD is a time series database that allows us to store terabytes of performance metrics at millisecond precision.  It uses HBase as the datastore, and can scale to millions of writes per second.  

Initially when I first setup our HBase cluster, I didn't configure the cluster nodes to use AWS IAM Roles.  As our infrastructure evolves, we're moving more and more to IAM roles.  To the best of my knowledge, you're not able to attach/assign instance roles after an instance is running. As a result I'll need to bring up a new Hbase cluster that has the IAM Roles assigned upon instance creation.  Once I had all the various Ansible roles built, moving the HBase/OpenTSDB data over to a new cluster was relatively straightforward and easy.  In this post, I'll go over the steps I took to snapshot and copy over existing OpenTSDB tables from one cluster to another.  The basic steps involved in this migration are as follows:


  1. Bring up a new HBase cluster.  For this I wrote two different Ansible play books with a number of roles.  This was the most time consuming part since there's a number of Hadoop XML config files that come into play across HDFS, Yarn, and HBase, along with Zookeeper configuration. The first play book was the boot-cluster.yml, and the second was config-cluster.yml


(looker)[ec2-user@ip-10-x-x-x hadoop-hbase]$ ansible-playbook boot-cluster.yml --extra-vars="cluster_id=2"                                                                                                  
 [WARNING]: provided hosts list is empty, only localhost is available

PLAY ***************************************************************************

TASK [provision master node] ***************************************************
changed: [localhost]

TASK [add master to host group] ************************************************
changed: [localhost] 

TASK [wait for SSH to come up on master] ***************************************
ok: [localhost]

TASK [provision slave nodes] ***************************************************
changed: [localhost]

TASK [add slaves to host group] ************************************************
changed: [localhost]

  Once the nodes were/are running, I now install/configure all the necessary software; i.e HDFS, HBase, Yarn, Zookeeper, etc...these were broken into a number of roles with tags which allow for quickly changing/testing configuration across the cluster.

(looker)[ec2-user@ip-10-x-x-x hadoop-hbase]$ ansible-playbook -i /mnt/looker/inventory/ec2.py config-cluster.yml --extra-vars="cluster_id=2"                                                            

PLAY ***************************************************************************

TASK [setup] *******************************************************************
ok: [localhost]

TASK [get cluster master] ******************************************************
changed: [localhost]

TASK [get cluster slaves] ******************************************************
changed: [localhost]

TASK [set_fact] ****************************************************************
ok: [localhost]

TASK [set_fact] ****************************************************************
ok: [localhost]

.....
.....
.....

TASK [mapred-site-config : check mapred-site.xml diff] *************************
changed: [10.x.x.x]
changed: [10.x.x.x]
changed: [10.x.x.x]

TASK [mapred-site-config : overwrite with merged mapred-site.xml] **************
changed: [10.x.x.x]
changed: [10.x.x.x]
changed: [10.x.x.x]

PLAY RECAP *********************************************************************
10.x.x.x               : ok=9    changed=5    unreachable=0    failed=0   
10.x.x.x               : ok=9    changed=5    unreachable=0    failed=0   
10.x.x.x               : ok=9    changed=5    unreachable=0    failed=0   
localhost              : ok=7    changed=2    unreachable=0    failed=0   

 
2. Next we need to shutdown collectd which is acting as the TCollector on each of the nodes. This will stop all the metric data from being sent to the OpenTSDB daemon(s)

(looker)[ec2-user@ip-10-x-x-x hadoop-hbase]$ ansible -i /mnt/looker/inventory/ec2.py "tag_Name_looker_emr:tag_looker_app_presto" -m shell -a '/etc/init.d/collectd stop' -u ec2-user -b

  3.  With all the collectd/TCollectors stopped, the TSD daemons can be shutdown and reconfigured. We'll need to update the tsd.storage.hbase.zk_quorum in the opentsdb.conf across all the servers running a TSD daemon. This value should point to the zookeeper nodes for the new HBase cluster. In the example below, I'm changing one of the TSD nodes identified by the tsd_node_id=1 to point to the HBase cluster with the cluster id of 2.

(looker)[ec2-user@ip-10-x-x-x opentsdb]$ ansible-playbook -i /mnt/looker/inventory/ec2.py config-opentsdb.yml  --tags get-zookeeper-nodes,opentsdb-site-config --extra-vars="cluster_id=2 tsd_node_id=1"

PLAY ***************************************************************************

TASK [setup] *******************************************************************
ok: [localhost]

TASK [get cluster 2 zookeeper nodes] *******************************************
changed: [localhost]

TASK [set_fact] ****************************************************************
ok: [localhost]

PLAY ***************************************************************************

TASK [setup] *******************************************************************
ok: [10.x.x.x]

TASK [opentsdb-site-config : copy over opentsdb.conf] **************************
ok: [10.x.x.x]

PLAY RECAP *********************************************************************
10.x.x.x               : ok=2    changed=0    unreachable=0    failed=0   
localhost              : ok=4    changed=1    unreachable=0    failed=0   

  4. Now that nothing is being written to the HBase cluster, we can disable the OpenTSDB tables, which flushes table memstore,  and create the snapshots.

hbase(main):056:0> disable 'tsdb'
0 row(s) in 2.3480 seconds
hbase(main):057:0> disable 'tsdb-meta'
0 row(s) in 2.2400 seconds
hbase(main):058:0> disable 'tsdb-tree'
0 row(s) in 2.2300 seconds
hbase(main):059:0> disable 'tsdb-uid'
0 row(s) in 2.2270 seconds

hbase(main):060:0> snapshot 'tsdb','tsdb-snap'
0 row(s) in 0.1240 seconds
hbase(main):061:0> snapshot 'tsdb-meta','tsdb-meta-snap'
0 row(s) in 0.1190 seconds
hbase(main):062:0> snapshot 'tsdb-tree','tsdb-tree-snap'
0 row(s) in 0.1200 seconds
hbase(main):063:0> snapshot 'tsdb-uid','tsdb-uid-snap'
0 row(s) in 0.1200 seconds

hbase(main):064:0> list_snapshots
SNAPSHOT                                TABLE + CREATION TIME
 tsdb-meta-snap                         tsdb-meta (Mon Jun 27 15:52:59 +0000 2016)
 tsdb-snap                              tsdb (Mon Jun 27 15:52:56 +0000 2016)
 tsdb-tree-snap                         tsdb-tree (Mon Jun 27 15:53:02 +0000 2016)
 tsdb-uid-snap                          tsdb-uid (Mon Jun 27 15:53:04 +0000 2016)
4 row(s) in 0.0240 seconds

=> ["tsdb-meta-snap", "tsdb-snap", "tsdb-tree-snap", "tsdb-uid-snap"]


  5. For each of the tables, I run the ExportSnapshot which creates a MapReduce job which copies the data over from one node to another node. Note that HBase doesn't need to be running since this is a HDFS operation. I'm running this operation on the old HBase HMaster node.

[hadoop@ip-10-x-x-x ~]$ /usr/lib/hbase/bin/hbase org.apache.hadoop.hbase.snapshot.ExportSnapshot -snapshot 'tsdb-snap' -copy-to hdfs://10.x.x.x:8020/hbase -mappers 5 -overwrite
2016-06-27 15:54:19,778 INFO  [main] mapreduce.Job: Job job_1466784934481_0003 running in uber mode : false
2016-06-27 15:54:19,780 INFO  [main] mapreduce.Job:  map 0% reduce 0%
2016-06-27 15:54:29,081 INFO  [main] mapreduce.Job:  map 50% reduce 0%
2016-06-27 15:54:30,090 INFO  [main] mapreduce.Job:  map 100% reduce 0%
2016-06-27 15:54:31,107 INFO  [main] mapreduce.Job: Job job_1466784934481_0003 completed successfully
2016-06-27 15:54:31,260 INFO  [main] snapshot.ExportSnapshot: Finalize the Snapshot Export
2016-06-27 15:54:31,269 INFO  [main] snapshot.ExportSnapshot: Verify snapshot integrity
2016-06-27 15:54:31,307 INFO  [main] snapshot.ExportSnapshot: Export Completed: tsdb-snap

Do the same thing for the tsdb-meta, tsdb-tree, and tsdb-uid tables...


  6. Once the snapshots are created and copied over to the new HBase cluster, check to make sure the snapshots are available and restore them.

hbase(main):005:0> list_snapshots
SNAPSHOT                               TABLE + CREATION TIME
 tsdb-meta-snap                        tsdb-meta (Mon Jun 27 15:52:59 +0000 2016)
 tsdb-snap                             tsdb (Mon Jun 27 15:52:56 +0000 2016)
 tsdb-tree-snap                        tsdb-tree (Mon Jun 27 15:53:02 +0000 2016)
 tsdb-uid-snap                         tsdb-uid (Mon Jun 27 15:53:04 +0000 2016)
4 row(s) in 0.0470 seconds

=> ["tsdb-meta-snap", "tsdb-snap", "tsdb-tree-snap", "tsdb-uid-snap"]
hbase(main):006:0> restore_snapshot "tsdb-snap"
0 row(s) in 0.8840 seconds

hbase(main):007:0> restore_snapshot "tsdb-meta-snap"
0 row(s) in 0.4170 seconds

hbase(main):008:0> restore_snapshot "tsdb-tree-snap"
0 row(s) in 0.3910 seconds

hbase(main):009:0> restore_snapshot "tsdb-uid-snap"
0 row(s) in 0.7370 seconds

hbase(main):010:0>

  You can export/copy the snapshots to s3 using:

[hadoop@ip-10-x-x-x~]$ /usr/lib/hbase/bin/hbase org.apache.hadoop.hbase.snapshot.ExportSnapshot -snapshot 'tsdb-snap' -copy-to s3a://<my-bucket>/hbase/ -mappers 5 -overwrite
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/hbase-1.1.5/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop-2.7.2/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2016-06-27 20:11:13,298 INFO  [main] snapshot.ExportSnapshot: Copy Snapshot Manifest
2016-06-27 20:11:14,682 WARN  [main] mapreduce.TableMapReduceUtil: The hbase-prefix-tree module jar containing PrefixTreeCodec is not present.  Continuing without it.
2016-06-27 20:11:15,245 INFO  [main] impl.TimelineClientImpl: Timeline service address: http://10.x.x.x:8188/ws/v1/timeline/
2016-06-27 20:11:15,444 INFO  [main] client.RMProxy: Connecting to ResourceManager at /10.x.x.x:8032
2016-06-27 20:11:15,553 INFO  [main] client.AHSProxy: Connecting to Application History server at /10.x.x.x:10200
2016-06-27 20:11:16,669 INFO  [main] snapshot.ExportSnapshot: Loading Snapshot 'tsdb-06272016' hfile list
2016-06-27 20:11:17,037 INFO  [main] mapreduce.JobSubmitter: number of splits:2
2016-06-27 20:11:17,188 INFO  [main] mapreduce.JobSubmitter: Submitting tokens for job: job_1467039665192_0004
2016-06-27 20:11:17,518 INFO  [main] impl.YarnClientImpl: Submitted application application_1467039665192_0004
2016-06-27 20:11:17,573 INFO  [main] mapreduce.Job: The url to track the job: http://ip-10-x-x-x:8088/proxy/application_1467039665192_0004/
2016-06-27 20:11:17,573 INFO  [main] mapreduce.Job: Running job: job_1467039665192_0004
2016-06-27 20:11:26,808 INFO  [main] mapreduce.Job: Job job_1467039665192_0004 running in uber mode : false
2016-06-27 20:11:26,810 INFO  [main] mapreduce.Job:  map 0% reduce 0%
2016-06-27 20:11:44,955 INFO  [main] mapreduce.Job:  map 50% reduce 0%
...
...
...
2016-06-27 20:12:00,186 INFO  [main] snapshot.ExportSnapshot: Finalize the Snapshot Export
2016-06-27 20:12:00,993 INFO  [main] snapshot.ExportSnapshot: Verify snapshot integrity
2016-06-27 20:12:02,501 INFO  [main] snapshot.ExportSnapshot: Export Completed: tsdb-snap

  7. Once the tables are restored, we can restart TSD daemon(s) and the collectd/TCollectors on the various nodes.


Without a deployment/orchestration tool like Ansible, complex tasks like the one above would be very cumbersome and error prone.  You might as well jam pencils in you eyes, it would feel better than doing this sort of stuff manually. 

So there you have it, Bob's your uncle !



Thursday, February 11, 2016

More fun with Clojure and Specter!


I was approached with a fun data wrangling exercise the other day by our Data Science folks.  They wanted to know if we could parse and extract a certain set of fields from a query string within our raw clickstream request logs. Of course the answer was yes.  The logs are written to Kafka and persisted into S3 in JSON format.  Later these logs are converted in batch with Spark into Parquet format to allow for faster querying with technologies like Hive and Presto.  We get around 10-15M such events a day.  Below is a sample event:
{
    "sessionId": "1455128662234:6yx5i731",
    "sessionOrdinal": 1,
    "protocol": "HTTP/1.1",
    "queryString": "ext=FB_LP_DPA_Registrations_Courses_quilting-5001&maid=88152&utm_source=Facebook&utm_medium=Social%2520Performance&fbc_id=Quilting%7CPaid_Class%7CActivity%7CCPC&fbads_id=Online+Classes+%3E+Quilting+US%2CGB%2CAU%2CNZ%2CCA%2CIE+18-65+Mobilefeed%2C+Desktopfeed+Link+clicks+Link+clicks+Cu&fbbid_type&fbg_name=male%2Cfemale&fbd_id=desktop%2Candroid_phone%2Candroid_tablet%2Ciphone%2Cipad&fbage_name=18-65&fbcountry_name=US%2CGB%2CAU%2CNZ%2CCA%2CIE",
    "remoteUser": null,
    "requestUri""/class/the-ultimate-t-shirt-quilt/5001",
    "serverName": "www.cool-server.com",
    "serverPort": 80,
    "ajaxRequest": false,
    "ajaxRequestType": null,
    "messageDate": 1455128662236
}







Since the raw logs have no additional ETL performed on them before the conversion to Parquet takes place, and doing crazy regex type stuff tends to cause people to pull there hair out, jam pencils into their eyes, and throw things, I knew that we would need to do some additional transformation on the data. But hey, as the saying goes, 80% of Data Science is data wrangling and getting it in a form that is conducive to running the required analysis.




The desired outcome is to have the queryString represented as a single row/element per key.  From here, these events can be materialized into Presto allowing for easier querying and analysis.  For example, if we are interested in the fbads_id, fbg_name, and fbcountry_name query string keys, the above event would end up being turned into the following three events:


{
  "session_id": "1455128662234:6yx5i731",
  "request_uri": "/class/the-ultimate-t-shirt-quilt/5001",
  "message_date": 1455128662236,
  "fbads_id": "Online Classes > Quilting US,GB,AU,NZ,CA,IE 18-65 Mobilefeed, Desktopfeed Link clicks Link clicks Cu"
}

{
  "session_id": "1455128662234:6yx5i731",
  "request_uri": "/class/the-ultimate-t-shirt-quilt/5001",
  "message_date": 1455128662236,
  "fbg_name": "male,female"
}

{
  "session_id": "1455128662234:6yx5i731",
  "request_uri": "/class/the-ultimate-t-shirt-quilt/5001",
  "message_date": 1455128662236,
  "fbcountry_name": "US,GB,AU,NZ,CA,IE"
}







It's not a complicated problem/task at all. Essentially the steps are as follows:
  • Parse the query string
  • Mapcat/Flatmap over the query string map
  • Filter out anything we're not interested in
  • Format and output these results
Since we need to do a series of lookups and transformations, I thought this would be a great opportunity to learn more about, and use a Clojure library that caught my eye called Specter.  It's written by Nathan Marz, the same guy that wrote Storm and Cascalog. The first time I read about it on github, I was in love.  And hey, learning more coolio Clojure stuff is always on my  radar! As it turns out, the solution boiled down to just a few functions.  I'll spare you from the   details and briefly cover the one that processes the :queryString.

(defn extract-query-string [parameter-logs]
  (->> parameter-logs
    (transform [ALL :queryString] (fn [x]
                                     ( -> x
                                          cemerick.url/query->map
                                          clojure.walk/keywordize-keys
                                          (select-keys parameter-log-types)))
    (map #(select-keys % [:requestUri :SessionId :messageDate :queryString]))
    (filter #(seq (:queryString %)))
    (transform-keys ->snake_case)))

This function takes a sequence of parameter log records (the parsing into a Clojure data structure   is handled by Cheshire), and then we call Specter function transform on the sequence. The "ALL" is called a selector which navigates to every map in the sequence, and for each map, the :queryString navigates to that key within every map. Once we're at the :queryString, we run it thru the thread macro where it's parsed and keys of interest are selected. The call to (select-keys ...) does a JDBC query to Redshift to get the keys we're interested in at runtime.  I'm pretty sure that we can accomplish the last three fn calls within the transform, but this was a first stab and it seems to work fine.  From here it's just the domino effect in play.  The end result looks something like this (from a different dataset):


/patterns/knitting,1449359995890:xn4qha9i,1449359999886,adwad_id,45472460017
/patterns/knitting,1449359995890:xn4qha9i,1449359999886,adwadgr_id,8538149617
/patterns/knitting,1449359995890:xn4qha9i,1449359999886,adwd_id,t
/patterns/knitting,1449359995890:xn4qha9i,1449359999886,adwmtch_id,b


Nathan's post does an excellent job of explaining in more detail the why and how of Specter. It's a great read and I recommend it.



So there you have it, Bob's your uncle !


Keeping in-sync with Salesforce

Keeping two or more data stores in-sync is always a fun task. With the help of Apache NiFi and some decently tooling, I've been able to...