Skip to content

Latest commit

 

History

History
510 lines (428 loc) · 16.5 KB

demo_integrating_oracle_kafka.adoc

File metadata and controls

510 lines (428 loc) · 16.5 KB

Integrating Oracle and Kafka

Setup

You need to build the Oracle docker image first (🤞maybe one day it’ll be available from a Docker repository like Docker Hub w/out any login required 🤞).

Start Oracle:

docker-compose up -d oracle

Wait for Oracle DB to be up (takes several minutes to instantiate)

grep -q "DATABASE IS READY TO USE!" <(docker logs -f oracle)
echo -e "$(date) Installing rlwrap on Oracle container"
docker exec -it -u root oracle bash -c "rpm -Uvh https://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm &&  yum install -y rlwrap"

Verify that the XStream capture process is running:

docker exec -it oracle /opt/oracle/scripts/startup/04_check_capture.sh

Expected output:

                                            Session                   XStream
                                             Serial Operating System  Program
XStream Component              Session ID    Number Process ID        Name
------------------------------ ---------- --------- ----------------- -------
DBZXOUT - Apply Reader                  1     40349 60836             AS02
DBZXOUT - Apply Coordinator           380     31272 60834             AP01
DBZXOUT - Propagation Send/Rcv        619     10409 55847             CX02
DBZXOUT - Apply Server                631     17119 60838             AS00
CAP$_DBZXOUT_1 - Capture              750     18993 55845             CP01

If you don’t see these running (particularly the Capture) then refer to https://github.com/confluentinc/demo-scene/blob/master/no-more-silos-oracle/debezium-xstream-system-output.adoc

Start the rest of the stack

docker-compose up -d

Check that Kafka Connect is running:

bash -c ' \
echo -e "\n\n=============\nWaiting for Kafka Connect to start listening on localhost ⏳\n=============\n"
while [ $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) -ne 200 ] ; do
  echo -e "\t" $(date) " Kafka Connect listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) " (waiting for 200)"
  sleep 5
done
echo -e $(date) "\n\n--------------\n\o/ Kafka Connect is ready! Listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) "\n--------------\n"
'

Check that required connectors are loaded

curl -s localhost:8083/connector-plugins|jq '.[].class'|egrep 'OracleConnector|JdbcSinkConnector|DatagenConnector'
"io.confluent.connect.jdbc.JdbcSourceConnector"
"io.confluent.kafka.connect.datagen.DatagenConnector"
"io.debezium.connector.oracle.OracleConnector"

Start sqlplus prompt

docker exec -it oracle bash -c 'sleep 1;rlwrap sqlplus Debezium/dbz@localhost:1521/ORCLPDB1'

Show Oracle table + contents

COL FIRST_NAME FOR A15
COL LAST_NAME FOR A15
COL ID FOR 999
COL CREATE_TS FOR A29
COL UPDATE_TS FOR A29
SET LINESIZE 200
SELECT ID, FIRST_NAME, LAST_NAME, CREATE_TS, UPDATE_TS FROM CUSTOMERS;
  ID FIRST_NAME      LAST_NAME       CREATE_TS                     UPDATE_TS
---- --------------- --------------- ----------------------------- -----------------------------
   1 Rica            Blaisdell       04-DEC-18 08.22.32.933376 PM  04-DEC-18 08.22.32.000000 PM
   2 Ruthie          Brockherst      04-DEC-18 08.22.32.953342 PM  04-DEC-18 08.22.32.000000 PM
   3 Mariejeanne     Cocci           04-DEC-18 08.22.32.965713 PM  04-DEC-18 08.22.32.000000 PM
   4 Hashim          Rumke           04-DEC-18 08.22.32.977417 PM  04-DEC-18 08.22.32.000000 PM
   5 Hansiain        Coda            04-DEC-18 08.22.32.979967 PM  04-DEC-18 08.22.32.000000 PM

Getting the data from Oracle to Kafka

There are two approaches - query-based CDC, and log-based CDC. Let’s try both, and examine the differences below.

