This article is the third post in a three-part series about progressively migrating Apache Kafka applications to Oracle Database Transactional Event Queues, adding advanced database features along the way..
- If you’re just getting started, start at Part I, Migrating an Apache Kafka Application to Oracle Database before continuing with this guide.
- If you’ve read Part I but not Part II, see Part II: Adding OSON Support
In this guide, we’ll add Transactional Messaging to our app. Transactional Messaging is a robust messaging pattern where consumers and/or producers combine event streaming with database operations (DML) in atomic database transactions. In this article, we’ll use Kafka Java Client for Oracle Database Transactional Event Queues to implement transactional messaging capabilities.
You can also refer to the checkpointed code for reference:
- The kafka-app-step-2 module contains the app with Oracle Database and OSON support. You should start here for this guide.
- The kafka-app-step-3 module contains the app after Transactional Messaging has been added. You may refer here if you get stuck, or just want to see the end result.
Why is Transactional Messaging important? When we process related data operations, like consuming an event and saving a record into the database, these operations must be atomic to ensure data reliability and consistency. If either operation fails, both should be rolled back to avoid data loss, duplication, or corruption.
There are many ways to solve the problem of data consistency with event streaming, like two-phase commits (costly, and slow) or the Transactional Outbox Pattern. However, Transactional Messaging offers a simple solution that is easy to implement using Oracle Database.
Implementing Transactional Messaging in the kafka-app
To follow along, clone the kafka-app-step-2 module which contains the app after adding OSON support (from Part II of this series). You’ll also need an Oracle Database instance: I suggest Oracle Database Free.
Creating the database resources
Because we’ll be inserting records into the database with Transactional Messaging, we’ll need a simple schema to use, with two tables: a weather station and related weather events, where each weather station is associated with some number of weather events:
CREATE TABLE weather_station (
id NUMBER GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
station_name VARCHAR2(100) NOT NULL,
location_lat NUMBER(9,6), -- latitude
location_lon NUMBER(9,6), -- longitude
elevation_m NUMBER(6,2)
);
CREATE TABLE weather_event (
id NUMBER GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
station_id NUMBER NOT NULL,
timestamp TIMESTAMP DEFAULT SYSTIMESTAMP,
temperature NUMBER(5,2),
humidity_percent NUMBER(5,2),
uv_index NUMBER(3,1),
CONSTRAINT fk_weather_station
FOREIGN KEY (station_id)
REFERENCES weather_station (id)
ON DELETE CASCADE
);Next, let’s insert a few example weather stations for our app, which will be used by our sample events. I picked a few of my favorite mountains in Oregon to serve as example stations.
INSERT INTO weather_station (station_name, location_lat, location_lon, elevation_m)
VALUES ('Mt. Hood', 45.373611, -121.695833, 3426.00);
INSERT INTO weather_station (station_name, location_lat, location_lon, elevation_m)
VALUES ('South Sister', 44.103889, -121.768056, 3157.00);
INSERT INTO weather_station (station_name, location_lat, location_lon, elevation_m)
VALUES ('Steens Mountain', 42.647500, -118.596389, 2965.00);Let’s create a JSON Relational Duality View that comines the weather station and weather events tables into one, read-write JSON object. The duality view allows us to insert our OSON objects containing weather event and station data using a single client round-trip, letting the database server in turn handle the JSON processing and database insert.
CREATE OR REPLACE FORCE EDITIONABLE JSON RELATIONAL DUALITY VIEW weather_event_dv
AS
weather_event @insert {
_id : id
timestamp
temperature
humidityPercent : humidity_percent
uvIndex : uv_index
station: weather_station @insert
@link (from : [STATION_ID]) {
id
name : station_name
}
};Note the use of the GraphQL syntax used when creating the JSON Relational Duality View. These views can be created over any relational data, and may written by hand or generated using tools like the SQL Developer extension for VS Code.
Updating the consumer to use transactional messaging
To implement transactional messaging, let’s write a new method, processEvent, which will be called by our consumer. This event takes a database connection and an event, and writes the event to the WEATHER_EVENT_DV JSON Relational Duality View.
private static void processEvent(Connection conn, JSONB jsonb, byte[] event) throws SQLException, IOException {
String val = jsonb.fromOSON(event, WeatherEvent.class)
.toString();
System.out.println("[CONSUMER] Deserialized: " + val);
final String sql = "INSERT INTO WEATHER_EVENT_DV (data) VALUES (?)";
try (PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setObject(1, event, OracleTypes.JSON);
ps.executeUpdate();
}
}We then update our consumer to use the processEvent method. Note the handling of a SQLException – this is where we could implement any rollback or compensation logic should the database operation fail.
private static Future<?> startConsumer() {
JSONB jsonb = JSONB.createDefault();
return EXECUTOR.submit(() -> {
Connection conn = null;
int consumedMessages = 0;
try (Consumer<String, byte[]> consumer = createConsumer(connectionProperties())) {
conn = ((KafkaConsumer<?, ?>) consumer).getDBConnection();
consumer.subscribe(List.of(TOPIC_NAME));
while (consumedMessages < TOTAL_RECORDS) {
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofSeconds(3));
for (ConsumerRecord<String, byte[]> record : records) {
processEvent(conn, jsonb, record.value());
}
consumer.commitSync();
consumedMessages += records.count();
}
} catch (IOException e) {
System.out.println("[CONSUMER] Deserialization error: " + e.getMessage());
} catch (SQLException e) {
// Handle rollback / error handling
System.out.println("[CONSUMER] SQL error: " + e.getMessage());
}
System.out.println("[CONSUMER] Committed all messages");
});
}Create and view JSON weather events
Run the app again, noting that this time it will insert a record using the JSON Relational Duality View for each weather event:
mvn clean compile exec:java
We can query the JSON Relational Duality View to pretty print the JSON events:
select json_serialize(w.data pretty) as events
from weather_event_dv w;Wrapping up
We successfully migrated an Apache Kafka app to Oracle Database Transactional Event Queues, and progressively added support for OSON serialization and JSON Relational Duality Views, and incorporated Transactional Messaging for robust event streaming.
More Code Samples and Demo Apps
Life is better with code samples — I maintain a number of Java demo apps in the oracle-database-java-samples GitHub repo — the following modules provide sample code for developing Java event-streaming applications using Oracle Database Transactional Event Queues:
- JDBC Producer/Consumer example
- JMS Queue/Topic examples
- Spring JMS example
- Oracle Kafka Java Client APIs

Leave a Reply