Thursday, December 31, 2015

Lua scripting with Redis

The last post covered how we're processing real-time events with ClojureAWS Lambda, and Redis.  Since that post, I decided to structure the keys differently, making it easier to query and organize the data.  Originally the keys were in the format of YYYY-MM-DD:<event>_<listing_id>, which represented a given event for a listing id on a specific date. After some additional thought and consideration, it was determined that moving the events into a hash for a given listing would be more flexible. So we went from keys with this structure:

"2015-12-12:cart::checkoutnow:impression_2155"

to hash keys with this structure: "2015-12-22_2155". Each key has a variety of key/value pairs that represent a given event type for the listing id and date. For example:



127.0.0.1:6379> hgetall "2015-12-22_2155"
 1) "ldp:sellPanel:addToCart:impression"
 2) "6"
 3) "ldp:sellPanel::impression"
 4) "1"
 5) "ldp:::impression"
 6) "1"
 7) "ldp:sellPanel:addToCart:click"
 8) "3"
 9) "cart::checkoutNow:impression"
10) "3"
11) "cart::checkoutNow:click"
12) "1"


Redis hashes are maps between string fields and string values, so they are the perfect data type to represent objects.  A hash with a few fields, where a few is defined as a hundred or so, is stored in a way that takes very little space, making it possible to store millions of object in a relatively small Redis instance.  A single hash in Redis can store 232 - 1 (more than 4 billion) field/value pairs. More information on these memory optimizations can be found here on the Redis website.

Since our requirements are fuzzy at best, I wanted to have a way to iterate quickly and prototype different functionality along with ways to query the data. For this I chose to leverage the powerful scripting language Lua.  Starting in version 2.6, the Lua interpreter was built into Redis. Lua is very powerful and extremely fast and is a great way to go if you have to do "advanced" data processing with Redis.  A full description of the language can be found on the Lua.org website, I would encourage you to take a quick read through the documentation. One key point to highlight is that the Lua table is the data structure in Lua. All structures that other languages offer---arrays, records, lists, queues, sets---are represented with tables in Lua. At first this may seem like a constraint, but in reality it doesn't turn out to be that way.  You can store pretty much anything in tables.

The initial requirements were as follows:
A. Run a predefined calculation against a listing id for a given day
B. Be able to run multiple predefined calculations against a given listing id for a given day
C. Run A and B over a list of listing ids
D. Run C for a given date range

The first part of data wrangling was to get the list of keys we're interested in.  For this I used the SCAN operator.  SCAN provides an iterator allowing for incremental iteration, returning only a small number of elements per call. This is the recommended way when working with keys since calling KEYS may block the server for long periods of time.  The SCAN looks something likes this and returns a cursor based iterator:

repeat
  ok=redis.call("scan",cursor,
                "match",start_year .. "-" .. start_month .. "-*_*",
                "count",100)
  cursor = ok[1]
  scan_keys = ok[2]
....
....

The ok variable is a Lua table, where the first row is a cursor id returned by the server, and the second row is the keys. Each time we call  redis.call(), we provide the next cursor id that the server provided from previous call. This continues until the server returns "0", at that point we've iterated thru all the keys for the call. Since the above call will give us back all the keys that match a given range between our starting year and month, we'll need to further whittle down the data to the set of data we're interested in. To filter the keys down, a simple higher-order function called "filter" would do the trick.

...
    filter = function (func, tbl)
     local t = {}
     for i,v in pairs(tbl) do
         if func(v) then
         table.insert(t,v)
         end
     end
     return t
    end,
    within_date_range = function (k)
      local _,_,y,m,d,l = string.find(k,"(%d%d%d%d)-(%d%d)-(%d%d)_(%d+)")
      local key_ts = y + m + d
      local start_ts = start_year + start_month + start_day
      local end_ts = end_year + end_month + end_day
      if (key_ts >= start_ts and key_ts <= end_ts and listing_ids[l]) then
        return true
      else 
        return false
      end
    end,
