Request/Reply
JMS to JMS – Request/Reply with Temporary Queues
Section titled “JMS to JMS – Request/Reply with Temporary Queues”Overview
Section titled “Overview”This example demonstrates the request/reply messaging pattern in JMS using temporary queues with ActiveMQ Artemis over TCP transport.
- A Sender sends a request message to a queue (
quick-start-request
) and creates aTemporaryQueue
to receive the reply. - A Receiver listens on the request queue, processes the message, and sends a reply back to the temporary queue.
This setup is useful for RPC-style communication where the client expects a direct reply for each request.
Prerequisites
Section titled “Prerequisites”- ActiveMQ Artemis running with TCP acceptor enabled:
<acceptor name="artemis-tcp">tcp://0.0.0.0:61616</acceptor>
- Add Maven dependency:
<dependency><groupId>org.apache.activemq</groupId><artifactId>artemis-jms-client</artifactId><version>2.31.2</version> <!-- Use your Artemis version --></dependency>
Sender Code (Request with Temporary Queue)
Section titled “Sender Code (Request with Temporary Queue)”package org.example.JMSTOJMS;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import javax.jms.*;import java.util.UUID;
public class MultiTempMessageSender { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); String requestQueueName = "quick-start-request";
try (Connection connection = connectionFactory.createConnection()) { connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue requestQueue = session.createQueue(requestQueueName); MessageProducer requestProducer = session.createProducer(requestQueue);
// Create a temporary queue for replies TemporaryQueue responseQueue = session.createTemporaryQueue(); System.out.println("Temporary queue created for replies.");
// Launch sender thread for (int i = 1; i <= 1; i++) { int senderId = i; new Thread(() -> { try { sendRequest(session, requestProducer, responseQueue, senderId); } catch (JMSException e) { e.printStackTrace(); } }).start(); }
Thread.sleep(Long.MAX_VALUE); // Keep alive } }
private static void sendRequest(Session session, MessageProducer requestProducer, TemporaryQueue replyQueue, int senderId) throws JMSException { String correlationId = UUID.randomUUID().toString();
TextMessage requestMessage = session.createTextMessage("Request from Sender " + senderId); requestMessage.setJMSReplyTo(replyQueue); requestMessage.setJMSCorrelationID(correlationId);
System.out.println("Sender " + senderId + " sent request with CorrelationID: " + correlationId); requestProducer.send(requestMessage);
try (MessageConsumer replyConsumer = session.createConsumer(replyQueue)) { Message replyMessage = replyConsumer.receive(); if (replyMessage instanceof TextMessage) { String replyText = ((TextMessage) replyMessage).getText(); System.out.println("Sender " + senderId + " received reply: " + replyText + " with CorrelationID: " + replyMessage.getJMSCorrelationID()); } else { System.out.println("Sender " + senderId + " timed out waiting for reply."); } } }}
Receiver Code (Consumes Requests and Sends Replies)
Section titled “Receiver Code (Consumes Requests and Sends Replies)”package org.example.JMSTOJMS;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import javax.jms.*;
public class MultiTempMessageReceiver { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); String requestQueueName = "quick-start-request";
try (Connection connection = connectionFactory.createConnection()) { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue requestQueue = session.createQueue(requestQueueName); MessageConsumer requestConsumer = session.createConsumer(requestQueue); connection.start();
// Launch multiple receivers for (int i = 1; i <= 3; i++) { int receiverId = i; new Thread(() -> { try { receiveAndReply(session, requestConsumer, receiverId); } catch (JMSException e) { e.printStackTrace(); } }).start(); }
Thread.sleep(Long.MAX_VALUE); // Keep alive } }
private static void receiveAndReply(Session session, MessageConsumer requestConsumer, int receiverId) throws JMSException { while (true) { Message requestMessage = requestConsumer.receive(); if (requestMessage instanceof TextMessage) { String requestText = ((TextMessage) requestMessage).getText(); String correlationId = requestMessage.getJMSCorrelationID(); Destination replyTo = requestMessage.getJMSReplyTo();
System.out.println("Receiver " + receiverId + " received request: " + requestText + " with CorrelationID: " + correlationId);
try (MessageProducer replyProducer = session.createProducer(replyTo)) { TextMessage replyMessage = session.createTextMessage("Response from Receiver " + receiverId); replyMessage.setJMSCorrelationID(correlationId); replyProducer.send(replyMessage);
System.out.println("Receiver " + receiverId + " sent reply with CorrelationID: " + correlationId); } } } }}
Key Points
Section titled “Key Points”- TemporaryQueue: Sender creates it for replies; automatically deleted when session closes.
- Correlation ID: Ensures replies are matched with original requests.
- Scaling: Multiple receivers can listen on the same request queue.
- Concurrency: Example uses threads to simulate parallel senders/receivers.
✅ This completes the request/reply workflow: the Sender publishes to quick-start-request
, Receivers process and respond back to the sender’s TemporaryQueue
.