Reading Time: 3 minutes

Anypoint Platform is fast. The legacy systems that it often connects to? Not so much.

Therefore, in real world use cases, the requirements often call for limiting the message throughput to protect the endpoint systems from being overwhelmed by traffic. Architectural designs that support message throttling commonly incorporate some elements of message queues to stage and hold messages in-flight, so that the endpoints can process them at a steadier pace.

For architectures with throttling, it is often important to be able to count how many messages in the queues are still waiting to be consumed. For example, if certain threshold has been crossed, an alert may need to be sent to the operation center, or the inbound polling may have to be stopped completely until the queue size falls below threshold again, only then can intaking of messages resume.

The QueueBrowser interface can be used to count the number of messages in-flight on the queues. For working with a Mule flow, an option is to write a custom Mule Java component that uses the QueueBrowser. Note that it is best to make the custom component a Spring bean to take advantages of using property placeholders, to make it easier to specify host address and port number.  

        <spring:bean id="queue-counter" name="queue-counter" class="org.mule.api.jms.extension.CurrentQueueSize">
            <spring:property name="port1" value="${jms.port1}"/>
            <spring:property name="host1" value="${jms.host1}"/>
            <spring:property name="port2" value="${jms.port2}"/>
            <spring:property name="host2" value="${jms.host2}"/>
        </spring:bean>
 With this approach, counting in the flow is as simple as this:

latest report
Learn why we are the Leaders in API management and iPaaS

The sample code for a Mule Java component, written for ActiveMQ, is shown below. Note that failover is fully supported. In the example, performance is not a paramount concern, therefore, all the queues are being counted in one call. The code can be easily modified though, so the queue name is also passed in as a property and counting will be done only for the specified queue.

package org.mule.api.jms.extension;

import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Set;

import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.QueueBrowser;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.log4j.Logger;
import org.mule.api.MuleEventContext;
import org.mule.api.lifecycle.Callable;

/**
 * A way to browse queue depth
 * @author adrianhsieh
 *
 */

public class CurrentQueueSize implements Callable {

	private String host1;
	private String host2;
	private int port1;
	private int port2;

	@SuppressWarnings("unchecked")
	@Override
	public Object onCall(MuleEventContext eventContext) throws Exception {
		
		Logger logger = Logger.getLogger(org.mule.api.jms.extension.CurrentQueueSize.class);
		
		if (host2 == null) {
			host2 = host1;
			port2 = port1;
		}
		
		String brokerURL = "failover:(tcp://" + host1 + ":" + port1 + ",tcp://" + host2 + ":" + port2 + ")?randomize=false";
		logger.debug("Broker URL: " + brokerURL);
		
		ConnectionFactory out = new ActiveMQConnectionFactory(brokerURL);
		ActiveMQConnection connection = (ActiveMQConnection) out.createConnection();

		connection.start();
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		
		Set<ActiveMQQueue> amqs = connection.getDestinationSource().getQueues(); 
		Iterator<ActiveMQQueue> queues = amqs.iterator(); 

		HashMap<String, Integer> payload = new HashMap<String, Integer>();
		
		while ( queues.hasNext() ) 
		{ 
		    ActiveMQQueue queue_t = queues.next(); 
		    String q_name = queue_t.getPhysicalName();
		    logger.debug( "Queue = " + q_name);

		    QueueBrowser queueBrowser = session.createBrowser(queue_t);
		    Enumeration<Message> e = (Enumeration<Message>) queueBrowser.getEnumeration();

		    int numMsgs = 0;
		    while(e.hasMoreElements()) 
		    {
		         Message message = (Message) e.nextElement();
		         logger.trace("Current message: " + message.toString());
		         numMsgs++;
		    }
		    logger.debug(q_name + ": No of messages = " + numMsgs);
		    queueBrowser.close();
		    
		    payload.put(q_name, numMsgs);
		}       
		
		
		session.close();
		connection.close();
		
		return payload;
	}	

	public String getHost1() {
		return host1;
	}

	public void setHost1(String host1) {
		this.host1 = host1;
	}

	public String getHost2() {
		return host2;
	}

	public void setHost2(String host2) {
		this.host2 = host2;
	}

	public int getPort1() {
		return port1;
	}

	public void setPort1(int port1) {
		this.port1 = port1;
	}

	public int getPort2() {
		return port2;
	}

	public void setPort2(int port2) {
		this.port2 = port2;
	}

}