...

Calling the filter function will prune down the keys to just the date range we're interested in. Note, currently there's no support to span between months, this is something we can work on later. Once we have the key set of interest, we can iterate over the metrics we're interested in and calculate them.

The initial calculations are all stored as Lua functions in a table.  The first two such metrics we want to report on are related to click through rates. For example, knowing how many people add an item to their cart and either proceed to checking out or abandoned their cart.  When the need to add additional metric calculations arises, we can easily add functions to the utils table.

metrics = {
 ['ldp_add_to_cart_ctr'] = function (k)
    local r = nil
    local a = redis.call("hget",k, "ldp:sellPanel:addToCart:click") 
    local b = redis.call("hget",k, "ldp:sellPanel:addToCart:impression")
    if n and d then
     r = (a/b)*100 
    end
     return k,r
   end,
 ['cart_to_checkout_ctr'] = function (k)
    local r = nil
    local a = redis.call("hget",k, "cart::checkoutNow:click") 
    local b = redis.call("hget",k, "cart::checkoutNow:impression")
    if n and d then
     r = (a/b)*100 
    end
     return k,r
   end

...
...
local x = utils.filter(utils.within_date_range,scan_keys)
for _, key in ipairs(x) do
 for metric,_ in pairs(metrics_to_gather) do
  local k,ctr = utils.metrics[metric](key)
   if ctr then
    table.insert(results,metric ..'=>'..k..' : ' .. utils.format_percentage(ctr))
    end
  end
end

There's a few ways to test this out, of course you can easily run the code against your production Redis instance, but I scripted an Ansible script that logins to the instance, runs a bgsave,  scp's the dump.rdb file over to my workstation, and then I run redis-server locally.

MBP-200729:~ dan.young$ redis-cli -a '<snowden-knows>' --eval ~/projects/redis/lua/listing-stats.lua 0 , cart_to_checkout_ctr,ldp_add_to_cart_ctr 2015-12-30 2015-12-31 2149
1) "cart_to_checkout_ctr=>2015-12-31_2149 : 100.00%"
2) "ldp_add_to_cart_ctr=>2015-12-31_2149 : 33.33%"
3) "cart_to_checkout_ctr=>2015-12-30_2149 : 25.00%"
4) "ldp_add_to_cart_ctr=>2015-12-30_2149 : 13.64%"

The first argument to --eval  is the script name, and the second represents the key names, which can be accessed via the global KEYS variable. Since we're generating the list of the KEYS within the script itself, and not providing any via the command line, we provide 0 for the KEYS argument.  All the additional arguments should not represent key names and can be accessed via the global ARGV variable. In the above, the ARGV[1] would be the cart_to_checkout_ctr,ldp_add_to_cart_ctr and ARGV[2] and ARGV[3] would be 2015-12-30 and 2015-12-31 respectively,  and ARGV[4] would be 2149.  For testing purposes, using eval is fine, but once you get to a stable point where you want to run this on your production instance, I would recommend to cache the code on the Redis server using the SCRIPT LOAD command.

 redis-cli -h 127.0.0.1 -p 6379 -a '<snowden-knows>' \
> SCRIPT LOAD "$(cat ~/projects/redis/lua/listing-stats.lua)"
"64b561bb63a178360e02f8d63a3c1cbf6925e387"

 redis-cli -h 127.0.0.1 -p 6379 -a '<snowden-knows>' \
> EVALSHA "64b561bb63a178360e02f8d63a3c1cbf6925e387" 0 cart_to_checkout_ctr,ldp_add_to_cart_ctr 2015-12-30 2015-12-31 2149
1) "cart_to_checkout_ctr=>2015-12-31_2149 : 100.00%"
2) "ldp_add_to_cart_ctr=>2015-12-31_2149 : 33.33%"
3) "cart_to_checkout_ctr=>2015-12-30_2149 : 25.00%"
4) "ldp_add_to_cart_ctr=>2015-12-30_2149 : 13.64%"

