Event-driven CDC, all within the database

In this article, we’ll describe a simple Spring Boot app that implements event-driven CDC (Change Data Capture), all within the database.

The app will insert data into a table, and automatically publish events to an in-database queue for every insert. A consumer will then react to the event for processing.

What’s neat about this? The entire database pipeline, from insert to event consumer, occurs inside the database. This means less round trips, fewer services to orchestrate, and higher fidelity data (data never needs to leave the database server until events are consumed).

Just looking for the code? Check out the GitHub repo here.

Let’s define database objects for CDC

We’ll first define a table called tickets that our app inserts rows into. We’ll wire up this table so any INSERT is captured as an event for the purposes of CDC.

create table testuser.tickets (
    id      number(10) generated always as identity primary key,
    title   varchar2(100),
    status  varchar2(50),
    created timestamp default systimestamp
);

Next, we create a Transactional Event Queue (TxEventQ) named ticket_event accepting JMS events. We’ll use this queue for all our CDC events. If you’re not familiar with TxEventQ, it’s a high-throughput event streaming system built into Oracle Database available since 21c.

-- Create a Transactional Event Queue
begin
    dbms_aqadm.create_transactional_event_queue(
            queue_name         => 'testuser.ticket_event',
            queue_payload_type => DBMS_AQADM.JMS_TYPE
    );

    -- Start the queue
    dbms_aqadm.start_queue(
            queue_name         => 'testuser.ticket_event'
    );
end;
/

Lastly, we need a database trigger that fires on each insert into the tickets table. For each row, we create a JSON object and publish it to the ticket_event queue using the dbms_aq.enqueue function.

create or replace trigger testuser.tickets_insert_trigger
    after insert on testuser.tickets
    for each row
declare
    enqueue_options    dbms_aq.enqueue_options_t;
    message_properties dbms_aq.message_properties_t;
    message_handle     raw(16);
    jms_msg            sys.aq$_jms_text_message;
    ticket_json        varchar2(400);
begin
    ticket_json := json_object(
            'id'   value :new.id,
            'title' value :new.title,
            'status' value :new.status,
            'created' value :new.created
            returning varchar2(400)
    );

    -- Create a JMS message for the event
    jms_msg := sys.aq$_jms_text_message.construct;
    jms_msg.set_text(ticket_json);

    -- Send the message to the ticket_event queue
    dbms_aq.enqueue(
            queue_name          => 'testuser.ticket_event',
            enqueue_options     => enqueue_options,
            message_properties  => message_properties,
            payload             => jms_msg,
            msgid               => message_handle
    );
    -- commit isn't required in a trigger, due to the enclosing txn
end;
/

You can find the full schema file here.

Writing the simple Spring Boot app

The Spring Boot app is pretty simple: it has two components, TicketService and CDCConsumer. The TicketService inserts tickets into the tickets table using JDBC, and the CDCConsumer responds to JMS events in the ticket_event queue.

We’ll use the spring-boot-starter-data-jdbc and oracle-spring-boot-starter-aqjms Spring Starters to pull in all the dependencies for our project. You can also find the full pom file here.

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jdbc</artifactId>
</dependency>
<dependency>
    <groupId>com.oracle.database.spring</groupId>
    <artifactId>oracle-spring-boot-starter-aqjms</artifactId>
    <version>${oracle.starters.version}</version>
</dependency>

First, let’s look at the CDCConsumer. It’s a basic Spring JMS consumer that receives messages and prints them to the console. We add a CountDownLatch purely for testing purposes, so we can stop the consumer after it receives 10 events.

If you’re writing a real app, the receiveMessage method is where you’d put your business logic: processing the ticket, doing an update, or whatever Java application logic you need.

package com.example.cdc;

import java.util.concurrent.CountDownLatch;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class CDCConsumer {
    private final CountDownLatch countDownLatch;

    public CDCConsumer(@Value("${expected.messages:10}") int numMessages) {
        countDownLatch = new CountDownLatch(numMessages);
    }
    @JmsListener(destination = "${txeventq.queue.name:ticket_event}", id = "cdcConsumer")
    public void receiveMessage(String message) {
        System.out.printf("[CONSUMER] Processing ticket: %s%n", message);
        countDownLatch.countDown();
    }

    public void await() throws InterruptedException {
        countDownLatch.await();
    }
}

The TicketService is also quite simple. It takes a Java DataSource and inserts rows into our tickets table, solely for demonstrating the database trigger and consumer:

package com.example.cdc;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;

import javax.sql.DataSource;
import org.springframework.stereotype.Component;

@Component
public class TicketService {
    private final DataSource dataSource;

    public TicketService(DataSource dataSource) {
        this.dataSource = dataSource;
    }

