You need to build the Oracle docker image first (🤞maybe one day it’ll be available from a Docker repository like Docker Hub w/out any login required 🤞).
Start Oracle:
docker-compose up -d oracle
Wait for Oracle DB to be up (takes several minutes to instantiate)
grep -q "DATABASE IS READY TO USE!" <(docker logs -f oracle)
echo -e "$(date) Installing rlwrap on Oracle container"
docker exec -it -u root oracle bash -c "rpm -Uvh https://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm && yum install -y rlwrap"
Verify that the XStream capture process is running:
docker exec -it oracle /opt/oracle/scripts/startup/04_check_capture.sh
Expected output:
Session XStream
Serial Operating System Program
XStream Component Session ID Number Process ID Name
------------------------------ ---------- --------- ----------------- -------
DBZXOUT - Apply Reader 1 40349 60836 AS02
DBZXOUT - Apply Coordinator 380 31272 60834 AP01
DBZXOUT - Propagation Send/Rcv 619 10409 55847 CX02
DBZXOUT - Apply Server 631 17119 60838 AS00
CAP$_DBZXOUT_1 - Capture 750 18993 55845 CP01
If you don’t see these running (particularly the Capture
) then refer to https://github.com/confluentinc/demo-scene/blob/master/no-more-silos-oracle/debezium-xstream-system-output.adoc
Start the rest of the stack
docker-compose up -d
Check that Kafka Connect is running:
bash -c ' \
echo -e "\n\n=============\nWaiting for Kafka Connect to start listening on localhost ⏳\n=============\n"
while [ $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) -ne 200 ] ; do
echo -e "\t" $(date) " Kafka Connect listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) " (waiting for 200)"
sleep 5
done
echo -e $(date) "\n\n--------------\n\o/ Kafka Connect is ready! Listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) "\n--------------\n"
'
Check that required connectors are loaded
curl -s localhost:8083/connector-plugins|jq '.[].class'|egrep 'OracleConnector|JdbcSinkConnector|DatagenConnector'
"io.confluent.connect.jdbc.JdbcSourceConnector"
"io.confluent.kafka.connect.datagen.DatagenConnector"
"io.debezium.connector.oracle.OracleConnector"
Start sqlplus prompt
docker exec -it oracle bash -c 'sleep 1;rlwrap sqlplus Debezium/dbz@localhost:1521/ORCLPDB1'
COL FIRST_NAME FOR A15
COL LAST_NAME FOR A15
COL ID FOR 999
COL CREATE_TS FOR A29
COL UPDATE_TS FOR A29
SET LINESIZE 200
SELECT ID, FIRST_NAME, LAST_NAME, CREATE_TS, UPDATE_TS FROM CUSTOMERS;
ID FIRST_NAME LAST_NAME CREATE_TS UPDATE_TS
---- --------------- --------------- ----------------------------- -----------------------------
1 Rica Blaisdell 04-DEC-18 08.22.32.933376 PM 04-DEC-18 08.22.32.000000 PM
2 Ruthie Brockherst 04-DEC-18 08.22.32.953342 PM 04-DEC-18 08.22.32.000000 PM
3 Mariejeanne Cocci 04-DEC-18 08.22.32.965713 PM 04-DEC-18 08.22.32.000000 PM
4 Hashim Rumke 04-DEC-18 08.22.32.977417 PM 04-DEC-18 08.22.32.000000 PM
5 Hansiain Coda 04-DEC-18 08.22.32.979967 PM 04-DEC-18 08.22.32.000000 PM
There are two approaches - query-based CDC, and log-based CDC. Let’s try both, and examine the differences below.
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-oracle-jdbc-02/config/ \
-d '{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:oracle:thin:@oracle:1521/ORCLPDB1",
"connection.user":"Debezium",
"connection.password":"dbz",
"numeric.mapping":"best_fit",
"mode":"timestamp",
"poll.interval.ms":"1000",
"validate.non.null":"false",
"table.whitelist":"CUSTOMERS",
"timestamp.column.name":"UPDATE_TS",
"topic.prefix":"ora-",
"transforms": "addTopicSuffix,InsertTopic,InsertSourceDetails,copyFieldToKey,extractValuefromStruct",
"transforms.InsertTopic.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertTopic.topic.field":"messagetopic",
"transforms.InsertSourceDetails.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertSourceDetails.static.field":"messagesource",
"transforms.InsertSourceDetails.static.value":"JDBC Source Connector from Oracle on asgard",
"transforms.addTopicSuffix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.addTopicSuffix.regex":"(.*)",
"transforms.addTopicSuffix.replacement":"$1-jdbc-02",
"transforms.copyFieldToKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.copyFieldToKey.fields":"ID",
"transforms.extractValuefromStruct.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractValuefromStruct.field":"ID"
}'
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-oracle-dbz-xstream-00/config \
-d '{
"connector.class": "io.debezium.connector.oracle.OracleConnector",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"database.server.name" : "asgard",
"database.hostname" : "oracle",
"database.port" : "1521",
"database.user" : "c##xstrm",
"database.password" : "xs",
"database.dbname" : "ORCLCDB",
"database.pdb.name" : "ORCLPDB1",
"database.out.server.name" : "dbzxout",
"database.history.kafka.bootstrap.servers" : "kafka:29092",
"database.history.kafka.topic": "schema-changes.inventory",
"include.schema.changes": "true",
"table.blacklist":"ORCLPDB1.AUDSYS.*"
}'
curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
column -s : -t| sed 's/\"//g'| sort
source | source-oracle-dbz-xstream-00 | RUNNING | RUNNING | io.debezium.connector.oracle.OracleConnector
source | source-oracle-jdbc-00 | RUNNING | RUNNING | io.confluent.connect.jdbc.JdbcSourceConnector
Run these two kafkacat
side by side in separate windows from sqlplus:
-
Query-based CDC data in Kafka:
docker exec kafkacat kafkacat -b kafka:29092 -t ora-CUSTOMERS-jdbc-02 -C -u -q -o-1 -r http://schema-registry:8081 -s key=s -s value=avro -J |jq '.'
-
Log-based CDC data in Kafka:
docker exec kafkacat kafkacat -b kafka:29092 -t asgard.DEBEZIUM.CUSTOMERS -C -u -q -o-1 -r http://schema-registry:8081 -s key=avro -s value=avro -J | jq '.'
Run these commands individually and examine the different payloads that you get for each change type. Note DBZ-1018 which means you might see a lag from the log-based approach (this is an issue with the implementation, not the concept)
-
Insert
SET AUTOCOMMIT ON; INSERT INTO CUSTOMERS (FIRST_NAME,LAST_NAME,CLUB_STATUS) VALUES ('Rick','Astley','Bronze');
-
Update
UPDATE CUSTOMERS SET CLUB_STATUS = 'Platinum' where ID=42;
Note that Debezium output includes the prior state of the record too
-
Delete
DELETE FROM CUSTOMERS WHERE ID=1;
Note that query-based CDC cannot capture deletes (without some kind of manual workaround with Flashback etc), whilst log-based captures it along with the prior state of the record
curl -X PUT http://localhost:8083/connectors/sink-postgres-02/config \
-H "Content-Type: application/json" \
-d '{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://postgres:5432/",
"connection.user": "postgres",
"connection.password": "postgres",
"tasks.max": "1",
"topics": "ora-CUSTOMERS-jdbc-02",
"auto.create": "true",
"auto.evolve":"true",
"pk.mode":"record_key",
"pk.fields":"ID",
"insert.mode": "upsert",
"table.name.format":"customers"
}'
Load up Postgres:
docker exec -it postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB'
Show the data:
SELECT * FROM customers;
Set up a data generator:
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-datagen-pageviews/config \
-d '{
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"kafka.topic": "pageviews",
"quickstart": "pageviews",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"max.interval": 500,
"tasks.max": "1"
}'
Launch ksqlDB:
docker exec -it ksqldb ksql http://ksqldb:8088
SET 'auto.offset.reset' = 'earliest';
CREATE STREAM PAGEVIEWS WITH (KAFKA_TOPIC='pageviews', VALUE_FORMAT='AVRO');
CREATE TABLE CUSTOMERS WITH (KAFKA_TOPIC='ora-CUSTOMERS-jdbc-02', VALUE_FORMAT='AVRO');
CREATE STREAM PV_ENRICHED
SELECT P.USERID,
C.FIRST_NAME + ' ' + C.LAST_NAME AS CUSTOMER_NAME,
C.CLUB_STATUS,
PAGEID
FROM PAGEVIEWS P
INNER JOIN
CUSTOMERS C
ON REPLACE(USERID,'User_','')=C.ROWKEY
EMIT CHANGES;
PRINT PV_ENRICHED;
CREATE SINK CONNECTOR SINK_POSTGRES WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSinkConnector',
'connection.url' = 'jdbc:postgresql://postgres:5432/',
'connection.user' = 'postgres',
'connection.password' = 'postgres',
'topics' = 'PV_ENRICHED',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'auto.create' = 'true'
);
CREATE TABLE PV_AGG AS
SELECT P.USERID,
C.FIRST_NAME + ' ' + C.LAST_NAME AS CUSTOMER_NAME,
C.CLUB_STATUS,
PAGEID, COUNT(*) AS PV_COUNT
FROM PAGEVIEWS P
INNER JOIN
CUSTOMERS C
ON REPLACE(USERID,'User_','')=C.ROWKEY
GROUP BY USERID
, C.FIRST_NAME + ' ' + C.LAST_NAME
, CLUB_STATUS
, PAGEID
EMIT CHANGES;
CREATE SINK CONNECTOR SINK_POSTGRES WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSinkConnector',
'connection.url' = 'jdbc:postgresql://postgres:5432/',
'connection.user' = 'postgres',
'connection.password' = 'postgres',
'topics' = 'PV_AGG',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'auto.create' = 'true',
'insert.mode' = 'upsert',
'pk.mode' = 'record_value',
'pk.fields' = 'USERID,PAGEID'
);
SELECT * FROM "PV_AGG" WHERE "PAGEID"='Page_22';
CREATE STREAM CUSTOMER_EVENTS WITH (KAFKA_TOPIC='ora-CUSTOMERS-jdbc-02', VALUE_FORMAT='AVRO');
CREATE TABLE CUSTOMERS_MV AS SELECT LATEST_BY_OFFSET(FIRST_NAME) + ' ' + LATEST_BY_OFFSET(LAST_NAME) AS FULLNAME, LATEST_BY_OFFSET(CLUB_STATUS) AS CLUB_STATUS FROM CUSTOMER_EVENTS GROUP BY ID;
-- "Push" query
SELECT FULLNAME, CLUB_STATUS FROM CUSTOMERS_MV WHERE ROWKEY=42 EMIT CHANGES;
-- "Pull" query
SELECT FULLNAME, CLUB_STATUS FROM CUSTOMERS_MV WHERE ROWKEY=42;
Query with REST API:
curl -s --location --request POST 'http://localhost:8088/query' \
--header 'Content-Type: application/json' \
--data-raw '{"ksql":"SELECT ROWKEY AS ID, FULLNAME, CLUB_STATUS FROM CUSTOMERS_MV WHERE ROWKEY=42;","streamsProperties": {"ksql.streams.auto.offset.reset": "earliest"
}}' | jq '.'
If you want to try this alternative Debezium configuration out you need to either:
-
Drop the existing connector
curl -i -X DELETE http://localhost:8083/connectors/source-oracle-dbz-xstream-00
or
-
Add a second capture server to Oracle:
(If you use this option make sure you update
database.out.server.name
in the REST API call below)docker exec -it oracle bash -c 'sleep 1;rlwrap sqlplus c##xstrmadmin/xsa@//localhost:1521/ORCLCDB'
BEGIN DBMS_XSTREAM_ADM.CREATE_OUTBOUND( server_name => 'dbzxout2', schema_names => 'debezium', connect_user => 'c##xstrm'); END; /
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-oracle-dbz-xstream-flat-00/config \
-d '{
"connector.class": "io.debezium.connector.oracle.OracleConnector",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"database.server.name" : "asgard",
"database.hostname" : "oracle",
"database.port" : "1521",
"database.user" : "c##xstrm",
"database.password" : "xs",
"database.dbname" : "ORCLCDB",
"database.pdb.name" : "ORCLPDB1",
"database.out.server.name" : "dbzxout",
"database.history.kafka.bootstrap.servers" : "kafka:29092",
"database.history.kafka.topic": "schema-changes.inventory",
"include.schema.changes": "true",
"table.blacklist":"ORCLPDB1.AUDSYS.*",
"transforms":"addTopicSuffix,unwrap",
"transforms.addTopicSuffix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.addTopicSuffix.regex":"(.*)",
"transforms.addTopicSuffix.replacement":"$1-flat",
"transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones":"false"
}'
docker exec kafkacat kafkacat -b kafka:29092 -t asgard.DEBEZIUM.CUSTOMERS-flat -C -u -q -o-1 -r http://schema-registry:8081 -s key=avro -s value=avro -J|jq '.'
curl -X PUT http://localhost:8083/connectors/sink-postgres-dbz-flat-00/config \
-H "Content-Type: application/json" \
-d '{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"connection.url": "jdbc:postgresql://postgres:5432/",
"connection.user": "postgres",
"connection.password": "postgres",
"tasks.max": "1",
"topics": "asgard.DEBEZIUM.CUSTOMERS-flat",
"auto.create": "true",
"auto.evolve":"true",
"pk.mode":"record_key",
"insert.mode": "upsert",
"delete.enabled":"true",
"table.name.format":"customers_dbz"
}'