Being picky with JMS

code {
display: block;
width: 100%;
overflow-x: scroll;
background: #fcfcfc;
padding: 10px;
}

Mule ESB offers an amazing out-of-the-box integration which easily integrates with ActiveMQ. There is a plethora of examples on the internet that will show how to use ActiveMQ with . But here we will explore how to use a filter with ActiveMQ and Mule that will help us picking up the right messages we need.

Consider an environment where there is a JMS queue and there are multiple consumers listening to that queue; how can a Mule flow filter and pick up a particular JMS message of choice from all the messages available in the queue?

Let’s assume we have a JMS queue named MyQueue which many consumers listening to it:

beingpickywithjms_1

We can see the queue contains few messages in it. Now all these messages can be consumed by any of the consumers out there which are listening to this queue. But how can our JMS consumer in our Mule flow select particular messages and consume only a selection of messages out of it?

We can filter JMS messages based on JMS properties like JMS priority, JMS Type and Headers, etc. We will first look into how we can filter JMS messages based on JMS priority and then with Header.

To consume a JMS message based on JMS priority, we need to insert the message into the queue with that priority. Let’s consider first that we will only consume JMS messages whose priority will be, say, 9.

So, let’s first insert a couple of JMS messages in the queue MyQueue with priority 9.

beingpickywithjms_2

So, now we have our queue with total 5 messages out of which 2 messages have the priority 9 and remaining having default priority of 0.

beingpickywithjms_3

With this in place we will now consume the messages in the queue MyQueue with 2 consumers listening to the same queue; one that will consume only those messages with priority=9 and the other will consume the remaining messages with default priority.

beingpickywithjms_4

We will now create a flow that will filter and consume messages from the JMS queue based on priority. That means we will be consuming only those messages from the queue  MyQueue whose JMS priority is 9. We will be using jms:selector here to filter JMS messages as follows:-

<flow name="JMSReceiver" doc:name="JMSReceiver">
<jms:inbound-endpoint <a href="https://www.mulesoft.com/exchange#!/?types=connector" target="_blank" rel="" title="Cloud Connectors" >connector-</a>ref="Active_MQ"  doc:name="JMS" exchange-pattern="request-response" address="jms://tcp:MyQueue">
   <jms:selector expression="JMSPriority = 9" />
</jms:inbound-endpoint>
<logger level="INFO" message="Received Payload :-#[message.payload]" doc:name="Logger"/>
</flow>         

So, now if we start our flow, we will find that our JMS consumer from our Mule flow has selectively consumed the messages from the queue MyQueue whose JMS priority is 9 leaving other messages in that queue.

beingpickywithjms_5

You can see here that it has consumed only those messages of priority 9 and remaining messages will be ignored.

So we can now configure JMS messages with a particular message priority and later we can consume the messages based on that priority.

Now let’s have another Mule flow, with another JMS consumer, and this time without a filter listening to the same queue MyQueue :-

<flow name="JMSReceiver2" doc:name="JMSReceiver">
<jms:inbound-endpoint <a href="https://www.mulesoft.com/exchange#!/?types=connector" target="_blank" rel="" title="Cloud Connectors" >connector-</a>ref="Active_MQ"  doc:name="JMS" exchange-pattern="request-response" address="jms://tcp:MyQueue"/>
<logger level="INFO" message="Received Payload :-#[message.payload]" doc:name="Logger"/>
</flow>         

This time we haven’t given any filter and now we will find all the other messages are consumed by the consumer leaving the queue empty:-

beingpickywithjms_6

Using a dynamic JMS selector:-

Now the question arises, can we do this dynamically? Can we select the messages by its JMS properties which are set dynamically?

Yes, we can. We use this selector option in an expression to use a dynamic JMS selector. To select dynamically we can use the select option of Mule Requester and can call it in the middle of a Mule flow, selecting and consuming the messages with dynamic selection:-

<flow name="JMSReceiverDynamic" >
     <http:listener config-ref="HTTP_Listener_Configuration" path="/dynamic" doc:name="HTTP"/>
     <set-variable variableName="priority" value="9" doc:name="Variable"/> 
     <mulerequester:request config-ref="Mule_Requester" resource="jms://MyQueue?selector=Priority%3D'#[flowVars.priority]'" doc:name="Mule Requester" timeout="120000"/>   
     <logger level="INFO" message="Received Payload with selecting Dynamically:-#[message.payload]" doc:name="Logger"/>
   </flow>    

