Sunday, November 15, 2015

Parquet file generation with Spark and Clojure

I'm working on an ETL project that requires conversion of existing JSON to Parquet data. Once the data is processed and stored back into S3, we'll be using Hive and Presto to query it via SQL/HiveQL.  Once again, separating storage from compute is the name of the game here... A sample of existing JSON looks something like this:

{"name":"checkout:payment:item:purchase","visitor_id":"123456","created":1447239382,"type":"U","version":1,"details":{"ext":1305221,"listing_id":573845,"quantity":1}}

The interesting part is in the details key. This is a JSON object that can have an arbitrary number of key:value pairs. The current contract with the developers is that it will be flat with no nested objects and/or array of objects in it, but can have any number of different key:value pairs. For instance, an impression event might have the details only containing the ext and gallery_type keys:



{"name":"filteredGallery:gallery:gallery:impression","visitor_id":"487464540","created":1447242806,"type":"U","version":1,"details":{"ext":1206491,"gallery_type":"course"}}


The goal is to have all theses events in the same table, and I prefer to have them in the Parquet format. The approach outline here is one where all the common fields/keys like name, visitor_id, created, type, and version will be promoted to individual columns, and the details column will contain a JSON string of the original content. This way we're able to maintain a unified table while at the same time allowing for arbitrary content to be stored in the details payload.  Using Apache Spark and Clojure for the ETL processing worked out nicely. Spark has built in support for reading JSON records via SQLContext.read.json() into a Dataframe, and once in a Dataframe the content can be saved to Parquet rather easily. In order to achieve the above format, I couldn't rely on automatically inferring of the JSON schema that Spark provided since I needed to do a little data reshaping. Instead the schema would need to be defined at runtime and programmatically create the Dataframe.  This involved the following three steps:


  1. Create an RDD of Rows from the original RDD;
  2. Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
  3. Apply the schema to the RDD of Rows via createDataFrame method provided by SQLContext.

The first thing to do was to create the RDD of Rows in the desired shape. After reading the text file of JSON strings into a RDD, a map operation is ran across each line which performs the following series of transformations:

  1. The JSON string is decoded into a Clojure Map
  2. The Clojure Map is ran thru Prismatic Schema 
  3. The fields that are going to be promoted to columns are selected out of the map 
  4. Encoded the details back to a JSON string
  5. Created the necessary Row from the Clojure vector
  6. Finally apply the schema to the RDD of Rows and create the Dataframe
I've tried to place the above step numbers next the appropriate line of Clojure code below.

