Pub/Sub with a NodeJS Oracle Database App

Did you know you can easily add pub/sub capabilities to your NodeJS/TypeScript app using Oracle Database?

In this article, we’ll implement a simple pub/sub TypeScript example using Oracle Database Transactional Event Queues (TxEventQ). If you’re not familiar with TxEventQ or database-driven messaging, I recommend reading my article Pub/Sub in your DB.

You’ll learn patterns you can use in any NodeJS/TypeScript app, making it easy to implement database-driven messaging between one or more distributed services.

Just looking for code? Click Here to view the sample.

Install Dependencies

We’ll use the node-oracledb driver to connect with Oracle Database. The testcontainers-node package lets us spin up disposable Oracle Database containers for testing.

Install these with npm or the dependency manager of your choice:

npm i testcontainers oracledb

Database Configuration

From an Oracle Database instance, we’ll set up queue user permissions, create a queue, and install pub/sub functions. Set this up with any Oracle Database version 23ai or later, including Oracle Database Free.

You can find the full SQL, including permissions and PL/SQL statements in the init.sql script.

NodeJS Thick Client?

Equivalent setup is possible with the NodeJS thick client. In this example, we’ll use PL/SQL and a thin client.

Permissions

Because we’re using database-driven messaging, your database app user should have at least these minimum permissions to produce and consume events using TxEventQ:

-- Configure testuser with the necessary privileges to use Transactional Event Queues.

-- pub/sub permissions
grant execute on dbms_aq to testuser;
-- queue administration
grant execute on dbms_aqadm to testuser;
-- optional: jms permissions, if using jms
grant execute on dbms_aqin to testuser;
grant execute on dbms_aqjms to testuser;
Creating a queue

Next, we’ll create a queue to produce and consume messages to/from. The create_transactional_event_queue procedure is used to create the queue. After a queue is created, it should be started using the start_queue procedure before messages are sent.

In this example, the queue is created under the testuser schema:

-- Create a Transactional Event Queue
begin
    -- See https://docs.oracle.com/en/database/oracle/oracle-database/23/arpls/DBMS_AQADM.html#GUID-93B0FF90-5045-4437-A9C4-B7541BEBE573
    -- For documentation on creating Transactional Event Queues.
    dbms_aqadm.create_transactional_event_queue(
            queue_name         => 'testuser.event_stream',
        -- Payload can be RAW, JSON, DBMS_AQADM.JMS_TYPE, or an object type.
        -- Default is DBMS_AQADM.JMS_TYPE.
            queue_payload_type => 'JSON'
    );
    -- Start the queue
    dbms_aqadm.start_queue(
            queue_name => 'testuser.event_stream'
    );
end;
/

We use the JSON payload type in our queue. ChatGPT said:

The JSON type works well for flexible serialized objects, and you can also use JMS, Raw (for generic binary payloads), or custom database object payload types.

Java applications that use the Java Message Service API typically rely on JMS payloads.

Pub/Sub functions

Next, we’ll create database procedures/functions to produce and consume JSON messages. We’ll invoke these database functions from our TypeScript client.

The produce_json_event procedure takes a JSON event as a parameter, and enqueues it to the event_stream queue we created earlier.

This procedure wraps dbms_aq.enqueue, so the invocation from our TypeScript client is simpler:

-- Procedure to produce a JSON event to the event_stream queue.
create or replace procedure testuser.produce_json_event (
    event in json
) as
    enqueue_options dbms_aq.enqueue_options_t;
    message_properties dbms_aq.message_properties_t;
    msg_id raw(16);
begin
    enqueue_options := dbms_aq.enqueue_options_t();
    message_properties := dbms_aq.message_properties_t();
    dbms_aq.enqueue(
            queue_name => 'testuser.event_stream',
            enqueue_options => enqueue_options,
            message_properties => message_properties,
            payload => event,
            msgid => msg_id
    );
-- remember to commit the enqueue, so the message is published to the database.
commit;
end;
/

The consume_json_event function takes no arguments, and returns a JSON value if one exists on the event_stream queue.

The dbms_aq.dequeue procedure pops a message from the queue, if one is available:

-- Procedure to consume a JSON event from the event_stream queue.
create or replace function testuser.consume_json_event return json is
    dequeue_options dbms_aq.dequeue_options_t;
    message_properties dbms_aq.message_properties_t;
    msg_id raw(16);
    event json;
begin
    dequeue_options := dbms_aq.dequeue_options_t();
    message_properties := dbms_aq.message_properties_t();
    dequeue_options.navigation := dbms_aq.first_message;
    dequeue_options.wait := dbms_aq.no_wait;

    dbms_aq.dequeue(
            queue_name => 'testuser.event_stream',
            dequeue_options => dequeue_options,
            message_properties => message_properties,
            payload => event,
            msgid => msg_id
    );
