Skip to content

Commit

Permalink
Working demo for code.talks Hamburg 2019
Browse files Browse the repository at this point in the history
  • Loading branch information
rmoff committed Oct 23, 2019
1 parent 737721f commit 179b92c
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 79 deletions.
6 changes: 5 additions & 1 deletion no-more-silos/data/mysql/a00_create_debezium_user.sql
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'dbz';
CREATE USER 'debezium'@'%' IDENTIFIED WITH mysql_native_password BY 'dbz';
CREATE USER 'replicator'@'%' IDENTIFIED BY 'replpass';

GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium';
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'replicator';
2 changes: 1 addition & 1 deletion no-more-silos/data/mysql/b00_create_db_demo.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
create database demo;

GRANT SELECT, INSERT, UPDATE, DELETE ON demo.* TO connect_user;
GRANT SELECT, INSERT, UPDATE, DELETE ON demo.* TO connect_user;
GRANT ALL PRIVILEGES ON demo.* TO 'debezium'@'%';
171 changes: 95 additions & 76 deletions no-more-silos/demo_no-more-silos.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ echo -e $(date) " Kafka Connect is ready! Listener HTTP state: " $(curl -s -o /d
'
----

3. Make sure the connector plugins are available
+
[source,bash]
----
curl -s localhost:8083/connector-plugins|jq '.[].class'|egrep 'debezium.*mysql|JdbcSink'
----
+
[source,bash]
----
"io.confluent.connect.jdbc.JdbcSinkConnector"
"io.debezium.connector.mysql.MySqlConnector"
----
3. Launch MySQL CLI
+
[source,bash]
Expand Down Expand Up @@ -102,18 +114,19 @@ curl -s "http://localhost:8083/connectors?expand=info&expand=status" |
+
[source,bash]
----
source | source-jdbc-mysql-00 | RUNNING | RUNNING | io.confluent.connect.jdbc.JdbcSourceConnector
source | source-datagen-item_details_01 | RUNNING | RUNNING | io.confluent.kafka.connect.datagen.DatagenConnector
source | source-jdbc-mysql-00 | RUNNING | RUNNING | io.confluent.connect.jdbc.JdbcSourceConnector
----

4. Examine the data
+
[source,bash]
----
docker run --net host --rm edenhill/kafkacat:1.5.0
-b localhost:9092
-r http://localhost:8081\
-s avro
-t mysql-00-customers
docker run --net host --rm edenhill/kafkacat:1.5.0 \
-b localhost:9092 \
-r http://localhost:8081 \
-s avro \
-t mysql-00-customers \
-C -o beginning -u -q | jq -c '.'
----

Expand Down Expand Up @@ -166,23 +179,21 @@ SELECT ID, FIRST_NAME, LAST_NAME, EMAIL, UPDATE_TS FROM customers;
+
[source,bash]
----
curl -i -X POST -H "Accept:application/json"
-H "Content-Type:application/json" http://localhost:8083/connectors/
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-debezium-mysql-00/config \
-d '{
"name": "source-debezium-mysql-00",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "42",
"database.server.name": "asgard",
"table.whitelist": "demo.customers",
"database.history.kafka.bootstrap.servers": "kafka:29092",
"database.history.kafka.topic": "asgard.dbhistory.demo" ,
"include.schema.changes": "true"
}
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "42",
"database.allowPublicKeyRetrieval":"true",
"database.server.name": "asgard",
"table.whitelist": "demo.customers",
"database.history.kafka.bootstrap.servers": "kafka:29092",
"database.history.kafka.topic": "asgard.dbhistory.demo" ,
"include.schema.changes": "true"
}'
----

Expand All @@ -197,80 +208,89 @@ curl -s "http://localhost:8083/connectors?expand=info&expand=status" |
+
[source,bash]
----
source | source-debezium-mysql-00 | RUNNING | RUNNING | io.debezium.connector.mysql.MySqlConnector
source | source-jdbc-mysql-00 | RUNNING | RUNNING | io.confluent.connect.jdbc.JdbcSourceConnector
source | source-datagen-item_details_01 | RUNNING | RUNNING | io.confluent.kafka.connect.datagen.DatagenConnector
source | source-debezium-mysql-00 | RUNNING | RUNNING | io.debezium.connector.mysql.MySqlConnector
source | source-jdbc-mysql-00 | RUNNING | RUNNING | io.confluent.connect.jdbc.JdbcSourceConnector
----

