Monday, February 13, 2017

More Apache NiFi fun...



In my last post, I briefly went over some basic extraction transformation and load processing (ETL) that I've began doing with Apache Nifi. As I begin to tackle more and more of migrating our old ETL processes to NiFi,  I decided now was the time to invest in getting a NiFi cluster running. We're going all in with NiFi, it's proving itself very capable and flexible.  By employing a NiFi cluster, it’s possible to have increased processing capability along with a single interface through which to make changes and to monitor various dataflows. Clustering allows changes to be made only once, and that change is then replicated to all the nodes of the cluster. Through the single interface, we may also monitor the health and status of all the nodes.

NiFi Cluster

NiFi employs a Zero-Master Clustering model. Each node in the cluster performs the same tasks on the data, but each operates on a different set of data. One of the nodes is automatically elected (via Apache ZooKeeper) as the Cluster Coordinator. All nodes in the cluster will then send heartbeat/status information to this node, and this node is responsible for disconnecting nodes that do not report any heartbeat status for some amount of time. Additionally, when a new node elects to join the cluster, the new node must first connect to the currently-elected Cluster Coordinator in order to obtain the most up-to-date flow.

Although it's beyond the scope of this post, I used Ansible to spin up and configure all the necessary nodes and cluster configuration.  Without some sort of orchestration tool like this, you're pretty much doomed from the beginning.  Currently we have a cluster of three nodes that's handling a number of data flows between a variety of systems including S3, Redshift, MySQL and Google Big Query.  The images below show the main canvas page for this cluster.


A few NiFi terms will help with the understanding of the various things that I'll be discussing below. I lifted these straight from the NiFi documentation:
  • Flowfile- represents each object moving through the system and for each one, NiFi keeps track of a map of key/value pair attribute strings and its associated content of zero or more bytes.
  • Processors- actually perform the work, and with Nifi 1.1.x there's currently 188 of them.
  • Processor Group- is a specific set of processes and their connections, which can receive data via input ports and send data out via output ports. In this manner, process groups allow creation of entirely new components simply by composition of other components.

I tend to leverage Process Groups quite a bit.  They allow one to logically organize various Processors and can either be Local and/or what's called a Remote Process Groups (RPG).  As your flows get more complicated, you'll most certainly want to leverage Process Groups to at least organize and compartmentalize the various Processors. RPGs allow you to send data to different clusters or to different machines within the same cluster. In our setup, we're employing a RPG to spread out out the data processing to the various nodes within the same cluster. Once we source the data from the primary node, the Flowfiles are sent via a RPG to their respective downstream processors and processor groups, which in turn distribute work to a given node in the cluster.



I'm briefly going to cover one of our dataflows; the Zendesk Chat sync. Let's take the red pill and follow the chat event extract and downstream flows. This process extracts our the Zendesk chats and parses the conversation into individual chat lines.  These are then loaded into Redshift and Google Big Query.  The high level steps are as follows:

1. Extract raw events from the zendesk._ticket_comments table for a given date

2. Convert the raw chat lines into individual chat lines. I covered this process in a previous blog post here.

3. Format the chats in JSON for loading into Redshift and Google Big Query

4. Load into Redshift and Google Big Query


Drilling into the Process Group labelled ZendeskExtract, we see the details of the extract process.




As you see, there's a variety of components on the canvas, these are called "processors". Processors actually perform the work and with Nifi 1.1.x there's currently 188 of them. Most of what you need to do is available out of the box and requires little or no coding.  A processor does some combination of data routing, transformation, or mediation between systems. Processors also have access to attributes of a given FlowFile and its content stream. The processor can either commit its work or rollback. Below is an example of the UpdateAttributes processor which updates the attributes for a FlowFile by using the Attribute Expression Language.






For this given flow, we run it every 30 minutes and it is triggered via the GenerateFlowFile processor.  GenerateFlowFile allows you to either generate empty Flowfiles for testing, or as in our case, a Flowfile with custom input text.  Since you can parameterize an ExecuteSQL processor with attributes from a Flowfile, I inject the JSON attribute chat_date into the body of the Flowfile. This will become an attribute that later on becomes a predicate for the SQL statement ran by the downstream ExecuteSQL processor. For example:


{"chat_date":"2017-02-10"}




