Migrate Apache Kafka applications to Oracle Database: Part I

The Kafka Java client for Oracle Database, displayed as an xml dependency and github link

In this article, we’ll walk through migrating a simple Apache Kafka Java application to Oracle Database Transactional Event Queues (TxEventQ) using example code from the migrate-kafka-to-oracle sample module on GitHub.

If you’re not familiar, TxEventQ is an event-streaming system that functions similarly to Apache Kafka — but with a few key differences:

  • TxEventQ is built into Oracle Database, and backed by Oracle’s table architecture, effectively making Oracle Database the “broker” from Apache Kafka.
  • Your event data is co-located with the rest of your data, backed by Oracle’s table architecture, and may be operated on with native SQL.
  • Message operations in TxEventQ are provided through database transactions, meaning produce and consume are atomic and may be combined database operations like inserts or updates.

Now, why would you want to migrate an Apache Kafka application to Oracle Database TxEventQ?

  1. Combining messaging with database transactions allows powerful, simple data operations that commit or rollback atomically. It can be difficult to successfully implement patterns like this without the risk of data corruption or loss.
  2. You are looking to simplify your distributed systems architecture: By using a database-native messaging system, you have one less set of services to install, maintain, and upgrade.
  3. Co-location of messaging data in a shared data platform opens up access patterns without implementing cross-query responsibilities, data sinks, or change data capture: If all my data lives in one platform, that data becomes easily accessible using common authentication patterns and query language(s).

If you get stuck anywhere in this article the kafka-app-step-1 module. contains a migrated version of the app configured to use an Oracle Database Free container.

See also; Part II, adding OSON support as the producer-consumer serialization format.

Illustration comparing a complex data platform featuring various databases and Apache Kafka on the left, with a simple converged database architecture on the right.

OK, let’s migrate that Kafka App to TxEventQ

First, let’s start with a basic Kafka app —a sample application that creates a topic, produces 10 messages, and consumes those 10 messages. If you’ve written Kafka code before, this code probably looks very familiar.

While simple, the app uses 3 key Kafka client APIs: The Admin, for managing topics, the Producer, to send messages, and the Consumer, to receive messages.

package com.example.kafka;

import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.TopicExistsException;

import static java.util.concurrent.Executors.newVirtualThreadPerTaskExecutor;

public class KafkaApp {
    private static final String TOPIC_NAME = "test_topic";
    private static final int TOTAL_RECORDS = 10;
    private static final ExecutorService EXECUTOR = newVirtualThreadPerTaskExecutor();

    private static Properties connectionProperties() {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9092");
        props.setProperty("security.protocol", "PLAINTEXT");
        return props;
    }

    private static Admin createAdmin(Properties props) {
        return Admin.create(props);
    }

    private static Producer<String, String> createProducer(Properties props) {
        props.put("enable.idempotence", "true");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return new KafkaProducer<>(props);
    }

    private static Consumer<String, String> createConsumer(Properties props) {
        props.put("group.id" , "MY_CONSUMER_GROUP");
        props.put("enable.auto.commit", "false");
        props.put("auto.offset.reset", "earliest");
        props.put("max.poll.records", 50);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return new KafkaConsumer<>(props);
    }

    private static void createTopic() {
        NewTopic testTopic = new NewTopic(TOPIC_NAME, 1, (short) 1);
        try (Admin admin = createAdmin(connectionProperties())) {
            admin.createTopics(List.of(testTopic))
                    .all()
                    .get();
            System.out.println("[ADMIN] Created topic: " + testTopic.name());
        } catch (ExecutionException | InterruptedException e) {
            if (e.getCause() instanceof TopicExistsException) {
                System.out.println("[ADMIN] Topic already exists");
            } else {
                throw new RuntimeException(e);
            }
        }
    }

    private static Future<?> startProducer() {
        // Start the producer, which sends 10 total messages (value of TOTAL_RECORDS).
        return EXECUTOR.submit(() -> {
            try (Producer<String, String> producer = createProducer(connectionProperties())) {
                for (WeatherEvent event : WeatherEvent.getSampleEvents()) {
                    String message = event.toString();
                    ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, message);
                    producer.send(record);
                    System.out.println("[PRODUCER] Sent: " + message);
                }
            }
            System.out.println("[PRODUCER] Produced all messages");
        });
    }

    private static Future<?> startConsumer() {
        return EXECUTOR.submit(() -> {
            int consumedMessages = 0;
            try (Consumer<String, String> consumer = createConsumer(connectionProperties())) {
                consumer.subscribe(List.of(TOPIC_NAME));
                while (consumedMessages < TOTAL_RECORDS) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(3));
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.println("[CONSUMER] Received: " + record.value());
                    }
                    consumer.commitSync();
                    consumedMessages += records.count();
                }
            }
            System.out.println("[CONSUMER] Consumed all messages");
        });
    }

    public static void main(String... args) throws Exception {
        createTopic();
        System.out.println("[MAIN] Started consumer");
        Future<?> consumerTask = startConsumer();
        System.out.println("[MAIN] Started producer");
        Future<?> producerTask = startProducer();

        producerTask.get();
        consumerTask.get();
        System.out.println("[MAIN] Done!");
    }
}

