Sunday, November 15, 2015

Parquet file generation with Spark and Clojure

I'm working on an ETL project that requires conversion of existing JSON to Parquet data. Once the data is processed and stored back into S3, we'll be using Hive and Presto to query it via SQL/HiveQL.  Once again, separating storage from compute is the name of the game here... A sample of existing JSON looks something like this:

{"name":"checkout:payment:item:purchase","visitor_id":"123456","created":1447239382,"type":"U","version":1,"details":{"ext":1305221,"listing_id":573845,"quantity":1}}

The interesting part is in the details key. This is a JSON object that can have an arbitrary number of key:value pairs. The current contract with the developers is that it will be flat with no nested objects and/or array of objects in it, but can have any number of different key:value pairs. For instance, an impression event might have the details only containing the ext and gallery_type keys:



{"name":"filteredGallery:gallery:gallery:impression","visitor_id":"487464540","created":1447242806,"type":"U","version":1,"details":{"ext":1206491,"gallery_type":"course"}}


The goal is to have all theses events in the same table, and I prefer to have them in the Parquet format. The approach outline here is one where all the common fields/keys like name, visitor_id, created, type, and version will be promoted to individual columns, and the details column will contain a JSON string of the original content. This way we're able to maintain a unified table while at the same time allowing for arbitrary content to be stored in the details payload.  Using Apache Spark and Clojure for the ETL processing worked out nicely. Spark has built in support for reading JSON records via SQLContext.read.json() into a Dataframe, and once in a Dataframe the content can be saved to Parquet rather easily. In order to achieve the above format, I couldn't rely on automatically inferring of the JSON schema that Spark provided since I needed to do a little data reshaping. Instead the schema would need to be defined at runtime and programmatically create the Dataframe.  This involved the following three steps:


  1. Create an RDD of Rows from the original RDD;
  2. Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
  3. Apply the schema to the RDD of Rows via createDataFrame method provided by SQLContext.

The first thing to do was to create the RDD of Rows in the desired shape. After reading the text file of JSON strings into a RDD, a map operation is ran across each line which performs the following series of transformations:

  1. The JSON string is decoded into a Clojure Map
  2. The Clojure Map is ran thru Prismatic Schema 
  3. The fields that are going to be promoted to columns are selected out of the map 
  4. Encoded the details back to a JSON string
  5. Created the necessary Row from the Clojure vector
  6. Finally apply the schema to the RDD of Rows and create the Dataframe
I've tried to place the above step numbers next the appropriate line of Clojure code below.

