Monday, October 26, 2015

Presto


I've been using Presto more frequently for a number of quick ad hoc reports against various event sources in S3. It's fast, easy to deploy and did I mention very fast? So what is Presto? Presto is "open source distributed SQL query engine for running interactive analytic queries against data sources of all sizes ranging from gigabytes to petabytes." and is open sourced by Facebook. Presto allows you to query the data where it resides via a number of connectors For us, this is primarily data stored in S3 and Redshift. 


Getting Presto running on Elastic MapReduce (EMR) is pretty straight forward. Presto is a very active project and the release cycle can be very quick. Although EMR currently supports Presto 0.119 on EMR 4.1, I like to have the ability to try out the latest release as soon as it comes out.   For this reason, Ansible is perfect fit for installing and configuring Presto along with any other configuration management and orchestration tasks.


Since we run both Spark and Hive, I opt for using Amazon's EMR service with spot pricing. This allows us to quickly spin up, resize and shut down clusters in a very cost effective way. Once we're done with our computing, we shutdown the cluster and only pay for what we use, while at the same are able to persist the processed data in S3. The first thing to do is to get a cluster running.  This can be easily accomplished via the Amazon Command Line Interface  tool. Below is an example of provisioning a EMR cluster with Spark and Hive running


aws emr create-cluster 
--release-label emr-4.1.0 
--applications Name=Spark Name=Hive 
--name "(Spark 1.5.0 + Hive + Presto) $( date '+%Y%m%d%H%M' )" 
--service-role EMR_DefaultRole 
--tags Name=spark-cluster enviro=datascience app="Data Science" 
--ec2-attributes KeyName="<my-key>",
InstanceProfile=EMR_EC2_DefaultRole,
SubnetId="subnet-ABC" 
--instance-groups file://./large-instance-setup.json.SPOT 
--log-uri 's3://<my-bucket>/emr/logs/' 
--no-auto-terminate --visible-to-all-users

This will return a job-id that can be used to further monitor and interact with the cluster and once we're done, shut the cluster down. When the cluster is running and in the "WAITING" state, we can run the Ansible Playbook to do the necessary configuration and setup. I've omitted a number of the PLAY output steps for brevity sake,  but essentially the it downloads, installs and configures Presto along with the various connectors and JVM settings.

ansible-playbook -i e configure-emr-cluster-presto.yml 

PLAY [Configure EMR/Presto cluster...] **************************************** 

TASK: [Gather EC2 facts] ****************************************************** 
ok: [10.88.2.212]
ok: [10.88.2.40]
ok: [10.88.2.89]
ok: [10.88.2.244]
ok: [10.88.2.191]

TASK: [Download Presto Installer script] ************************************** 
changed: [10.88.2.212]
changed: [10.88.2.40]
changed: [10.88.2.191]
changed: [10.88.2.89]
changed: [10.88.2.244]

TASK: [file path=~hadoop/install-presto-emr-4.0 state=file mode=0755] ********* 
changed: [10.88.2.191]
changed: [10.88.2.89]
changed: [10.88.2.40]
changed: [10.88.2.244]
changed: [10.88.2.212]

TASK: [Install Presto Server] ************************************************* 
changed: [10.88.2.89]
changed: [10.88.2.212]
changed: [10.88.2.244]
changed: [10.88.2.191]
changed: [10.88.2.40]
....
....
....
PLAY RECAP ******************************************************************** 
10.88.2.191                : ok=12   changed=11   unreachable=0    failed=0   
10.88.2.212                : ok=15   changed=14   unreachable=0    failed=0   
10.88.2.244                : ok=12   changed=11   unreachable=0    failed=0   
10.88.2.40                 : ok=12   changed=11   unreachable=0    failed=0   
10.88.2.89                 : ok=12   changed=11   unreachable=0    failed=0  