Query-based CDC (JDBC Source connector)

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/source-oracle-jdbc-02/config/ \
    -d '{
            "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
            "connection.url": "jdbc:oracle:thin:@oracle:1521/ORCLPDB1",
            "connection.user":"Debezium",
            "connection.password":"dbz",
            "numeric.mapping":"best_fit",
            "mode":"timestamp",
            "poll.interval.ms":"1000",
            "validate.non.null":"false",
            "table.whitelist":"CUSTOMERS",
            "timestamp.column.name":"UPDATE_TS",
            "topic.prefix":"ora-",
            "transforms": "addTopicSuffix,InsertTopic,InsertSourceDetails,copyFieldToKey,extractValuefromStruct",
            "transforms.InsertTopic.type":"org.apache.kafka.connect.transforms.InsertField$Value",
            "transforms.InsertTopic.topic.field":"messagetopic",
            "transforms.InsertSourceDetails.type":"org.apache.kafka.connect.transforms.InsertField$Value",
            "transforms.InsertSourceDetails.static.field":"messagesource",
            "transforms.InsertSourceDetails.static.value":"JDBC Source Connector from Oracle on asgard",
            "transforms.addTopicSuffix.type":"org.apache.kafka.connect.transforms.RegexRouter",
            "transforms.addTopicSuffix.regex":"(.*)",
            "transforms.addTopicSuffix.replacement":"$1-jdbc-02",
            "transforms.copyFieldToKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
            "transforms.copyFieldToKey.fields":"ID",
            "transforms.extractValuefromStruct.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
            "transforms.extractValuefromStruct.field":"ID"
        }'

Log-based CDC (Debezium Oracle/XStream connector)

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/source-oracle-dbz-xstream-00/config \
    -d '{
        "connector.class": "io.debezium.connector.oracle.OracleConnector",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://schema-registry:8081",
        "database.server.name" : "asgard",
        "database.hostname" : "oracle",
        "database.port" : "1521",
        "database.user" : "c##xstrm",
        "database.password" : "xs",
        "database.dbname" : "ORCLCDB",
        "database.pdb.name" : "ORCLPDB1",
        "database.out.server.name" : "dbzxout",
        "database.history.kafka.bootstrap.servers" : "kafka:29092",
        "database.history.kafka.topic": "schema-changes.inventory",
        "include.schema.changes": "true",
        "table.blacklist":"ORCLPDB1.AUDSYS.*"
        }'

Check the connectors

curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
       jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
       column -s : -t| sed 's/\"//g'| sort
source  |  source-oracle-dbz-xstream-00  |  RUNNING  |  RUNNING  |  io.debezium.connector.oracle.OracleConnector
source  |  source-oracle-jdbc-00         |  RUNNING  |  RUNNING  |  io.confluent.connect.jdbc.JdbcSourceConnector

Examine the data

Run these two kafkacat side by side in separate windows from sqlplus:

  • Query-based CDC data in Kafka:

    docker exec kafkacat kafkacat -b kafka:29092 -t ora-CUSTOMERS-jdbc-02 -C -u -q -o-1 -r http://schema-registry:8081 -s key=s -s value=avro -J |jq '.'
  • Log-based CDC data in Kafka:

    docker exec kafkacat kafkacat -b kafka:29092 -t asgard.DEBEZIUM.CUSTOMERS -C -u -q -o-1 -r http://schema-registry:8081 -s key=avro -s value=avro -J | jq '.'

Run these commands individually and examine the different payloads that you get for each change type. Note DBZ-1018 which means you might see a lag from the log-based approach (this is an issue with the implementation, not the concept)

  • Insert

    SET AUTOCOMMIT ON;
    
    INSERT INTO CUSTOMERS (FIRST_NAME,LAST_NAME,CLUB_STATUS) VALUES ('Rick','Astley','Bronze');
  • Update

    UPDATE CUSTOMERS SET CLUB_STATUS = 'Platinum' where ID=42;

    Note that Debezium output includes the prior state of the record too

  • Delete

    DELETE FROM CUSTOMERS WHERE ID=1;

    Note that query-based CDC cannot capture deletes (without some kind of manual workaround with Flashback etc), whilst log-based captures it along with the prior state of the record