4. Examine the data with console consumer
4. Examine the data with kafkacat
+
[source,bash]
----
docker run --net host --rm edenhill/kafkacat:1.5.0
-b localhost:9092
-r http://localhost:8081\
-s avro
-t asgard.demo.customers
-C -o beginning -u -q | jq '.op, .before, .after'
docker run --net host --rm edenhill/kafkacat:1.5.0 \
-b localhost:9092 \
-r http://localhost:8081 \
-s avro \
-t asgard.demo.customers \
-C -o beginning -u -q | jq '.'
----
+
[source,bash]
----
"u"
{
"asgard.demo.customers.Value": {
"id": 42,
"first_name": {
"string": "Rick"
},
"last_name": {
"string": "Astley"
},
"email": {
"string": ""
},
"gender": {
"string": "Male"
},
"comments": {
"string": ""
},
"UPDATE_TS": "2019-04-01T23:53:31Z"
}
}
{
"asgard.demo.customers.Value": {
"id": 42,
"first_name": {
"string": "Rick"
"before": null,
"after": {
"Value": {
"id": 42,
"first_name": {
"string": "Rick"
},
"last_name": {
"string": "Astley"
},
"email": {
"string": "[email protected]"
},
"gender": {
"string": "Male"
},
"comments": {
"string": ""
},
"UPDATE_TS": {
"string": "2019-10-23T16:29:53Z"
}
}
},
"source": {
"version": "0.10.0.Final",
"connector": "mysql",
"name": "asgard",
"ts_ms": 0,
"snapshot": {
"string": "last"
},
"last_name": {
"string": "Astley"
"db": "demo",
"table": {
"string": "customers"
},
"email": {
"string": "[email protected]"
},
"gender": {
"string": "Male"
},
"comments": {
"string": ""
},
"UPDATE_TS": "2019-04-01T23:53:38Z"
"server_id": 0,
"gtid": null,
"file": "binlog.000002",
"pos": 873,
"row": 0,
"thread": null,
"query": null
},
"op": "c",
"ts_ms": {
"long": 1571848220368
}
}
----

5. Split the screen to show Kafka topic output along with MySQL.

4. Rerun the console consumer to show compact output
4. Rerun kafkacat to show compact output
+
[source,bash]
----
docker-compose exec -T kafka-connect
kafka-avro-console-consumer
--bootstrap-server kafka:29092
--property schema.registry.url=http://schema-registry:8081
--topic asgard.demo.customers --from-beginning | jq -c '.'
docker run --net host --rm edenhill/kafkacat:1.5.0 \
-b localhost:9092 \
-r http://localhost:8081 \
-s avro \
-t asgard.demo.customers \
-C -o beginning -u -q | jq '.op, .before, .after'
----


Expand Down Expand Up @@ -360,7 +380,6 @@ SELECT ID, FIRST_NAME, LAST_NAME, EMAIL FROM CUSTOMERS_STREAM WHERE ID=42 EMIT C
SET 'auto.offset.reset' = 'earliest';
SELECT OP, BEFORE->EMAIL, AFTER->EMAIL FROM CUSTOMERS_CDC_STREAM WHERE AFTER->ID=42 EMIT CHANGES;
[source,sql]
----
+
----
Expand Down
11 changes: 10 additions & 1 deletion no-more-silos/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ services:
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka:29092"
CONNECT_REST_PORT: 8083
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect-01"
CONNECT_GROUP_ID: kafka-connect-01
CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-01-configs
CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-01-offsets
Expand Down Expand Up @@ -109,6 +109,15 @@ services:
sleep 5
done
#
echo "Creating connector"
curl -s -X PUT -H "Content-Type:application/json" http://localhost:8083/connectors/source-datagen-item_details_01/config \
-d '{
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"kafka.topic": "ratings",
"max.interval":250,
"quickstart": "ratings"
}'
# echo -e "\n--\n+> Creating Kafka Connect JDBC Source"
# curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
# "name": "jdbc_source_mysql_00",
Expand Down

0 comments on commit 179b92c

Please sign in to comment.