Reading Time: 11 minutes

In a modern cloud architecture, one of the common patterns that is observed is the use of message queues to: 

  • Decouple applications
  • Improve performance
  • Increase reliability
  • Implement granular scalability

Since MuleSoft as a platform has its roots in ESB, there are many out-of-the-box connectors for popular messaging providers and a generic JMS Connector that can be used for any broker implementation. MuleSoft also offers Anypoint MQ which is a multi-tenant, cloud messaging service that enables customers to perform advanced asynchronous messaging scenarios between their applications. Anypoint MQ is fully integrated with Anypoint Platform, offering role-based access control, client management, and connectors. The goal of this post is not to get into the specifics of Anypoint MQ. Instead, I would like to demonstrate a use-case that leverages Anypoint MQ and prescribe an approach to implement error handling for such scenarios. Granted that error handling logic varies depending on use cases, business requirements, organizational processes, etc., I would still like to believe that the core approach to error handling for messaging-based integrations would more or less remain the same. For example, the most common approach I have seen consists of the following steps: 

  1. Capture the error in the error handler logic.
  2. Enrich the message with additional meta-data that could later be used for a potential reprocessing. For example, source queues/destination queues, retry count, error message/description etc.
  3. Keep track of the delivery count.
  4. Publish to error queue.
  5. Process the message from error queue after a time interval and then depending on the redelivery count, either route it back to the source queue or to a dead letter queue.
latest report
Learn why we are the Leaders in API management and iPaaS

The above set of steps is precisely what I am going to cover in the remainder of this post. This post assumes that you have hands-on experience with Mule development and also that you are familiar with Anypoint Platform and Anypoint MQ.

For the purposes of this post, we will use three flows:

PublishMessage – responsible for publishing the message to the source-queue.

QueueProcessor – responsible for fetching the messages from the source-queue and implement business logic. If an error occurs, route the message to error-queue after enriching the payload.

ErrorHandler- responsible for fetching the messages from the error-queue and publishing them to either the source-queue or dead letter queue depending on the redelivery count.

All three flows are packaged within the same Mule application for convenience but in reality, these would all be packaged as three separate mule applications: PublishMessage, QueueProcessor, and ErrorHandler.

  1. PublishMessage

This is a simple flow that exposes an HTTP endpoint for consumers to call. Upon receiving a message, the flow publishes the message to a queue named “source-queue” using the Anypoint MQ Connector. 

XML Snippet of the PublishMessage flow:

<flow name="PublishMessage" doc:id="a0e992da-89a4-48c2-b8db-ef2395487407" >
<http:listener doc:name="Listener" doc:id="cc94dd19-1bd1-4114-a9f8-9ef99d811a1e" config-ref="HTTP_Listener_config" path="test"/>
<anypoint-mq:publish doc:name="Publish to source-queue" doc:id="474767c2-9f68-4118-911c-ae649069f5e5" config-ref="Anypoint_MQ_Config" destination="source-queue"/>
</flow>

2. QueueProcessor

This is the flow that picks up the message from the queue for further processing. We have a transform message right after the subscriber which sets three variables:

  • originalPayload: used to preserve the original incoming payload.
%dw 2.0
output application/json
---
payload
  • ackToken: used to capture the acknowledgement token that is later used to manually acknowledge the message.
%dw 2.0
output application/java
---
attributes.ackToken
  • redelivery_count: We will rely on custom User Properties to keep track of retry attempts. For simplicity, let’s call it count. All the user properties are available as part of incoming message attributes. The first time a message is received, the user property (count) will not exist because it has not been set yet. Therefore, the logic is to simply check if the user property is null, in which case initialize the count value to 1 or else simply increment it. As shown below, we initialize the value to 1 if there is no count or else increment it.
%dw 2.0
output application/java
---
if ( attributes.properties.'count' == null)
1
else
attributes.properties.'count' + 1

We then simulate an error scenario using the raise error component.

Error Handling Logic

The error handling logic uses a transform message component to enrich the payload with additional information like errorDescription, errorCause, sourceQueueName along with the original payload. The data element of the payload will now contain the original payload that was received.