Redis is cool...what else can be said....

So there you have it, Bob's your uncle !

Thursday, December 17, 2015

Using Clojure and AWS Lambda to process Amazon Kinesis Streams

I'm working on a development project that will result in the moving of our event logging from Apache Kafka to Amazon Kinesis Streams.  As part of this project, I wanted to leverage the stream model processing paradigm that Kinesis exposes to provide real time/near-real time analytics to our Data Science team. As a first step, we want to capture a variety of basic usage metrics like user actions, impressions, and clicks. By using a combination of Kinesis for the event logging, AWS Lambda as the execution engine, and Redis as the data store, we'll be able to build some interesting real time dashboards to visually display this information.

Once the basic Lego blocks were in place; the Kinesis Stream, IAM role, wiring up the web application api gateway to write events to the stream, etc... the next step was to write the code to process the incoming events and get these into Redis.  For this I leveraged AWS Lambda, which allows you run code without provisioning or managing servers. You write your code, upload it to Lambda, and off to the races you go! Currently Lambda supports either Python, Java, or Node.js.  I choose to write mine in Clojure since it runs on the JVM, provides great Java interoperability, and it's just an awesome data oriented language. As a starting reference, I used the great AWS blog post by Tim Wagner and Bryan Moffat. This was a huge help and really jump started things, so thank you gentlemen!


The first thing needed was to implement the predefined Java interface RequestStreamHandler which is provided by the AWS Lambda Java core library, aws-lambda-java-core.  The handleRequest method is called whenever the Lambda function is invoked.  Since in theory this will be a "continuous" stream of data, once the initial Lambda function is called and the container is run, Lambda should reuse the container as long as the code hasn't changed, and not to much time has gone by since the last invocation. Here's an interesting AWS blog post on that topic.  Although this reference Node.js, I imagine it's similar for Java/JVM based containers. Below is the handleRequest function to handle the incoming stream of data. As you can see, we parse the incoming JSON stream into a more friendly Clojure data structure, and then thread that into the handle-event function.


(defn -handleRequest [_ input-stream _ _]
    (-> (json/parse-stream (io/reader input-stream) true)
        (handle-event)))

The code block above gets the data into a more Clojure friendly format, and the below is a sample of such a Kinesis event. The main information we're interested in is the :data key.  I've only included one "event" for demonstration purposes, but the :Records will be a vector of multiple :kinesis events up to the number that you've configured within the event source for the Lambda function.  I'm starting off with 1000.  Kinesis takes care of the deserializing the payload for us into base64.  We'll need to decode it to get the actual JSON data out...

[:Records
 [{:kinesis {:kinesisSchemaVersion "1.0",
             :partitionKey "foo",
             :sequenceNumber "<big-ass-number>",
             :data "bXIuIG1hZ29vIHJvY2tzIQ=="},
   :eventSource "aws:kinesis",
   :eventVersion "1.0",
   :eventID "shardId-000000000000:<big-ass-number>",
   :eventName "aws:kinesis:record",
   :invokeIdentityArn "arn:aws:iam::<snowden-knows>:role/lambda_exec_role_kinesis",
   :awsRegion "us-east-1",
   :eventSourceARN "arn:aws:kinesis:us-east-1:<snowden-knows>:stream/fuel-test"}
  ]]

So for every group of "events", the handle-event function is called.  This function basically calls a transducer, then the write-records function submits those to Redis via the Carmine library. Finally we take two events off the records collection and log them along with the collections count.

(defn handle-event [event]
 (let [records (k/get-keys event)]
  (do
   (r/write-records records)
   (log/info (clojure.string/join ";" (take 2 records)) " Event count => " (count records)))))

The log/info is logged to CloudWatch Logs.



The get-keys function applies a transducer to our collection of events. Transducers are really cool in that they allow for composable transformations of your data. Since they are decoupled from input or output sources, they can used in many different processes. The other awesome thing about them is that they compose directly without the creation of intermediate collections.  It's bat shit crazy and really powerful. The get-keys and the transducing function looks a bit like this:

