Migrate Apache Kafka applications to Oracle Database: Part III

Transactional messaging using Oracle Database

This article is the third post in a three-part series about progressively migrating Apache Kafka applications to Oracle Database Transactional Event Queues, adding advanced database features along the way..

  1. If you’re just getting started, start at Part I, Migrating an Apache Kafka Application to Oracle Database before continuing with this guide.
  2. If you’ve read Part I but not Part II, see Part II: Adding OSON Support

In this guide, we’ll add Transactional Messaging to our app. Transactional Messaging is a robust messaging pattern where consumers and/or producers combine event streaming with database operations (DML) in atomic database transactions. In this article, we’ll use Kafka Java Client for Oracle Database Transactional Event Queues to implement transactional messaging capabilities.

You can also refer to the checkpointed code for reference:

  • The kafka-app-step-2 module contains the app with Oracle Database and OSON support. You should start here for this guide.
  • The kafka-app-step-3 module contains the app after Transactional Messaging has been added. You may refer here if you get stuck, or just want to see the end result.

Why is Transactional Messaging important? When we process related data operations, like consuming an event and saving a record into the database, these operations must be atomic to ensure data reliability and consistency. If either operation fails, both should be rolled back to avoid data loss, duplication, or corruption.

There are many ways to solve the problem of data consistency with event streaming, like two-phase commits (costly, and slow) or the Transactional Outbox Pattern. However, Transactional Messaging offers a simple solution that is easy to implement using Oracle Database.

Implementing Transactional Messaging in the kafka-app

To follow along, clone the kafka-app-step-2 module which contains the app after adding OSON support (from Part II of this series). You’ll also need an Oracle Database instance: I suggest Oracle Database Free.

Creating the database resources

Because we’ll be inserting records into the database with Transactional Messaging, we’ll need a simple schema to use, with two tables: a weather station and related weather events, where each weather station is associated with some number of weather events:

CREATE TABLE weather_station (
    id                NUMBER GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
    station_name      VARCHAR2(100) NOT NULL,
    location_lat      NUMBER(9,6), -- latitude
    location_lon      NUMBER(9,6), -- longitude
    elevation_m       NUMBER(6,2)
);

CREATE TABLE weather_event (
    id                NUMBER GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
    station_id        NUMBER NOT NULL,
    timestamp         TIMESTAMP DEFAULT SYSTIMESTAMP,
    temperature       NUMBER(5,2),
    humidity_percent  NUMBER(5,2),
    uv_index          NUMBER(3,1),
    CONSTRAINT fk_weather_station
    FOREIGN KEY (station_id)
    REFERENCES weather_station (id)
    ON DELETE CASCADE
);

Next, let’s insert a few example weather stations for our app, which will be used by our sample events. I picked a few of my favorite mountains in Oregon to serve as example stations.

INSERT INTO weather_station (station_name, location_lat, location_lon, elevation_m)
VALUES ('Mt. Hood', 45.373611, -121.695833, 3426.00);

INSERT INTO weather_station (station_name, location_lat, location_lon, elevation_m)
VALUES ('South Sister', 44.103889, -121.768056, 3157.00);

INSERT INTO weather_station (station_name, location_lat, location_lon, elevation_m)
VALUES ('Steens Mountain', 42.647500, -118.596389, 2965.00);

Let’s create a JSON Relational Duality View that comines the weather station and weather events tables into one, read-write JSON object. The duality view allows us to insert our OSON objects containing weather event and station data using a single client round-trip, letting the database server in turn handle the JSON processing and database insert.

CREATE OR REPLACE FORCE EDITIONABLE JSON RELATIONAL DUALITY VIEW weather_event_dv
 AS 
weather_event @insert {
    _id : id
    timestamp
    temperature
    humidityPercent : humidity_percent
    uvIndex : uv_index
    station: weather_station @insert
             @link (from : [STATION_ID]) {
        id
        name : station_name
    }
};

Note the use of the GraphQL syntax used when creating the JSON Relational Duality View. These views can be created over any relational data, and may written by hand or generated using tools like the SQL Developer extension for VS Code.

Updating the consumer to use transactional messaging

To implement transactional messaging, let’s write a new method, processEvent, which will be called by our consumer. This event takes a database connection and an event, and writes the event to the WEATHER_EVENT_DV JSON Relational Duality View.

private static void processEvent(Connection conn, JSONB jsonb, byte[] event) throws SQLException, IOException {
    String val = jsonb.fromOSON(event, WeatherEvent.class)
            .toString();
    System.out.println("[CONSUMER] Deserialized: " + val);
    final String sql = "INSERT INTO  WEATHER_EVENT_DV (data) VALUES (?)";
    try (PreparedStatement ps = conn.prepareStatement(sql)) {
        ps.setObject(1, event, OracleTypes.JSON);
        ps.executeUpdate();
    }
}

We then update our consumer to use the processEvent method. Note the handling of a SQLException – this is where we could implement any rollback or compensation logic should the database operation fail.

private static Future<?> startConsumer() {
    JSONB jsonb = JSONB.createDefault();
    return EXECUTOR.submit(() -> {
        Connection conn = null;
        int consumedMessages = 0;
        try (Consumer<String, byte[]> consumer = createConsumer(connectionProperties())) {
            conn = ((KafkaConsumer<?, ?>) consumer).getDBConnection();
            consumer.subscribe(List.of(TOPIC_NAME));
            while (consumedMessages < TOTAL_RECORDS) {
                ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofSeconds(3));
                for (ConsumerRecord<String, byte[]> record : records) {
                    processEvent(conn, jsonb, record.value());
                }
                consumer.commitSync();
                consumedMessages += records.count();
            }
        } catch (IOException e) {
            System.out.println("[CONSUMER] Deserialization error: " + e.getMessage());
        } catch (SQLException e) {
            // Handle rollback / error handling
            System.out.println("[CONSUMER] SQL error: " + e.getMessage());
        }
        System.out.println("[CONSUMER] Committed all messages");
    });
}

Create and view JSON weather events

Run the app again, noting that this time it will insert a record using the JSON Relational Duality View for each weather event:

mvn clean compile exec:java

We can query the JSON Relational Duality View to pretty print the JSON events:

select json_serialize(w.data pretty) as events
from weather_event_dv w;

Wrapping up

We successfully migrated an Apache Kafka app to Oracle Database Transactional Event Queues, and progressively added support for OSON serialization and JSON Relational Duality Views, and incorporated Transactional Messaging for robust event streaming.

More Code Samples and Demo Apps

Life is better with code samples — I maintain a number of Java demo apps in the oracle-database-java-samples GitHub repo — the following modules provide sample code for developing Java event-streaming applications using Oracle Database Transactional Event Queues:

Have a use case that’s not already covered? Leave a comment or reach out directly on LinkedIn and I’ll be happy to help you out.

Leave a Reply

Discover more from andersswanson.dev

Subscribe now to keep reading and get access to the full archive.

Continue reading