This uses Docker Compose to run the Kafka Connect worker.
-
Bring the Docker Compose up
docker-compose up -d
-
Make sure everything is up and running
➜ docker-compose ps Name Command State Ports --------------------------------------------------------------------------------------------------- broker /etc/confluent/docker/run Up 0.0.0.0:9092->9092/tcp kafka-connect bash -c cd /usr/share/conf ... Up (healthy) 0.0.0.0:8083->8083/tcp, 9092/tcp kafkacat /bin/sh -c apk add jq; Up wh ... ksqldb /usr/bin/docker/run Up 0.0.0.0:8088->8088/tcp mysql docker-entrypoint.sh mysqld Up 0.0.0.0:3306->3306/tcp, 33060/tcp schema-registry /etc/confluent/docker/run Up 0.0.0.0:8081->8081/tcp zookeeper /etc/confluent/docker/run Up 2181/tcp, 2888/tcp, 3888/tcp
-
Create test topic + data using ksqlDB (but it’s still just a Kafka topic under the covers)
docker exec -it ksqldb ksql http://ksqldb:8088
CREATE STREAM TEST01 (COL1 INT, COL2 VARCHAR) WITH (KAFKA_TOPIC='test01', PARTITIONS=1, VALUE_FORMAT='AVRO');
INSERT INTO TEST01 (ROWKEY, COL1, COL2) VALUES ('X',1,'FOO'); INSERT INTO TEST01 (ROWKEY, COL1, COL2) VALUES ('Y',2,'BAR');
SHOW TOPICS; PRINT test01 FROM BEGINNING LIMIT 2;
-
Create the Sink connector
curl -X PUT http://localhost:8083/connectors/sink-jdbc-mysql-01/config \ -H "Content-Type: application/json" -d '{ "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "jdbc:mysql://mysql:3306/demo", "topics": "test01", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "connection.user": "connect_user", "connection.password": "asgard", "auto.create": true, "auto.evolve": true, "insert.mode": "insert", "pk.mode": "record_key", "pk.fields": "MESSAGE_KEY" }'
Things to customise for your environment:
-
Insert some more data. Notice what happens if you re-use a key.
INSERT INTO TEST01 (ROWKEY, COL1, COL2) VALUES ('Z',1,'WOO'); INSERT INTO TEST01 (ROWKEY, COL1, COL2) VALUES ('Y',4,'PFF');
-
Update the Sink connector to use
UPSERT
modecurl -X PUT http://localhost:8083/connectors/sink-jdbc-mysql-01/config \ -H "Content-Type: application/json" -d '{ "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "jdbc:mysql://mysql:3306/demo", "topics": "test01", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "connection.user": "connect_user", "connection.password": "asgard", "auto.create": true, "auto.evolve": true, "insert.mode": "upsert", "pk.mode": "record_key", "pk.fields": "MESSAGE_KEY" }'
-
Drop fields, add fields - note how the target schema evolves in-place
curl -X PUT http://localhost:8083/connectors/sink-jdbc-mysql-01/config \ -H "Content-Type: application/json" -d '{ "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "jdbc:mysql://mysql:3306/demo", "topics": "test01", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "connection.user": "connect_user", "connection.password": "asgard", "auto.create": true, "auto.evolve": true, "insert.mode": "upsert", "pk.mode": "record_key", "pk.fields": "MESSAGE_KEY", "transforms": "dropSome,addSome", "transforms.dropSome.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.dropSome.blacklist": "COL2", "transforms.addSome.type":"org.apache.kafka.connect.transforms.InsertField$Value", "transforms.addSome.partition.field": "_partition", "transforms.addSome.timestamp.field" : "RECORD_TS" }'
-
Write some CSV and JSON to new topics
docker exec -i kafkacat kafkacat \ -b broker:29092 -P \ -t some_json_data <<EOF { "ID": 1, "Artist": "Rick Astley", "Song": "Never Gonna Give You Up" } { "ID": 2, "Artist": "asdfasd", "Song": "dsfjfghg" } EOF docker exec -i kafkacat kafkacat \ -b broker:29092 -P \ -t some_json_data_with_a_schema <<EOF { "schema": { "type": "struct", "optional": false, "version": 1, "fields": [ { "field": "ID", "type": "string", "optional": true }, { "field": "Artist", "type": "string", "optional": true }, { "field": "Song", "type": "string", "optional": true } ] }, "payload": { "ID": "1", "Artist": "Rick Astley", "Song": "Never Gonna Give You Up" } } EOF docker exec -i kafkacat kafkacat \ -b broker:29092 -P \ -t some_csv_data <<EOF 1,Rick Astley,Never Gonna Give You Up EOF
-
Stream the JSON data that has a schema to DB:
curl -X PUT http://localhost:8083/connectors/sink-jdbc-mysql-02-json/config \ -H "Content-Type: application/json" -d '{ "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "jdbc:mysql://mysql:3306/demo", "topics": "some_json_data_with_a_schema", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "true", "connection.user": "connect_user", "connection.password": "asgard", "auto.create": true, "auto.evolve": true, "insert.mode": "insert" }'
-
Use ksqlDB to apply a schema to the CSV and schemaless-JSON, and show off
INSERT INTO
for merging two topics into one with a common schemaCREATE STREAM SOME_JSON (ID INT, ARTIST VARCHAR, SONG VARCHAR) WITH (KAFKA_TOPIC='some_json_data', VALUE_FORMAT='JSON'); CREATE STREAM SOME_JSON_AS_AVRO WITH (VALUE_FORMAT='AVRO') AS SELECT * FROM SOME_JSON; CREATE STREAM SOME_CSV (ID INT, ARTIST VARCHAR, SONG VARCHAR) WITH (KAFKA_TOPIC='some_csv_data', VALUE_FORMAT='DELIMITED'); INSERT INTO SOME_JSON_AS_AVRO SELECT * FROM SOME_CSV;
-
Create a sink for the reserialized data
NoteThe Kafka Connect worker configuration is set to use AvroConverter
which is why it is not specified here.curl -X PUT http://localhost:8083/connectors/sink-jdbc-mysql-02-avro/config \ -H "Content-Type: application/json" -d '{ "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "jdbc:mysql://mysql:3306/demo", "topics": "SOME_JSON_AS_AVRO", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "connection.user": "connect_user", "connection.password": "asgard", "auto.create": true, "auto.evolve": true, "insert.mode": "insert" }'
References