Getting the data from Kafka to another database

curl -X PUT http://localhost:8083/connectors/sink-postgres-02/config \
    -H "Content-Type: application/json" \
    -d '{
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url": "jdbc:postgresql://postgres:5432/",
        "connection.user": "postgres",
        "connection.password": "postgres",
        "tasks.max": "1",
        "topics": "ora-CUSTOMERS-jdbc-02",
        "auto.create": "true",
        "auto.evolve":"true",
        "pk.mode":"record_key",
        "pk.fields":"ID",
        "insert.mode": "upsert",
        "table.name.format":"customers"
    }'

Load up Postgres:

docker exec -it postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB'

Show the data:

SELECT * FROM customers;

Enriching streams & querying state with ksqlDB

Set up a data generator:

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/source-datagen-pageviews/config \
    -d '{
      "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
      "kafka.topic": "pageviews",
      "quickstart": "pageviews",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "max.interval": 500,
      "tasks.max": "1"
    }'

Launch ksqlDB:

docker exec -it ksqldb ksql http://ksqldb:8088

Enriching a stream of data with a join to another topic

SET 'auto.offset.reset' = 'earliest';

CREATE STREAM PAGEVIEWS WITH (KAFKA_TOPIC='pageviews', VALUE_FORMAT='AVRO');

CREATE TABLE CUSTOMERS WITH (KAFKA_TOPIC='ora-CUSTOMERS-jdbc-02', VALUE_FORMAT='AVRO');
CREATE STREAM PV_ENRICHED
SELECT P.USERID,
       C.FIRST_NAME + ' ' + C.LAST_NAME AS CUSTOMER_NAME,
       C.CLUB_STATUS,
       PAGEID
  FROM PAGEVIEWS P
       INNER JOIN
       CUSTOMERS C
         ON REPLACE(USERID,'User_','')=C.ROWKEY
EMIT CHANGES;
PRINT PV_ENRICHED;

CREATE SINK CONNECTOR SINK_POSTGRES WITH (
    'connector.class'     = 'io.confluent.connect.jdbc.JdbcSinkConnector',
    'connection.url'      = 'jdbc:postgresql://postgres:5432/',
    'connection.user'     = 'postgres',
    'connection.password' = 'postgres',
    'topics'              = 'PV_ENRICHED',
    'key.converter'       = 'org.apache.kafka.connect.storage.StringConverter',
    'auto.create'         = 'true'
  );

Calculate aggregates on streams of data, push to a database

CREATE TABLE PV_AGG AS
SELECT P.USERID,
       C.FIRST_NAME + ' ' + C.LAST_NAME AS CUSTOMER_NAME,
       C.CLUB_STATUS,
       PAGEID, COUNT(*) AS PV_COUNT
  FROM PAGEVIEWS P
       INNER JOIN
       CUSTOMERS C
         ON REPLACE(USERID,'User_','')=C.ROWKEY
         GROUP BY USERID
                , C.FIRST_NAME + ' ' + C.LAST_NAME
                , CLUB_STATUS
                , PAGEID
EMIT CHANGES;
CREATE SINK CONNECTOR SINK_POSTGRES WITH (
    'connector.class'     = 'io.confluent.connect.jdbc.JdbcSinkConnector',
    'connection.url'      = 'jdbc:postgresql://postgres:5432/',
    'connection.user'     = 'postgres',
    'connection.password' = 'postgres',
    'topics'              = 'PV_AGG',
    'key.converter'       = 'org.apache.kafka.connect.storage.StringConverter',
    'auto.create'         = 'true',
    'insert.mode'         = 'upsert',
    'pk.mode'             = 'record_value',
    'pk.fields'           = 'USERID,PAGEID'
  );
