Did you know you can easily add pub/sub capabilities to your NodeJS/TypeScript app using Oracle Database?
In this article, we’ll implement a simple pub/sub TypeScript example using Oracle Database Transactional Event Queues (TxEventQ). If you’re not familiar with TxEventQ or database-driven messaging, I recommend reading my article Pub/Sub in your DB.
You’ll learn patterns you can use in any NodeJS/TypeScript app, making it easy to implement database-driven messaging between one or more distributed services.
Just looking for code? Click Here to view the sample.
Install Dependencies
We’ll use the node-oracledb driver to connect with Oracle Database. The testcontainers-node package lets us spin up disposable Oracle Database containers for testing.
Install these with npm or the dependency manager of your choice:
npm i testcontainers oracledbDatabase Configuration
From an Oracle Database instance, we’ll set up queue user permissions, create a queue, and install pub/sub functions. Set this up with any Oracle Database version 23ai or later, including Oracle Database Free.
You can find the full SQL, including permissions and PL/SQL statements in the init.sql script.
NodeJS Thick Client?
Equivalent setup is possible with the NodeJS thick client. In this example, we’ll use PL/SQL and a thin client.
Permissions
Because we’re using database-driven messaging, your database app user should have at least these minimum permissions to produce and consume events using TxEventQ:
-- Configure testuser with the necessary privileges to use Transactional Event Queues.
-- pub/sub permissions
grant execute on dbms_aq to testuser;
-- queue administration
grant execute on dbms_aqadm to testuser;
-- optional: jms permissions, if using jms
grant execute on dbms_aqin to testuser;
grant execute on dbms_aqjms to testuser;Creating a queue
Next, we’ll create a queue to produce and consume messages to/from. The create_transactional_event_queue procedure is used to create the queue. After a queue is created, it should be started using the start_queue procedure before messages are sent.
In this example, the queue is created under the testuser schema:
-- Create a Transactional Event Queue
begin
-- See https://docs.oracle.com/en/database/oracle/oracle-database/23/arpls/DBMS_AQADM.html#GUID-93B0FF90-5045-4437-A9C4-B7541BEBE573
-- For documentation on creating Transactional Event Queues.
dbms_aqadm.create_transactional_event_queue(
queue_name => 'testuser.event_stream',
-- Payload can be RAW, JSON, DBMS_AQADM.JMS_TYPE, or an object type.
-- Default is DBMS_AQADM.JMS_TYPE.
queue_payload_type => 'JSON'
);
-- Start the queue
dbms_aqadm.start_queue(
queue_name => 'testuser.event_stream'
);
end;
/
We use the JSON payload type in our queue. ChatGPT said:
The JSON type works well for flexible serialized objects, and you can also use JMS, Raw (for generic binary payloads), or custom database object payload types.
Java applications that use the Java Message Service API typically rely on JMS payloads.
Pub/Sub functions
Next, we’ll create database procedures/functions to produce and consume JSON messages. We’ll invoke these database functions from our TypeScript client.
The produce_json_event procedure takes a JSON event as a parameter, and enqueues it to the event_stream queue we created earlier.
This procedure wraps dbms_aq.enqueue, so the invocation from our TypeScript client is simpler:
-- Procedure to produce a JSON event to the event_stream queue.
create or replace procedure testuser.produce_json_event (
event in json
) as
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
msg_id raw(16);
begin
enqueue_options := dbms_aq.enqueue_options_t();
message_properties := dbms_aq.message_properties_t();
dbms_aq.enqueue(
queue_name => 'testuser.event_stream',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => event,
msgid => msg_id
);
-- remember to commit the enqueue, so the message is published to the database.
commit;
end;
/The consume_json_event function takes no arguments, and returns a JSON value if one exists on the event_stream queue.
The dbms_aq.dequeue procedure pops a message from the queue, if one is available:
-- Procedure to consume a JSON event from the event_stream queue.
create or replace function testuser.consume_json_event return json is
dequeue_options dbms_aq.dequeue_options_t;
message_properties dbms_aq.message_properties_t;
msg_id raw(16);
event json;
begin
dequeue_options := dbms_aq.dequeue_options_t();
message_properties := dbms_aq.message_properties_t();
dequeue_options.navigation := dbms_aq.first_message;
dequeue_options.wait := dbms_aq.no_wait;
dbms_aq.dequeue(
queue_name => 'testuser.event_stream',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => event,
msgid => msg_id
);
-- return the JSON event to the caller
return event;
end;
/TypeScript producer/consumer
The TypeScript SQLProducer takes an arbitrary object, converts it to serialized JSON, and sends it to the event_stream queue.
We use OSON for serialized events – an efficient, Oracle Database native, binary JSON format.
import {type Result, type Connection, DB_TYPE_JSON, BIND_IN, type BindParameters} from "oracledb";
export class SQLProducer {
constructor(private readonly conn: Connection) {
}
public async send(event: any): Promise<Result<unknown>> {
// serialize the event to OSON
const oson = this.conn.encodeOSON(event);
// Bind the binary oson value as Oracle Database JSON
// Specifically, as an input parameter
const params: BindParameters = {
oson: {
type: DB_TYPE_JSON,
dir: BIND_IN,
val: oson
}
}
// Invoke the produce_json_event function using the oson val
return await this.conn.execute(
`BEGIN
produce_json_event(:oson);
END;`,
params
)
}
}The TypeScript SQLConsumer polls an event from the queue, if one exists.
When an event is received, the OSON is deserialized to an arbitrary object.
import {BIND_OUT, type BindParameters, type Connection, DB_TYPE_JSON} from "oracledb";
export class SQLConsumer {
constructor(private readonly conn: Connection) {
}
public async poll(): Promise<any> {
// Bind an output parameter as Oracle Database JSON
const params: BindParameters = {
oson: {
type: DB_TYPE_JSON,
dir: BIND_OUT,
}
}
// Receive an event from the queue, if one exists
let result = await this.conn.execute(
`BEGIN
:oson := consume_json_event();
END;`,
params
);
if (result.outBinds) {
// If we got an event, deserialize it to an object
// @ts-ignore
return this.conn.decodeOSON(result.outBinds.oson)
}
return undefined
}
}Try it with a simple test
txeventq.test.ts uses a disposable Oracle Database Free container, creating a queue and installing pub/sub functions in the database on startup. The linked test class uses the a gvenzl/oracle-free container.
A SQLProducer and SQLConsumer are initialized after the database starts. The producer sends a message, and the consumer receives it.
import { describe, it, expect, beforeAll, afterAll } from 'vitest'
import {
OracleDatabaseContainer,
StartedOracleDatabaseContainer
} from "../../src/testcontainers/generic_oracle_database_container.js";
import type {Connection} from "oracledb";
import {SQLProducer} from "../../src/txeventq/SQLProducer.js";
import {SQLConsumer} from "../../src/txeventq/SQLConsumer.js";
describe("Transactional Event Queues", () => {
let startedContainer: StartedOracleDatabaseContainer
let conn: Connection
beforeAll(async () => {
let container = new OracleDatabaseContainer()
// Mount the SQL initialization script on the container
.withCopyFilesToContainer([{
source: "./test/txeventq/init.sql",
target: "/tmp/init.sql"
}])
startedContainer = await container.start()
// Create the queue and install the pub/sub functions
let {output, exitCode} = await startedContainer.exec("sqlplus / as sysdba @/tmp/init.sql")
expect(exitCode).toBe(0)
conn = await startedContainer.getDatabaseConnection()
}, 10 * 60 * 1000); // With a pre-pulled image, the container should start up in seconds.
afterAll(async () => {
await startedContainer.stop()
});
it("plsql pub-sub", async () => {
// Start a producer and send a message
let producer = new SQLProducer(conn)
let event = {
message: "Hello from TxEventQ!"
}
await producer.send(event)
console.log("Produced event: " + JSON.stringify(event))
let result: any
// Start a consumer and receive a message
let consumer = new SQLConsumer(await startedContainer.getDatabaseConnection())
while (!result) {
result = await consumer.poll()
}
expect(result.message).toEqual(event.message)
console.log("Consumed event: " + JSON.stringify(result))
}, 5000)
});From the project’s typescript directory, run the test using npm:
npm run sql-producer-consumerIn the test output, you should see a JSON event produced and consumed:

That’s it! With this setup, you can implement pub/sub between apps using low-latency, database-driven messaging.
Expect a follow-up post that demonstrates how to implement TypeScript clients with multiple producers and consumers using consumer groups!
References
- Download the sample code
- Thick Client Sample (alternative to thin client in this blog post)
- What is Transactional Messaging?
- What is TxEventQ?

Leave a Reply