Oracle to Kafka — Playing with Confluent’s new Oracle CDC Source Connector in Docker

It’s really exciting to have a new option for streaming Oracle data into Kafka. With Confluent releasing their “Oracle CDC Source Premium Connector” there’s a new way to capture data that has been added to, updated, or deleted from Oracle RDBMS with Change Data Capture (CDC). Think of this as a low touch way to stream both the Oracle data and schema changes into Kafka and Schema Registry.

I put together some examples so you can play with this entire pipeline in Docker. Docker is an easy-way to run a an end to end pipeline on your machine. I hope this demo gives you a glimpse of how Oracle CDC to Kafka works.

⚠️ Some of these components are paid offerings 💰 for production use. Both the Oracle and Confluent license grant you free licence to play with this stuff as a developer for 30 days (disclaimer: I’m not a lawyer; nor do I play one on TV)

🛠 Code available at https://github.com/saubury/kafka-connect-oracle-cdc

Kafka Connect, CDC— What be this?

This connector uses the Oracle LogMiner interface to query online and archived redo log files. That’s an Oracle specific way to stream both the DML and DDL directly from the redo of the database. This is similar to how the Debezium project reads the Postgres write-ahead log or the MySQL binlog.

Why not just use the Oracle JDBC Source Connector?

The Oracle JDBC Connector is fine for getting a snapshot of a table. However, to identify changed or deleted rows with a JDBC connector is painful. Let’s just say you’ll quickly fall into a murky world of putting triggers on tables to determine what has changed … and no-one wants to do that.

The Oracle CDC Source Connector on the other hand is essentially listening to the recovery redo segment of the database. This allows for most DML (insert, update, delete) and some DDL (alter table) database events to be captured an Oracle database and produced into into Kafka and the Schema Registry.

Oracle Versions

Let’s Play

Clone this repo

git clone https://github.com/saubury/kafka-connect-oracle-cdccd kafka-connect-oracle-cdc

Get Oracle CDC Source Connector

Unzip to confluentinc-kafka-connect-oracle-cdc (and remove any trailing version numbers)

unzip ~/Downloads/confluentinc-kafka-connect-oracle-cdc-1.0.3.zipmv confluentinc-kafka-connect-oracle-cdc-1.0.3 confluentinc-kafka-connect-oracle-cdc

Get Oracle Docker

  • log into https://hub.docker.com/
  • search “oracle database”
  • click on “Oracle Database Enterprise Edition”
  • click on “Proceed to Checkout”
  • fill in your contact info on the left, check two boxes under “Developer Tier” on the right, click on “Get Content”

You should now be able to download an Oracle Docker image

docker login --username YourDockerUserName --password-stdin
<<Enter your password>>
docker pull store/oracle/database-enterprise:12.2.0.1

Docker Startup

docker-compose up -d

Setup Oracle Docker

First, ensure the database looks like it’s finished starting up (docker-compose logs -f oracle) and then run the following (for the curious, the SQL script is here )

docker-compose exec oracle /scripts/go_sqlplus.sh /scripts/oracle_setup_docker

Sample Data

The EMP table

Connector Configuration

curl -s -X GET -H 'Content-Type: application/json' http://localhost:8083/connector-plugins | jq '.'

And look for an occurrence of "class": "io.confluent.connect.oracle.cdc.OracleCdcSourceConnector"

All good — now it’s time to establish the SimpleOracleCDC connector. This configuration will initially snapshot the EMP table, and then listen to all redo activity (DML and DDL) against this table.

curl -s -X POST -H 'Content-Type: application/json' --data @SimpleOracleCDC.json http://localhost:8083/connectors | jq

Check the status of the connector. You may need to wait a minute or so for the status to show up

curl -s -X GET -H 'Content-Type: application/json' http://localhost:8083/connectors/SimpleOracleCDC/status | jq

Check topic

kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic ORCLCDB.C__MYUSER.EMP --from-beginning

Or if you don’t have the Kafka tools installed, you can launch kafka-avro-console-consumer via a container like (yes; this is all one line)

docker-compose exec kafka-connect kafka-avro-console-consumer --bootstrap-server kafka:29092 --property schema.registry.url="http://schema-registry:8081" --topic ORCLCDB.C__MYUSER.EMP --from-beginning

The (simplified) output of kafka-avro-console-consumer should look something like. Four rows initially from the table are four records in Kafka

{"I":"\u0001","NAME":{"string":"Bob"}}
{"I":"\u0002","NAME":{"string":"Jane"}}
{"I":"\u0003","NAME":{"string":"Mary"}}
{"I":"\u0004","NAME":{"string":"Alice"}}

Schema

curl -s -X GET http://localhost:8081/subjects/ORCLCDB.C__MYUSER.EMP-value/versions/1 | jq -r .schema | jq .

Amongst other things, you’ll see version 1 of the schema has been registered like this

"fields": [
{
"name": "I",
"type": {
"type": "bytes"
},
{
"name": "NAME",
"type": [
"string"
]
}

Insert, update and delete some data

Run docker-compose exec oracle /scripts/go_sqlplus.sh followed by this SQL

insert into C##MYUSER.emp (name) values ('Dale');
insert into C##MYUSER.emp (name) values ('Emma');
update C##MYUSER.emp set name = 'Robert' where name = 'Bob';
delete C##MYUSER.emp where name = 'Jane';
commit;
exit

Updated Sample Data

Updated EMP table

The (simplified) output of kafka-avro-console-consumer should look something like

{"I":"\u0005","NAME":{"string":"Dale"},"op_type":{"string":"I"}}
{"I":"\u0006","NAME":{"string":"Emma"},"op_type":{"string":"I"}}
{"I":"\u0001","NAME":{"string":"Robert"},"op_type":{"string":"U"}}
{"I":"\u0002","NAME":{"string":"Jane"},"op_type":{"string":"D"}

DDL

Run docker-compose exec oracle /scripts/go_sqlplus.sh followed by this SQL

ALTER TABLE C##MYUSER.EMP ADD (SURNAME VARCHAR2(100));insert into C##MYUSER.emp (name, surname) values ('Mickey', 'Mouse');
commit;

Updated Sample Data

DDL applied to EMP table

Schema mutation

curl -s -X GET http://localhost:8081/subjects/ORCLCDB.C__MYUSER.EMP-value/versions

Will show [1,2] — indicating two versions of the schema are available. We have data registered against version 1 and version 2 of the schema

curl -s -X GET http://localhost:8081/subjects/ORCLCDB.C__MYUSER.EMP-value/versions/2 | jq -r .schema | jq .

Note, schema version 2 has this addition

{
"name": "SURNAME",
"type": [
"null",
"string"
],
"default": null
}

Tear Down

docker-compose down

Conclusion

Oh, and Docker is awesome !

References

Day job: data steaming & system architecture. Night gig: IoT and random project hacking

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store