Authenticate to your Oracle Database like it’s a Kafka cluster

There’s a Kafka interface for Oracle Database, and it works just like the kafka-clients Java API many developers are familiar with. The difference? You don’t manage or pay for a separate Kafka cluster. You use the database’s built-in pub/sub capabilities.

Kafka Java Client for Oracle Database Transactional Event Queues (okafka)

The okafka library implements Kafka Java APIs for Oracle Database Transactional Event Queues (TxEventQ), an event streaming system that’s built into the database. okafka uses the same Kafka Java interfaces as kafka-clients, but connects to Oracle Database TxEventQ instead of a Kafka cluster.

And what’s Oracle Database TxEventQ? TxEventQ is the evolution of queuing in the database, supporting high-throughput event-driven workflows. Expect the same features you’re used to in other event streaming implementation: multiple consumers, delivery guarantees, and additional database features like SQL interoperability and transactional messaging.

okafka is available as part of Oracle Database 23ai!

Setting up a Java project for okafka

Add the Kafka Java Client for Oracle Database Transactional Event Queues dependency to your project. If you’re using Maven:

<dependency>
    <groupId>com.oracle.database.messaging</groupId>
    <artifactId>okafka</artifactId>
    <version>23.7.0.0</version>
</dependency>

Or, if you’re using Gradle:

implementation "com.oracle.database.messaging:okafka:23.6.1.0"

Next, set up database permissions for your user and TxEventQ. The following permissions are recommended for a user to manage topics, produce and consume messages:

-- okafka permissions
-- AQ User role to be able to use AQ
grant aq_user_role to TESTUSER;
-- To be able to invoke operations from AQ-JMS
grant execute on dbms_aq to  TESTUSER;
-- To be able to create transactional event queue and subscriber
grant execute on dbms_aqadm to TESTUSER;
-- To be able to discover other RAC nodes of the database
grant select on sys.gv_$session to TESTUSER;
grant select on sys.v_$session to TESTUSER;
grant select on sys.gv_$instance to TESTUSER;
grant select on sys.gv_$listener_network to TESTUSER;
grant select on SYS.DBA_RSRC_PLAN_DIRECTIVES to TESTUSER;
grant select on sys.gv_$pdbs to TESTUSER;
-- Remove user_queue_parittion_assignment_table grant if on ADB
grant select on user_queue_partition_assignment_table to TESTUSER;
exec dbms_aqadm.GRANT_PRIV_FOR_RM_PLAN('TESTUSER');
commit;

Authenticating from Java code

We can authenticate to Oracle Database using a Properties object, similar to Kafka. Take the following snippet from AuthenticationExample.java: It instantiates a properties object with Oracle Database specific connection parameters. This snippet is applicable to plaintext, TLS, and mTLS connections:

// Just like kafka-clients, we can use a Java Properties object to configure connection parameters.
Properties props = new Properties();

// oracle.service.name is a custom property to configure the Database service name.
props.put("oracle.service.name", tnsAdmin);
// oracle.net.tns_admin is a custom property to configure the directory containing Oracle Database connection files.
// If you are using mTLS authentication, client certificates must be present in this directory.
props.put("oracle.net.tns_admin", walletDir);
// security.protocol is a standard Kafka property, set to PLAINTEXT or SSL for Oracle Database.
// (SASL is not supported with Oracle Database).
props.put("security.protocol", securityProtocol);
if (securityProtocol.equals("SSL")) {
    // For SSL authentication, pass the TNS alias (such as "mydb_tp") to be used from the tnsnames.ora file
    // found in the WALLET_DIR directory.
    props.put("tns.alias", tnsAdmin);
} else {
    // For PLAINTEXT authentication, we provide the database URL in the format
    // HOSTNAME:PORT as the bootstrap.servers property.
    props.put("bootstrap.servers", bootstrapServers);
}

You can use this properties object with an AdminClient, KafkaProducer, or KafkaConsumer from the okafka library to connect to Oracle Database. Here’s an example creating a topic in Oracle Database. Note explicit imports, showing we’re using okafka for the concrete implementations, while relying on the Kafka Java interfaces:

// Using our connection properties, let's create a Kafka admin client connected to Oracle Database.
// The fully qualified types are provided for illustrative purposes:
// The org.oracle.okafka.clients.admin.AdminClient.create method creates a KafkaAdminClient for
// Oracle Database implementing the org.apache.kafka.clients.admin.Admin interface.
try (org.apache.kafka.clients.admin.Admin admin = org.oracle.okafka.clients.admin.AdminClient.create(props)) {
    // Note that the replication factor is 0. Replication of topic data in Oracle Database is handled
    // by external database controls, not the message broker.
    NewTopic topic = new NewTopic("authentication_example", 5, (short) 0);
    // Create the topic using standard kafka-clients APIs.
    admin.createTopics(List.of(topic))
            .all()
            .get();
}

okafka and Oracle Wallet

okafka uses Oracle Wallet with JDBC to authenticate to the database. Oracle wallets are secure containers for credentials and connection metadata. They’re commonly used to support SSL and mTLS connections to the database This means at minimum for plaintext connections, you need a directory containing an ojdbc.properties file with your database username and password:

user = testuser
password = Welcome12345

If you’re using mTLS, this directory must contain additional Oracle wallet files, like client certificates and keys stored in JKS (Java KeyStore) format. Typically you will download this from your Database instance (like ADB) or generate it using Oracle PKI tools. An ojdbc.properties file for mTLS using JKS will look something like this:

# Connection property while using Oracle wallets.
user=<database username>
password=<database password>
#oracle.net.wallet_location=(SOURCE=(METHOD=FILE)(METHOD_DATA=(DIRECTORY=${TNS_ADMIN})))
# FOLLOW THESE STEPS FOR USING JKS
# (1) Uncomment the following properties to use JKS.
# (2) Comment out the oracle.net.wallet_location property above
# (3) Set the correct password for both trustStorePassword and keyStorePassword.
# It's the password you specified when downloading the wallet from OCI Console or the Service Console.
javax.net.ssl.trustStore=${TNS_ADMIN}/truststore.jks
javax.net.ssl.trustStorePassword=<wallet password>
javax.net.ssl.keyStore=${TNS_ADMIN}/keystore.jks
javax.net.ssl.keyStorePassword=<wallet password>

If you’re new to Oracle wallet/mTLS, I suggest reading 🔐 What’s an Oracle wallet, anyway?

Looking for resources to get started?

You can also browse this site using the TxEventQ tag to find more samples.

Got questions? Write a comment or drop me a line on LinkedIn.

Leave a Reply

Discover more from andersswanson.dev

Subscribe now to keep reading and get access to the full archive.

Continue reading