Once the basic Lego blocks were in place; the Kinesis Stream, IAM role, wiring up the web application api gateway to write events to the stream, etc... the next step was to write the code to process the incoming events and get these into Redis. For this I leveraged AWS Lambda, which allows you run code without provisioning or managing servers. You write your code, upload it to Lambda, and off to the races you go! Currently Lambda supports either Python, Java, or Node.js. I choose to write mine in Clojure since it runs on the JVM, provides great Java interoperability, and it's just an awesome data oriented language. As a starting reference, I used the great AWS blog post by Tim Wagner and Bryan Moffat. This was a huge help and really jump started things, so thank you gentlemen!
(defn -handleRequest [_ input-stream _ _]
(-> (json/parse-stream (io/reader input-stream) true)
(handle-event)))
The code block above gets the data into a more Clojure friendly format, and the below is a sample of such a Kinesis event. The main information we're interested in is the :data key. I've only included one "event" for demonstration purposes, but the :Records will be a vector of multiple :kinesis events up to the number that you've configured within the event source for the Lambda function. I'm starting off with 1000. Kinesis takes care of the deserializing the payload for us into base64. We'll need to decode it to get the actual JSON data out...
[:Records
[{:kinesis {:kinesisSchemaVersion "1.0",
:partitionKey "foo",
:sequenceNumber "<big-ass-number>",
:data "bXIuIG1hZ29vIHJvY2tzIQ=="},
:eventSource "aws:kinesis",
:eventVersion "1.0",
:eventID "shardId-000000000000:<big-ass-number>",
:eventName "aws:kinesis:record",
:invokeIdentityArn "arn:aws:iam::<snowden-knows>:role/lambda_exec_role_kinesis",
:awsRegion "us-east-1",
:eventSourceARN "arn:aws:kinesis:us-east-1:<snowden-knows>:stream/fuel-test"}
]]
So for every group of "events", the handle-event function is called. This function basically calls a transducer, then the write-records function submits those to Redis via the Carmine library. Finally we take two events off the records collection and log them along with the collections count.
(defn handle-event [event]
(let [records (k/get-keys event)]
(do
(r/write-records records)
(log/info (clojure.string/join ";" (take 2 records)) " Event count => " (count records)))))
The log/info is logged to CloudWatch Logs.
The get-keys function applies a transducer to our collection of events. Transducers are really cool in that they allow for composable transformations of your data. Since they are decoupled from input or output sources, they can used in many different processes. The other awesome thing about them is that they compose directly without the creation of intermediate collections. It's bat shit crazy and really powerful. The get-keys and the transducing function looks a bit like this:
(def data-xf
(comp (get-data-records) (decode-messages) (json-string->map) (build-keys)))
(defn get-keys [events]
(sequence data-xf (get-in events [:Records])))
The data-xf is the transducer that is composed of a series of transformations. This transducer is then applied to the incoming collection. Walking through the transducer, the first thing is to extract all the :data events, which are base64 encoded. Next we need to do a base64 decode, which yields a collection of JSON strings. From here we decode the JSON strings into Clojure maps, and finally we build the keys that will be sent off to Redis. Note that the transducer does a single pass over the original collection, and doesn't create intermediate collections between the various function calls.
("2015-12-17:cart::checkoutnow:impression_2276" "2015-12-17:cart:ordersummary::impression_2276")
The return from the get-keys is a sequence of compound keys that we send over to Redis where the INCR operation is called against each of the keys.
redis-cli
127.0.0.1:6379> auth <snowden-knows>
OK
127.0.0.1:6379> get 2015-12-17:cart::checkoutnow:impression_2276
"455"
127.0.0.1:6379>
So there you have it, Bob's your uncle !