    public void createTicket(String title, String status) {
        final String sql = "insert into tickets (title, status) values (?, ?)";
        try (Connection conn = dataSource.getConnection();
             PreparedStatement ps = conn.prepareStatement(sql)) {
            ps.setString(1, title);
            ps.setString(2, status);
            ps.executeUpdate();
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }
}

Let’s test it out!

I’ve included a test suite with Testcontainers and Oracle Database Free in the module – download it from GitHub to try it out. To run the test, you’ll need Java 21+, Maven, and a Docker-compatible environment to run the Oracle Database Free container.

The test suite starts up an Oracle Database Free container, inserts 10 rows into the tickets table, and waits for the CDCConsumer to asynchronously process each event. If you run the test with mvn test, you should see the following output:

[MAIN] Starting Spring Boot CDC Example
[MAIN] Created tickets
[MAIN] Waiting for consumer to finish processing messages...
[CONSUMER] Processing ticket: {"id":1,"title":"ticket 1","status":"NEW","created":"2025-06-26T18:42:32.285317"}
[CONSUMER] Processing ticket: {"id":2,"title":"ticket 2","status":"NEW","created":"2025-06-26T18:42:33.282544"}
[CONSUMER] Processing ticket: {"id":3,"title":"ticket 3","status":"NEW","created":"2025-06-26T18:42:33.295571"}
[CONSUMER] Processing ticket: {"id":4,"title":"ticket 4","status":"NEW","created":"2025-06-26T18:42:33.304789"}
[CONSUMER] Processing ticket: {"id":5,"title":"ticket 5","status":"NEW","created":"2025-06-26T18:42:33.312474"}
[CONSUMER] Processing ticket: {"id":6,"title":"ticket 6","status":"NEW","created":"2025-06-26T18:42:33.319317"}
[CONSUMER] Processing ticket: {"id":7,"title":"ticket 7","status":"NEW","created":"2025-06-26T18:42:33.327824"}
[CONSUMER] Processing ticket: {"id":8,"title":"ticket 8","status":"NEW","created":"2025-06-26T18:42:33.337015"}
[CONSUMER] Processing ticket: {"id":9,"title":"ticket 9","status":"NEW","created":"2025-06-26T18:42:33.348141"}
[CONSUMER] Processing ticket: {"id":10,"title":"ticket 10","status":"NEW","created":"2025-06-26T18:42:33.360091"}
[MAIN] Consumer finished.

Find the full test code here for reference:

package com.example.cdc;

import java.time.Duration;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.jms.config.JmsListenerEndpointRegistry;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.oracle.OracleContainer;
import org.testcontainers.utility.MountableFile;

import static org.junit.jupiter.api.Assertions.assertTimeout;

@Testcontainers
@SpringBootTest
public class SpringBootCDCTest {
    /**
     * Use a containerized Oracle Database instance for testing.
     */
    @ServiceConnection
    static OracleContainer oracleContainer = new OracleContainer("gvenzl/oracle-free:23.26.0-slim-faststart")
            .withStartupTimeout(Duration.ofMinutes(5))
            .withUsername("testuser")
            .withPassword("testpwd");

    /**
     * Set up the test environment:
     * 1. configure Spring Properties to use the test database.
     * 2. run a SQL script to configure the test database for our JMS example.
     */
    @BeforeAll
    static void setUp() throws Exception {
        oracleContainer.start();
        // Configures the test database, granting the test user access to TxEventQ, creating and starting a queue for JMS.
        oracleContainer.copyFileToContainer(MountableFile.forClasspathResource("init.sql"), "/tmp/init.sql");
        oracleContainer.execInContainer("sqlplus", "sys / as sysdba", "@/tmp/init.sql");
    }


    @Autowired
    private JmsListenerEndpointRegistry jmsListenerEndpointRegistry;

    @Autowired
    TicketService ticketService;

    @Autowired
    CDCConsumer consumer;

    @Test
    void springBootJMSExample() {
        System.out.println("[MAIN] Starting Spring Boot CDC Example");

        for (int i = 0; i < 10; i++) {
            ticketService.createTicket("ticket " + (i+1), "NEW");
        }
        System.out.println("[MAIN] Created tickets");

        System.out.println("[MAIN] Waiting for consumer to finish processing messages...");
        assertTimeout(Duration.ofSeconds(5), () -> {
            consumer.await();
        });

        // Do a clean shutdown of the JMS listener.
        jmsListenerEndpointRegistry.getListenerContainer("cdcConsumer").stop();
        System.out.println("[MAIN] Consumer finished.");
    }
}

Final Thoughts

To scale this up for a “production ready” app, I’d implement parallel JMS consumers in Spring Boot to process insert/update/etc events at scale. You can have as many concurrent consumers as you have event-streams (essential partitions) in TxEventQ.

You may also want multiple types of CDC processing, with different consumer groups. Think one consumer group to handle ticket processing, like summary generation, and another to trigger ticket-related workflows. You can enable this using the multiple_consumers => true parameter during TxEventQ creation.

If you use the Oracle JMS APIs for messaging (lower-level than Spring JMS), you can also leverage the JMS database connection to run SQL statements in the same transaction as the Consumer. I have an example of this here: Use JMS Topics for Messaging, that also uses multiple consumers.

Questions?

Leave me a comment or reach out on LinkedIn, and I’ll be happy to chat!

References

Leave a Reply

Discover more from andersswanson.dev

Subscribe now to keep reading and get access to the full archive.

Continue reading