(def data-xf
  (comp (get-data-records) (decode-messages) (json-string->map) (build-keys)))
(defn get-keys [events]
  (sequence data-xf  (get-in events [:Records])))

The data-xf is the transducer that is composed of a series of transformations. This transducer is then applied to the incoming collection. Walking through the transducer, the first thing is to extract all the :data events, which are base64 encoded. Next we need to do a base64 decode, which yields a collection of JSON strings. From here we decode the JSON strings into Clojure maps, and finally we build the keys that will be sent off to Redis. Note that the transducer does a single pass over the original collection, and doesn't create intermediate collections between the various function calls.

("2015-12-17:cart::checkoutnow:impression_2276" "2015-12-17:cart:ordersummary::impression_2276")

The return from the get-keys is a sequence of compound keys that we send over to Redis where the INCR  operation is called against each of the keys.

redis-cli
127.0.0.1:6379> auth <snowden-knows>
OK
127.0.0.1:6379> get 2015-12-17:cart::checkoutnow:impression_2276
"455"
127.0.0.1:6379> 


So there you have it, Bob's your uncle !

Saturday, December 12, 2015

More fun with Facebook's Prestodb....

Presto is pretty cool, you can store a lot of data in HDFS/S3 and get at it quickly with ease.  As I outlined in a previous post,  one way we're laying out our new event data stream is to store an arbitrary payload in a field called "details".  As I watch the development go on, I see interesting stuff come thru the event data via AWS Kinesis Firehose. This week, I started seeing records with array elements in them. I'll spare you for all the other information, but the details payload tend to look like this:

{"listingIds":[2156,2156,2156,2152],"quantities":[2,1,1,1]} 

...I said cool, let's see how we can count these things in Presto...because that's what we like to do, count things....so the first thing was to convert the raw JSON into the binary Parquet format.  The details are in a previous post . Since I'm fartin' around, I just took a single JSON file and converted it to Parquet then registered the resulting data as an external table in Hive

CREATE EXTERNAL TABLE fuel_events_2 (
name string,
visitor_id string,
message_date bigint,
version int,
type string,
details string)
STORED AS parquet
LOCATION 's3n://<top-secret-bucket>/temp/foo-baz/p/dt=2015-12-11/'

Now that we have the external table registered with the Hive metastore, we can start querying the data.

presto:default> select message_date,details 
from fuel_events_2 where name='cart::impression';
 message_date |                           details                           
------------------------------+--------------+-------------------------------
1449855906 | {"listingIds":[2150],"quantities":[1]}                      
1449854809 | {"listingIds":[2156,2156,2152],"quantities":[1,1,1]}        
1449854930 | {"listingIds":[2156,2156,2156,2152],"quantities":[2,1,1,1]} 
1449855279 | {"listingIds":[2152],"quantities":[1]} 
1449855345 | {"listingIds":[2150,2152],"quantities":[1,1]}               
1449855364 | {"listingIds":[2150,2150,2152],"quantities":[1,1,1]}        
1449855963 | {"listingIds":[2150],"quantities":[1]}        
1449855391 | {"listingIds":[2150,2150,2152],"quantities":[1,1,1]}       
1449855818 | {"listingIds":[2150,2152],"quantities":[1,1]}        
1449855898 | {"listingIds":[2157],"quantities":[3]}                      
1449855958 | {"listingIds":[2157],"quantities":[3]}                      
1449856259 | {"listingIds":[2150],"quantities":[1]}                      
(12 rows)

Query 20151212_134710_00034_fkuv8, FINISHED, 4 nodes
Splits: 6 total, 6 done (100.00%)
0:01 [171 rows, 14.1KB] [238 rows/s, 19.6KB/s]

For a simple grouping query, we need to first parse the JSON string into a data structure that will allow us to do the grouping and filtering. This is easily accomplished with a combination of the json_extract and cast functions.  Below we parse and cast the listing_ids into an array.