%dw 2.0
output application/json
---
{
"data": vars.originalPayload,
"errorDescription": error.description,
"errorCause": error.cause.summaryMessage,
"sourceQueueName": "source-queue"

}

The sourceQueueName could then be used to write the message back to the same queue if needed. ErrorDescription and cause could be used to file a support incident for notification/alerting purposes. The payload is then published to the error queue. In the screenshot below, note that a user property with name count is being set with the current “redelivery_count” value that was calculated in the main flow. 

The last step of the error handling logic is to acknowledge the message using the Ack component. 

Note the use of the expression “#[vars.ackToken]” (variable that was initialized in the main flow) as a value for the parameter Ack token.

XML Snippet of the QueueProcessor flow with the error handling logic:


<flow name="QueueProcessor" doc:id="0a9c3dbd-ddb5-46d2-85b1-6a8a475ff64d" >
<anypoint-mq:subscriber doc:name="fetch from source-queue" doc:id="fb7ddf52-770b-42ec-a4d3-9467989acdac" config-ref="Anypoint_MQ_Config" destination="source-queue" acknowledgementMode="MANUAL"/>
<ee:transform doc:name="Transform Message" doc:id="d65b19c1-d03e-4090-a9ab-a0f8bdfe3c04" >
<ee:message >
</ee:message>
<ee:variables >
<ee:set-variable variableName="redelivery_count" ><![CDATA[%dw 2.0
output application/java
---
if ( attributes.properties.'count' == null)
1
else
attributes.properties.'count' + 1]]></ee:set-variable>
<ee:set-variable variableName="ackToken" ><![CDATA[%dw 2.0
output application/java
---
attributes.ackToken]]></ee:set-variable>
<ee:set-variable variableName="originalPayload" ><![CDATA[%dw 2.0
output application/json
---
payload]]></ee:set-variable>
</ee:variables>
</ee:transform>
<raise-error doc:name="Raise error" doc:id="4cd9f541-3e94-4044-9ce2-ede1f9e19612" type="ACCOUNTS:INSUFFICIENT FUNDS" />
<error-handler >
<on-error-propagate enableNotifications="true" logException="true" doc:name="On Error Propagate" doc:id="0799111d-8448-4481-8741-bd32c37b19bd" type="ANY">
<ee:transform doc:name="Transform Message" doc:id="8ea42bff-7129-465b-8a32-b3dce90aa8b1">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
output application/json
---
{
"data": vars.originalPayload,
"errorDescription": error.description,
"errorCause": error.cause.summaryMessage,
"sourceQueueName": "source-queue"

}]]></ee:set-payload>
</ee:message>
</ee:transform>
<logger level="INFO" doc:name="Log redelivery_count" doc:id="ce986698-1e27-4323-8931-63adbacd2544" message="RedeliveryCount in QueueProcessor : #[vars.'redelivery_count' ]" />
<anypoint-mq:publish doc:name="Publish to error-queue" doc:id="d02eaff6-5cb5-4af6-a0e2-60eb2383dc74" config-ref="Anypoint_MQ_Config" destination="error-queue">
<anypoint-mq:properties><![CDATA[#[output application/java
---
{
count : vars.'redelivery_count'
}]]]></anypoint-mq:properties>
</anypoint-mq:publish>
<anypoint-mq:ack doc:name="Ack" doc:id="53edbd13-6dfc-4c8b-97c5-8229790be897" config-ref="Anypoint_MQ_Config" ackToken="#[vars.ackToken]"/>
</on-error-propagate>
</error-handler>
</flow>

3. ErrorHandler

The logic of the error-handler flow is pretty straightforward. The message is fetched from from the error-queue using the Anypoint MQ Connector. The transform message component sets three variables namely ackToken, sourceQueueName, and redelivery_count. We then use choice block to determine the count. If the redelivery_count >=(preconfigured number of retries – which is 3 in this example), we publish it to a dead letter queue or else we publish the message to the main queue for re-processing. Lastly, we acknowledge the message using the Ack component.

