The batch module was first introduced in Mule 3.5 and it aims to simplify integration use cases where basic ETL functionality is needed. If you’re unfamiliar with it, I recommend you look at this post before continuing. If you are already old friends with batch, then keep reading for a breakdown of all the changes!
We recently introduced our HowToblog series, which is designed to present simple use-case tutorials to help you as you evaluate Mulesoft’s Anypoint Platform. This blog post is a follow up to our last blog “How to ETL” using Mule. In that post, we demonstrated how Anypoint Platform could be leveraged to read a large number of messages, transform/enrich these messages, and then load them into a target system/application.
One of the biggest issues while processing multiple messages is handling unclean data. Anypoint Platform is designed to ease the burden of data integration; data integrity checks are a very important component of the process. It’s crucial to have such a check in place so as to avoid a proliferation of bad data to downstream applications.
In the previous blog post, we polled for all new order records in a MySQL database table(extract). For each order record, we do the following –
Look up the product details for each SKUs using SKU number in the record.
Look up the order status for the order using Order ID in the record.
Insert the merged data with product details and order status into the Status Report table
We are now adding the following data integrity checks to this process:
Deduplicate records in the source table based on Order ID
Ensure that all the required fields in the payload are present
Ensure the value of the fields such as Order ID is between 1 – 10000
Ensure the length of the retailer name is not greater than 70 characters
If there is an error in any of these checks, we insert an error record with the appropriate error message into the ORDERS_ERRORS table.
To follow the example in this guide, run the SQL script order.sql to set up the database and tables and load them with some sample data. You will find the order_integrity.sql file in the project under the folder src/main/resources
Import the completed project from the previous ETL blog post. Click on File -> Import -> Anypoint Studio Deployable Studio Archive -> dbetl.zip. Rename the project to “data integrity”.
Add the transform message transformer (DataWeave) from the palette into the flow after the “Capture Changed Records” poller component. Rename the component to “Deduplicate records using Order ID”. Configure the DataWeave transformer by using the following transform expression – “payload distinctBy $.OrderID”. You can also optionally set the metadata for the output to define a map object called Order as shown below –
Add a batch step before the “Transform_QuerySKU” step. Rename it to “DataIntegrityCheck” as shown below –
Add the DataWeave transformer into this new batch step and rename it “Transform to JSON”. This transformer is used to convert the map object into a JSON string as shown below. You can also optionally set the metadata for the output JSON structure.
Copy the completed JSON schema file order_json_schema.json from the src/main/resources folder of the downloaded project into the corresponding folder in your project as shown below –
Search for the “Validate JSON Schema” component in the palette and drop it into the Mule flow after the “Transform to JSON” component. Configure it as shown below to use the JSON schema file –
Drag and drop the “JSON to Object” transformer from the Palette into the flow. This transformer converts the JSON back into a Hash map. Configure it as shown below –
Insert a batch step at the end of the mule flow to handle and log failed records. Configure it as shown below to accept “ONLY_FAILURES” –
When the message enters the “FailedRecords” step, it comes as a byte array. Insert the byte array to object transformer in the flow to convert the message back to an object using the configuration as shown below –
Insert a DataWeave transformer to convert the JSON object into a Java hashmap so that we can extract individual values from the payload. Rename the component to “Convert JSON to Object” and configure it as shown below. You can also optionally set the metadata for the source and target to show the Order map object.
Insert a Variable component from the palette into the Mule flow after the DataWeave transformer. Rename it to “Store exception” and configure it to store the exception into a variable called “exception”. Use the following MEL(Mule Expression Language) expression to store the variable – “#[getStepExceptions()]”
Insert a database connector from the palette after “Store Exception”. Rename this to “Insert Error” as this would be used to insert the error records into a separate table. Configure it as shown below with the following SQL statement – “INSERTINTO ORDERS_ERRORS(OrderID,RetailerName,SKUs,OrderDate,Message) VALUES (#[payload.OrderID],#[payload.RetailerName],#[payload.SKUs],#[payload.OrderDate],#[flowVars.exception.DataIntegrityCheck.message])”
As you can see from the above example, it is not only straightforward to setup a variety of ETL scenarios using Anypoint Platform, but you can also include a sophisticated data integrity component to weed out the “bad” data. In this way, you can ensure that the downstream systems only have good data. The error records which are persisted to an error table can be fixed in the source system and then retried again.
Sign up for our blog newsletter on the top right corner of the page to stay up to date on all of our releases and posts!
Welcome to the final post in the three post series about batch improvements on Mule 3.8!
The last new feature we have is a simple one which comes quite handy when you need to read through logs. As you know, batch jobs are just programs processed in batch mode, and each time the job is triggered, a new job instance is created and tracked separately. Each of those instances is unique and therefore has a unique ID.
By default, Mule automatically generates an UUID for it. However, because UUIDs are not easy to read, correlating a log message to the actual execution can be challenging. It is even more difficult when several job instances are running at the same time.
To solve this problem, we added a new property to the batch:job definition called “Job Instance ID”:
This attribute takes an MEL expression which evaluates the results each time the job is triggered. Then the output value is used as the job instance ID.
This attribute is:
Optional: If not set, then Mule will generate a UUID as usual.
An MEL expression: Constant values are not allowed.
Unique: You have to make sure that the MEL expression always returns unique values. If the expression returns a non-unique ID, an error will occur, and it will not create a job instance.
Let’s see how a simple batch log looks like without using this feature:
As you can see, the job instance ID is ’20ae8500-d1a7-11e5-af90-6003089b5498′ which is not meaningful at all. It is hard to correlate that ID to an actual execution, especially if you have many jobs running at the same time or if you run many jobs a day.
This is how the same log looks with the custom job instance:
As you can see, the job instance ID is now ‘Job from 2016-02-12T13:34:10.481-03:00’ which makes it a lot easier to identify the instance from its timestamp. Depending on how often your jobs trigger and depending on what business purpose they serve, you might choose to use another criterion to build the ID.
This update was a short one and hopefully, it was useful too!
Welcome back! Following the series about new batch features in Mule 3.8, the second most common request was being able to configure the batch block size.
What’s the block size?
In a traditional online processing model, each request is usually mapped to a worker thread. Regardless of the processing being synchronous, asynchronous, one-way, request-response, or even if the requests are temporarily buffered before being processed (like in the Disruptor or SEDA models), servers usually end up in this 1:1 relationship between a request and a running thread.
However, batch is not an on-line tool. It can do near real-time, but the fact that all records are first stored in a persistent queue before the processing phase begins disqualifies it from that category. Not surprisingly, we quickly realized that this traditional threading model didn’t actually apply to batch.
Batch was designed to work with millions of records. It was also designed to be reliable, meaning that a transactional persistent storage was needed. The problem is that if you have 1 million records to place in a queue for a batch job that has 3 steps, in a best case scenario you’re looking at 3 million I/O operations to take and re-queue each record as it moves through the steps. That’s far from efficient, so we came out with the concept of block size. Instead of queueing each record individually, we queue blocks of 100 records, reducing the I/O overhead at the cost of higher working memory requirements.
I/O was not the only problem though. The queue -> thread -> queue hand off process also needed optimization. So, instead of having a 1:1 relationship between each record and a working thread, we have a block:1 relationship. For example:
By default, each job has 16 threads (you can change this using a threading-profile element)
Each of those threads is given a block of 100 records.
Each thread iterates through that block processing each record
Each block is queued back and the process continues
Again, this is great for performance but comes at the cost of working memory. Although you’re only processing 16 records in parallel, you need enough memory to move a grand total of 1600 from persistent storage into RAM.
The decision of having 100 as the block size came from performance analysis. We took a set of applications we considered to be representative of the most common batch use cases and tested then with records which varied from 64KB to 1MB, and datasets which went from 100.000 records up to 10 million. 100 turned out to be a nice convergence number between performance and reasonable working memory requirements.
The problem with the design decisions above came when users started to use batch for cases which exceded the original intent. For example:
Some users like the programming model that batch offers, but don’t really have a ton of records to process. It then became usual to hear about users that are trying batch with as little as 200 records or even less than a hundred. Because the block size is set at 100, a job with 200 records will result in only 2 records been processed in parallel at any given time. If you hit it with less than 101, then processing becomes sequential.
Other users tried to use batch to process really large payloads, such as images. In this case, the 100 block size puts a heavy toll on the working memory requirements. For example, if the average image is 3MB long, a default threading-profile setting would require 4.6GB of working memory just to keep the blocks in memory. These kind of use cases could definitively use a smaller size
And finally, there’s the exact opposite of the case above: jobs that have 5 million records which payloads are so small you could actually fit blocks of 500 in memory with no problem. A larger block size would give these cases a huge performance boost.
The solution is actually quite simple. We simply made the block size configurable:
As you can see in the image above, the batch job now has a “Batch Block Size” property you can now configure. The property is optional and defaults to 100, so no behaviour changes will be seen by default.
The feature is actually quite simple and powerful since it allows to overcome all the above cases. It has only one catch: you actually need to understand how block sizes work in order to set a value that actually helps. You also need to do some due diligence before moving this setting into production. We still believe that the default 100 works best for all cases. If you believe that your particular case is best suited by a custom size, make sure to run comparative tests with different values in order to find your optimum size. If you’ve never perceived this as a problem, then just keep the default value and be happy.
Hello there! If you’ve been using Mule for a while now, you probably remember that the batch module was introduced back in the 3.5 release. If you’re not familiar with it, you can familiarize yourself by following these links:
We received a lot of love for this feature, but as adoption grew, so did the requests for improvements. We introduced 3 popular improvements in the 3.8 release of Mule, and this is the first of a 3 part series to describe them.
In a nutshell, the batch module now gives you:
Read/Write access to a record’s payload and variables when inside a commit block
The ability to customize the queue block size
Configurable job instance ids
Let’s discuss the first feature…
As you know, the commit blocks are used to group a set of records together in order to perform bulk operations. For example, if you want to insert contacts into Salesforce or write to a DB, it would be quite inefficient to do that on a record by record bases, mainly because of the I/O cost (network overhead, disk overhead, etc). So, it’s better to only perform one bulk operation in which you perform those inserts for many records at a time. If you want more context and examples around uses for the commit block, please check the links above.
The problem is that the commit block only used to expose the grouped record’s payload. It did not allow you to change those payloads nor to access (for either read or write) the associated record vars. Hence, imagine a batch job in which you want to insert contacts into salesforce and then log the generate salesforce ids. There was no way of doing this with a commit block prior to 3.8. You would have to do it on a record by record basis, which would not only be slow but probably rapidly exceed your SalesforceAPI quota in no time.
The solution is to simply expose the grouped records payloads and record vars as variables as context variables you can use through the Mule Expression Language (MEL). So, let’s consider an example:
I then modified it a little bit to express the same thing as a batch kob:
The above job has:
An input phase which polls for those files and uses DataWeave to transform the file into a List of Maps
Then it has a step with a fixed sized commit which inserts those records into salesforce, and then uses a foreach block to extract and set the generated SalesforceIDs (more detailed on that to come)
Finally, has a second step which simply logs the extracted IDs.
As you probably figured by now, the magic happens on the foreach block. The block has one simple expression component:
As you can see, whenever you place a foreach block inside a commit block, you will automatically get a variable called record which holds a reference to one of the aggregated records. This example shows you how to access the record vars and set the generated id.
Other valid expressions are:
This approach has a limitation though: It assumes that the list you’re iterating complies with the following restrictions:
The size matches the amount of aggregated records
There’s a direct correlation between the aggregated records and the items in the list. Meaning that the first item in the list being iterated correlates to the first aggregated record and so on…
This is true for most use cases including the example above, and although these restrictions are something you must not loose sight off, they make most cases easy to deal with. However, this is not enough for all use cases…
Random access of records
For those cases which don’t fit the above restrictions, we also expose a variable called records which is a random access list. That variable is accesible all across the commit block, not just inside a foreach, and can be used more freely:
The example above just plays with the first record which is not something that makes much sense on the real world, but it does show that as long as you know how to handle list indexes, you can do pretty much whatever you want. Just for the sake of it, I’m now going to show you how to use random access to produce code that is semantically equivalent to the sequential access example:
In this example, you see how the counter variable that foreach uses to keep track of the iteration is used to access the correct record.
The last case to consider is that of streaming commits. As you know, there’re two types of commit blocks: fixed size and streaming. Fixed size aggregate sets of records up to a certain limit which must be able to fit in memory. The record payloads are exposed as an immutable List. Streaming commits give access to the entire set of records, which is presumed to not fit into memory. So instead of a List, streaming commits use a forward-only Iterator which knows how to perform streaming in order to avoid the memory problem. How does it relate to this feature? Simple: because of the memory restriction, random access is not supported on streaming commits. You can only use the sequential access flavour of it that was explained on the first example.
This new feature enables lots of use cases which users have reported to have issues with and that we have found challenging ourselves while building the examples on the Anypoint Exchange. Do you find it useful as well? Feedback welcome!
A popular request among users of the Batch module is the ability to grab the job instance id in any of a Batch job’s phases. Why is that useful? Well, there could be a number of useful scenarios: (more…)
Sometimes (more often than we think), less concurrency is actually more. Not too long ago, I found myself in a conversation in which we were discussing non-blocking architectures, tuning, and performance. We were discussing that tuning for those models often starts with “2 threads per core” (2TPC). The discussion made me curious about how Mule’s batch module would perform if tested by 2TPC. I knew beforehand that 2TPC wouldn’t be so impressive on batch, mainly because it doesn’t use a non-blocking threading model. However, I found myself thinking that the 16 thread default threading profile might be a little excessive (again, because sometimes less is more) and wanted to see what would happen if we tried it. You know, just out of curiosity.
Designing the Test
I explained my hypothesis to the mighty Luciano Gandini from our performance team. He pointed out that there were two different cases that needed to be tested:
Jobs that are mainly IO bound, which spend most of their execution time performing IO operations (to disk, Databases, external APIs, etc)
Jobs that are mainly CPU bound. They might as well do a fair amount of IO, but most of the processing time is spent on CPU operations (transformations, validations, etc).
GOTCHA: Because batch relies on persistent queues to support large datasets while guaranteeing reliability and resilience, no batch job is truly light on IO. However, because the impact of that fact is similar no matter how many threads you use, for the purpose of this test (and for that purpose only) we can pretend that factor doesn’t exist.
Luciano then designed two batch jobs, one IO intensive and one CPU intensive one. Each job was executed several times with a 1 million records dataset. The tests were executed in a 24-core computer (the performance guys have pretty amazing toys!) and a different threading profile was used on each run. These were the results:
IO bound Jobs
The two first runs used 8 and 16 threads, the later being much faster. That’s easy to explain – since the job is hard on IO, many threads will find themselves locked and waiting for the IO operation to finish. By adding more threads, you can have more work going on. Then, there was a third run which used 24 threads (1 per core). Again, this was faster but not by much. And again, this didn’t come as a surprise. Although there’s more work being done, the new threads also block on IO operations at basically the same time and by the same time, while adding an increasing penalty on context switch and thread administration penalty. The last run used 48 threads (true 2TPC) and while still faster, the improvement gained was not significant compared to the extra CPU and memory cost.
Conclusion: More threads do increase performance, but only to a certain extent, which you’ll find pretty fast. The 16 threads default was validated.
GOTCHA: If your job’s IO includes consuming an external API, adding more threads might turn out to be way more harmful than shown here. That’s because some APIs have limits in terms of how many calls can you perform a day or even how many you can perform concurrently. Exceeding those thresholds might result in your requests being rejected or throttled.
CPU bound jobs
These results did come as a surprise. First of all, because the behavior was really similar for that of IO bound jobs, so the first hypothesis of the two cases being different was the first thing to be disproved by the test.
For the first two runs with 8 and 16 threads, the results were similar. 16 threads did the job in less than half the time. The big difference, however, was that the 24 and 48 runs gave almost the same running time. This is because the threads on this job didn’t spend any time at all being blocked by IO operations, so the overhead of the added threads pretty much consumed all the gained processing power. Gotta admit, I didn’t see that one coming.
Conclusion: The behavior is pretty similar no matter the nature of the job. Although for CPU intensive ones, the decay is more noticeable once the 16 threads barrier is surpassed. The 16 threads default was validated again.
The good news is that it looks like we don’t need to change any defaults in batch. But most importantly, the results provided some lessons on tuning and performance which we weren’t expecting. Keep in mind however that no two jobs are born alike. You might as well have cases which don’t follow these trends. The purpose of this post is not to tell you what to do, but to give you ideas on how to test what you’re doing. Hope it helps on that regard.
We’d really like for you to share what kind of batch jobs you have and how they react to variations in the threading profile. Looking forward to validating these trends or getting more surprises!
Fact: Batch Jobs are tricky to handle when exceptions raise. The problem is the huge amounts of data that these jobs are designed to take. If you’re processing 1 million records you simply can’t log everything. Logs would become huge and unreadable. Not to mention the performance toll it would take. On the other hand, if you log too little then it’s impossible to know what went wrong, and if 30 thousand records failed, not knowing what’s wrong with them can be a royal pain. This is a trade-off not simple to overcome.
The idea of this blog post is to give you a short introduction on how to do Real time sync with Mule ESB. We’ll use several of the newest features that Mule has to offer – like the improved Poll component with watermarking and the Batch Module. Finally we’ll use one of our Anypoint Templates as an example application to illustrate the concepts.
What is it?
Near Real time sync is the term we’ll use along this blog post to refer to the following scenario:
“When you want to keep data flowing constantly from one system to another”
As you might imagine the keyword in the scenario definition here is constantly. That means, periodically the application will move whatever entity/data you care about from the source system to its destination system. The periodicity really depends on what the application is synchronizing. If you are moving purchase orders for a large retailer you could allow the application a few minutes between each synchronization. Now, if we are dealing with Banking transaction you’ll probably like to change that to something in the order of a few seconds or even a hundred milliseconds, unless you really like to deal with very angry people. (more…)
A limitation in the first release of batch was that all records needed to have a Serializable payload. This is so because batch uses persistent queues to buffer the records making it possible to processes “larger than memory” sets of data. However, we found that non Serializable payloads were way more common that we initially thought. So, we decided to have batch use the Kryo serializer instead of the Java’s standard. Kryo is a very cool serialization library that allows:
Serializing objects that do not implement the Serializable interface
Serializing objects that do not have (nor inherit) a default constructor
It’s way faster than the Java serializer and produces smaller outputs
Introducing Kryo into de project did not only made batch more versatile by removing limitations, it also had a great impact in performance. During our testing, we saw performance improvements of up to 40% by doing nothing but just using Kyro (of course that the speed boost is relative to the jobs characteristics; if you have a batch job that spends 90% of its time doing IO, the impact in performance won’t be as visible as in one that juggles between IO and CPU processing)
MuleSoft provides the most widely used integration platform for connecting any application, data source or API, whether in the cloud or on-premises. With Anypoint Platform®, MuleSoft delivers a complete integration experience built on proven open source technology, eliminating the pain and cost of point-to-point integration. Anypoint Platform includes CloudHub™ iPaaS, Mule ESB™, and a unified solution for API management™, design and publishing.