Picture an architecture where production data gets painstakingly replicated to a very expensive secondary database, where, eventually, yesterday’s information gets analyzed. What’s the name for this “pattern”? If you answered “Traditional Business Intelligence (BI)”, you’ve won a rubber Mule and a warm handshake at the next Mule Summit!
As the volume of data to analyze kept increasing and the need to react in real-time became more pressing, new approaches to BI came to life: the so-called Big Data problem was recognized and a range of tools to deal with it started to emerge.
Apache Hadoop is one of these tools. It’s “an open-source software framework that supports data-intensive distributed applications. It supports the running of applications on large clusters of commodity hardware. Hadoop was derived from Google’s MapReduce and Google File System (GFS) papers” (Wikipedia). So how do you feed real-time data into Hadoop? There are different ways but one consists in writing directly to its primary data store named HDFS (aka Hadoop Distributed File System). Thanks to its Java client, this is very easily done in simple scenarios. If you start throwing concurrent writes and the need to organize data in specific directory hierarchies, its a good time to bring Mule into the equation.
In this post we will look at how Mule’s HDFS Connector can help you write time series data in HDFS, ready to be map-reduced to your heart’s content.
Show me the data
The time series data we will insert in HDFS is very basic. It follows this pattern:
{source_id},{YYYY-MM-DD'T'HH:mm:SS},{value}
It’s simply a value at a point in time for a specific data source. If you’ve dealt with smart meter data, this should look familiar. Here is a sample data frame that follows this format:
210145,2013-04-26T21:36:52,99.9
We want the data to be organized in HDFS in a specific hierarchy of directories that follows this path pattern:
${data_root}/{source_id}/{YYYYMMDD}/raw.dat
This means that each data source will have a sub-root created based on its ID and below it all data will be accumulated in a single file per day named raw.dat.
It is also to be noted that only a best effort is needed to ensure that no data gets lost. If a crash occurs in the processing chain and some data gets lost, processing will not be drastically affected (for example, some sources produce data that can be interpolated if a gap exists).
Finally, the other constraints we have to deal with are:
- raw data is received over HTTP and must be acknowledged right away, whether it’s successfully processed or not (clients do not support retrying),
- concurrently appending to the same HDFS file is not supported.
Implementation
To use the HDFS Connector for Mule, add it as a Maven dependency to your project’s POM file as explained in the installation guide. If you’re a Mule Studio user, you can add the HDFS plug-in via its Eclipse Update Site.
We implement the storage solution with two flows, as illustrated below:
- The data-receiver flow accepts data over HTTP. It uses a one-way inbound endpoint in order to immediately respond 200 OK and let the rest of the processing happen asynchronously. The data is received as an input stream which we deserialize to a String before sending it to a VM queue for accumulation. We use a non-persistent in-memory VM queue because it fits our constraints.
- The data-writer flow consumes the data from the VM queue with a single-thread processing strategy (configuration not visible above). It takes care of adding a CRLF at the end of each data frame for persistence in the file. It also computes the HDFS path under which the data must be written and, based on the pre-existence of this path, either creates it or append to it.
Note that, in theory, we could use one thread per source ID in the data-writer flow instead of a single thread for all. This would increase throughput while ensuring no more than one thread appends to a particular HDFS file at a time. In practice, since there’s a potentially unlimited number of source IDs, this would be problematic. It would be better to instead have a limited number of VM queues and bind each source ID with a particular queue using a consistent source ID to queue ID mapping algorithm. With most of the above flow put in common in a sub-flow, the flow in charge of consuming these VM queues would be very small.
Analyze this!
Loading data in HDFS is of course just the beginning of Mule’s adventure with Big Data. As data gets loaded in near real-time, map/reduce jobs can be run on it. For this, tools like Apache Pig or Apache Mahout come handy. But this is for another story.
In the meantime you can start playing with this example: its source code is available on GitHub.
What is your success story using Mule for Big Data? Any tip or trick or suggestion you’d like to share with us? As always, your comments are very valuable to us.