Skip to content

Latest commit

 

History

History
 
 

kafka-to-database

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 

Streaming data from Kafka to a Database using Kafka Connect JDBC Sink

This uses Docker Compose to run the Kafka Connect worker.

  1. Bring the Docker Compose up

    docker-compose up -d
  2. Make sure everything is up and running

    ➜ docker-compose ps
         Name                    Command                  State                     Ports
    ---------------------------------------------------------------------------------------------------
    broker            /etc/confluent/docker/run        Up             0.0.0.0:9092->9092/tcp
    kafka-connect     bash -c cd /usr/share/conf ...   Up (healthy)   0.0.0.0:8083->8083/tcp, 9092/tcp
    kafkacat          /bin/sh -c apk add jq;           Up
                      wh ...
    ksqldb            /usr/bin/docker/run              Up             0.0.0.0:8088->8088/tcp
    mysql             docker-entrypoint.sh mysqld      Up             0.0.0.0:3306->3306/tcp, 33060/tcp
    schema-registry   /etc/confluent/docker/run        Up             0.0.0.0:8081->8081/tcp
    zookeeper         /etc/confluent/docker/run        Up             2181/tcp, 2888/tcp, 3888/tcp
  3. Create test topic + data using ksqlDB (but it’s still just a Kafka topic under the covers)

    docker exec -it ksqldb ksql http://ksqldb:8088
    CREATE STREAM TEST01 (COL1 INT, COL2 VARCHAR)
      WITH (KAFKA_TOPIC='test01', PARTITIONS=1, VALUE_FORMAT='AVRO');
    INSERT INTO TEST01 (ROWKEY, COL1, COL2) VALUES ('X',1,'FOO');
    INSERT INTO TEST01 (ROWKEY, COL1, COL2) VALUES ('Y',2,'BAR');
    SHOW TOPICS;
    PRINT test01 FROM BEGINNING LIMIT 2;
  4. Create the Sink connector

    curl -X PUT http://localhost:8083/connectors/sink-jdbc-mysql-01/config \
         -H "Content-Type: application/json" -d '{
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url": "jdbc:mysql://mysql:3306/demo",
        "topics": "test01",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "connection.user": "connect_user",
        "connection.password": "asgard",
        "auto.create": true,
        "auto.evolve": true,
        "insert.mode": "insert",
        "pk.mode": "record_key",
        "pk.fields": "MESSAGE_KEY"
    }'

    Things to customise for your environment:

    • topics : the source topic(s) you want to send to S3

    • key.converter : match the serialisation of your source data (see here)

    • value.converter : match the serialisation of your source data (see here)

    • transforms : remove this if you don’t want partition and offset added to each message

  5. Insert some more data. Notice what happens if you re-use a key.

    INSERT INTO TEST01 (ROWKEY, COL1, COL2) VALUES ('Z',1,'WOO');
    INSERT INTO TEST01 (ROWKEY, COL1, COL2) VALUES ('Y',4,'PFF');
  6. Update the Sink connector to use UPSERT mode

    curl -X PUT http://localhost:8083/connectors/sink-jdbc-mysql-01/config \
         -H "Content-Type: application/json" -d '{
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url": "jdbc:mysql://mysql:3306/demo",
        "topics": "test01",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "connection.user": "connect_user",
        "connection.password": "asgard",
        "auto.create": true,
        "auto.evolve": true,
        "insert.mode": "upsert",
        "pk.mode": "record_key",
        "pk.fields": "MESSAGE_KEY"
    }'
  7. Drop fields, add fields - note how the target schema evolves in-place

    curl -X PUT http://localhost:8083/connectors/sink-jdbc-mysql-01/config \
         -H "Content-Type: application/json" -d '{
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url": "jdbc:mysql://mysql:3306/demo",
        "topics": "test01",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "connection.user": "connect_user",
        "connection.password": "asgard",
        "auto.create": true,
        "auto.evolve": true,
        "insert.mode": "upsert",
        "pk.mode": "record_key",
        "pk.fields": "MESSAGE_KEY",
        "transforms": "dropSome,addSome",
        "transforms.dropSome.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
        "transforms.dropSome.blacklist": "COL2",
        "transforms.addSome.type":"org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.addSome.partition.field": "_partition",
        "transforms.addSome.timestamp.field" : "RECORD_TS"
    }'
  8. Write some CSV and JSON to new topics

    docker exec -i kafkacat kafkacat \
            -b broker:29092 -P \
            -t some_json_data <<EOF
    { "ID": 1, "Artist": "Rick Astley", "Song": "Never Gonna Give You Up" }
    { "ID": 2, "Artist": "asdfasd", "Song": "dsfjfghg" }
    EOF
    
    docker exec -i kafkacat kafkacat \
            -b broker:29092 -P \
            -t some_json_data_with_a_schema <<EOF
    { "schema": { "type": "struct", "optional": false, "version": 1, "fields": [ { "field": "ID", "type": "string", "optional": true }, { "field": "Artist", "type": "string", "optional": true }, { "field": "Song", "type": "string", "optional": true } ] }, "payload": { "ID": "1", "Artist": "Rick Astley", "Song": "Never Gonna Give You Up" } }
    EOF
    
    docker exec -i kafkacat kafkacat \
            -b broker:29092 -P \
            -t some_csv_data <<EOF
    1,Rick Astley,Never Gonna Give You Up
    EOF
  9. Stream the JSON data that has a schema to DB:

    curl -X PUT http://localhost:8083/connectors/sink-jdbc-mysql-02-json/config \
         -H "Content-Type: application/json" -d '{
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url": "jdbc:mysql://mysql:3306/demo",
        "topics": "some_json_data_with_a_schema",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "true",
        "connection.user": "connect_user",
        "connection.password": "asgard",
        "auto.create": true,
        "auto.evolve": true,
        "insert.mode": "insert"
    }'
  10. Use ksqlDB to apply a schema to the CSV and schemaless-JSON, and show off INSERT INTO for merging two topics into one with a common schema

    CREATE STREAM SOME_JSON (ID INT, ARTIST VARCHAR, SONG VARCHAR)
      WITH (KAFKA_TOPIC='some_json_data', VALUE_FORMAT='JSON');
    
    CREATE STREAM SOME_JSON_AS_AVRO
      WITH (VALUE_FORMAT='AVRO') AS
        SELECT * FROM SOME_JSON;
    
    CREATE STREAM SOME_CSV (ID INT, ARTIST VARCHAR, SONG VARCHAR)
      WITH (KAFKA_TOPIC='some_csv_data', VALUE_FORMAT='DELIMITED');
    
    INSERT INTO SOME_JSON_AS_AVRO SELECT * FROM SOME_CSV;
  11. Create a sink for the reserialized data

    Note
    The Kafka Connect worker configuration is set to use AvroConverter which is why it is not specified here.
    curl -X PUT http://localhost:8083/connectors/sink-jdbc-mysql-02-avro/config \
         -H "Content-Type: application/json" -d '{
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url": "jdbc:mysql://mysql:3306/demo",
        "topics": "SOME_JSON_AS_AVRO",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "connection.user": "connect_user",
        "connection.password": "asgard",
        "auto.create": true,
        "auto.evolve": true,
        "insert.mode": "insert"
    }'

References