Keeping two or more data stores in-sync is always a fun task. With the help of Apache NiFi and some decently tooling, I've been able to craft up a solution here at Looker that allows us to easily keep our Salesforce data in-sync and up-to-date in Google Big Query. We've been running this in production for close to a year now, and it's proven to be very flexible and rather solid. In this post I'll cover how we accomplish this.
Before going into the plumbing details, here's a very high overview of the process.
1. Extract the data from Salesforce, either incremental or full exports.
2. Use NiFi's Record Reader and a Record Writer Controller Services to transform the data from CSV to JSON.
3. Load the data into Google Big Query.
Below is a high level overview
Extraction Flow |
Plumbing
The first task was to get data out of the Salesforce in a fast and reliable manner. After researching both the REST/SOAP based API and Dataloader, it seemed that the Dataloader would be the right tool for our use case. Data Loader is a client application for the bulk import or export of data. It can be used to insert, update, delete, or export Salesforce records. I will not cover the details of getting the Dataloader setup, it's covered pretty well in the Developer Guide.
When running Dataloader in batch mode, you provide a configuration directory that contains the necessary config.properties, log-conf.xml, and the process-conf.xml files. All of our Salesforce extract processes share the same config.properties and log-conf.xml, so those are easily symlinked to a shared location. The interesting component that drives what data is extracted is contained in the process-conf.xml file. The process-conf.xml file contains the information that Dataloader needs to process a "bean". Each process-conf.xml file refers to a single process such as an insert, upsert, export, and so on. Therefore, this file can contain multiple processes. One of the parameters that you can configure with each bean is the sfdc.extractionSOQL which allows valid SOQL (Salesforce SQL) to be ran for that bean. Wanting to have the extractionSOQL be driven at runtime, and not hardcode into the process-conf.xml, I opted to leveraged xsltproc since we're dealing with XML files. I use xsltproc currently to manage and manipulate Hadoop XML configurations files via Ansible, so I knew it was the right tool for the job. Xsltproc is a command line tool for applying XSLT documents which allow parsing of XML and various tree manipulations. With xsltproc, you provide an output file, a XSLT transformation, and an existing XML file to do the transformation on. In addition, you can pass a parameters into the XSLT Transformation via the --stringparam argument. For example, running the command below, the extractionSOQL parameter defined in the XSLT would be overriden by the command line argument.
xsltproc --timing --stringparam extractionSOQL 'SELECT foo FROM bar WHERE baz = "green"' -o conf/bar/process-conf.xml xsl/bar.xsl conf/nifi-process-conf-master.xml
<xsl:param name="extractionSOQL" select="'Select Foo FROM Bar'"/>
<xsl:template match="@*|node()">
<xsl:copy>
<xsl:apply-templates select="@*|node()" />
</xsl:copy>
</xsl:template>
<xsl:template match="bean[@id='BarExtract']/property[@name='configOverrideMap']/map">
<xsl:copy>
<xsl:apply-templates select="@*|node()"/>
<entry key='sfdc.extractionSOQL' value="{$extractionSOQL}" />
</xsl:copy>
</xsl:template>
<bean id="BarExtract" class="com.salesforce.dataloader.process.ProcessRunner" singleton="false" lazy-init="default" autowire="default" dependency-check="default">
<description>Foo Extract</description>
<property name="name" value="Lead"/>
<property name="configOverrideMap">
<map>
<entry key="sfdc.entity" value="Foo"/>
<entry key="process.operation" value="extract"/>
<entry key="process.mappingFile" value="/opt/nifi-common/scripts/salesforce/dataloader/mapping-files/Foo.sdl"/>
<entry key="dataAccess.type" value="csvWrite"/>
<entry key="sfdc.extractionSOQL" value="SELECT Foo FROM Bar WHERE baz = 'green'"/>
</map>
</property>
</bean>
Now that we're exporting the data out of Salesforce in CSV format, the next piece of the puzzle is to convert that CSV data into JSON data so we can load it into Google Big Query. A combination of the FetchFile -> UpdateAttribute -> QueryRecord take care of the heavy lifting. QueryRecord processor allows us to set a Record Reader and Record Writer. In our case, we're using the CSVReader and JSONRecordSetWriter. These allow use to map/convert the CSV data to JSON fairly easily.
From there we prep the JSON for Big Query by running it through a simple JQ filter, compress and store the results in Google Cloud Storage. The flowfile is then passed out of the Salesforce processor and routed to the processor group responsible for loading the data into Google Big Query.
Once all the necessary flows were wired together, we scheduled them to run via the use of a GenerateFlowFile with some custom JSON text. Additionally we have a HandleHttpRequest listening for adhoc request to kick off and run of the flow. When the GenerateFlowFile triggers a creation, it's routed the an EvalutateJsonPath processor which extracts all the needed JSON values into flowfile attributes.
For example, within the GenerateFlowFile CustomText section, the following is added which will generate the necessary JSON to extract all the data from the Bar Object/Table that was modified within the last 30 minutes and is not deleted.
Scheduling
Once all the necessary flows were wired together, we scheduled them to run via the use of a GenerateFlowFile with some custom JSON text. Additionally we have a HandleHttpRequest listening for adhoc request to kick off and run of the flow. When the GenerateFlowFile triggers a creation, it's routed the an EvalutateJsonPath processor which extracts all the needed JSON values into flowfile attributes.
For example, within the GenerateFlowFile CustomText section, the following is added which will generate the necessary JSON to extract all the data from the Bar Object/Table that was modified within the last 30 minutes and is not deleted.
{"http.headers.looker_bq_write_disposition":"WRITE_APPEND","http.headers.looker_table":"Bar","http.headers.looker_SOQL":"Select foo FROM Bar WHERE IsDeleted = false AND SystemModstamp > ${now():toNumber():minus(1800000):format("yyyy-MM-dd'T'HH:mm:ss'Z'","GMT")}"}
So there you have it, Bob's your uncle