Monday, October 26, 2015

Presto


I've been using Presto more frequently for a number of quick ad hoc reports against various event sources in S3. It's fast, easy to deploy and did I mention very fast? So what is Presto? Presto is "open source distributed SQL query engine for running interactive analytic queries against data sources of all sizes ranging from gigabytes to petabytes." and is open sourced by Facebook. Presto allows you to query the data where it resides via a number of connectors For us, this is primarily data stored in S3 and Redshift. 


Getting Presto running on Elastic MapReduce (EMR) is pretty straight forward. Presto is a very active project and the release cycle can be very quick. Although EMR currently supports Presto 0.119 on EMR 4.1, I like to have the ability to try out the latest release as soon as it comes out.   For this reason, Ansible is perfect fit for installing and configuring Presto along with any other configuration management and orchestration tasks.


Since we run both Spark and Hive, I opt for using Amazon's EMR service with spot pricing. This allows us to quickly spin up, resize and shut down clusters in a very cost effective way. Once we're done with our computing, we shutdown the cluster and only pay for what we use, while at the same are able to persist the processed data in S3. The first thing to do is to get a cluster running.  This can be easily accomplished via the Amazon Command Line Interface  tool. Below is an example of provisioning a EMR cluster with Spark and Hive running


aws emr create-cluster 
--release-label emr-4.1.0 
--applications Name=Spark Name=Hive 
--name "(Spark 1.5.0 + Hive + Presto) $( date '+%Y%m%d%H%M' )" 
--service-role EMR_DefaultRole 
--tags Name=spark-cluster enviro=datascience app="Data Science" 
--ec2-attributes KeyName="<my-key>",
InstanceProfile=EMR_EC2_DefaultRole,
SubnetId="subnet-ABC" 
--instance-groups file://./large-instance-setup.json.SPOT 
--log-uri 's3://<my-bucket>/emr/logs/' 
--no-auto-terminate --visible-to-all-users

This will return a job-id that can be used to further monitor and interact with the cluster and once we're done, shut the cluster down. When the cluster is running and in the "WAITING" state, we can run the Ansible Playbook to do the necessary configuration and setup. I've omitted a number of the PLAY output steps for brevity sake,  but essentially the it downloads, installs and configures Presto along with the various connectors and JVM settings.

ansible-playbook -i e configure-emr-cluster-presto.yml 

PLAY [Configure EMR/Presto cluster...] **************************************** 

TASK: [Gather EC2 facts] ****************************************************** 
ok: [10.88.2.212]
ok: [10.88.2.40]
ok: [10.88.2.89]
ok: [10.88.2.244]
ok: [10.88.2.191]

TASK: [Download Presto Installer script] ************************************** 
changed: [10.88.2.212]
changed: [10.88.2.40]
changed: [10.88.2.191]
changed: [10.88.2.89]
changed: [10.88.2.244]

TASK: [file path=~hadoop/install-presto-emr-4.0 state=file mode=0755] ********* 
changed: [10.88.2.191]
changed: [10.88.2.89]
changed: [10.88.2.40]
changed: [10.88.2.244]
changed: [10.88.2.212]

TASK: [Install Presto Server] ************************************************* 
changed: [10.88.2.89]
changed: [10.88.2.212]
changed: [10.88.2.244]
changed: [10.88.2.191]
changed: [10.88.2.40]
....
....
....
PLAY RECAP ******************************************************************** 
10.88.2.191                : ok=12   changed=11   unreachable=0    failed=0   
10.88.2.212                : ok=15   changed=14   unreachable=0    failed=0   
10.88.2.244                : ok=12   changed=11   unreachable=0    failed=0   
10.88.2.40                 : ok=12   changed=11   unreachable=0    failed=0   
10.88.2.89                 : ok=12   changed=11   unreachable=0    failed=0  


Now we can start running queries! Presto has a command line client that you can use to run queries, or via a JDBC driver, or even via REST calls. The JDBC driver is just a thin wrapper over the REST interface, which allows for one to query Presto from virtually any language that supports HTTP calls For example, using curl, if I wanted to get the top 5 products that were sold on a given day, I can easily run the following curl command to submit the query to Presto for execution.