(let [java-rdd (.textFile sc input-path)
        row-rdd (-> java-rdd
                  (f/map (f/fn [x]
                       (let [json (json/decode x true) 1.
                               v (-> (fuel-event-coercer json) 2.
                                        (select-vals [:name :visitor_id :created :version :type]) 3.
                                        (conj (json/encode (:details json))))] 4.
                      (RowFactory/create (into-array Object v))))) 5.
                      (.repartition 5))
        df (.applySchema c row-rdd fuel-schema)) 6.

In step 2, Prismatic Schema is used to not only validate the incoming data, but also to coerce a couple of fields to the required data type needed by the Scala StructType 



(def FuelEvent
  "A Fuel event"
  {:name String
   :visitor_id String
   :created long
   :version s/Int
   :type String
   :details s/Any })

(def fuel-event-coercer
  (coerce/coercer FuelEvent {FuelEvent identity s/Int #(int %) long #(long %)}))

In step 3 the new schema is applied to the RDDs of Row objects.  A StructType with StructField is used to defined the necessary schema.



(defonce metadata
         (.build (MetadataBuilder.)))

(defonce fuel-schema
  (-> (StructType.)
      (.add (StructField. "name" DataTypes/StringType false metadata))
      (.add (StructField. "visitor_id" DataTypes/StringType false metadata))
      (.add (StructField. "message_date" DataTypes/LongType false metadata))
      (.add (StructField. "version" DataTypes/IntegerType false metadata))
      (.add (StructField. "type" DataTypes/StringType false metadata))
      (.add (StructField. "details" DataTypes/StringType false metadata))))

Once the schema was applied to the RDD, we now had the required Dataframe. Dataframes can easily be saved as Parquet via the writer interface.



(-> (.write df)
      (.parquet output-path))

After compiling the program to an uberjar, and running it on our Spark cluster, we can pull down one of the output Parquet files and quickly check the schema and sample some of the content.



MBP-200729:~ dan.young$ java -jar parquet-tools-1.6.0.jar head part-r-00000-47de5165-eec1-4101-b43d-ecf1128f7f8c.gz.parquet 
name = checkout:payment:item:purchase
visitor_id = 9416357
message_date = 1447239382
version = 1
type = U
details = {"ext":1305221,"listing_id":1004749,"quantity":1}

name = checkout:payment:item:purchase
visitor_id = 1377646
message_date = 1447240124
version = 1
type = U
details = {"ext":1350243,"listing_id":1005346,"quantity":1}

name = checkout:payment:item:purchase
visitor_id = 8689480
message_date = 1447203872
version = 1
type = U
details = {"ext":1346542,"listing_id":1000099,"quantity":1}

name = checkout:payment:item:purchase
visitor_id = 1738281
message_date = 1447264721
version = 1
type = U
details = {"ext":349788,"listing_id":1004696,"quantity":1}

name = checkout:payment:item:purchase
visitor_id = 7731053
message_date = 1447257543
version = 1
type = U
details = {"ext":1346186,"listing_id":1004812,"quantity":1}

MBP-200729:~ dan.young$ java -jar parquet-tools-1.6.0.jar schema part-r-00000-47de5165-eec1-4101-b43d-ecf1128f7f8c.gz.parquet 
message root {
  optional binary name (UTF8);
  optional binary visitor_id (UTF8);
  optional int64 message_date;
  optional int32 version;
  optional binary type (UTF8);
  optional binary details (UTF8);
}

So far things look good!  After quickly jumping into Hive and creating the necessary external table and register all the partitions, we can start querying the data.


[hadoop@ip-10-88-2-100 ~]$ ./presto-cli --catalog hive --schema default

presto:default> select count(*) from fuel_events;
   _col0   
-----------
 550206316 
(1 row)

Query 20151115_150136_00014_2sdtj, FINISHED, 4 nodes
Splits: 821 total, 821 done (100.00%)
0:02 [550M rows, 5.9GB] [278M rows/s, 2.98GB/s]


presto:default> select name,json_extract_scalar(details,'$.ext'),count(*) as cnt
-> from fuel_events 
-> where dt>='2015-11-01' 
-> and name='filteredGallery:gallery:item:impression' 
-> group by name,json_extract_scalar(details,'$.ext')
-> order by cnt desc 
-> limit 10;
                  name                   |  _col1  |   cnt   
-----------------------------------------+---------+---------
 filteredGallery:gallery:item:impression | 460615  | 5517904 
 filteredGallery:gallery:item:impression | 772464  |  823135 
 filteredGallery:gallery:item:impression | 460597  |  656068 
 filteredGallery:gallery:item:impression | 745003  |  577317 
 filteredGallery:gallery:item:impression | 460614  |  566551 
 filteredGallery:gallery:item:impression | 1348671 |  466633 
 filteredGallery:gallery:item:impression | 1340510 |  385342 
 filteredGallery:gallery:item:impression | 1340493 |  372207 
 filteredGallery:gallery:item:impression | 1343133 |  364170 
 filteredGallery:gallery:item:impression | 1304395 |  253733 
(10 rows)

Query 20151115_150052_00013_2sdtj, FINISHED, 4 nodes
Splits: 60 total, 60 done (100.00%)
0:18 [40.8M rows, 458MB] [2.22M rows/s, 24.9MB/s]


Joining data in Presto against metadata stored in a different datastore is quite easy, in our case Amazon Redshift

presto:default> select d.summary, a.ext,count(*) as cnt from 
-> ( select cast(json_extract_scalar(details,'$.ext') as bigint) as ext
->   from  fuel_events 
->   where dt='2015-11-11'
->   and name='filteredGallery:gallery:item:impression') a
-> LEFT JOIN postgresql.public.external_link as b ON b.external_link_id = a.ext
-> LEFT JOIN postgresql.public.marketing_action as c ON c.marketing_action_id = b.marketing_action_id
-> LEFT JOIN postgresql.public.marketing_channel as d ON d.marketing_channel_id = c.marketing_channel_id
-> group by d.summary, a.ext
-> order by cnt desc
-> limit 10;
    summary     |   ext   |  cnt   
----------------+---------+--------
 Organic        |  460615 | 401889 
 Internal Email |  745003 | 105655 
 Paid Search    |  772464 |  57294 
 Organic        |  460597 |  55419 
 Organic        |  460614 |  42677 
 Retargeting    | 1042328 |  18420 
 External Email |  966708 |  17270 
 Organic        |  413614 |  17127 
 Paid Search    | 1237421 |  16899 
 Display        |  881853 |  14353 
(10 rows)

Query 20151115_154533_00024_2sdtj, FINISHED, 4 nodes
Splits: 37 total, 37 done (100.00%)
0:04 [3.98M rows, 28.7MB] [963K rows/s, 6.94MB/s]

We can also query the data via Hive.

hive> select a.name,b.f1,a.dt,count(*) as cnt from  
> fuel_events a lateral view json_tuple(a.details,'ext','gallery_item_type') b as f1, f2 
> where a.dt='2015-11-10' 
> and a.name='filteredGallery:gallery:item:impression' 
> and b.f2='pfs'
> group by a.name,a.dt,b.f1,b.f2
> order by cnt desc 
> limit 10;
Query ID = hadoop_20151115151717_22a6b8a1-b39e-49ab-9084-22e32b817368
Total jobs = 2
Launching Job 1 out of 2

OK
filteredGallery:gallery:item:impression 460615  2015-11-10      12620
filteredGallery:gallery:item:impression 1348671 2015-11-10      6961
filteredGallery:gallery:item:impression 772464  2015-11-10      4879
filteredGallery:gallery:item:impression 460597  2015-11-10      3022
filteredGallery:gallery:item:impression 460614  2015-11-10      2551
filteredGallery:gallery:item:impression 1160651 2015-11-10      2367
filteredGallery:gallery:item:impression 1348635 2015-11-10      2302
filteredGallery:gallery:item:impression 1348675 2015-11-10      2125
filteredGallery:gallery:item:impression 1348676 2015-11-10      1798
filteredGallery:gallery:item:impression 1348683 2015-11-10      1711
Time taken: 76.551 seconds, Fetched: 10 row(s)
hive> 

I really like the concept and power of separating the storage from the compute.  We can easily provide powerful and fast SQL access to data stored in S3, while at the same time allowing for more interesting analysis that might prove to hard or rather time consuming to do in SQL via something like Spark core and/or Hadoop MapReduce


So there you have it, Bob's your uncle !

Tuesday, November 3, 2015

Spark + Clojure + Snowflake DB = Fun!



Recently I had the pleasure to work on a small project that required converting event data from one of our given sources into a new/different format. This new format would be JSON and would require a little Extraction Transformation and Load (ETL) work on the data before further analysis and storage. Once this data was transformed, we planned on loading it into Snowflake DB and from there, layer Looker on top for some visual analysis and eye candy....

The first thing I needed to do was to convert the data into the new format. The volume of the data was rather small by "Big Data" standards, averaging ~300M events a month. I decided to use a combination of Apache Spark and Clojure to do the "heavy" ETL work. Why these? because Spark is awesome....Clojure is awesome...so awesome + awesome = more awesome. The core event data was already in S3 as Parquet files, and other metadata needed was in Redshift.

The data that we were converting needed to be transformed ten different ways. We had "base/core" key:value pairs in the JSON record that all entries had; i.e. name, visitorId,created,etc... and then each different record type had variable contents in the "details" payload. The details is kept rather simple and doesn't allow for any nested objects. The two records below illustrate these:

{
"name": "checkout:payment:item:purchase",
"visitorId": 4227262,
"created": 1441098500022,
"type": "U",
"version": 1,
"details": {
  "ext": 1236295,
  "listingId": 1004870,
  "quantity": 1}
}
{
"name": "filteredGallery:gallery:item:impression",
"visitorId": 6499643,
"created": 1441073457564,
"type": "U",
"version": 1,
"details": {
  "ext": 1245372,
  "galleryItemType": "product-for-sale-group"}
}


Loading this core dataset into Spark was relatively easy, the event data was loaded into a DataFrame via the sqlContext.read.load method. Flambo has a nice wrapper function around it.

(sql/load c input-path "org.apache.spark.sql.parquet")

Once the event data was loaded in as a DataFrame we register it as a temp table. This would allow us to run SQL queries against it. Here we cache the table if we have more than one event type for a given day that needs to be processed, since we'll be querying the same dataset over and over again.

(let ...
     ...
   _ (do
       (sql/register-temp-table event-df "events")
       (if (> (count (:events options)) 1)
       (sql/sql c "CACHE TABLE events")))]

A number of the events requiring converting need to be joined against metadata tables within Redshift. This was pretty straight forward since Spark provides a nice way to get the data into a DataFrame via JDBC. For instance, the below loads the course table into a DataFrame.

(defn get-course-df [c]
  (let [url (str "CREATE TEMPORARY TABLE course USING org.apache.spark.sql.jdbc 
OPTIONS (url \"jdbc:" db-uri \"", dbtable \"(select course_id,base_product_id 
from course)\")")]
  (sql/sql c url)))

Once we have the events and all the necessary "join tables" loaded into Spark as DataFrames, we can get to work with the queries and transformations. The way I approached this was to have a Clojure map that has all the necessary queries predefined within it. This way I could easily lookup the query that needed to be ran on the DataFrame based off what the event type. Since keywords can act as functions within Clojure, you can simply call the keyword function on the incoming event string, and then use that to look-up the query up in the map. We have two nested loops, one over the days that we will be converting, and the other over the event type that we will be converting. Once we have the query for the given event, we can execute it against the SparkSQL engine and thread the DataFrame through a series of transformations using the thread first macro.


(loop ...
  (let [event (first events-to-process)
       event-query (get-in q/query-map [(keyword event) :query])]
       ...
       ...
       (->
         (sql/sql c event-query)
       ...
       ...)))

Now that we have all the data needed, we must extract and format the final JSON record accordingly. An additional part of information that is stored in the Clojure map that defines all the queries, is the event specific information. These details will be stored in the :details key of the final JSON record. The :details key in the map is a vector of keywords. With the JSON records I mentioned earlier, you can see that the checkout:payment:item:purchase record requires the ext, listingId, quantity while the filteredGallery:gallery:item:impression record requires the ext and galleryItemType.

{:checkout:payment:item:purchase:ecomm 
   {:query "select foo from bar" :details [:ext :listingId :quantity]}}

From here, we can build a simple function that takes the vector with the row data, along with the event type we're working on and build the necessary JSON record.


(defn get-event [event v]
  (let [e (-> (zipmap base-events (subvec v 0 5))
                 (dissoc :sessionOrdinal :sessionId)
                 (assoc :type "U" :version 1))
              detail (zipmap (get-in query-map [event :details]) (subvec v 5))]
  (json/generate-string (merge e {:details detail }))))


Then all we need to do from there is run a couple of map operations over the RDD, and then save the results to a file. ...


...
(f/map (f/fn [x] (sql/row->vec x)))
(f/map (f/fn [x] (q/get-event (keyword event) x)))
...
...

From here we easily loaded the JSON data into Snowflake DB for further querying and analysis.


{
    "name": "checkout:payment:item:purchase",
    "visitorId": 8402509,
    "created": 1433149846291,
    "type": "U",
    "version": 1,
    "details": {
        "ext": 1089212,
        "listingId": 1000471,
        "quantity": 1
    }
}

The next step is to write the data to Kinesis vs a text file, but that's still work in progress.

So there you have it, Bob's your uncle!




Keeping in-sync with Salesforce

Keeping two or more data stores in-sync is always a fun task. With the help of Apache NiFi and some decently tooling, I've been able to...