How to implement activeMQ JMS Producer in Spring-boot application?

Let’s consider a scenario where your application is deployed in clustering environment and application handles thousands of email request to handle it. To solve this problem, You could simple trigger an API endpoints and post the message to the process which sends email. This could solve the problem however when an application received 100’s message per seconds, You have to handle concurrency. You got to have a queue to handle these messages and that’s where you need JMS & ActiveMQ. ActiveMQ is implementation of JMS like any other JMS implementation (RabbitMQ, AmazonMQ etc).

As you know, Spring boot controllers do not provide concurrency.

In this post, I would like to put details how you can implement producer job to sent JMS messages. If you wish to know how to implement JMS Listener.. Read my previous post Follow below each step to implement ActiveMQ Producer JOB in your Spring boot application.

NOTES: In below code, There are a lots of ActiveMQ policies which are configured in the code listed below. These configurations are varies based on your requirement. And, Recommendation is to read actual document to understand more on these configurations.

Sample Configurations: Session.CLIENT_ACKNOWLEDGE and DeliveryMode.PERSISTENT

Maven POM.XML Dependencies

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
#Add dependencies if you have producer too in same environment.
<dependency>
   <groupId>org.apache.activemq</groupId>
   <artifactId>activemq-client</artifactId>
   <version>5.15.8</version>
</dependency>
<dependency>
	<groupId>org.apache.activemq</groupId>
	<artifactId>activemq-pool</artifactId>
	<version>5.15.8</version>
</dependency>

ActiveMQ Configuration: application.properties

activemq.packages.trusted=<Optional but for security you can configure package name where the listener is>
activemq.brokerUrl=<brokenURL>
activemq.connection.pool=5
activemq.prefetch.limit=50
#consumer
producer.user=<userName>
producer.password=<Password>
producer.queues=<Queue Name>

ActiveMQ Java Configuration File mapped with application.properties file.

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Service;

@Service
@Configuration
public class AMQConfig {

    @Value("${activemq.packages.trusted}")
    private String trustedPackages;

    @Value("${activemq.connection.pool}")
    private String noOfConnections;

    @Value("${activemq.brokerUrl}")
    private String brokerUrl;

    @Value("${producer.user}")
    private String consumerUser;

    @Value("${producer.password}")
    private String consumerPassword;

    @Value("${producer.queues}")
    private String queues;

    @Value("${activemq.prefetch.limit}")
    private int prefetchLimit=50;

Springboot Java JMS Listener Service

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Session;

@Component
public class ProducerJob {

    @Autowired
    AMQConfig amqConfig;

    private final Logger LOGGER = LoggerFactory.getLogger(ProducerJob.class);
  // replace with your own producer cron
    @Scheduled(cron = "${PRODUCER_JOB_CRON}")
    public void run(){
            Session producerSession = null;
            try {
                Connection producerConnection = AMQProducer.getActiveMQProducerConnection(amqConfig);
                if (producerConnection != null) {
                    producerSession = producerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
                    LOGGER.info(" Producer Job Starts. sending sample message ");
                    for (int index = 0; index < 10; index++) {
                        AMQProducer.sendMessage(producerSession, amqConfig, "{'orderno': index*100, 'type':'order', 'amount':'202'}");
                    }
                    LOGGER.info(" Job ran and queue size is: {} ",  count * 50);
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (producerSession != null) {
                    try {
                        producerSession.close();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
    }
}

AMQProducer.java: This file contains the code which sends message in activeMQ.

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.*;

public final class AMQProducer {
    private AMQProducer(){ }

    private static final Logger LOGGER = LoggerFactory.getLogger(AMQProducer.class);

    static PooledConnectionFactory pooledConnectionFactory = null;
    static Connection producerConnection = null;

    private static PooledConnectionFactory getPooledConnectionFactory(final String brokenUrl, String userName, String password, String noOfConnections){
        final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokenUrl);
        // Pass the username and password.
        connectionFactory.setUserName(userName);
        connectionFactory.setPassword(password);
        // Create a pooled connection factory.
        final PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
        pooledConnectionFactory.setConnectionFactory(connectionFactory);      pooledConnectionFactory.setMaxConnections(Integer.parseInt(noOfConnections));
        return pooledConnectionFactory;
    }
    public static Connection  getActiveMQProducerConnection(AMQConfig amqConfig) {
        if (pooledConnectionFactory == null) {
            LOGGER.info(" pooledConnectionFactory is null ");
            pooledConnectionFactory = getPooledConnectionFactory(amqConfig.getBrokerUrl(), amqConfig.getProducerUser(),
                    amqConfig.getProducerPassword(), amqConfig.getNoOfPooledConnections());
        }
        if (producerConnection == null){
            try {
                LOGGER.info(" producerConnection is null ");
                producerConnection = pooledConnectionFactory.createConnection();
                producerConnection.start();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
        return producerConnection;
    }

    public static void sendMessage(Session producerSession, AMQConfig amqConfig, final String message) throws Exception{
        if (producerSession != null) {
            final Destination producerDestination = producerSession.createTopic(amqConfig.getTopics());
            final MessageProducer producer = producerSession.createProducer(producerDestination);
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            try {
                TextMessage producerMessage = producerSession.createTextMessage(message);
                producer.send(producerMessage);
                LOGGER.info(" ******************* message sent  ******************* ");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

Note: This is a working solution however there is a chance that code could throw error due to typing mistake. Reach out to me, will do my best to help you out.

References

https://activemq.apache.org/

https://activemq.apache.org/what-is-the-prefetch-limit-for

https://docs.aws.amazon.com/amazon-mq/latest/developer-guide/getting-started-activemq.html

https://activemq.apache.org/using-apache-activemq

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.