Friday, May 26, 2017

Kickin' the tires on NiFi's Wait/Notify Processors


With the release of NiFi 1.2 a number of new processors were introduced, including the Wait/Notify, and GCSObject processors. The Notify processor allows you to cache a value, called a release value, in the Map Cache Server. The Map Cache Server is primarily used to store lookup values and for keeping user-defined state.  Using the Notify along with the Wait processor, you can hold up the processing of a particular flow until a "release signal" is stored in the Map Cache Server. This is very useful with a number of various ETL flows.

The initial flow that I planned on applying these processors to is the Zendesk Chat service to extract both individual chat messages and session information.  I've previously written about how we're currently processing chats.  Although this has been working fine, it does require a dependency on a third party database sync process from Zendesk to Redshift, which only runs once an hour. By being able to process the chats directly from the Zendesk, Looker's Department of Customer Love (DCL) is able to get quicker chat metrics along with cutting out the middleman and removing an integration point. Now were able to load directly into Google Big Query at a much quicker cadence. So let's get to the meat of things.....

The high level overview of the flow is as follows:
  • Trigger the event flow for a given date range.
  • Extract all the chat "sessions" for the given range, may require a looping construct.
  • Query and get all the messages for the given "sessions".
  • Load into Google Cloud Storage.
  • Load into Big Query.

I won't cover in detail the first step of triggering the flow, but essentially we use a combination of HandleHttpRequest and GenerateFlowFile processors.  The HandleHttpRequest allows us to process a given range of dates in an ad-hoc manner, whereas the GenerateFlowFile allows for scheduling the flow via a cron expression and/or run every X seconds/minutes...

Once we have the date range, the next part is to get all the "sessions" from the API.  We don't know ahead of time how many chats we'll have, so we'll need to factor in the need to handle pagination and some sort of looping construct.  The search API returns a next_url if there are more chats for the given range. In that case, we'll need to loop around and get the next batch of sessions.  This will continue until the next_url is NULL.  Below is the flow we're using to get all the chats for a given date range.


Since we need to be careful to not exceed the rate limit for the API, a delay is implemented that throttles each call to the search API. With each iteration, the HTTP response is a JSON object that has a results array. The results array includes a url of the chat, a starting timestamp, a preview message, a type attribute, and an id of the chat session. It's rather easy to extract out all the ids from the results array using JQ.



Next all the "pages of chat sessions" are sent downstream to a MergeContent processor which will merge them into a single FlowFile.  Once all the session ids are merged, we then immediately split the entire group via the SplitText processor into chunks of 50 (the Zendesk API limit).  Now we've set the stage to use the bulk chat API to efficiently extract the session messages.


The SplitText processor provides some useful attributes that we'll use in the Wait processor.  The key one in this flow is the fragment.count. Each FlowFile resulting from the split will have a fragment.index attribute which indicates the ordering of that file in the split and a fragment.count which is the number of splits from the parent. For example, if we have 120 sessions to process, and we split those into 50 sessions per chunk, we will have three chunks.  Each one of these FlowFiles will have a fragment.count that is equal to three, and each one will have a fragment.index that is a index value in the split.  The original FlowFile is sent off to the Wait processor, which we'll discuss later, and all the merged/split data is sent downstream for processing.

Each "chunk of 50" works their way through the flow and eventually is written to Google Cloud Storage via the new PutGCSObject processor.  The success relationship of the PutGCSObject is routed to the Notify processor, which in-turn updates the DistrbutedMapCacheServer. The value being updated in the DistrbutedMapCacheServer is the Signal Counter Name, and the delta being applied to the update is the Signal Counter Delta. So for each file written successfully, the Signal Counter Name is incremented by 1.



Notify Processor properties

Now we need to back up a bit in the flow to the SplitText processor labelled "chunk chats", we can see a fork in the processing.  This is where we setup the Wait processor. The original FlowFile will sit and Wait for the appropriate "release signal" to be written to the DistrbutedMapCacheServer before proceeding.  Once the "release signal" is detected, the FlowFile will be routed to the success queue which is sent on to the Processor Group that will load the data into a Google BigQuery partitioned table.  In the event that one or more of the files are not successfully written to Google Cloud Storage, the original FlowFile in the wait queue will eventually (10 min default) expire. This will result in FlowFile being routed to the expired queue where we route that to a Amazon SNS Topic that will notify me.








Wait Processor properties




I really enjoy the flexibility of being able to adjust and change workflows so easily in NiFi. The addition of the Wait/Notify processors are great additions to NiFi and further expands and extends the usefulness of NiFi. I keep finding more and more ways to leverage NiFi for our ETL and data processing needs.


So there you have it, Bob's your uncle

Keeping in-sync with Salesforce

Keeping two or more data stores in-sync is always a fun task. With the help of Apache NiFi and some decently tooling, I've been able to...