presto:default> select message_date,cast(json_extract(details, '$.listingIds') as array<bigint>) as listing_ids from fuel_events_2 where name='cart::impression';
 message_date |       listing_ids        
------------------------------+--------------+-------------------
1449854809 | [2156, 2156, 2152]       
1449855906 | [2150]                   
1449855279 | [2152]                   
1449855345 | [2150, 2152]             
1449855364 | [2150, 2150, 2152]       
1449855963 | [2150]                   
1449855391 | [2150, 2150, 2152]       
1449855818 | [2150, 2152]             
1449855898 | [2157]                   
1449855958 | [2157]                   
1449856259 | [2150]                   
1449854930 | [2156, 2156, 2156, 2152] 
(12 rows)

To get each listing_id represented as a row, we can use the Presto UNNEST function.

presto:default> with t AS (
             ->   select from_unixtime(message_date) as message_date,cast(json_extract(details, '$.listingIds') as array<bigint>) as listing_ids from fuel_events_2 where name='cart::impression')
             -> 
             -> select message_date,listing_id
             -> from t AS x(message_date,listing_ids)
             -> cross join unnest(listing_ids) as t2(listing_id)
             -> ;
 message_date       | listing_id 
------------------------------+-------------------------+------------
2015-12-11 17:45:06.000 |       2150 
2015-12-11 17:36:31.000 |       2150 
2015-12-11 17:36:31.000 |       2150 
2015-12-11 17:36:31.000 |       2152 
2015-12-11 17:43:38.000 |       2150 
2015-12-11 17:43:38.000 |       2152 
2015-12-11 17:44:58.000 |       2157 
2015-12-11 17:45:58.000 |       2157 
2015-12-11 17:50:59.000 |       2150 
2015-12-11 17:34:39.000 |       2152 
2015-12-11 17:35:45.000 |       2150 
2015-12-11 17:35:45.000 |       2152 
2015-12-11 17:36:04.000 |       2150 
2015-12-11 17:36:04.000 |       2150 
2015-12-11 17:36:04.000 |       2152 
2015-12-11 17:46:03.000 |       2150 
2015-12-11 17:28:50.000 |       2156 
2015-12-11 17:28:50.000 |       2156 
2015-12-11 17:28:50.000 |       2156 
2015-12-11 17:28:50.000 |       2152 
2015-12-11 17:26:49.000 |       2156 
2015-12-11 17:26:49.000 |       2156 
2015-12-11 17:26:49.000 |       2152 
(23 rows)

Query 20151212_140413_00056_fkuv8, FINISHED, 4 nodes
Splits: 6 total, 6 done (100.00%)
0:01 [171 rows, 14.1KB] [238 rows/s, 19.6KB/s]

From here, we just add a group by and we're laughin'

presto:default> with t AS (
             ->   select date_format(from_unixtime(message_date),'%Y-%m-%d') as message_date,cast(json_extract(details, '$.listingIds') as array<bigint>) as listing_ids from fuel_events_2 where name='cart::impression')
             -> 
             -> select message_date,listing_id,count(*)
             -> from t AS x(message_date,listing_ids)
             -> cross join unnest(listing_ids) as t2(listing_id)
             -> group by message_date,listing_id
             -> ;
message_date | listing_id | _col3 
------------------------------+--------------+------------+-------
2015-12-11   |       2156 |     5 
2015-12-11   |       2152 |     7 
2015-12-11   |       2150 |     9 
2015-12-11   |       2157 |     2 
(4 rows)

Query 20151212_140440_00057_fkuv8, FINISHED, 4 nodes
Splits: 10 total, 10 done (100.00%)
0:01 [171 rows, 14.1KB] [239 rows/s, 19.7KB/s]


So there you have it, Bob's your uncle !

Keeping in-sync with Salesforce

Keeping two or more data stores in-sync is always a fun task. With the help of Apache NiFi and some decently tooling, I've been able to...