curl -v -H "Content-Type: text/plain" \
> -H "X-Presto-User: hadoop" \
> -H "X-Presto-Catalog: hive" \
> -H "X-Presto-Schema: default" \
> -X POST \
> -d "select item_id,
>            count(*),
>            dt from conversion_event 
>            where event_type='PAID' 
>            and item_type IN ('PRODUCT_FOR_SALE') 
>            and dt='2015-10-20' 
>            group by item_id,dt
>            order by count(*) desc limit 5" \
> http://10.88.2.115:8080/v1/statement | jq '.nextUri'
*   Trying 10.88.2.115...
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0* Connected to 10.88.2.115 (10.88.2.115) port 8080 (#0)
> POST /v1/statement HTTP/1.1
> Host: 10.88.2.115:8080
> User-Agent: curl/7.43.0
> Accept: */*
> Content-Type: text/plain
> X-Presto-User: hadoop
> X-Presto-Catalog: hive
> X-Presto-Schema: default
> Content-Length: 311
> 
} [311 bytes data]
* upload completely sent off: 311 out of 311 bytes
< HTTP/1.1 200 OK
< Date: Mon, 26 Oct 2015 14:18:44 GMT
< Content-Type: application/json
< X-Content-Type-Options: nosniff
< Content-Length: 408
< 
{ [408 bytes data]
100   719  100   408  100   311   1418   1081 --:--:-- --:--:-- --:--:--  1421
* Connection #0 to host 10.88.2.115 left intact
"http://10.88.2.115:8080/v1/statement/20151026_141844_00013_amvv6/1"

The response contains a query identifier which can be used to gather detailed information about the query. I'm using jq  to extract out the nextUri query identifier . This URI can be called via a GET operation to the /v1/statement/{query-id}/{token} which will provide a status update for the query in progress or it can deliver the final results to the client.



curl -v -H "X-Presto-User: hadoop" -H "X-Presto-Catalog: hive" -X GET http://10.88.2.115:8080/v1/statement/20151026_141844_00013_amvv6/1 | jq -c '.data'
*   Trying 10.88.2.115...
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0* Connected to 10.88.2.115 (10.88.2.115) port 8080 (#0)
> GET /v1/statement/20151026_141844_00013_amvv6/1 HTTP/1.1
> Host: 10.88.2.115:8080
> User-Agent: curl/7.43.0
> Accept: */*
> X-Presto-User: hadoop
> X-Presto-Catalog: hive
> 
< HTTP/1.1 200 OK
< Date: Mon, 26 Oct 2015 16:26:47 GMT
< Content-Type: application/json
< X-Content-Type-Options: nosniff
< Vary: Accept-Encoding, User-Agent
< Content-Length: 1896
< 
{ [1170 bytes data]
100  1896  100  1896    0     0  10018      0 --:--:-- --:--:-- --:--:--  9978
* Connection #0 to host 10.88.2.115 left intact
[[5441,155,"2015-10-20"],[7458,154,"2015-10-20"],[5527,144,"2015-10-20"],[5393,141,"2015-10-20"],[5762,135,"2015-10-20"]]

You can also navigate to the master node web UI to check on the status of submitted and running jobs.

As I mentioned earlier, Presto has a number of connectors that can be used to query the data where it resides.  This eliminates the need to take processed data out of say S3 and load it into Redshift so one can run SQL queries against it.  For example, the query below joins one of our "larger" event data sources stored in S3 to "smaller" metadata tables that are stored in Redshift. This time I'm using the presto-cli, but you can do the same thing via the REST endpoint


presto:default> WITH
c1 as (SELECT session_id,
user_id,pre_user_id,
COALESCE(NULLIF(user_id,0),pre_user_id) as user_id,
external_link_id,
percent_rank() OVER (PARTITION BY session_id ORDER BY message_date,session_step) as r,device_type,dt
FROM clickstream WHERE dt = '2015-10-25')
,c2 AS (SELECT DISTINCT session_id,user_id,external_link_id,dt FROM c1 WHERE r=1.0)
,t1 as (SELECT a.session_id,a.calculated_user_id,a.external_link_id,d.summary as summary,a.dt FROM c2 as a
LEFT JOIN redshift.public.external_link as b ON b.external_link_id = a.external_link_id
LEFT JOIN redshift.public.marketing_action as c ON c.marketing_action_id = b.marketing_action_id
LEFT JOIN redshift.public.marketing_channel as d ON d.marketing_channel_id = c.marketing_channel_id)
SELECT summary,count(*) as cnt,dt FROM t1 GROUP BY summary,dt order by 1,3;

    summary     |   cnt   |     dt     
----------------+---------+------------
 Organic        | 1337036 | 2015-10-25 
 Internal Email |   76592 | 2015-10-25 
 Paid Search    |   49842 | 2015-10-25 
(3 rows)

Query 20151027_051606_00008_m3y74, FINISHED, 4 nodes
Splits: 86 total, 86 done (100.00%)

0:10 [15.7M rows, 652MB] [1.56M rows/s, 64.9MB/s]

Go ahead and kick the tires and have some fun with Presto.  It's incredibly fast

So there you have it, Bob's your uncle !




Keeping in-sync with Salesforce

Keeping two or more data stores in-sync is always a fun task. With the help of Apache NiFi and some decently tooling, I've been able to...