The app expects a Kafka broker listening on localhost:9092 . To run the app locally, you can start an Apache Kafka container like so:

docker run -p 9092:9092 apache/kafka:4.0.0

Then, from the kafka-app directory, start the app using Maven (Java 21+ required):

mvn clean compile exec:java

You should see a printout displaying the messages produced and consumed.

Now, let’s apply the changes to use TxEventQ

First, let’s replace kafka-clients dependency in the pom.xml with Oracle’s Kafka Java Clients library for TxEventQ (OKafka). This library includes Java implementations of Apache Kafka interfaces for Oracle Database, allowing developers to use kafka-clients APIs with TxEventQ

<!-- OKafka All-in-one -->
<dependency>
    <groupId>com.oracle.database.messaging</groupId>
    <artifactId>okafka</artifactId>
    <version>23.6.0.0</version>
</dependency>

Next, let’s update any Kafka client classes with OKafka specific versions. These classes implement the org.apache.kafka.clients interfaces with API compatibility for methods like producer.send and consumer.poll :

// Use org.oracle.okafka.clients implementations for kafka-clients
import org.oracle.okafka.clients.admin.AdminClient;
import org.oracle.okafka.clients.consumer.KafkaConsumer;
import org.oracle.okafka.clients.producer.KafkaProducer;

Finally, we need to update our connection parameters to use Oracle Database instead of a Kafka Broker. Note that the oracle.net.tns_admin property should point to the directory containing the Oracle Database wallet. If you’re not using a wallet, this directory should contain an ojdbc.properties file with the user and password to connect to the database

private static Properties connectionProperties() {
    Properties props = new Properties();
    props.setProperty("bootstrap.servers", "localhost:1521");
    props.setProperty("security.protocol", "PLAINTEXT");
    // Database service name / TNS Alias
    props.put("oracle.service.name", "freepdb1");
    // If using Oracle Database wallet, pass wallet directory
    String resourcesDir = new File(TxEventQApp.class.getClassLoader().getResource("").getFile())
            .getAbsolutePath();
    props.put("oracle.net.tns_admin", resourcesDir);
    return props;
}

The ojdbc.properties file should look something like this, and can be placed in the app’s src/main/resources directory, where the app is configured to pick it up:

user = testuser
password = testpwd

However, if you’re using SSL as the security protocol instead of PLAINTEXT, use the following connection code for Oracle Database using a database wallet (TLS or mTLS enabled):

private static Properties connectionProperties() {
    Properties props = new Properties();
    props.setProperty("security.protocol", "SSL");
    // TNS Alias
    props.put("oracle.service.name", "mydb_tp");
    // If using Oracle Database wallet, pass wallet directory
    String resourcesDir = new File(TxEventQApp.class.getClassLoader().getResource("").getFile())
            .getAbsolutePath();
    props.put("oracle.net.tns_admin", resourcesDir);
    return props;
}

To run the migrated app, start an Oracle Database Free container:

docker run --name oracledb -d -p 1521:1521 -e ORACLE_PASSWORD=testpwd gvenzl/oracle-free:23.26.0-slim-faststart

Next, run the testuser.sql script against the database as sysdba in the freepdb1 container. This script creates a database user for the app with all necessary grants to create topics and produce/consume messages.

Then, from the kafka-app directory (or the kafka-app-step-1 directory, which contains already migrated code), start the app using Maven:

mvn clean compile exec:java

If all goes well, you should see output similar (or exactly the same) to when you ran the Kafka app! If you got stuck, refer to the kafka-app-step-1 module for the complete, migrated code.

Congratulations, you just migrated an Apache Kafka application to Oracle Database Transactional Event Queues!

What’s next?

In Part II, we’ll add OSON support as the producer-consumer serialization format. Part III (TBD) will add Transactional Messaging and JSON Relational Duality Views.

More Code Samples and Demo Apps

Life is better with code samples — I maintain a number of Java demo apps in the oracle-database-java-samples GitHub repo — the following modules provide sample code for developing Java event-streaming applications using Oracle Database Transactional Event Queues:

Looking for help? Leave a comment or reach out directly on LinkedIn and I’ll be happy to help you out.

Leave a Reply

Discover more from andersswanson.dev

Subscribe now to keep reading and get access to the full archive.

Continue reading