Leveraging Apache NiFi for agile dataflows
Mar 8, 2017
At Looker, we’ve come a long way from our humble single MySQL database and nightly extraction, transformation, and loading (ETL) jobs via cron.
Today we have multiple data sources such as Salesforce, Zendesk, Marketo, etc, as well as multiple databases, including Redshift and BigQuery. Over the years we‘ve cobbled together a number of homegrown scripts running on a fleet of ambiguously named instances to assemble our analytical backend. This architecture resulted in a number of challenges that compounded over time, including job visibility, completion status, error detection and notification, logging, and one really important one; job dependencies on other jobs completion. In addition to the challenges, this workflow introduced an increased operational support load for various teams.
Adding to the operational challenges, we are also consolidating more data into Massively Parallel Processing (MPP) databases like Google BigQuery and Amazon Redshift, and as we do this we need to maintain existing data pipelines as well as be able to test new ones.
Enter Apache Nifid
Apache Nifi was originally created by the United States National Security Agency (NSA) and was open sourced in 2014. It enables the automation of data flow between systems. The main challenges we faced that were addressed by NiFi include, but are not limited to:
- Understanding the big picture of a data flow
- Ability to make immediate changes and tighten the feedback loop
- Maintain a chain of custody for data
NiFi provides a number of high-level capabilities and objectives that can help in tackling the complex issues of ETL that most companies face.
Some of these important capabilities include:
- Highly configurable
- Flow can be modified at runtime
- Back pressure
- Data Provenance
- Track data flow from beginning to end
- Designed for extension
- Build your own processors and more
- Enables rapid development and effective testing
I’ve been rolling out Apache NiFi here at Looker to handle more of our ETL jobs. As I began the process of migrating more of old ETL processes to NiFi, as well as developing new ones, I decided now was the time to invest in a NiFi cluster. By employing a NiFi cluster, it’s possible to have increased processing capability along with a single interface through which to make changes and to monitor various dataflows. Clustering allows changes to be made only once, and that change is then replicated to all the nodes of the cluster. Through the single interface, we may also monitor the health and status of all the nodes.
NiFi employs a Zero-Master Clustering model. Each node in the cluster performs the same tasks on the data, but each operates on a different set of data. One of the nodes is automatically elected (via Apache ZooKeeper) as the Cluster Coordinator. All nodes in the cluster will then send heartbeat/status information to this node, and this node is responsible for disconnecting nodes that do not report any heartbeat status for some amount of time. Additionally, when a new node elects to join the cluster, the new node must first connect to the currently-elected Cluster Coordinator in order to obtain the most up-to-date flow.
Although it's beyond the scope of this post, I used Ansible to spin up and configure all the necessary nodes and cluster configuration. Without some sort of orchestration tool like this, you're pretty much doomed from the beginning. Currently we have a cluster of three nodes that's handling a number of data flows between a variety of systems including S3, Redshift, MySQL and Google BigQuery. The images below show the main canvas page for this cluster.
A few NiFi terms will help with the understanding of the various things that I'll be discussing. I lifted these straight from the NiFi documentation:
- Flowfile- represents each object moving through the system and for each one, NiFi keeps track of a map of key/value pair attribute strings and its associated content of zero or more bytes.
- Processors- actually perform the work, and with Nifi 1.1.x there's currently 188 of them.
- Processor Group- is a specific set of processes and their connections, which can receive data via input ports and send data out via output ports. In this manner, process groups allow creation of entirely new components simply by composition of other components.
I tend to leverage Process Groups quite a bit. They allow one to logically organize various Processors and can either be Local and/or what's called a Remote Process Groups (RPG). As your flows get more complicated, you'll most certainly want to leverage Process Groups to at least organize and compartmentalize the various data flows. RPGs allow you to send data to different clusters or to different machines within the same cluster. In our setup, we're employing a RPG to spread out out the data processing to the various nodes within the same cluster. Once we source from the primary node, the Flowfiles are sent via a RPG to their respective downstream processors and processor groups, which in turn distribute work to a given node in the cluster.
I'm briefly going to cover one of our more interesting dataflows; the Zendesk Chat sync. Let's take the red pill and follow the chat event extract and downstream flows. This flow extracts our Zendesk chats and parses the conversation into individual chat lines. These are then loaded into Redshift and Google BigQuery. The high level steps are as follows:
- Extract raw events from the zendesk._ticket_comments table for a given date
- Convert the raw chat lines into individual chat lines.
- Format the chats in JSON for loading into Redshift and Google BigQuery
- Load into Redshift and Google BigQuery
- Rinse, lather, and repeat...
Drilling into the Process Group labelled ZendeskExtract, we see the details of the extract process.
As you see, there's a variety of components on the canvas, these are called "processors". Processors actually perform the work and with Nifi 1.1.x there's currently 188 of them. Most of what you need is available out of the box and requires little or no coding. A processor does some combination of data routing, transformation, or mediation between systems. Processors also have access to attributes of a given FlowFile and its content stream. The processor can either commit its work or rollback. Below is an example of the UpdateAttributes processor which updates the attributes for a FlowFile by using the Attribute Expression Language.
For this given flow, we run it every 30 minutes and it is triggered via the GenerateFlowFile processor. GenerateFlowFile allows you to either generate empty Flowfiles for testing, or as in our case, a Flowfile with custom input text. Since you can parameterize an ExecuteSQL processor with attributes from a Flowfile, I inject the JSON attribute chat_date into the body of the Flowfile. This will become an attribute that later on becomes a predicate for the SQL statement ran by the downstream ExecuteSQL processor. For example:
I’ve also configured a ListenHTTP processor that allows us to rerun the flow for any given date. For example you can call this via a simple curl command like this:
curl -X POST -H 'Content-Type: application/json' -H 'chat_date:2017-02-13' --data "invoked chat sync for 2017-02-13" 10.190.3.108:9000/chatListener
This Flowfile is then passed downstream to the EvaluateJsonPath and UpdateAttribute processors before reaching the zendesk-output port. These processors are used to extract the chat_date from the Flowfile along with setting a number of attributes used in downstream processing.
At this stage in the ETL flow, we're ready to actually run the queries to extract the data from the database. Since we have a cluster of nodes, we want to be able to execute the SQL on any given node. This is important to allow the flow to scale the processing. On the flip side, when data is sourced we need to ensure that only one node for a given SQL statement is running the query, otherwise we'll get duplicates.
With the GenerateTableFetch processors, I specify a PartitionSize for each table. The GenerateTableFetch processor will generate flow files containing SQL statements with different offset and limit predicates. Each Flowfile will then be sent to the cluster and assigned to run on a single node via an ExecuteSQL processor. For example, the SQL coming out of the GenerateTableFetch processor might look like this:
SELECT * FROM zendesk._ticket_history
WHERE id > 244779086088 ORDER BY id LIMIT 200000
Example of the ExecuteSQL running on different nodes within the cluster
All the Zendesk tables are sent to the same processor group and generally follow the same path. The Chats do require a couple of extra steps of transformation along with the newly transformed data being loaded back into Redshift along with Google BigQuery. This parallel execution path is needed as we develop the necessary LookML for transitioning from Redshift to Google BigQuery. Once the transition is complete, we can simply turn off the path that loads to Redshift without every disrupting the path to Google BigQuery. This transformation path is handled via routing.
A FlowFile that has the looker_table attribute of chats, it will be routed to the parse chats processor. The parse chats processor leverages the ExecuteStreamProcessor and feeds an external java program that parses and formats the chats.
After the parsing, a new FlowFile is created as a result of the output stream. This Flowfile makes its way down to the formatting processor before it is sent back into the Big Query and Redshift flows.
The next few steps in the flow are pretty boring. With Redshift path we compress the Flowfile content, write to S3 and load into Redshift via a COPY command. With the Google BigQuery data flow, we compress the JSON, write it to disk along with a metadata file that contains all the information needed for loading into Google BigQuery. The final steps in the flow are fairly straightforward. We read in the metadata file, extract the metadata JSON into a FlowFile attributes, route the loading to partitioned vs. non-partitioned tables, and finally load the data via the BigQuery command line tool.
One of the most important features of NiFi is built-in support for data provenance. Data provenance documents the inputs, entities, systems, and processes that influence data of interest, in effect providing a historical record of the data and its origins. For each processor within NiFi, one can click on the component and inspect the data provenance.
For example, let's look at the data provenance for the ExecuteScript processor that loads the chat events into a Google BigQuery partitioned. The data provenance can be accessed by right clicking on the processor and select the Data Provenance.
If we look at the details of an event, and then the Content tab, we can inspect both the input and output claim. The input claim in this example is a Flowfile with the metadata specifying what file that will be loading into Google BigQuery:
Whereas the output claim will have the output of the BigQuery load:
Waiting on bqjob_r67aaeabc3cb7ebe7_0000015a379d509e_1 ... (0s) Current status: RUNNING
Waiting on bqjob_r67aaeabc3cb7ebe7_0000015a379d509e_1 ... (1s) Current status: RUNNING
Waiting on bqjob_r67aaeabc3cb7ebe7_0000015a379d509e_1 ... (1s) Current status: DONE
I can’t stress how important data provenance is in any ETL pipeline/job. As things get more complex it becomes even more critical to gain insight into what's going on at each step of the way. Although this is just one important aspect of NiFi, there's a number of other really important and useful features and functionality that I'll cover in later posts.
NiFi purely focuses on the task of connecting systems that produce and process data, while providing the user experience and core functions necessary to do that. Once you get data into a dataflow, you need a way to manage and deliver that. This is what NiFi does, and does it well.
Today at Looker, we’re using NiFi to efficiently and reliably deliver millions of events hourly to a variety of data stores like Amazon Redshift and Google BigQuery. NiFi has been extremely useful in allowing us to quickly adjust, augment, and add various dataflows. At the same time, NiFi allows us to more easily wrap our heads around various dependencies and steps used to move bits of information from A to B, and sometimes through C.