{"name":"checkout:payment:item:purchase","visitor_id":"123456","created":1447239382,"type":"U","version":1,"details":{"ext":1305221,"listing_id":573845,"quantity":1}}
The interesting part is in the details key. This is a JSON object that can have an arbitrary number of key:value pairs. The current contract with the developers is that it will be flat with no nested objects and/or array of objects in it, but can have any number of different key:value pairs. For instance, an impression event might have the details only containing the ext and gallery_type keys:
{"name":"filteredGallery:gallery:gallery:impression","visitor_id":"487464540","created":1447242806,"type":"U","version":1,"details":{"ext":1206491,"gallery_type":"course"}}
The goal is to have all theses events in the same table, and I prefer to have them in the Parquet format. The approach outline here is one where all the common fields/keys like name, visitor_id, created, type, and version will be promoted to individual columns, and the details column will contain a JSON string of the original content. This way we're able to maintain a unified table while at the same time allowing for arbitrary content to be stored in the details payload. Using Apache Spark and Clojure for the ETL processing worked out nicely. Spark has built in support for reading JSON records via SQLContext.read.json() into a Dataframe, and once in a Dataframe the content can be saved to Parquet rather easily. In order to achieve the above format, I couldn't rely on automatically inferring of the JSON schema that Spark provided since I needed to do a little data reshaping. Instead the schema would need to be defined at runtime and programmatically create the Dataframe. This involved the following three steps:
- Create an RDD of
Row
s from the original RDD; - Create the schema represented by a
StructType
matching the structure ofRow
s in the RDD created in Step 1. - Apply the schema to the RDD of
Row
s viacreateDataFrame
method provided bySQLContext
.
The first thing to do was to create the RDD of Rows in the desired shape. After reading the text file of JSON strings into a RDD, a map operation is ran across each line which performs the following series of transformations:
- The JSON string is decoded into a Clojure Map
- The Clojure Map is ran thru Prismatic Schema
- The fields that are going to be promoted to columns are selected out of the map
- Encoded the details back to a JSON string
- Created the necessary Row from the Clojure vector
- Finally apply the schema to the RDD of Rows and create the Dataframe
(let [java-rdd (.textFile sc input-path)
row-rdd (-> java-rdd
(f/map (f/fn [x]
(let [json (json/decode x true) 1.
v (-> (fuel-event-coercer json) 2.
(select-vals [:name :visitor_id :created :version :type]) 3.
(conj (json/encode (:details json))))] 4.
(RowFactory/create (into-array Object v))))) 5.
(.repartition 5))
df (.applySchema c row-rdd fuel-schema)) 6.
In step 2, Prismatic Schema is used to not only validate the incoming data, but also to coerce a couple of fields to the required data type needed by the Scala StructType
(def FuelEvent
"A Fuel event"
{:name String
:visitor_id String
:created long
:version s/Int
:type String
:details s/Any })
(def fuel-event-coercer
(coerce/coercer FuelEvent {FuelEvent identity s/Int #(int %) long #(long %)}))
In step 3 the new schema is applied to the RDDs of Row objects. A StructType with StructField is used to defined the necessary schema.
(defonce metadata
(.build (MetadataBuilder.)))
(defonce fuel-schema
(-> (StructType.)
(.add (StructField. "name" DataTypes/StringType false metadata))
(.add (StructField. "visitor_id" DataTypes/StringType false metadata))
(.add (StructField. "message_date" DataTypes/LongType false metadata))
(.add (StructField. "version" DataTypes/IntegerType false metadata))
(.add (StructField. "type" DataTypes/StringType false metadata))
(.add (StructField. "details" DataTypes/StringType false metadata))))
Once the schema was applied to the RDD, we now had the required Dataframe. Dataframes can easily be saved as Parquet via the writer interface.
(-> (.write df)
(.parquet output-path))
After compiling the program to an uberjar, and running it on our Spark cluster, we can pull down one of the output Parquet files and quickly check the schema and sample some of the content.
MBP-200729:~ dan.young$ java -jar parquet-tools-1.6.0.jar head part-r-00000-47de5165-eec1-4101-b43d-ecf1128f7f8c.gz.parquet
name = checkout:payment:item:purchase
visitor_id = 9416357
message_date = 1447239382
version = 1
type = U
details = {"ext":1305221,"listing_id":1004749,"quantity":1}
name = checkout:payment:item:purchase
visitor_id = 1377646
message_date = 1447240124
version = 1
type = U
details = {"ext":1350243,"listing_id":1005346,"quantity":1}
name = checkout:payment:item:purchase
visitor_id = 8689480
message_date = 1447203872
version = 1
type = U
details = {"ext":1346542,"listing_id":1000099,"quantity":1}
name = checkout:payment:item:purchase
visitor_id = 1738281
message_date = 1447264721
version = 1
type = U
details = {"ext":349788,"listing_id":1004696,"quantity":1}
name = checkout:payment:item:purchase
visitor_id = 7731053
message_date = 1447257543
version = 1
type = U
details = {"ext":1346186,"listing_id":1004812,"quantity":1}
MBP-200729:~ dan.young$ java -jar parquet-tools-1.6.0.jar schema part-r-00000-47de5165-eec1-4101-b43d-ecf1128f7f8c.gz.parquet
message root {
optional binary name (UTF8);
optional binary visitor_id (UTF8);
optional int64 message_date;
optional int32 version;
optional binary type (UTF8);
optional binary details (UTF8);
}
So far things look good! After quickly jumping into Hive and creating the necessary external table and register all the partitions, we can start querying the data.
[hadoop@ip-10-88-2-100 ~]$ ./presto-cli --catalog hive --schema default
presto:default> select count(*) from fuel_events;
_col0
-----------
550206316
(1 row)
Query 20151115_150136_00014_2sdtj, FINISHED, 4 nodes
Splits: 821 total, 821 done (100.00%)
0:02 [550M rows, 5.9GB] [278M rows/s, 2.98GB/s]
presto:default> select name,json_extract_scalar(details,'$.ext'),count(*) as cnt
-> from fuel_events
-> where dt>='2015-11-01'
-> and name='filteredGallery:gallery:item:impression'
-> group by name,json_extract_scalar(details,'$.ext')
-> order by cnt desc
-> limit 10;
name | _col1 | cnt
-----------------------------------------+---------+---------
filteredGallery:gallery:item:impression | 460615 | 5517904
filteredGallery:gallery:item:impression | 772464 | 823135
filteredGallery:gallery:item:impression | 460597 | 656068
filteredGallery:gallery:item:impression | 745003 | 577317
filteredGallery:gallery:item:impression | 460614 | 566551
filteredGallery:gallery:item:impression | 1348671 | 466633
filteredGallery:gallery:item:impression | 1340510 | 385342
filteredGallery:gallery:item:impression | 1340493 | 372207
filteredGallery:gallery:item:impression | 1343133 | 364170
filteredGallery:gallery:item:impression | 1304395 | 253733
(10 rows)
Query 20151115_150052_00013_2sdtj, FINISHED, 4 nodes
Splits: 60 total, 60 done (100.00%)
0:18 [40.8M rows, 458MB] [2.22M rows/s, 24.9MB/s]
Joining data in Presto against metadata stored in a different datastore is quite easy, in our case Amazon Redshift
presto:default> select d.summary, a.ext,count(*) as cnt from
-> ( select cast(json_extract_scalar(details,'$.ext') as bigint) as ext
-> from fuel_events
-> where dt='2015-11-11'
-> and name='filteredGallery:gallery:item:impression') a
-> LEFT JOIN postgresql.public.external_link as b ON b.external_link_id = a.ext
-> LEFT JOIN postgresql.public.marketing_action as c ON c.marketing_action_id = b.marketing_action_id
-> LEFT JOIN postgresql.public.marketing_channel as d ON d.marketing_channel_id = c.marketing_channel_id
-> group by d.summary, a.ext
-> order by cnt desc
-> limit 10;
summary | ext | cnt
----------------+---------+--------
Organic | 460615 | 401889
Internal Email | 745003 | 105655
Paid Search | 772464 | 57294
Organic | 460597 | 55419
Organic | 460614 | 42677
Retargeting | 1042328 | 18420
External Email | 966708 | 17270
Organic | 413614 | 17127
Paid Search | 1237421 | 16899
Display | 881853 | 14353
(10 rows)
Query 20151115_154533_00024_2sdtj, FINISHED, 4 nodes
Splits: 37 total, 37 done (100.00%)
0:04 [3.98M rows, 28.7MB] [963K rows/s, 6.94MB/s]
We can also query the data via Hive.
hive> select a.name,b.f1,a.dt,count(*) as cnt from
> fuel_events a lateral view json_tuple(a.details,'ext','gallery_item_type') b as f1, f2
> where a.dt='2015-11-10'
> and a.name='filteredGallery:gallery:item:impression'
> and b.f2='pfs'
> group by a.name,a.dt,b.f1,b.f2
> order by cnt desc
> limit 10;
Query ID = hadoop_20151115151717_22a6b8a1-b39e-49ab-9084-22e32b817368
Total jobs = 2
Launching Job 1 out of 2
OK
filteredGallery:gallery:item:impression 460615 2015-11-10 12620
filteredGallery:gallery:item:impression 1348671 2015-11-10 6961
filteredGallery:gallery:item:impression 772464 2015-11-10 4879
filteredGallery:gallery:item:impression 460597 2015-11-10 3022
filteredGallery:gallery:item:impression 460614 2015-11-10 2551
filteredGallery:gallery:item:impression 1160651 2015-11-10 2367
filteredGallery:gallery:item:impression 1348635 2015-11-10 2302
filteredGallery:gallery:item:impression 1348675 2015-11-10 2125
filteredGallery:gallery:item:impression 1348676 2015-11-10 1798
filteredGallery:gallery:item:impression 1348683 2015-11-10 1711
Time taken: 76.551 seconds, Fetched: 10 row(s)
hive>
I really like the concept and power of separating the storage from the compute. We can easily provide powerful and fast SQL access to data stored in S3, while at the same time allowing for more interesting analysis that might prove to hard or rather time consuming to do in SQL via something like Spark core and/or Hadoop MapReduce
So there you have it, Bob's your uncle !