How to implement activeMQ JMS Listener in Spring-boot application?

There are thousands use cases where application required some sort of Java Messaging service API. And, I would like to put one use case to explain the need of JMS listener.

Let’s consider a scenario where your application is deployed in clustering environment and application handles thousands of email request to handle it. To solve this problem, You could simple trigger an API endpoints and post the message to the process which sends email. This could solve the problem however when an application received 100’s message per seconds, You have to handle concurrency. You got to have a queue to handle these messages and that’s where you need JMS & ActiveMQ. ActiveMQ is implementation of JMS like any other JMS implementation (RabbitMQ, AmazonMQ etc).

As you know, Spring boot controllers do not provide concurrency.

In this post, I have assumed that application is receiving messages and you need to implement an Active MQ listener to handle those messages. Follow below each step to implement listener in your Spring boot application.

Maven POM.XML Dependencies

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
#Add dependencies if you have producer too in same environment.
<dependency>
   <groupId>org.apache.activemq</groupId>
   <artifactId>activemq-client</artifactId>
   <version>5.15.8</version>
</dependency>
<dependency>
	<groupId>org.apache.activemq</groupId>
	<artifactId>activemq-pool</artifactId>
	<version>5.15.8</version>
</dependency>

ActiveMQ Configuration: application.properties

activemq.packages.trusted=<Optional but for security you can configure package name where the listener is>
activemq.brokerUrl=<brokenURL>
activemq.connection.pool=5
activemq.maximumThreads=10
activemq.prefetch.limit=50
#consumer
consumer.user=<userName>
consumer.password=<Password>
consumer.queues=<Queue Name>

ActiveMQ Java Configuration File mapped with application.properties file.

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Service;

@Service
@Configuration
public class AMQConfig {

    @Value("${activemq.packages.trusted}")
    private String trustedPackages;

    @Value("${activemq.connection.pool}")
    private String noOfConnections;

    @Value("${activemq.brokerUrl}")
    private String brokerUrl;

    @Value("${consumer.user}")
    private String consumerUser;

    @Value("${consumer.password}")
    private String consumerPassword;

    @Value("${consumer.queues}")
    private String queues;

    @Value("${activemq.maximumThreads}")
    private int maximumThreads = 10;

    @Value("${activemq.prefetch.limit}")
    private int prefetchLimit=50;

Springboot Java JMS Listener Service

@Component
@Configuration
@EnableJms
public class MessageConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(MessageConsumer.class);
    @Autowired
    AMQConfig amqConfig;

    @Bean
    public ActiveMQConnectionFactory receiverActiveMQConnectionFactory() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL(amqConfig.getBrokerUrl());
        connectionFactory.setUserName(amqConfig.getConsumerUser());
        connectionFactory.setPassword(amqConfig.getConsumerPassword());
        ActiveMQPrefetchPolicy activeMQPrefetchPolicy = new ActiveMQPrefetchPolicy();
       activeMQPrefetchPolicy.setQueuePrefetch(amqConfig.getPrefetchLimit());
        activeMQPrefetchPolicy.setMaximumPendingMessageLimit(amqConfig.getPrefetchLimit());

        connectionFactory.setPrefetchPolicy(activeMQPrefetchPolicy);
        return connectionFactory;
    }

    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory =new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(receiverActiveMQConnectionFactory());
        return factory;
    }

    @Bean
    public MessageReceiver receiver() {
        return new MessageReceiver();
    }
    // Stop message receiver.
    class MessageReceiver {
        private CountDownLatch latch = new CountDownLatch(1);
        public CountDownLatch getLatch() {
            return latch;
        }
        @JmsListener(destination = "${consumer.queues}")
        public void receive(String message) {
            try {
                LOGGER.info("***************** Message Received *****************");
                Thread.sleep(100);
                if (StringUtils.isNotBlank(message)) {
                    //Execute Your code here.
                  LOG.info(" Message received {}",  message);
                }
                latch.countDown();
            }catch (Exception ex){
                LOGGER.error(" error occurs while creating new thread from message receiver {}", ex.getMessage());
            }
        }
    }
}

Note: This is a working solution however there is a chance that code could throw error due to typing mistake. Reach out to me, will do my best to help you out.

References

https://activemq.apache.org/

https://activemq.apache.org/what-is-the-prefetch-limit-for

https://docs.aws.amazon.com/amazon-mq/latest/developer-guide/getting-started-activemq.html

https://activemq.apache.org/using-apache-activemq

1 thought on “How to implement activeMQ JMS Listener in Spring-boot application?

  1. Pingback: How to implement activeMQ JMS Producer in Spring-boot application? | Cyber Security: Awareness is the key

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.