Note: It is important for the error-handler flow to wait for a specified amount of time before it reprocesses the message to give some breathing space for the systems involved. Otherwise, the error handler flow continues to poll as soon as a message is published and the message is then either routed back to the source-queue or dlq immediately thus not leaving enough time for the systems involved to heal. In order to induce a delay, we will now leverage an out-of-the-box feature of Anypoint MQ called Default Delivery Delay, which specifies how long to delay delivery for messages sent to the queue. The delay could be specified in seconds, minutes, hours, or days. Below is a screenshot of the queue settings which shows how to configure the default delivery delay:

XML Snippet of the error-handler flow:


<flow name="error-handler" doc:id="6c09d606-ca8c-4448-bd76-cda1c6dda304" >
<anypoint-mq:subscriber doc:name="fetch from error-queue" doc:id="990a64d5-272f-4eb1-bd35-0ae958669a5d" config-ref="Anypoint_MQ_Config" destination="error-queue" acknowledgementMode="MANUAL"/>
<ee:transform doc:name="Transform Message" doc:id="a03a7f8b-07e2-4db6-ad40-3fba8ee6974e" >
<ee:message >
</ee:message>
<ee:variables >
<ee:set-variable variableName="redelivery_count" ><![CDATA[%dw 2.0
output application/java
---
attributes.properties.count]]></ee:set-variable>
<ee:set-variable variableName="sourceQueueName" ><![CDATA[%dw 2.0
output application/java
---
payload.sourceQueueName]]></ee:set-variable>
<ee:set-variable variableName="ackToken" ><![CDATA[%dw 2.0
output application/java
---
attributes.ackToken]]></ee:set-variable>
</ee:variables>
</ee:transform>
<logger level="INFO" doc:name="log redelivery count" doc:id="e399c5ed-c0d2-4709-b64e-cc1a0cc35604" message="#[vars]" />
<choice doc:name="Choice" doc:id="4bcb32f5-6dc5-44bf-b5ed-26c88f984b1b" >
<when expression="#[vars.redelivery_count &gt;= 3]">
<logger level="INFO" doc:name="Logger" doc:id="b91e608e-5ce4-4c2e-9c6d-69ed3ffe3f40" message="redelivery count is :  #[vars.redelivery_count]"/>
<anypoint-mq:publish doc:name="Publish to dlq" doc:id="a35feed1-aa47-4fa7-9715-5f25176560cc" config-ref="Anypoint_MQ_Config" destination="dlq">
<anypoint-mq:body ><![CDATA[#[payload.data]]]></anypoint-mq:body>
<anypoint-mq:properties ><![CDATA[#[output application/java
---
{
count : vars.'redelivery_count'
}]]]></anypoint-mq:properties>
</anypoint-mq:publish>
</when>
<otherwise >
<logger level="INFO" doc:name="Logger" doc:id="6a65afff-9002-402a-9a88-04d24c2164e2" message="redelivery count is :  #[vars.redelivery_count]"/>
<anypoint-mq:publish doc:name="Publish to source-queue" doc:id="44401db5-a054-48f3-b0e7-a3acabe365bf" config-ref="Anypoint_MQ_Config" destination="#[vars.sourceQueueName]">
<anypoint-mq:body ><![CDATA[#[payload.data]]]></anypoint-mq:body>
<anypoint-mq:properties ><![CDATA[#[output application/java
---
{
count : vars.'redelivery_count'
}]]]></anypoint-mq:properties>
</anypoint-mq:publish>
</otherwise>
</choice>
<anypoint-mq:ack doc:name="Ack" doc:id="c5699be9-3c0a-4987-813c-f18ef3168f48" config-ref="Anypoint_MQ_Config" ackToken="#[vars.ackToken]"/>
</flow>

In closing, we saw how to implement error handling logic for Anypoint MQ-based applications using User Properties (count in our example), Default Delivery Delay and some orchestration. For advanced use cases, Anypoint MQ provides an inbuilt circuit-breaker-capability which enables you to control how the connector handles errors that occur while processing a consumed message.