-- return the JSON event to the caller
return event;
end;
/

TypeScript producer/consumer

The TypeScript SQLProducer takes an arbitrary object, converts it to serialized JSON, and sends it to the event_stream queue.

We use OSON for serialized events – an efficient, Oracle Database native, binary JSON format.

import {type Result, type Connection, DB_TYPE_JSON, BIND_IN, type BindParameters} from "oracledb";

export class SQLProducer {
    constructor(private readonly conn: Connection) {
    }

    public async send(event: any): Promise<Result<unknown>> {
        // serialize the event to OSON
        const oson = this.conn.encodeOSON(event);
        // Bind the binary oson value as Oracle Database JSON
        // Specifically, as an input parameter
        const params: BindParameters = {
            oson: {
                type: DB_TYPE_JSON,
                dir: BIND_IN,
                val: oson
            }
        }
        
        // Invoke the produce_json_event function using the oson val
        return await this.conn.execute(
            `BEGIN
              produce_json_event(:oson);
            END;`,
            params
        )
    }
}

The TypeScript SQLConsumer polls an event from the queue, if one exists.

When an event is received, the OSON is deserialized to an arbitrary object.

import {BIND_OUT, type BindParameters, type Connection, DB_TYPE_JSON} from "oracledb";

export class SQLConsumer {
    constructor(private readonly conn: Connection) {
    }

    public async poll(): Promise<any> {
        // Bind an output parameter as Oracle Database JSON
        const params: BindParameters = {
            oson: {
                type: DB_TYPE_JSON,
                dir: BIND_OUT,
            }
        }
        
        // Receive an event from the queue, if one exists
        let result = await this.conn.execute(
            `BEGIN
              :oson := consume_json_event();
            END;`,
            params
        );
        if (result.outBinds) {
            // If we got an event, deserialize it to an object
            // @ts-ignore
            return this.conn.decodeOSON(result.outBinds.oson)
        }
        return undefined
    }
}

Try it with a simple test

txeventq.test.ts uses a disposable Oracle Database Free container, creating a queue and installing pub/sub functions in the database on startup. The linked test class uses the a gvenzl/oracle-free container.

A SQLProducer and SQLConsumer are initialized after the database starts. The producer sends a message, and the consumer receives it.

import { describe, it, expect, beforeAll, afterAll } from 'vitest'
import {
    OracleDatabaseContainer,
    StartedOracleDatabaseContainer
} from "../../src/testcontainers/generic_oracle_database_container.js";
import type {Connection} from "oracledb";
import {SQLProducer} from "../../src/txeventq/SQLProducer.js";
import {SQLConsumer} from "../../src/txeventq/SQLConsumer.js";

describe("Transactional Event Queues", () => {
    let startedContainer: StartedOracleDatabaseContainer
    let conn: Connection

    beforeAll(async () => {
        let container = new OracleDatabaseContainer()
            // Mount the SQL initialization script on the container
            .withCopyFilesToContainer([{
                source: "./test/txeventq/init.sql",
                target: "/tmp/init.sql"
            }])
        startedContainer = await container.start()
        
        // Create the queue and install the pub/sub functions
        let {output, exitCode} = await startedContainer.exec("sqlplus / as sysdba @/tmp/init.sql")
        expect(exitCode).toBe(0)
        conn = await startedContainer.getDatabaseConnection()
    }, 10 * 60 * 1000); // With a pre-pulled image, the container should start up in seconds.

    afterAll(async () => {
        await startedContainer.stop()
    });

    it("plsql pub-sub", async () => {
        // Start a producer and send a message
        let producer = new SQLProducer(conn)
        let event = {
            message: "Hello from TxEventQ!"
        }
        await producer.send(event)
        console.log("Produced event: " + JSON.stringify(event))

        let result: any
        
        // Start a consumer and receive a message
        let consumer = new SQLConsumer(await startedContainer.getDatabaseConnection())
        while (!result) {
            result = await consumer.poll()
        }
        expect(result.message).toEqual(event.message)
        console.log("Consumed event: " + JSON.stringify(result))
    }, 5000)
});

From the project’s typescript directory, run the test using npm:

npm run sql-producer-consumer

In the test output, you should see a JSON event produced and consumed:

Terminal output showing the results of a TypeScript test for transactional event queues, including produced and consumed events with timestamps and durations.

That’s it! With this setup, you can implement pub/sub between apps using low-latency, database-driven messaging.

Expect a follow-up post that demonstrates how to implement TypeScript clients with multiple producers and consumers using consumer groups!

References

Response

  1. […] Pub/Sub with a NodeJS Oracle Database App – Anders Swanson gives you a practical example for Pub/Sub with Oracle Database and […]

Leave a Reply

Discover more from andersswanson.dev

Subscribe now to keep reading and get access to the full archive.

Continue reading