Recently I decided to start the process of migrating our internal ETL processes over to Apache NiFi. As with most companies, ETL scripts seem to get scattered all over the place over time. It becomes rather challenging to not only know what's running where and when, but also gaining visibility into various execution and dependencies of these flows becomes hard. I've been very pleased with the flexibility and broad features/functionality that's provided with NiFi. There are hundreds of builtin processors that cover most ETL needs. If you do find yourself needing functionality that's not provided, one can easily extended functionality for example by using the ExecuteScript processor to wrap existing ETL code.
It was rather easy for me to port my existing Drake workflows over to NiFi. In this post, I'll focus at a high level on one workflow, our Zendesk Chat Parser. This ETL flow essentially extracts Zendesk chat data from Salesforce, parses and formats the chats, and then loads it into Google Big Query.
At the top level, there are three Processor Groups, which are solely used for logically organizing the Flows. The first flow extracts the raw chats from the database for a given date. The start of a Flow is designed to be invoked in two ways. The first being on a timer basis, so every 30 minutes the ExecuteProcess processor will trigger for the current date. The second is via the ListenHTTP processor. The ListenHTTP creates a HTTP endpoint that can be called if we want to trigger the flow for a specific date. Below is an example of triggering the flow via curl for example:
curl -X POST -H 'Content-Type: application/json' -H 'chat_date:2016-12-20'
--data "invoked chat sync" <nifi-server>:8082/chatSyncListener
Below is a screen shot of the complete flow for the extract:
One thing to note on the chat extract, an UpdateAttribute processor is connected to the output of the RouteOnAttribute processor. This is being used to add various attributes/metadata to the Flowfile, which will be used in downstream processors to aid with loading of the data into the correct BigQuery project/table and table partition.
The second stage of the pipeline is kicked off automatically once data is sent into the "output port". You connect process groups together via output/input port components. Since all the data that is extracted from the database via the ExecuteSQL processor will be in Avro format, we'll need to convert the records into JSON records before loading into Big Query. This is accomplished very easily with the ConvertAvroToJSON processor.
The ExecuteStreamCommand processor labeled "parse zendesk chats" runs the jar file that actually parses all the raw incoming chat events. I've blogged about using Instaparse for the parsing previously, you can read about it in more detail here. The output of this processor is sent downstream to the next processor in the flow, the "format chats for big query". This step leverages JQ to reformat the JSON; converting the "chat-line-timestamp" into a UNIX timestamp along with renaming various fields.
JSON content before the ExecuteStreamCommand:
{"chat-date":"2016-12-20","ticket-id":"64085","chat-line-timestamp":"02:19:50 PM","chatter":"Joe","chat-line":"Good morning.","created-at":"2016-12-20 02:19:50 PM"}
{"chat-date":"2016-12-20","ticket-id":"64085","chat-line-timestamp":"02:19:58 PM","chatter":"Susan","chat-line":"Good morning Joe!","created-at":"2016-12-20 02:19:58 PM"}
{"chat-date":"2016-12-20","ticket-id":"64085","chat-line-timestamp":"02:20:00 PM","chatter":"Susan","chat-line":"How are you?","created-at":"2016-12-20 02:20:00 PM"}
{"chat-date":"2016-12-20","ticket-id":"64085","chat-line-timestamp":"02:20:12 PM","chatter":"Joe","chat-line":"Great. I have a quick question on Dashboards this morning.","created-at":"2016-12-20 02:20:12 PM"}
JSON content after the ExecuteStreamCommand:
{"ticket_id":"64085","chatter":"Joe","chat_line":"Good morning.","created_at":1482243590}
{"ticket_id":"64085","chatter":"Susan","chat_line":"Good morning Joe!","created_at":1482243598}
{"ticket_id":"64085","chatter":"Susan","chat_line":"How are you?","created_at":1482243600}
{"ticket_id":"64085","chatter":"Joe","chat_line":"Great. I have a quick question on Dashboards this morning.","created_at":1482243612}
The final stages of the flow is the loading of the data into BigQuery. This step is triggered via the ListFile processor which listens for files being dropped into a directory. The ListFile processor will maintain state so the same file will not be listed/processed twice. Since I needed to embed a project id, table name, and partition into the filename, another UpdateAttribute processor is needed to parse and extract those back into Flowfile attributes. These attributes will then be used by the downstream ExecuteScript processor. Nifi has a pretty rich Expression Language that can be used for such tasks quite easily.
I happen to be using Groovy to wrap the Google Big Query command line tool. Once I define the flowFile session via session.get(), one can access the flowFile attributes by calling the getAttributes() method on the session object.
def flowFile = session.get();
if (flowFile == null) {
return;
}
def attributes = flowFile.getAttributes()
Now we're able to leverage the bq command line tool to upload the file and load the data into the correct project, table and table partition. I'm calling bq by using the Groovy's executeOnShell method. If we do get a failure loading Big Query, a notification is sent to text and email me via AWS SNS and I set the processor status to failure via:
session.transfer(flowFile, REL_FAILURE)
The use of NiFi is expanding quickly here at Looker. The seamless experience between design, control, feedback, and monitoring make for a very powerful tool in any data engineering toolkit.