Now we can start running queries! Presto has a command line client that you can use to run queries, or via a JDBC driver, or even via REST calls. The JDBC driver is just a thin wrapper over the REST interface, which allows for one to query Presto from virtually any language that supports HTTP calls For example, using curl, if I wanted to get the top 5 products that were sold on a given day, I can easily run the following curl command to submit the query to Presto for execution.



curl -v -H "Content-Type: text/plain" \
> -H "X-Presto-User: hadoop" \
> -H "X-Presto-Catalog: hive" \
> -H "X-Presto-Schema: default" \
> -X POST \
> -d "select item_id,
>            count(*),
>            dt from conversion_event 
>            where event_type='PAID' 
>            and item_type IN ('PRODUCT_FOR_SALE') 
>            and dt='2015-10-20' 
>            group by item_id,dt
>            order by count(*) desc limit 5" \
> http://10.88.2.115:8080/v1/statement | jq '.nextUri'
*   Trying 10.88.2.115...
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0* Connected to 10.88.2.115 (10.88.2.115) port 8080 (#0)
> POST /v1/statement HTTP/1.1
> Host: 10.88.2.115:8080
> User-Agent: curl/7.43.0
> Accept: */*
> Content-Type: text/plain
> X-Presto-User: hadoop
> X-Presto-Catalog: hive
> X-Presto-Schema: default
> Content-Length: 311
> 
} [311 bytes data]
* upload completely sent off: 311 out of 311 bytes
< HTTP/1.1 200 OK
< Date: Mon, 26 Oct 2015 14:18:44 GMT
< Content-Type: application/json
< X-Content-Type-Options: nosniff
< Content-Length: 408
< 
{ [408 bytes data]
100   719  100   408  100   311   1418   1081 --:--:-- --:--:-- --:--:--  1421
* Connection #0 to host 10.88.2.115 left intact
"http://10.88.2.115:8080/v1/statement/20151026_141844_00013_amvv6/1"

The response contains a query identifier which can be used to gather detailed information about the query. I'm using jq  to extract out the nextUri query identifier . This URI can be called via a GET operation to the /v1/statement/{query-id}/{token} which will provide a status update for the query in progress or it can deliver the final results to the client.



curl -v -H "X-Presto-User: hadoop" -H "X-Presto-Catalog: hive" -X GET http://10.88.2.115:8080/v1/statement/20151026_141844_00013_amvv6/1 | jq -c '.data'
*   Trying 10.88.2.115...
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0* Connected to 10.88.2.115 (10.88.2.115) port 8080 (#0)
> GET /v1/statement/20151026_141844_00013_amvv6/1 HTTP/1.1
> Host: 10.88.2.115:8080
> User-Agent: curl/7.43.0
> Accept: */*
> X-Presto-User: hadoop
> X-Presto-Catalog: hive
> 
< HTTP/1.1 200 OK
< Date: Mon, 26 Oct 2015 16:26:47 GMT
< Content-Type: application/json
< X-Content-Type-Options: nosniff
< Vary: Accept-Encoding, User-Agent
< Content-Length: 1896
< 
{ [1170 bytes data]
100  1896  100  1896    0     0  10018      0 --:--:-- --:--:-- --:--:--  9978
* Connection #0 to host 10.88.2.115 left intact
[[5441,155,"2015-10-20"],[7458,154,"2015-10-20"],[5527,144,"2015-10-20"],[5393,141,"2015-10-20"],[5762,135,"2015-10-20"]]

You can also navigate to the master node web UI to check on the status of submitted and running jobs.

As I mentioned earlier, Presto has a number of connectors that can be used to query the data where it resides.  This eliminates the need to take processed data out of say S3 and load it into Redshift so one can run SQL queries against it.  For example, the query below joins one of our "larger" event data sources stored in S3 to "smaller" metadata tables that are stored in Redshift. This time I'm using the presto-cli, but you can do the same thing via the REST endpoint


presto:default> WITH
c1 as (SELECT session_id,
user_id,pre_user_id,
COALESCE(NULLIF(user_id,0),pre_user_id) as user_id,
external_link_id,
percent_rank() OVER (PARTITION BY session_id ORDER BY message_date,session_step) as r,device_type,dt
FROM clickstream WHERE dt = '2015-10-25')
,c2 AS (SELECT DISTINCT session_id,user_id,external_link_id,dt FROM c1 WHERE r=1.0)
,t1 as (SELECT a.session_id,a.calculated_user_id,a.external_link_id,d.summary as summary,a.dt FROM c2 as a
LEFT JOIN redshift.public.external_link as b ON b.external_link_id = a.external_link_id
LEFT JOIN redshift.public.marketing_action as c ON c.marketing_action_id = b.marketing_action_id
LEFT JOIN redshift.public.marketing_channel as d ON d.marketing_channel_id = c.marketing_channel_id)
SELECT summary,count(*) as cnt,dt FROM t1 GROUP BY summary,dt order by 1,3;

    summary     |   cnt   |     dt     
----------------+---------+------------
 Organic        | 1337036 | 2015-10-25 
 Internal Email |   76592 | 2015-10-25 
 Paid Search    |   49842 | 2015-10-25 
(3 rows)

Query 20151027_051606_00008_m3y74, FINISHED, 4 nodes
Splits: 86 total, 86 done (100.00%)

0:10 [15.7M rows, 652MB] [1.56M rows/s, 64.9MB/s]

Go ahead and kick the tires and have some fun with Presto.  It's incredibly fast

So there you have it, Bob's your uncle !




Wednesday, October 21, 2015

Simple stuff made easy...



My last post ended with the result set providing the initially needed information in a nicely formatted map, broken down by day and the triggering-event-reco-source



{:event-date "2015-10-07", 
:triggering-event-reco-source "algorithm-30", 
:estimated-reco-sales-channel-total "8089.56", 
:estimated-daily-sales-total "97789.76", 
:contribution-rate "8.27"}

So the next logical question was: Is the product a online video, downloadable material, or physically shipped good? Since we only had the product-id in the event stream, the answer to these questions would require the joining of the event data against metadata that we store in Redshift. This would allow us to "classify/categorize" the event, and provide the answer to our burning questions.  The metadata we needed was contained in three separate tables in Redshift. The approach taken was to merge all these tables into a single RDD at the start of the Spark program. Since we're iteratively processing multiple days, it made sense to cache the data in memory across the cluster. This would allow for quicker joining of the data within Spark and would only require us to read the data once from Redshift.


The first step was to get the data out of Redshift. With really large datasets we would want to bulk unload the data from Redshift into S3, and then source it into Spark via a RDD. Since what we need is "smaller" metadata, getting it out of Redshift turned out to be rather straightforward with the use of the low-level Clojure wrapper for JDBC. The Clojure JDBC library allows you to control how the ResultSet is transformed and returned by specifying the :result-set-fn within your query specification. The example below will apply the metadata-tuples function to the entire result set (a lazy sequence) before it is returned:

(sql/query db-spec ["SELECT <QUERY TO GET REQUIRED METADATA>"] :result-set-fn metadata-tuples)

The metadata-tuples function creates a list of tuples where the key (K) of each is a concatenation of the  product-type and the product-id. The need to create an artificial composite key is primarily due to the data being split across three different tables and we might have overlapping identifiers. The value (V) of each tuple is a map of the sales-channel, where the :sales-channel is either a physically shipped good, online video, or downloadable material:

(defn metadata-tuples [rs]
  (reduce (fn [tuples t]
    (into tuples [(ft/tuple (str (:product-type t) "_" (:product-id t))
                             {:sales-channel (:sales-channel t)})]))
   ()
  rs))

Once we have the three result sets, they are merged into one via Clojure's into function and then turned into a RDD of (K,V) pairs and cached in Spark: 

(let [products (->> products-for-sale
                    (f/parallelize-pairs x)
                    (f/cache))]
     ...
     ...)

The "processing code" above was omitted for clarity's sake. Once the RDD of metadata was cached in the cluster, we're able to join it with the processed event data via a left outer join:

(f/flat-map-to-pair (f/fn [x]
  (let [[_ v] (f/untuple x)
        e (into [] (remove #(empty? (:recommendation-nodes %))
                     (traverse-session-events v)))]
    (map (fn [y] (ft/tuple (str (:cart-item-type y) "_" (:cart-id y))(identity y))) e))))

(f/left-outer-join products)

and with a little more processing and formatting, we now we have all the required information:



{:event-date "2015-10-07", 
:triggering-event-reco-source "algorithm-30", 
:sales-channel "VIDEO",
:estimated-reco-sales-channel-total "123.99", 
:estimated-daily-sales-total "97789.76", 
:contribution-rate "0.12"}


One point of clarification with the above example, the records represent one grouping of products by recommendation source and sale-channel. We have multiple sales records for algorithm-30 that are categorized as the "VIDEO" sales-channels. The estimated-daily-sales is across all sales for that given day, and there's a reason for using "estimated", but that's for another day.....Having multiple result sets might clarify things a bit.


{:event-date "2015-10-12",
:triggering-event-reco-source "algorithm-30",
:sales-channel "VIDEO",
:estimated-reco-sales-channel-total "1349.46",
:estimated-sales-total "165201.41",
:contribution-rate "0.82"}

{:event-date "2015-10-12",
:triggering-event-reco-source "algorithm-32",
:sales-channel "PRODUCT",
:estimated-reco-sales-channel-total "942.09",
:estimated-sales-total "165201.41",
:contribution-rate "0.57"}

{:event-date "2015-10-12",
:triggering-event-reco-source "algorithm-33",
:sales-channel "DOWNLOADABLE_MATERIAL",
:estimated-reco-sales-channel-total "65.51",
:estimated-sales-total "165201.41",
:contribution-rate "0.04"}

Once again, the turnaround to provide the additional information was only a day. We were able to materialize this data rather quickly into an external Hive table and layer some quick visualizations on-top with Looker. With Clojure and REPL driven development aiding in agility, you can iterate fast, even with Big Data.  

So there you have it, Bob's your uncle!


Thursday, October 15, 2015

Ad hoc data analysis/discovery of event data...



Recently an opportunity at work arose to run some ad hoc data analysis/discovery on our event data. The required analysis was a bit more complicated than what was provided by our current reporting capabilities. This complication was primarily due to both the quality and the layout/structure of the existing data. We wanted to understand how much of the revenue was coming from an internal (machine learning supported) recommendation service, for given products over various time boundaries. From these sales, we wanted to further understand which recommendation algorithm was used to drive the purchase. Measuring the performance of these algorithms would allow us to fine tune them in the future and have the ability to ascertain whether or not those changes made a positive or negative impact on the overall revenue.

It was determined early on that trying to do this in plain SQL wasn't a great fit, and having this data stored in Amazon Redshift constrained the approach to just SQL. Luckily, I had been pulling the same event data off Kafka , converting it to Parquetand storing in S3 for some time. For the older data that wasn’t currently in S3, I was able to bulk extract the data from Redshift and process it in Spark to convert it into the required Parquet format. Having this data stored in S3 opened up the door for alternative compute engines, and Parquet provided a very performant columnar data storage format that can be processed by multiple Big Data engines. These included, but were not limited to Hive, Presto, Spark, and Hadoop. Separating the compute engine from the storage engine offers a number of advantages when it comes to scaling, compute flexibility, and overall cost of data processing.


After reviewing the data in greater detail, it appeared to me that this fit nicely into a recursive graph oriented problem. Users may purchase multiple items within a given session, and each of these purchases may or may not be driven by a recommendation. All of these events are linked together via the event-id and parent-event-id. Below is an example of what that dataset looks like. It is a small subset of the user session data, and is used for illustration purposes. User sessions may be one or even thousands of interactions with the system, each generating a unique event.


step-id - event-id,parent-event-id,item-id,page,action,price,recommendation-algorithm
1 - A,,7234,SELL_PAGE,DISPLAY,19.99,NONE
2 - B,A,5431,RECO_SELL_PAGE,DISPLAY,14.99,algorithm-99
3 - C,B,5431,SELL_PAGE,DISPLAY,14.99,algorithm-99
4 - D,C,5431,SELL_PAGE,ADD_TO_CART,14.99,NONE
5 - E,D,5431,CART,PAID,14.99,

I've replaced the actual event-id and referring-event-id with easier-to-follow values. Also a step-id is added for easier reference. In this session subset, a user was presented with the product 7234 for $19.99 initially (step-id 1), but we see that they actually end up purchasing an item that was presented to them via the recommendation panel (step-id 2) on the website (step-id 5). We can also see that the recommendation-algorithm is dropped from the event data back at step 3. This could represent a bug in the current system and/or a requirement that has not yet been implemented, or just an oversight. In any case, you need to work with the data that’s given to you, and 80% of the Data Science work lies in Data Wrangling and getting the data ready for further analysis.


All the events are linked, where the event-id may have a parent-event-id. Getting the information we need can easily be done using a graph, traversing the graph to extract and keep the nodes of interest. Using Clojure and the graph library Loom turned out to be a winning approach. Clojure is just awesome for working with data interactively and runs on the JVM, so leveraging existing libraries written in Java/Scala is doable. Also, porting the standalone app code to run on Apache Spark to run at scale on much larger datasets turned out to be rather straightforward using the Flambo DSL. Additionally the Clojure REPL allows one to easily interact with the data and iterate fast.


The first thing was to get the base data of interest out of Parquet and into Clojure.  This was pretty straightforward using a combination of the Spark DataFrame API and resilient distributed datasets (RDD)Converting a DataFrame to a JavaRDD is simple and allowed me wrangle the data into more Clojure friendly data structure. Being able to mix the usage of SQL and the richer data structures provided via a RDD is a huge win and provides greater flexibility to shape and work with the data.

(sql/with-sql-context c conf
           (let [df (sql/load c s3n://bucket-to-data/dt=2015-10-07/ "org.apache.spark.sql.parquet")
      _  (do
          (sql/register-data-frame-as-table c df "events")
          (sql/register-data-frame-as-table c (sql/sql c 
           "SELECT
             session_id,
             event_id,
             parent_event_id,
             location,
             product_id,
             event_type,
             recommendation_source,
             (quantity*price) as total,
             currency_code,
             dt as event_date
           FROM events
           WHERE location IN (‘SELL_PAGE',’RECO_SELL_PAGE’,'CART')
           AND event_type IN (‘DISPLAY','ADD_TO_CART','PAID')") "event_subset"))]
             (->
               (sql/sql c SELECT session_id,
                                  event_id,
                                  parent_event_id,
                                  location,
                                  product_id,
                                  event_type,
                                  recommendation_source,
                                  total,
                                  currency_code,
                                  event_date
                            FROM event_subset")
             
 (.toJavaRDD)

Now I want to get the data into a Clojure map, which is done via passing the RDD to the map-to-pair function. This allows us to convert the data into a Clojure map by using zipmap to map keys to values.

(f/map-to-pair (f/fn [x] (let [[k & v] (sql/row->vec x)]
                 (ft/tuple k (event-map v)))))


Note that each step we take is a data transformation and returns a new RDD. So in this case we'll have a RDD where the key(K) value is the user's session-id, and the values(V) will be the Clojure map of that event. Since we need to work on the data at the session level, we call the Spark groupByKey function on the previous RDD. This is done via the Flambo convenience function group-by-key.  Note this is an expensive operation since groupByKey must be able to hold all the key-value pairs for any key in memory. If a key has too many values, it can result in out-of-memory errors. But since we are running this across multiple machines and multiple JVM's we stand a pretty good chance of this not happening.

(f/group-by-key :n 200)

Now we have all of the session values together in (K,V) pairs, where the K is the session-id and the V is now a Scala collection of our Clojure maps. Example:

#flambo/tuple [1444244343173:hs9o16ss #object[
scala.collection.convert.Wrappers$IterableWrapper 0x4228e9bd [{:reco-source algorithm-30, 
  :event-id 00ef52b1-59d4-408c-9cad-4ab205afa873, 
  :product-id 7124, 
  :currency-code USD, 
  :parent-event-id a1d90bbb-f66b-4537-94a2-c407552a4d49, 
  :total 49.99, 
  :event-date 2015-10-07, 
  :location SELL_PAGE, 
  :event-type DISPLAY}]]]

Now that we have all the data grouped by session-id, the next step is to use the Loom library to create a graph of the corrected nodes. Once we have the graph of nodes, we can then search them with relative ease. Below you can see that we are unrolling each Scala Tuple via the untuple function.  Since we don’t care about the session-id any longer, we denoted that by specifying the _ .  All the data of interest is in the V of the (K,V) of the tuple.


(f/flat-map-to-pair (f/fn [x]
  (let [[_ v] (f/untuple x)
        e (into [] (remove #(empty? (:recommendation-nodes %))
                   (traverse-session-events v)))]
(map (fn [y] (ft/tuple (str (:cart-item-type y) "_" (:cart-id y))(identity y))) e))))


Within the traverse-session-events, we create the session graph of all the connected events. Once we have this, list comprehension can be used to process each of the PAID event nodes within a session. Each “PAID” node is checked to see if there’s a connected parent event node of interest. These “parent nodes of interest” are events that have a :label matching a defined set. These checks are done in the walk-nodes function, being called in the let binding. If they do not, we just return an empty :recommendation-nodes list which is filtered out.

(defn traverse-session-events
 "given a graph, traverse all the end nodes and check to see if they were triggered by any recommendation driven events"
 [events]
 (let [g (session-graph events)]
   (for [node (paid-cart-events g)
       :let [x (mapv #(attr g node %) [:total :event-date :currency-code :id])
             reco-nodes (walk-nodes g node)
             event-map {:cart-event-id node 
                  :cart-total (nth x 0) 
                  :event-date (nth x 1) 
                  :currency-code (nth x 2) 
                  :cart-id (nth x 3) 
                  :triggering-event-reco-source (:reco-source (first reco-nodes)
                  :recommendation-nodes reco-nodes}]]
    event-map)))

 (defn walk-nodes
 "given an end-node and a graph, traverse the graph to see if the end-node has a valid connected start node."
 [g n]
 (for [node (bf-traverse g n)
       :let [x (mapv #(attr g node %) [:label :reco-source :thing-id])]
       :when (valid-connected-nodes (nth x 0))]
   {:triggering-event-id node :label (nth x 0) :reco-source (nth x 1)}))

Once this step/transformation is complete, we have all the information needed for the first draft of this program. Now we’re able to see the estimated total revenue for that given recommendation source.


{:event-date "2015-10-07", 
:triggering-event-reco-source "algorithm-30", 
:estimated-reco-sales-channel-total "8089.56", 
:estimated-daily-sales-total "97789.76", 
:contribution-rate "8.27"}


Later on we added metadata, extracted from Redshift, to further categorize the data. The overall time to turn this attribution report around was rather quick - in less than a week and a half we had the data we needed.  Separating the compute from storage engines is very powerful! Having the capabilities and facilities to do such quick exploration and data discovery is also very powerful and extremely valuable and cost effective.  


So there you have it, Bob's your uncle !

Keeping in-sync with Salesforce

Keeping two or more data stores in-sync is always a fun task. With the help of Apache NiFi and some decently tooling, I've been able to...