This Flowfile is then passed downstream to the EvaluateJsonPath and UpdateAttribute processors before reaching the zendesk-output port.  These processors are used to extract the chat_date from the Flowfile along with setting a number of attributes used in downstream processing.







At this stage in the ETL flow, we're ready to actually run the queries to extract the data from the database.  Since we have a cluster of nodes, we want to be able to execute the SQL on any of the nodes. This is important since when data is sourced, we need to ensure that only one node for a given SQL statement is running the query, otherwise we'll get duplicates. For the chat-event, we only generate a single Flowfile every 30 minutes, so this is fine.  With the GenerateTableFetch processors, I specify a PartitionSize for each table.  The GenerateTableFetch processor will generate flow files containing SQL statements with different offset and limit predicates.  Each Flowfile will then be sent to the cluster and assigned to run on a single node via an ExecuteSQL processor.  For example, the SQL might look like this:


SELECT * FROM zendesk._ticket_history WHERE id > 244779086088 ORDER BY id LIMIT 200000

Example of the ExecuteSQL running on different nodes within the cluster





All the Zendesk tables are sent to the same processor group and generally follow the same path.  The Chats do require a couple of extra steps of transformation along with the newly transformed data being loaded back into Redshift along with Google Big Query. This parallel execution path is needed as we develop the necessary LookML for transitioning from Redshift to Google Big Query. Once the transition is complete, we can simply turn off the path that loads to Redshift without every disrupting the path to Google Big Query. This transformation path is handled via routing.



So a FlowFile that has the looker_table attribute of chats, it will be routed to the parse chats processor.  From here the FlowFile makes its way down to the formatting processor before it is sent back into the Big Query and Redshift flows.




The next few steps in the flow are pretty boring.  With Redshift path we compress the Flowfile content, write to S3 and load into Redshift via a COPY command. The Google Big Query path we compress the JSON and write to disk along with a metadata file that contains all the JSON needed to provide the necessary information for loading into Google Big Query. The final steps in the flow are fairly straightforward. We read in the metadata file, extract the needed JSON into a FlowFile attribute, route the loading to partitioned vs. non-partitioned tables, and finally load the data via the Big Query command line tool




Data Provenance


One of the most important features of NiFi IMHO is built in support for data provenance. Data provenance documents the inputs, entities, systems, and processes that influence data of interest, in effect providing a historical record of the data and its origins. For each processor within NiFi, one can click on the component and inspect the data provenance.

For example, let's look at the data provenance for the ExecuteScript processor that loads the chat events into a Google Big Query partitioned. The data provenance can be accessed by right clicking on the processor and select the Data Provenance.




If we look at the details of an event, and then the Content tab, we can inspect both the input and output claim. The input claim in this example is a Flowfile with the metadata specifying what file that will be loading into Google Big Query.



{
  "JSONAttributes": {
    "looker_egress_path": "/opt/nifi-working/zendesk/chat-events-egress-to-bq/",
    "looker_table_key": "",
    "looker_schema": "chat_events",
    "looker_temp_schema": "",
    "looker_bq_write_disposition": "WRITE_TRUNCATE",
    "looker_egress_file": "1698334643841923.gz",
    "looker_table": "chats",
    "looker_bq_partition": "20170213",
    "looker_temp_table": ""
  }
}

The output claim will have the output of the Big Query load





Upload complete.
Waiting on bqjob_r67aaeabc3cb7ebe7_0000015a379d509e_1 ... (0s) Current status: RUNNING
                                                                                      
Waiting on bqjob_r67aaeabc3cb7ebe7_0000015a379d509e_1 ... (1s) Current status: RUNNING
                                                                                      
Waiting on bqjob_r67aaeabc3cb7ebe7_0000015a379d509e_1 ... (1s) Current status: DONE   


Data provenance is extremely important in any ETL pipeline/job. As things get more complex it becomes even more critical to gain insight into what's going on along the way. There's a number of other really important and useful attributes of Apache NiFi that I'll cover in later posts.  But if you need to move bits from A to B, then you should really consider investing some time into learning more about how to leverage NiFi.


So there you have it, Bob's your uncle

Keeping in-sync with Salesforce

Keeping two or more data stores in-sync is always a fun task. With the help of Apache NiFi and some decently tooling, I've been able to...