Here we have selected the messages with priority=9 dynamically from a variable with an expression. We can use values from variables, inbound properties, properties file or from a message payload directly to make it dynamic.

Pushing a message to JMS queue with JMS priority from Mule :-

Next we will see how to insert a message in our JMS queue, directly from our Mule flow, setting JMS priority of the message to 9.

Here is the Mule flow, which can directly set the message into JMS queue with a priority 9:-

<http:listener-config name="HTTP_Listener_Configuration" host="0.0.0.0" port="8081" doc:name="HTTP Listener Configuration"/>
<jms:activemq-<a href="https://www.mulesoft.com/exchange#!/?types=connector" target="_blank" rel="" title="Cloud Connectors" >connector</a> name="Active_MQ" numberOfConcurrentTransactedReceivers="20" brokerURL="tcp://localhost:61616"/>

<flow name="JMSSender" >
  <http:listener config-ref="HTTP_Listener_Configuration" path="/jms" doc:name="HTTP"/>
  <set-payload value="This is a test JMS message with priority 9" doc:name="Set Payload"/>
  <logger message="Payload :- #[message.payload]" level="INFO" doc:name="Logger" />
  <jms:outbound-endpoint queue="MyQueue" <a href="https://www.mulesoft.com/exchange#!/?types=connector" target="_blank" rel="" title="Cloud Connectors" >connector-</a>ref="Active_MQ" doc:name="JMS">
    <message-properties-transformer>
       <add-message-property key="Priority" value="9" />
    </message-properties-transformer>
  </jms:outbound-endpoint> 
</flow>

Here we will set the priority in our message payload. To set priority in our payload we will be configuring it as the following:-

<message-properties-transformer>
   <add-message-property key="Priority" value="9"/>  
</message-properties-transformer>

So, if we invoke the mule flow by hitting the url:- http://localhost:8081/jms   the message will now be pushed into the queue MyQueue   with priority 9 as follows:-

beingpickywithjms_7

In a similar way we can also configure the JMS messages by setting other JMS properties like JMS header in our Mule flow as follows:-

<message-properties-transformer>
  <add-message-property key="Header" value="MyCustom-Header" />             
</message-properties-transformer>

Each message will be send to the queue MyQueue with a header attached with it. So, if we check the property of the message in the queue, we can see our header is attached to it:-

beingpickywithjms_8

And then consume it based on the Header:-

<jms:inbound-endpoint <a href="https://www.mulesoft.com/exchange#!/?types=connector" target="_blank" rel="" title="Cloud Connectors" >connector-</a>ref="Active_MQ"  doc:name="JMSHeader" exchange-pattern="request-response" address="jms://tcp:MyQueue">
   <jms:selector expression="Header = 'MyCustom-Header'"/>    
</jms:inbound-endpoint>

So here messages from queue MyQueue will be consumed only if its header is MyCustom-Header in the same way it filtered and consumed JMS messages based on priority.

Using dynamic JMS selector again:-

Again, if we want to consume it dynamic selection, we can use the expression in Mule Requester and select the message based on header while fetching the header from a variable:-

<set-variable variableName="Custom-Header" value="MyCustom-Header" doc:name="Variable"/> 
  <mulerequester:request config-ref="Mule_Requester" resource="jms://MyQueue?selector=Header%3D'#[flowVars.Custom-Header]'" doc:name="Mule Requester" timeout="120000"/>

At this point, you should have a good idea on the way to configure JMS messages with JMS properties and consuming the messages from a Queue in a multi-consumer environment based on the properties by applying filters both statically and dynamically.

Now, you can experiment in your own way and configure JMS messages and implement the example.

Please do share your feedback and experiences in the below section for comments.


We'd love to hear your opinion on this post

One Response to “Being picky with JMS”

  1. What’s the status of the Mule Requester module mentioned for being able to set properties of the JMS selector dynamically?
    I didn’t see it as part of the Mule runtime yet (looking at the 3.8 beta runtime under the /lib/mule/ folder), and the GitHub link to it mentions to install it from the MuleSoft Maven repo rather than from e.g .Anypoint Exchange. Will it become part of the core Mule platform at some future point?

    Agree(0)Disagree(0)Comment