Thursday, December 17, 2015

Using Clojure and AWS Lambda to process Amazon Kinesis Streams

I'm working on a development project that will result in the moving of our event logging from Apache Kafka to Amazon Kinesis Streams.  As part of this project, I wanted to leverage the stream model processing paradigm that Kinesis exposes to provide real time/near-real time analytics to our Data Science team. As a first step, we want to capture a variety of basic usage metrics like user actions, impressions, and clicks. By using a combination of Kinesis for the event logging, AWS Lambda as the execution engine, and Redis as the data store, we'll be able to build some interesting real time dashboards to visually display this information.

Once the basic Lego blocks were in place; the Kinesis Stream, IAM role, wiring up the web application api gateway to write events to the stream, etc... the next step was to write the code to process the incoming events and get these into Redis.  For this I leveraged AWS Lambda, which allows you run code without provisioning or managing servers. You write your code, upload it to Lambda, and off to the races you go! Currently Lambda supports either Python, Java, or Node.js.  I choose to write mine in Clojure since it runs on the JVM, provides great Java interoperability, and it's just an awesome data oriented language. As a starting reference, I used the great AWS blog post by Tim Wagner and Bryan Moffat. This was a huge help and really jump started things, so thank you gentlemen!


The first thing needed was to implement the predefined Java interface RequestStreamHandler which is provided by the AWS Lambda Java core library, aws-lambda-java-core.  The handleRequest method is called whenever the Lambda function is invoked.  Since in theory this will be a "continuous" stream of data, once the initial Lambda function is called and the container is run, Lambda should reuse the container as long as the code hasn't changed, and not to much time has gone by since the last invocation. Here's an interesting AWS blog post on that topic.  Although this reference Node.js, I imagine it's similar for Java/JVM based containers. Below is the handleRequest function to handle the incoming stream of data. As you can see, we parse the incoming JSON stream into a more friendly Clojure data structure, and then thread that into the handle-event function.


(defn -handleRequest [_ input-stream _ _]
    (-> (json/parse-stream (io/reader input-stream) true)
        (handle-event)))

The code block above gets the data into a more Clojure friendly format, and the below is a sample of such a Kinesis event. The main information we're interested in is the :data key.  I've only included one "event" for demonstration purposes, but the :Records will be a vector of multiple :kinesis events up to the number that you've configured within the event source for the Lambda function.  I'm starting off with 1000.  Kinesis takes care of the deserializing the payload for us into base64.  We'll need to decode it to get the actual JSON data out...

[:Records
 [{:kinesis {:kinesisSchemaVersion "1.0",
             :partitionKey "foo",
             :sequenceNumber "<big-ass-number>",
             :data "bXIuIG1hZ29vIHJvY2tzIQ=="},
   :eventSource "aws:kinesis",
   :eventVersion "1.0",
   :eventID "shardId-000000000000:<big-ass-number>",
   :eventName "aws:kinesis:record",
   :invokeIdentityArn "arn:aws:iam::<snowden-knows>:role/lambda_exec_role_kinesis",
   :awsRegion "us-east-1",
   :eventSourceARN "arn:aws:kinesis:us-east-1:<snowden-knows>:stream/fuel-test"}
  ]]

So for every group of "events", the handle-event function is called.  This function basically calls a transducer, then the write-records function submits those to Redis via the Carmine library. Finally we take two events off the records collection and log them along with the collections count.

(defn handle-event [event]
 (let [records (k/get-keys event)]
  (do
   (r/write-records records)
   (log/info (clojure.string/join ";" (take 2 records)) " Event count => " (count records)))))

The log/info is logged to CloudWatch Logs.



The get-keys function applies a transducer to our collection of events. Transducers are really cool in that they allow for composable transformations of your data. Since they are decoupled from input or output sources, they can used in many different processes. The other awesome thing about them is that they compose directly without the creation of intermediate collections.  It's bat shit crazy and really powerful. The get-keys and the transducing function looks a bit like this:

(def data-xf
  (comp (get-data-records) (decode-messages) (json-string->map) (build-keys)))
(defn get-keys [events]
  (sequence data-xf  (get-in events [:Records])))

The data-xf is the transducer that is composed of a series of transformations. This transducer is then applied to the incoming collection. Walking through the transducer, the first thing is to extract all the :data events, which are base64 encoded. Next we need to do a base64 decode, which yields a collection of JSON strings. From here we decode the JSON strings into Clojure maps, and finally we build the keys that will be sent off to Redis. Note that the transducer does a single pass over the original collection, and doesn't create intermediate collections between the various function calls.

("2015-12-17:cart::checkoutnow:impression_2276" "2015-12-17:cart:ordersummary::impression_2276")

The return from the get-keys is a sequence of compound keys that we send over to Redis where the INCR  operation is called against each of the keys.

redis-cli
127.0.0.1:6379> auth <snowden-knows>
OK
127.0.0.1:6379> get 2015-12-17:cart::checkoutnow:impression_2276
"455"
127.0.0.1:6379> 


So there you have it, Bob's your uncle !

Keeping in-sync with Salesforce

Keeping two or more data stores in-sync is always a fun task. With the help of Apache NiFi and some decently tooling, I've been able to...