This is the second post in a three-part series about migrating Apache Kafka applications to Oracle Database Transactional Event Queues.
If you’re just getting started, start at Part I, Migrating an Apache Kafka Application to Oracle Database before continuing with this guide.
In this guide, we’ll add support for OSON to the application from Part I, using the migrated Oracle Database TxEventQ as a starting point. I recommend cloning this module, or following along on your own.
Working with OSON
OSON is Oracle’s optimized binary JSON format, used by the JSON data type and related database functionality. We’ll configure the Kafka app that we migrated to Oracle Database in Part I of this series to use OSON as the message format for records produced and consumed from topics.
Add the JSON Collections Dependency
To implement OSON support, we’ll add the Oracle JSON collections dependency to our project. This package includes several classes to help leverage OSON in our Java apps, including the JSONB serialization utility.
Note that we exclude the spring boot starter dependency – we aren’t using Spring in this example, only the Oracle JSON bits.
<dependency>
<groupId>com.oracle.database.spring</groupId>
<artifactId>oracle-spring-boot-starter-json-collections</artifactId>
<version>25.3.0</version>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</exclusion>
</exclusions>
</dependency>Updating the Producer to use OSON
Next, we’ll implement a Kafka OSON serializer, so our Kafka producer can write Java objects as OSON when sending messages to a topic. Our OSON serializer will use the JSONB utility class, which includes methods to convert Java objects both to and from OSON:
import com.oracle.spring.json.jsonb.JSONB;
import org.apache.kafka.common.serialization.Serializer;
public class OSONSerializer<T> implements Serializer<T> {
private final JSONB jsonb;
public OSONSerializer(JSONB jsonb) {
this.jsonb = jsonb;
}
@Override
public byte[] serialize(String s, T obj) {
return jsonb.toOSON(obj);
}
}Let’s update the createProducer method to use the new OSONSerializer, modifying our producer so WeatherEvent objects are written as OSON.
private static Producer<String, WeatherEvent> createProducer(Properties props) {
props.put("enable.idempotence", "true");
JSONB jsonb = JSONB.createDefault();
Serializer<String> keySerializer = new StringSerializer();
Serializer<WeatherEvent> valueSerializer = new OSONSerializer<>(jsonb);
return new KafkaProducer<>(props, keySerializer, valueSerializer);
}
The startProducer method must also be updated to set WeatherEvent as the ProducerRecord value:
private static Future<?> startProducer() {
return EXECUTOR.submit(() -> {
try (Producer<String, WeatherEvent> producer = createProducer(connectionProperties())) {
for (WeatherEvent event : WeatherEvent.getSampleEvents()) {
ProducerRecord<String, WeatherEvent> record = new ProducerRecord<>(TOPIC_NAME, event);
producer.send(record);
System.out.println("[PRODUCER] Serializing: " + event.toString());
}
}
System.out.println("[PRODUCER] Produced all messages");
});
}Updating the consumer to use OSON
Next, we update the createConsumer method to use a byte array instead of a string. This byte array will be the OSON representation of WeatherEvent objects sent by the producer.
We intentionally keep the event in binary format, as we’ll add Transactional Messaging support using OSON in the next post in the series.
private static Consumer<String, byte[]> createConsumer(Properties props) {
props.put("group.id" , "MY_CONSUMER_GROUP");
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "earliest");
props.put("max.poll.records", 50);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
return new KafkaConsumer<>(props);
}The startConsumer method is also updated to handle binary OSON data in byte array format. For example purposes, we still print weather event’s string representation to the console.
private static Future<?> startConsumer() {
JSONB jsonb = JSONB.createDefault();
return EXECUTOR.submit(() -> {
int consumedMessages = 0;
try (Consumer<String, byte[]> consumer = createConsumer(connectionProperties())) {
consumer.subscribe(List.of(TOPIC_NAME));
while (consumedMessages < TOTAL_RECORDS) {
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofSeconds(3));
for (ConsumerRecord<String, byte[]> record : records) {
String val = jsonb.fromOSON(record.value(), WeatherEvent.class)
.toString();
System.out.println("[CONSUMER] Deserialized: " + val);
}
consumer.commitSync();
consumedMessages += records.count();
}
} catch (IOException e) {
System.out.println("[CONSUMER] Deserialization error: " + e.getMessage());
}
System.out.println("[CONSUMER] Consumed all messages");
});
}What’s next?
In this next post in this series, we’ll add Transactional Messaging to the app: the consumer will run database operations using the same, atomic transaction that it uses to receive messages.
More Code Samples and Demo Apps
Life is better with code samples — I maintain a number of Java demo apps in the oracle-database-java-samples GitHub repo — the following modules provide sample code for developing Java event-streaming applications using Oracle Database Transactional Event Queues:
- JDBC Producer/Consumer example
- JMS Queue/Topic examples
- Spring JMS example
- Oracle Kafka Java Client APIs
Have a use case that’s not already covered? Leave a comment or reach out directly on LinkedIn and I’ll be happy to help you out.

Leave a Reply