Event streaming is extremely popular in modern application architectures, providing high-throughput, real-time data flow in microservices architectures.
However, ensuring data consistency in distributed systems is challenging — Traditional two-phase commit (2PC) implementations can be costly and slow, making them impractical for high-throughput environments.
In this article, we’ll solve data consistency challenges in event streaming systems using Oracle Database Transactional Event Queues, and Transactional Messaging: a simplified version of the Transactional Outbox Pattern.

What is an Event Streaming System?
Event Streaming systems facilitate communication between distributed systems, acting as an intermediary for asynchronous message exchange”.
Specifically, event streaming systems have several key functions:
- Facilitate asynchronous communication between services.
- Enforce conformant message formats.
- Route messages with delivery guarantees.
- Persist messages on reliable, durable storage.
- Support scalable, high-throughput messaging across network and disk I/O.
There are many event streaming implementations on the market today, like Apache Kafka, RabbitMQ, NATS, Oracle Database Transactional Event Queues, and others.
When do you use an Event Streaming System?
Synchronous protocols like REST or gRPC are highly effective for stateless services that are consumed by clients over the internet.
However, synchronous protocols are often tightly coupled with participating services that must be available for the duration of a client request.
Tightly coupled services are typical more challenging to scale, and can amplify failures that cause cascading outages.
In contrast, an event streaming system enables fault-tolerant, decoupled asynchronous workflows. Event-driven workflows improve resilience against delays and service failures. Decoupling allows for retryable processing by independent, scalable components.
However, Event Streaming systems introduce added complexity in implementation, monitoring, and DevOps management. Many service architectures use synchronous communication for public interfaces, and asynchronous communication for internal, event-driven architectures.
How are Messages Sent and Received?
- Producers publish messages to a topic, which contains an ordered log or queue of messages.
- Messages are stored persistently in a database, log file, or other disk-based mechanism by the event streaming system.
- One or more Consumers subscribe to topics, reading messages from the event streaming system.
Throughout the process, end-to-end communication remains asynchronous, allowing producers and consumers to retry communication.

The Transactional Outbox Pattern
Applications that perform database operations and send/receive messages must devise a method of ensuring data consistency. On failure, both send/receive and DML are rolled back.
Without data consistency, increasingly costly data divergence or loss may occur.
One method of providing consistency is through transactional messaging, a solution to the Transactional Outbox Pattern. In event streaming systems like Oracle Database Transactional Event Queues, you may produce or consume a message in the same database transaction as any other database operation.

The following Java code uses the Kafka Java Client for Oracle Database Transactional Event Queues (okafka) to set up a message consumer that reads from a topic and processes each message within a database transaction.
The following code sample uses Transactional Messaging to consume an event and run a SQL query atomically:
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(
oracleDatabaseProps
);
consumer.subscribe(List.of("topic1"));
while (true) {
try {
// 1. Poll a batch of records from the subscribed topics.
// This poll is part of the current transaction.
ConsumerRecords<String, String> records = consumer.poll(
Duration.ofMillis(100)
);
System.out.println("Consumed records: " + records.count());
// 2. Get the consumer's current database connection.
Connection conn = consumer.getDBConnection();
for (ConsumerRecord<String, String> record : records) {
// 3. Perform a database operation (DML) with each record.
processRecord(record, conn);
}
// 4. Do a blocking commit on the current batch of records.
// For non-blocking, use commitAsync()
consumer.commitSync();
} catch (Exception e) {
// 5. If auto-commit is disabled, will not be committed until
// commitSync() or commitAsync() are called.
System.out.println("Unexpected error processing records. Aborting transaction!");
// Rollback current transaction, including message consumption
// and DML.
consumer.getDBConnection().rollback();
}
}Breaking Down the Code
- Create an org.oracle.okafka.clients.consumer.KafkaConsumer instance: A consumer is initialized with a Java Properties object using connection information for Oracle Database.
- Subscribe to a topic: The consumer subscribes to
topic1so it may receive messages from this topic. - Poll for records: The consumer calls
poll(Duration.ofMillis(100))to fetch new messages from the topic in batches. Records are retrieved and printed to the console. - Obtain the current database connection: The consumer retrieves a database connection from
consumer.getDBConnection(). - Process each record in the database: The method
processRecord(record, conn)executes a database operation (DML statement like INSERT, UPDATE, DELETE) for each consumed message. - Commit transaction: After processing all records, the consumer commits both message consumption and database changes using
consumer.commitSync(). If an asynchronous commit is preferred,commitAsync()could be used instead. - Handle errors and roll back: If an exception occurs, an error message is logged, and
conn.rollback()is called. Rollback ensures that neither message consumption nor database changes take effect, preventing potentially disastrous data inconsistencies.
Final Thoughts
The transactional messaging pattern is simple to implement with a relational database, and ensures exactly-once message processing.
When message consumption and database updates are part of the same transaction, they become atomic. If a processing fails, neither message offsets nor database changes are committed, and data integrity is preserved.
References
Want to see this in action? These resources use Oracle Database Free to implement transactional messaging and address the 2PC problem:
- Combine message operations with DML in the same commit (PL/SQL)
- Using Transactional Kafka APIs with Oracle Database (Java)
- More articles on TxEventQ
- Oracle Database’s open-source Kafka compatible API
Questions? Leave a comment here, or connect with me on LinkedIn.

Leave a Reply