JMS Queue: When Size Does Matter

motif

Anypoint Platform is fast. The legacy systems that it often connects to? Not so much.

Therefore, in real world use cases, the requirements often call for limiting the message throughput to protect the endpoint systems from being overwhelmed by traffic. Architectural designs that support message throttling commonly incorporate some elements of message queues to stage and hold messages in-flight, so that the endpoints can process them at a steadier pace.

For architectures with throttling, it is often important to be able to count how many messages in the queues are still waiting to be consumed. For example, if certain threshold has been crossed, an alert may need to be sent to the operation center, or the inbound polling may have to be stopped completely until the queue size falls below threshold again, only then can intaking of messages resume.

Muley counting his new friends.

Muley’s counting his new friends.

The QueueBrowser interface can be used to count the number of messages in-flight on the queues. For working with a Mule flow, an option is to write a custom Mule component that uses the . Note that it is best to make the custom component a Spring bean to take advantages of using property placeholders, to make it easier to specify host address and port number.  

        <spring:bean id="queue-counter" name="queue-counter" class="org.mule.api.jms.extension.CurrentQueueSize">
            <spring:property name="port1" value="${jms.port1}"/>
            <spring:property name="host1" value="${jms.host1}"/>
            <spring:property name="port2" value="${jms.port2}"/>
            <spring:property name="host2" value="${jms.host2}"/>
        </spring:bean>
 With this approach, counting in the flow is as simple as this:

The sample code for a Mule Java component, written for ActiveMQ, is shown below. Note that failover is fully supported. In the example, performance is not a paramount concern, therefore, all the queues are being counted in one call. The code can be easily modified though, so the queue name is also passed in as a property and counting will be done only for the specified queue.

package org.mule.api.jms.extension;

import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Set;

import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.QueueBrowser;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.log4j.Logger;
import org.mule.api.MuleEventContext;
import org.mule.api.lifecycle.Callable;

/**
 * A way to browse queue depth
 * @author adrianhsieh
 *
 */

public class CurrentQueueSize implements Callable {

	private String host1;
	private String host2;
	private int port1;
	private int port2;

	@SuppressWarnings("unchecked")
	@Override
	public Object onCall(MuleEventContext eventContext) throws Exception {
		
		Logger logger = Logger.getLogger(org.mule.api.jms.extension.CurrentQueueSize.class);
		
		if (host2 == null) {
			host2 = host1;
			port2 = port1;
		}
		
		String brokerURL = "failover:(tcp://" + host1 + ":" + port1 + ",tcp://" + host2 + ":" + port2 + ")?randomize=false";
		logger.debug("Broker URL: " + brokerURL);
		
		ConnectionFactory out = new ActiveMQConnectionFactory(brokerURL);
		ActiveMQConnection connection = (ActiveMQConnection) out.createConnection();

		connection.start();
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		
		Set<ActiveMQQueue> amqs = connection.getDestinationSource().getQueues(); 
		Iterator<ActiveMQQueue> queues = amqs.iterator(); 

		HashMap<String, Integer> payload = new HashMap<String, Integer>();
		
		while ( queues.hasNext() ) 
		{ 
		    ActiveMQQueue queue_t = queues.next(); 
		    String q_name = queue_t.getPhysicalName();
		    logger.debug( "Queue = " + q_name);

		    QueueBrowser queueBrowser = session.createBrowser(queue_t);
		    Enumeration<Message> e = (Enumeration<Message>) queueBrowser.getEnumeration();

		    int numMsgs = 0;
		    while(e.hasMoreElements()) 
		    {
		         Message message = (Message) e.nextElement();
		         logger.trace("Current message: " + message.toString());
		         numMsgs++;
		    }
		    logger.debug(q_name + ": No of messages = " + numMsgs);
		    queueBrowser.close();
		    
		    payload.put(q_name, numMsgs);
		}       
		
		
		session.close();
		connection.close();
		
		return payload;
	}	

	public String getHost1() {
		return host1;
	}

	public void setHost1(String host1) {
		this.host1 = host1;
	}

	public String getHost2() {
		return host2;
	}

	public void setHost2(String host2) {
		this.host2 = host2;
	}

	public int getPort1() {
		return port1;
	}

	public void setPort1(int port1) {
		this.port1 = port1;
	}

	public int getPort2() {
		return port2;
	}

	public void setPort2(int port2) {
		this.port2 = port2;
	}

}


We'd love to hear your opinion on this post


2 Responses to “JMS Queue: When Size Does Matter”

  1. Thank you for this article, it’s something that people regularly ask.

    A better design IMO would be to inject a JMS Connector into the CurrentQueueSize component (a call getConnection on it), instead of creating a connection inside of the component. This would be more flexible as it would allow changing the JMS Connector (for example at test time) without needing to change the component.

  2. 1) In case of ActiveMQ, we can introspect the JMX bean for the Queue and get:
    a) Enqueued message count (i.e. total number of messages added to Queue after last restart of MQ)
    b) In flight message count (i.e. number of messages being processed at an instance and not Ack’d by consumers)
    c) Dequeued message count (i.e number of messages processed)
    c) Queue size (message available in the Queue including inflight messages)

    2) QueueBrowser may provide uniform way of identifying JMS message count, but using JMX for ActiveMQ can avoid an extra/frequent connections to Active MQ.

    3) Irrespective of using QueueBrowser or JMX, it should be possible to raise an Alert using MMC. The consumer count in JMS endpoint throttles the number of messages Mule process at any point of time. Is there any other option to throttle other than that?

    – ananth