(let [java-rdd (.textFile sc input-path)
        row-rdd (-> java-rdd
                  (f/map (f/fn [x]
                       (let [json (json/decode x true) 1.
                               v (-> (fuel-event-coercer json) 2.
                                        (select-vals [:name :visitor_id :created :version :type]) 3.
                                        (conj (json/encode (:details json))))] 4.
                      (RowFactory/create (into-array Object v))))) 5.
                      (.repartition 5))
        df (.applySchema c row-rdd fuel-schema)) 6.

In step 2, Prismatic Schema is used to not only validate the incoming data, but also to coerce a couple of fields to the required data type needed by the Scala StructType 



(def FuelEvent
  "A Fuel event"
  {:name String
   :visitor_id String
   :created long
   :version s/Int
   :type String
   :details s/Any })

(def fuel-event-coercer
  (coerce/coercer FuelEvent {FuelEvent identity s/Int #(int %) long #(long %)}))

In step 3 the new schema is applied to the RDDs of Row objects.  A StructType with StructField is used to defined the necessary schema.



(defonce metadata
         (.build (MetadataBuilder.)))

(defonce fuel-schema
  (-> (StructType.)
      (.add (StructField. "name" DataTypes/StringType false metadata))
      (.add (StructField. "visitor_id" DataTypes/StringType false metadata))
      (.add (StructField. "message_date" DataTypes/LongType false metadata))
      (.add (StructField. "version" DataTypes/IntegerType false metadata))
      (.add (StructField. "type" DataTypes/StringType false metadata))
      (.add (StructField. "details" DataTypes/StringType false metadata))))

Once the schema was applied to the RDD, we now had the required Dataframe. Dataframes can easily be saved as Parquet via the writer interface.



(-> (.write df)
      (.parquet output-path))

After compiling the program to an uberjar, and running it on our Spark cluster, we can pull down one of the output Parquet files and quickly check the schema and sample some of the content.



MBP-200729:~ dan.young$ java -jar parquet-tools-1.6.0.jar head part-r-00000-47de5165-eec1-4101-b43d-ecf1128f7f8c.gz.parquet 
name = checkout:payment:item:purchase
visitor_id = 9416357
message_date = 1447239382
version = 1
type = U
details = {"ext":1305221,"listing_id":1004749,"quantity":1}

name = checkout:payment:item:purchase
visitor_id = 1377646
message_date = 1447240124
version = 1
type = U
details = {"ext":1350243,"listing_id":1005346,"quantity":1}

name = checkout:payment:item:purchase
visitor_id = 8689480
message_date = 1447203872
version = 1
type = U
details = {"ext":1346542,"listing_id":1000099,"quantity":1}

name = checkout:payment:item:purchase
visitor_id = 1738281
message_date = 1447264721
version = 1
type = U
details = {"ext":349788,"listing_id":1004696,"quantity":1}

name = checkout:payment:item:purchase
visitor_id = 7731053
message_date = 1447257543
version = 1
type = U
details = {"ext":1346186,"listing_id":1004812,"quantity":1}

MBP-200729:~ dan.young$ java -jar parquet-tools-1.6.0.jar schema part-r-00000-47de5165-eec1-4101-b43d-ecf1128f7f8c.gz.parquet 
message root {
  optional binary name (UTF8);
  optional binary visitor_id (UTF8);
  optional int64 message_date;
  optional int32 version;
  optional binary type (UTF8);
  optional binary details (UTF8);
}

So far things look good!  After quickly jumping into Hive and creating the necessary external table and register all the partitions, we can start querying the data.


[hadoop@ip-10-88-2-100 ~]$ ./presto-cli --catalog hive --schema default

presto:default> select count(*) from fuel_events;
   _col0   
-----------
 550206316 
(1 row)

Query 20151115_150136_00014_2sdtj, FINISHED, 4 nodes
Splits: 821 total, 821 done (100.00%)
0:02 [550M rows, 5.9GB] [278M rows/s, 2.98GB/s]


presto:default> select name,json_extract_scalar(details,'$.ext'),count(*) as cnt
-> from fuel_events 
-> where dt>='2015-11-01' 
-> and name='filteredGallery:gallery:item:impression' 
-> group by name,json_extract_scalar(details,'$.ext')
-> order by cnt desc 
-> limit 10;
                  name                   |  _col1  |   cnt   
-----------------------------------------+---------+---------
 filteredGallery:gallery:item:impression | 460615  | 5517904 
 filteredGallery:gallery:item:impression | 772464  |  823135 
 filteredGallery:gallery:item:impression | 460597  |  656068 
 filteredGallery:gallery:item:impression | 745003  |  577317 
 filteredGallery:gallery:item:impression | 460614  |  566551 
 filteredGallery:gallery:item:impression | 1348671 |  466633 
 filteredGallery:gallery:item:impression | 1340510 |  385342 
 filteredGallery:gallery:item:impression | 1340493 |  372207 
 filteredGallery:gallery:item:impression | 1343133 |  364170 
 filteredGallery:gallery:item:impression | 1304395 |  253733 
(10 rows)

Query 20151115_150052_00013_2sdtj, FINISHED, 4 nodes
Splits: 60 total, 60 done (100.00%)
0:18 [40.8M rows, 458MB] [2.22M rows/s, 24.9MB/s]


Joining data in Presto against metadata stored in a different datastore is quite easy, in our case Amazon Redshift

presto:default> select d.summary, a.ext,count(*) as cnt from 
-> ( select cast(json_extract_scalar(details,'$.ext') as bigint) as ext
->   from  fuel_events 
->   where dt='2015-11-11'
->   and name='filteredGallery:gallery:item:impression') a
-> LEFT JOIN postgresql.public.external_link as b ON b.external_link_id = a.ext
-> LEFT JOIN postgresql.public.marketing_action as c ON c.marketing_action_id = b.marketing_action_id
-> LEFT JOIN postgresql.public.marketing_channel as d ON d.marketing_channel_id = c.marketing_channel_id
-> group by d.summary, a.ext
-> order by cnt desc
-> limit 10;
    summary     |   ext   |  cnt   
----------------+---------+--------
 Organic        |  460615 | 401889 
 Internal Email |  745003 | 105655 
 Paid Search    |  772464 |  57294 
 Organic        |  460597 |  55419 
 Organic        |  460614 |  42677 
 Retargeting    | 1042328 |  18420 
 External Email |  966708 |  17270 
 Organic        |  413614 |  17127 
 Paid Search    | 1237421 |  16899 
 Display        |  881853 |  14353 
(10 rows)

Query 20151115_154533_00024_2sdtj, FINISHED, 4 nodes
Splits: 37 total, 37 done (100.00%)
0:04 [3.98M rows, 28.7MB] [963K rows/s, 6.94MB/s]

We can also query the data via Hive.

hive> select a.name,b.f1,a.dt,count(*) as cnt from  
> fuel_events a lateral view json_tuple(a.details,'ext','gallery_item_type') b as f1, f2 
> where a.dt='2015-11-10' 
> and a.name='filteredGallery:gallery:item:impression' 
> and b.f2='pfs'
> group by a.name,a.dt,b.f1,b.f2
> order by cnt desc 
> limit 10;
Query ID = hadoop_20151115151717_22a6b8a1-b39e-49ab-9084-22e32b817368
Total jobs = 2
Launching Job 1 out of 2

OK
filteredGallery:gallery:item:impression 460615  2015-11-10      12620
filteredGallery:gallery:item:impression 1348671 2015-11-10      6961
filteredGallery:gallery:item:impression 772464  2015-11-10      4879
filteredGallery:gallery:item:impression 460597  2015-11-10      3022
filteredGallery:gallery:item:impression 460614  2015-11-10      2551
filteredGallery:gallery:item:impression 1160651 2015-11-10      2367
filteredGallery:gallery:item:impression 1348635 2015-11-10      2302
filteredGallery:gallery:item:impression 1348675 2015-11-10      2125
filteredGallery:gallery:item:impression 1348676 2015-11-10      1798
filteredGallery:gallery:item:impression 1348683 2015-11-10      1711
Time taken: 76.551 seconds, Fetched: 10 row(s)
hive> 

I really like the concept and power of separating the storage from the compute.  We can easily provide powerful and fast SQL access to data stored in S3, while at the same time allowing for more interesting analysis that might prove to hard or rather time consuming to do in SQL via something like Spark core and/or Hadoop MapReduce


So there you have it, Bob's your uncle !

Keeping in-sync with Salesforce

Keeping two or more data stores in-sync is always a fun task. With the help of Apache NiFi and some decently tooling, I've been able to...