The JMS API provides a standardized way for Java-based applications to communicate asynchronously by producing and consuming messages. Using JMS, different applications can efficiently communicate asynchronously, promoting loose coupling.
JMS includes the following key features to enable service-to-service messaging:
- Asynchronous messaging: messages are produced and consumed by applications without blocking.
- Exactly Once Delivery: JMS messages sent over Oracle Database are delivered only once to their intended destination
- JMS supports both point-to-point (queue) and pub-sub (topic) messaging patterns
In this article, we’ll walk through sample code that implements a parallel JMS producer/consumer example using the pub-sub topic model to enable concurrent consumers on a given topic. We’ll use Oracle Database Transactional Event Queues (TxEventQ) as the JMS message broker.
If you’re unfamiliar with TxEventQ, it’s a high-throughput messaging system capable of multiple producers/consumers and exactly-once messaging. I’ve published a more detailed write-up on TxEventQ Here.
Want to skip the article and go straight to the code? Find the full sample on GitHub Here.
Create a JMS Topic
As Oracle Database is our JMS message broker, we’ll write a short SQL script to create a JMS topic within the database.
-- Create a Transactional Event Queue
begin
-- See https://docs.oracle.com/en/database/oracle/oracle-database/23/arpls/DBMS_AQADM.html#GUID-93B0FF90-5045-4437-A9C4-B7541BEBE573
-- For documentation on creating Transactional Event Queues.
dbms_aqadm.create_transactional_event_queue(
queue_name => 'testuser.mytopic',
-- Payload can be RAW, JSON, DBMS_AQADM.JMS_TYPE, or an object type.
-- Default is DBMS_AQADM.JMS_TYPE.
queue_payload_type => DBMS_AQADM.JMS_TYPE,
-- FALSE means queues can only have one consumer for each message. This is the default.
-- TRUE means queues created in the table can have multiple consumers for each message.
multiple_consumers => true
);
-- 6 queue shards for consumer parallelization
dbms_aqadm.set_queue_parameter('testuser.mytopic', 'shard_num', 6);
-- must be set to make sure that order is guaranteed at the jms consumer.
dbms_aqadm.set_queue_parameter('testuser.mytopic', 'sticky_dequeue', 1);
-- must be set to make sure that order of the events per correlation in place
DBMS_AQADM.set_queue_parameter('testuser.mytopic', 'KEY_BASED_ENQUEUE', 1);
-- Start the queue
dbms_aqadm.start_queue(
queue_name => 'testuser.mytopic'
);
end;
/
begin
dbms_aqadm.add_subscriber(
queue_name => 'testuser.mytopic',
subscriber => sys.aq$_agent('example_subscriber_1', null, null)
);
dbms_aqadm.add_subscriber(
queue_name => 'testuser.mytopic',
subscriber => sys.aq$_agent('example_subscriber_2', null, null)
);
end;
/Let’s break down this script, examining the PL/SQL procedures used to create and configure the topic:
- We use the dbms_aqadm.create_transactional_event_queue procedure to create a topic by passing the
multiple_consumersflag. When present, this flag allows multiple consumers per message, and message persistence within the database after consumption. - After creating the topic, we start it using the dbms_aqadm.start_queue procedure.
- We use the dbms_aqadm.set_queue_parameter to configure the topic with six shards (parallel event streams), sticky dequeue for strict ordering on the JMS consumers, and key based enqueue to allow routing of messaging a specific shard/event stream within the topic.
- Two subscribers are added to the topic, allowing us to form up to two consumer groups. Our Java consumers will use this subscriber name when subscribing to a topic.
JMS Producer
Let’s now implement a JMSProducer class that produces messages to a JMS topic. Our JMSProducer uses a Java DataSource connected to Oracle Database to create a JMS session, and an associated JMS publisher for that session.
Taking a list of input data, the JMSProducer writes each String as a JMS TextMessage and produces it to the topic.
import java.util.List;
import java.util.UUID;
import jakarta.jms.JMSException;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import jakarta.jms.Topic;
import jakarta.jms.TopicConnection;
import javax.sql.DataSource;
import oracle.jakarta.jms.AQjmsFactory;
import oracle.jakarta.jms.AQjmsSession;
import oracle.jakarta.jms.AQjmsTopicPublisher;
public class JMSProducer implements Runnable {
private final DataSource dataSource;
private final String username;
private final String topicName;
private final List<String> input;
public JMSProducer(DataSource dataSource, String username, String topicName, List<String> input) {
this.dataSource = dataSource;
this.username = username;
this.topicName = topicName;
this.input = input;
}
@Override
public void run() {
// Create a new JMS connection and session.
try (TopicConnection connection = AQjmsFactory.getTopicConnectionFactory(dataSource).createTopicConnection();
AQjmsSession session = (AQjmsSession) connection.createTopicSession(true, Session.AUTO_ACKNOWLEDGE)) {
connection.start();
Topic jmsTopic = session.getTopic(username, topicName);
// The JMS Connection must be started before use.
AQjmsTopicPublisher publisher = (AQjmsTopicPublisher) session.createPublisher(jmsTopic);
// Write the input data as JMS text messages to a JMS topic.
for (String s : input) {
TextMessage message = session.createTextMessage(s);
message.setJMSCorrelationID(UUID.randomUUID().toString());
publisher.publish(message);
}
session.commit();
} catch (JMSException e) {
System.out.println("JMSException caught: " + e);
throw new RuntimeException(e);
}
System.out.println("[PRODUCER] Sent all JMS messages. Closing producer!");
}
}JMS Consumer
The JMSConsumer class also uses a Java DataSource to create a JMS session, using that session to subscribe to topic using the subscriber name created during the topic initialization.
The consumer then enters a polling loop to receive messages — once all messages have been consumed using the count variable, the consumer exits.
Because we are using JMS over JDBC, the consumer can insert a record into a database table in the same transaction as the that it received the message on. This allows us to combine database operations (DML) with message receipt in an atomic manner — if an error occurs during message processing, both the message receipt and DML are rolled back.
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import jakarta.jms.JMSException;
import jakarta.jms.MessageConsumer;
import jakarta.jms.Session;
import jakarta.jms.Topic;
import jakarta.jms.TopicConnection;
import javax.sql.DataSource;
import oracle.jakarta.jms.AQjmsException;
import oracle.jakarta.jms.AQjmsFactory;
import oracle.jakarta.jms.AQjmsSession;
import oracle.jakarta.jms.AQjmsTextMessage;
import oracle.jdbc.OracleTypes;
public class JMSConsumer implements Runnable {
private final DataSource dataSource;
private final int consumerID;
private final String username;
private final String topicName;
private final String groupName;
private final AtomicInteger count;
public JMSConsumer(DataSource dataSource, int consumerID, String groupName, String username, String topicName, AtomicInteger count) {
this.dataSource = dataSource;
this.consumerID = consumerID;
this.username = username;
this.topicName = topicName;
this.groupName = groupName;
this.count = count;
}
@Override
public void run() {
int consumedMessages = 0;
// Create a new JMS connection and session.
try (TopicConnection topicConn = AQjmsFactory.getTopicConnectionFactory(dataSource).createTopicConnection();
AQjmsSession session = (AQjmsSession) topicConn.createTopicSession(true, Session.AUTO_ACKNOWLEDGE);
Connection dbConn = session.getDBConnection()) {
Topic jmsTopic = session.getTopic(username, topicName);
// The JMS Connection must be started before use.
topicConn.start();
MessageConsumer consumer = session.createDurableSubscriber(jmsTopic, groupName);
while (true) {
AQjmsTextMessage message = (AQjmsTextMessage) consumer.receive(1_000); // Timeout: 1 second
if (message != null) {
// The atomic count abstraction is for example purposes only.
// We want to stop all the consumers after the count reaches 0.
if (count.decrementAndGet() >= 0) {
String msg = message.getText();
processMessage(msg, dbConn);
session.commit(); // Only commit if message received and processed successfully
consumedMessages++;
}
}
if (count.get() <= 0) {
System.out.printf("[CONSUMER %d (%s)] Received %d JMS messages. Closing consumer!%n", consumerID, groupName, consumedMessages);
return;
}
}
} catch (JMSException | SQLException e) {
System.out.println("Exception caught: " + e);
throw new RuntimeException(e);
}
}
private void processMessage(String message, Connection dbConn) throws SQLException {
final String sql = """
insert into weather_events (data) values(?)
""";
try (PreparedStatement stmt = dbConn.prepareStatement(sql)) {
stmt.setObject(1, message.getBytes(), OracleTypes.JSON);
stmt.executeUpdate();
}
}
}Testing it out with Oracle Database Free
Let’s write a test that takes both JMSProducer and JMSConsumer classes and runs a parallel message processing scenario using Oracle Database Free.
Our test class (JMSMultiConsumerTest) will do the following:
- Spin up a containerized instance of Oracle Database Free.
- Initialize the database, running the testuser-topic.sql script.
- Create two consumer groups, each with three instances of JMSConsumer and start them in virtual threads.
- Start an instance of JMSProducer in a virtual thread.
- Wait for the JMSConsumer instances to receive all messages.
- Verify that all the messages were inserted into the database by the JMSConsumer(s).
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import javax.sql.DataSource;
import oracle.ucp.jdbc.PoolDataSource;
import oracle.ucp.jdbc.PoolDataSourceFactory;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.oracle.OracleContainer;
import org.testcontainers.utility.MountableFile;
import static java.util.concurrent.Executors.newVirtualThreadPerTaskExecutor;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.fail;
@Testcontainers
public class JMSMultiConsumerTest {
private static final String oracleImage = "gvenzl/oracle-free:23.26.0-slim-faststart";
private static final String testUser = "testuser";
private static final String testPassword = "Welcome123#";
private static final String topicName = "mytopic";
@Container
private static final OracleContainer oracleContainer = new OracleContainer(oracleImage)
.withStartupTimeout(Duration.ofMinutes(3)) // allow possible slow startup
.withInitScripts(
"create-table.sql"
)
.withUsername(testUser)
.withPassword(testPassword);
private static DataSource dataSource;
private static List<String> input;
@BeforeAll
static void setUp() throws Exception {
// Configure the Oracle Database container with the TxEventQ test user.
oracleContainer.start();
oracleContainer.copyFileToContainer(MountableFile.forClasspathResource("testuser-topic.sql"), "/tmp/init.sql");
oracleContainer.execInContainer("sqlplus", "sys / as sysdba", "@/tmp/init.sql");
dataSource = getDataSource();
input = Files.readAllLines(Paths.get("src", "test", "resources", "producer-events.txt"));
}
@Test
void produceConsume() throws Exception {
// Used for tracking the number of messages consumed. Once all messages have been consumed and the latch is empty,
// the test completes.
AtomicInteger count = new AtomicInteger(input.size());
AtomicInteger count2 = new AtomicInteger(input.size());
// Number of consumer threads, may be 1 - 6.
final int consumerThreads = 3;
// Create an executor to submit producer and consumer threads.
ExecutorService executor = newVirtualThreadPerTaskExecutor();
List<Future<?>> consumers = new ArrayList<>();
// Start the consumer thread(s) concurrently.
for (int i = 0; i < consumerThreads; i++) {
consumers.add(executor.submit(getConsumer(i+1, count, "example_subscriber_1")));
consumers.add(executor.submit(getConsumer(i+1, count2, "example_subscriber_2")));
}
// Start the producer thread.
executor.submit(getProducer());
// Wait for the consumer(s) to receive all messages.
for (Future<?> consumer : consumers) {
consumer.get();
}
// Verify consumer inserted all the messages to the weather_events database table.
// Total records should be input.size * 2, because there are two parallel consumer groups.
verifyEventsSent(input.size() * 2);
}
private void verifyEventsSent(int count) throws SQLException {
try (Connection conn = dataSource.getConnection();
Statement stmt = conn.createStatement()) {
String sql = "select count(*) from weather_events";
ResultSet rs = stmt.executeQuery(sql);
if (rs.next()) {
assertThat(count).isEqualTo(rs.getInt(1));
} else {
fail("no records found");
}
}
}
private JMSProducer getProducer() {
return new JMSProducer(
dataSource,
testUser,
topicName,
input
);
}
private JMSConsumer getConsumer(int id, AtomicInteger count, String group) {
return new JMSConsumer(
dataSource,
id,
group,
testUser,
topicName,
count
);
}
private static DataSource getDataSource() throws SQLException {
PoolDataSource ds = PoolDataSourceFactory.getPoolDataSource();
ds.setConnectionFactoryClassName("oracle.jdbc.pool.OracleDataSource");
ds.setConnectionPoolName(UUID.randomUUID().toString());
ds.setURL(oracleContainer.getJdbcUrl());
ds.setUser(oracleContainer.getUsername());
ds.setPassword(oracleContainer.getPassword());
ds.setConnectionPoolName(UUID.randomUUID().toString());
ds.setMaxPoolSize(30);
ds.setInitialPoolSize(10);
ds.setMinPoolSize(1);
return ds;
}
}To run the test, you’ll need Java 21+, Maven, and a Docker-compatible environment. You can run it using Maven like so:
mvn test
After the database container is initialized, you should see output about the producer and consumers. Note that the ordering of the consumers may differ due to their parallel nature:
[PRODUCER] Sent all JMS messages. Closing producer!
[CONSUMER 2 (example_subscriber_2)] Received 28 JMS messages. Closing consumer!
[CONSUMER 1 (example_subscriber_2)] Received 36 JMS messages. Closing consumer!
[CONSUMER 3 (example_subscriber_2)] Received 22 JMS messages. Closing consumer!
[CONSUMER 1 (example_subscriber_1)] Received 21 JMS messages. Closing consumer!
[CONSUMER 2 (example_subscriber_1)] Received 36 JMS messages. Closing consumer!
[CONSUMER 3 (example_subscriber_1)] Received 29 JMS messages. Closing consumer!
References
These additional resources can help you get started with TxEventQ and Oracle Database Free:

Leave a Reply