SELECT * FROM "PV_AGG" WHERE "PAGEID"='Page_22';

Materialise and query state

CREATE STREAM CUSTOMER_EVENTS WITH (KAFKA_TOPIC='ora-CUSTOMERS-jdbc-02', VALUE_FORMAT='AVRO');
CREATE TABLE CUSTOMERS_MV AS SELECT LATEST_BY_OFFSET(FIRST_NAME) + ' ' + LATEST_BY_OFFSET(LAST_NAME) AS FULLNAME, LATEST_BY_OFFSET(CLUB_STATUS) AS CLUB_STATUS FROM CUSTOMER_EVENTS GROUP BY ID;
-- "Push" query
SELECT FULLNAME, CLUB_STATUS FROM CUSTOMERS_MV WHERE ROWKEY=42 EMIT CHANGES;

-- "Pull" query
SELECT FULLNAME, CLUB_STATUS FROM CUSTOMERS_MV WHERE ROWKEY=42;

Query with REST API:

curl -s --location --request POST 'http://localhost:8088/query' \
--header 'Content-Type: application/json' \
--data-raw '{"ksql":"SELECT ROWKEY AS ID, FULLNAME, CLUB_STATUS FROM CUSTOMERS_MV WHERE ROWKEY=42;","streamsProperties": {"ksql.streams.auto.offset.reset": "earliest"
  }}' | jq '.'

Appendix [TODO]

Flattened record

If you want to try this alternative Debezium configuration out you need to either:

  • Drop the existing connector

    curl -i -X DELETE http://localhost:8083/connectors/source-oracle-dbz-xstream-00

    or

  • Add a second capture server to Oracle:

    (If you use this option make sure you update database.out.server.name in the REST API call below)

    docker exec -it oracle bash -c 'sleep 1;rlwrap sqlplus c##xstrmadmin/xsa@//localhost:1521/ORCLCDB'
    BEGIN
    	  DBMS_XSTREAM_ADM.CREATE_OUTBOUND(
    	    server_name     =>  'dbzxout2',
    	    schema_names    =>  'debezium',
    			connect_user => 'c##xstrm');
    	END;
        /
curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/source-oracle-dbz-xstream-flat-00/config \
    -d '{
        "connector.class": "io.debezium.connector.oracle.OracleConnector",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://schema-registry:8081",
        "database.server.name" : "asgard",
        "database.hostname" : "oracle",
        "database.port" : "1521",
        "database.user" : "c##xstrm",
        "database.password" : "xs",
        "database.dbname" : "ORCLCDB",
        "database.pdb.name" : "ORCLPDB1",
        "database.out.server.name" : "dbzxout",
        "database.history.kafka.bootstrap.servers" : "kafka:29092",
        "database.history.kafka.topic": "schema-changes.inventory",
        "include.schema.changes": "true",
        "table.blacklist":"ORCLPDB1.AUDSYS.*",
        "transforms":"addTopicSuffix,unwrap",
        "transforms.addTopicSuffix.type":"org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.addTopicSuffix.regex":"(.*)",
        "transforms.addTopicSuffix.replacement":"$1-flat",
        "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones":"false"
        }'
docker exec kafkacat kafkacat -b kafka:29092 -t asgard.DEBEZIUM.CUSTOMERS-flat -C -u -q -o-1 -r http://schema-registry:8081 -s key=avro -s value=avro -J|jq '.'
curl -X PUT http://localhost:8083/connectors/sink-postgres-dbz-flat-00/config \
    -H "Content-Type: application/json" \
    -d '{
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://schema-registry:8081",
        "connection.url": "jdbc:postgresql://postgres:5432/",
        "connection.user": "postgres",
        "connection.password": "postgres",
        "tasks.max": "1",
        "topics": "asgard.DEBEZIUM.CUSTOMERS-flat",
        "auto.create": "true",
        "auto.evolve":"true",
        "pk.mode":"record_key",
        "insert.mode": "upsert",
        "delete.enabled":"true",
        "table.name.format":"customers_dbz"
    }'