Saturday, December 12, 2015

More fun with Facebook's Prestodb....

Presto is pretty cool, you can store a lot of data in HDFS/S3 and get at it quickly with ease.  As I outlined in a previous post,  one way we're laying out our new event data stream is to store an arbitrary payload in a field called "details".  As I watch the development go on, I see interesting stuff come thru the event data via AWS Kinesis Firehose. This week, I started seeing records with array elements in them. I'll spare you for all the other information, but the details payload tend to look like this:

{"listingIds":[2156,2156,2156,2152],"quantities":[2,1,1,1]} 

...I said cool, let's see how we can count these things in Presto...because that's what we like to do, count things....so the first thing was to convert the raw JSON into the binary Parquet format.  The details are in a previous post . Since I'm fartin' around, I just took a single JSON file and converted it to Parquet then registered the resulting data as an external table in Hive

CREATE EXTERNAL TABLE fuel_events_2 (
name string,
visitor_id string,
message_date bigint,
version int,
type string,
details string)
STORED AS parquet
LOCATION 's3n://<top-secret-bucket>/temp/foo-baz/p/dt=2015-12-11/'

Now that we have the external table registered with the Hive metastore, we can start querying the data.

presto:default> select message_date,details 
from fuel_events_2 where name='cart::impression';
 message_date |                           details                           
------------------------------+--------------+-------------------------------
1449855906 | {"listingIds":[2150],"quantities":[1]}                      
1449854809 | {"listingIds":[2156,2156,2152],"quantities":[1,1,1]}        
1449854930 | {"listingIds":[2156,2156,2156,2152],"quantities":[2,1,1,1]} 
1449855279 | {"listingIds":[2152],"quantities":[1]} 
1449855345 | {"listingIds":[2150,2152],"quantities":[1,1]}               
1449855364 | {"listingIds":[2150,2150,2152],"quantities":[1,1,1]}        
1449855963 | {"listingIds":[2150],"quantities":[1]}        
1449855391 | {"listingIds":[2150,2150,2152],"quantities":[1,1,1]}       
1449855818 | {"listingIds":[2150,2152],"quantities":[1,1]}        
1449855898 | {"listingIds":[2157],"quantities":[3]}                      
1449855958 | {"listingIds":[2157],"quantities":[3]}                      
1449856259 | {"listingIds":[2150],"quantities":[1]}                      
(12 rows)

Query 20151212_134710_00034_fkuv8, FINISHED, 4 nodes
Splits: 6 total, 6 done (100.00%)
0:01 [171 rows, 14.1KB] [238 rows/s, 19.6KB/s]

For a simple grouping query, we need to first parse the JSON string into a data structure that will allow us to do the grouping and filtering. This is easily accomplished with a combination of the json_extract and cast functions.  Below we parse and cast the listing_ids into an array.


presto:default> select message_date,cast(json_extract(details, '$.listingIds') as array<bigint>) as listing_ids from fuel_events_2 where name='cart::impression';
 message_date |       listing_ids        
------------------------------+--------------+-------------------
1449854809 | [2156, 2156, 2152]       
1449855906 | [2150]                   
1449855279 | [2152]                   
1449855345 | [2150, 2152]             
1449855364 | [2150, 2150, 2152]       
1449855963 | [2150]                   
1449855391 | [2150, 2150, 2152]       
1449855818 | [2150, 2152]             
1449855898 | [2157]                   
1449855958 | [2157]                   
1449856259 | [2150]                   
1449854930 | [2156, 2156, 2156, 2152] 
(12 rows)

To get each listing_id represented as a row, we can use the Presto UNNEST function.

presto:default> with t AS (
             ->   select from_unixtime(message_date) as message_date,cast(json_extract(details, '$.listingIds') as array<bigint>) as listing_ids from fuel_events_2 where name='cart::impression')
             -> 
             -> select message_date,listing_id
             -> from t AS x(message_date,listing_ids)
             -> cross join unnest(listing_ids) as t2(listing_id)
             -> ;
 message_date       | listing_id 
------------------------------+-------------------------+------------
2015-12-11 17:45:06.000 |       2150 
2015-12-11 17:36:31.000 |       2150 
2015-12-11 17:36:31.000 |       2150 
2015-12-11 17:36:31.000 |       2152 
2015-12-11 17:43:38.000 |       2150 
2015-12-11 17:43:38.000 |       2152 
2015-12-11 17:44:58.000 |       2157 
2015-12-11 17:45:58.000 |       2157 
2015-12-11 17:50:59.000 |       2150 
2015-12-11 17:34:39.000 |       2152 
2015-12-11 17:35:45.000 |       2150 
2015-12-11 17:35:45.000 |       2152 
2015-12-11 17:36:04.000 |       2150 
2015-12-11 17:36:04.000 |       2150 
2015-12-11 17:36:04.000 |       2152 
2015-12-11 17:46:03.000 |       2150 
2015-12-11 17:28:50.000 |       2156 
2015-12-11 17:28:50.000 |       2156 
2015-12-11 17:28:50.000 |       2156 
2015-12-11 17:28:50.000 |       2152 
2015-12-11 17:26:49.000 |       2156 
2015-12-11 17:26:49.000 |       2156 
2015-12-11 17:26:49.000 |       2152 
(23 rows)

Query 20151212_140413_00056_fkuv8, FINISHED, 4 nodes
Splits: 6 total, 6 done (100.00%)
0:01 [171 rows, 14.1KB] [238 rows/s, 19.6KB/s]

From here, we just add a group by and we're laughin'

presto:default> with t AS (
             ->   select date_format(from_unixtime(message_date),'%Y-%m-%d') as message_date,cast(json_extract(details, '$.listingIds') as array<bigint>) as listing_ids from fuel_events_2 where name='cart::impression')
             -> 
             -> select message_date,listing_id,count(*)
             -> from t AS x(message_date,listing_ids)
             -> cross join unnest(listing_ids) as t2(listing_id)
             -> group by message_date,listing_id
             -> ;
message_date | listing_id | _col3 
------------------------------+--------------+------------+-------
2015-12-11   |       2156 |     5 
2015-12-11   |       2152 |     7 
2015-12-11   |       2150 |     9 
2015-12-11   |       2157 |     2 
(4 rows)

Query 20151212_140440_00057_fkuv8, FINISHED, 4 nodes
Splits: 10 total, 10 done (100.00%)
0:01 [171 rows, 14.1KB] [239 rows/s, 19.7KB/s]


So there you have it, Bob's your uncle !

Keeping in-sync with Salesforce

Keeping two or more data stores in-sync is always a fun task. With the help of Apache NiFi and some decently tooling, I've been able to...