-
Bring up the stack
git clone https://github.com/confluentinc/demo-scene.git cd kafka-connect-zero-to-hero docker-compose up -d
This brings up the stack ready for use.
-
Wait for Kafka Connect to be started
bash -c ' \ echo -e "\n\n=============\nWaiting for Kafka Connect to start listening on localhost ⏳\n=============\n" while [ $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) -eq 000 ] ; do echo -e "\t" $(date) " Kafka Connect listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) " (waiting for 200)" sleep 10 done echo -e $(date) "\n\n--------------\n\o/ Kafka Connect is ready! Listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) "\n--------------\n" docker-compose logs -f kafka-connect-01 '
-
Get a MySQL prompt
docker-compose exec mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
-
Get a Postgres prompt
docker-compose exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB'
-
Open browsers
-
Open KSQL prompt
docker-compose exec ksql-cli bash -c 'echo -e "\n\n⏳ Waiting for KSQL to be available before launching CLI\n"; while [ $(curl -s -o /dev/null -w %{http_code} http://ksql-server:8088/) -eq 000 ] ; do echo -e $(date) "KSQL Server HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://ksql-server:8088/) " (waiting for 200)" ; sleep 5 ; done; ksql http://ksql-server:8088'
A MySQL database stores orders placed by an application.
-
Kafka Connect - Debezium MySQL connector streams them into Kafka topic
-
Kafka Connect - Elasticsearch connector to stream orders from Kafka topic to Elasticsearch
SELECT * FROM ORDERS ORDER BY CREATE_TS DESC LIMIT 1\G
Trigger the MySQL data generator with:
docker-compose exec mysql /data/02_populate_more_orders.sh
Look at the new rows!
watch -n 2 -x docker-compose exec mysql bash -c 'echo "SELECT * FROM ORDERS ORDER BY CREATE_TS DESC LIMIT 1 \G" | mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
curl -i -X POST -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/ \
-d '{
"name": "source-debezium-orders-00",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "42",
"database.server.name": "asgard",
"table.whitelist": "demo.orders",
"database.history.kafka.bootstrap.servers": "kafka:29092",
"database.history.kafka.topic": "dbhistory.demo" ,
"decimal.handling.mode": "double",
"include.schema.changes": "true",
"transforms": "unwrap,addTopicPrefix",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.addTopicPrefix.regex":"(.*)",
"transforms.addTopicPrefix.replacement":"mysql-debezium-$1"
}
}'
Check the status of the connector
curl -s "http://localhost:8083/connectors"| jq '.[]'| xargs -I{connector_name} curl -s "http://localhost:8083/connectors/"{connector_name}"/status"| jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| column -s : -t| sed 's/\"//g'| sort
Output should be
source-debezium-orders-00 | RUNNING | RUNNING
View the topic in Confluent Control Center or from the CLI:
docker-compose exec -T kafka-connect-01 \
kafka-avro-console-consumer \
--bootstrap-server kafka:29092 \
--property schema.registry.url=http://schema-registry:8081 \
--topic mysql-debezium-asgard.demo.ORDERS | jq '.'
Show Kafka Consumer and MySQL side by side.
curl -i -X POST -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/ \
-d '{
"name": "sink-elastic-orders-00",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"topics": "mysql-debezium-asgard.demo.ORDERS",
"connection.url": "http://elasticsearch:9200",
"type.name": "type.name=kafkaconnect",
"key.ignore": "true",
"schema.ignore": "true"
}
}'
Force Kibana to refresh its field list:
curl -s 'http://localhost:5601/api/saved_objects/_bulk_get' \
-H 'kbn-xsrf: nevergonnagiveyouup' \
-H 'Content-Type: application/json' \
-d '[{"id":"mysql-debezium-asgard.demo.orders","type":"index-pattern"}]'
Inspect the data in Kibana or from CLI:
echo '{ "size": 1, "sort": [ { "CREATE_TS": { "order": "desc" } } ] }' | \
http http://localhost:9200/mysql-debezium-asgard.demo.orders/_search | \
jq '.hits[]'
or
curl -s http://localhost:9200/mysql-debezium-asgard.demo.orders/_search \
-H 'content-type: application/json' \
-d '{ "size": 1, "sort": [ { "CREATE_TS": { "order": "desc" } } ] }' |\
jq '.'
Note
|
If you want to set the Elasticsearch document id to match the key of the source database record use the following: "key.ignore": "true",
…
"transforms": "extractKey",
"transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field":"id" |
curl -i -X POST -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/ \
-d '{
"name": "sink-neo4j-orders-00",
"config": {
"connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
"topics": "mysql-debezium-asgard.demo.ORDERS",
"neo4j.server.uri": "bolt://neo4j:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "connect",
"neo4j.topic.cypher.mysql-debezium-asgard.demo.ORDERS": "MERGE (city:city{city: event.delivery_city}) MERGE (customer:customer{id: event.customer_id, delivery_address: event.delivery_address, delivery_city: event.delivery_city, delivery_company: event.delivery_company}) MERGE (vehicle:vehicle{make: event.make, model:event.model}) MERGE (city)<-[:LIVES_IN]-(customer)-[:BOUGHT{order_total_usd:event.order_total_usd,order_id:event.order_id}]->(vehicle)"
}
} '
View in Neo4j browser
Some sinks will require a schema. An example of this is the JDBC Sink.
-
Create source data, serialised in varying ways:
-
JsonConverter, `schemas.enable=false
- a.k.a. throw away your schemas, I want to make life difficult for anyone using this data ;-)curl -i -X POST -H "Accept:application/json" \ -H "Content-Type:application/json" http://localhost:8083/connectors/ \ -d '{ "name": "source-debezium-orders-01", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "43", "database.server.name": "asgard", "table.whitelist": "demo.orders", "database.history.kafka.bootstrap.servers": "kafka:29092", "database.history.kafka.topic": "dbhistory.demo" , "decimal.handling.mode": "double", "include.schema.changes": "true", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "transforms": "unwrap,addTopicPrefix", "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope", "transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter", "transforms.addTopicPrefix.regex":"(.*)", "transforms.addTopicPrefix.replacement":"mysql-debezium-json-no-schema-$1" } }'
-
-
Create a JDBC sink
-
Reading JSON data with no schema data (and
schemas.enable=false
, as is correct)curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "sink_postgres_00_json", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "topics": "mysql-debezium-json-no-schema-asgard.demo.ORDERS", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "connection.url": "jdbc:postgresql://postgres:5432/", "connection.user": "postgres", "connection.password": "postgres", "auto.create": "true", "auto.evolve":"true", "pk.mode":"none", "table.name.format": "sink_postgres_00_json" } }'
-
-
The sinks will be
FAILED
:curl -s "http://localhost:8083/connectors"| jq '.[]'| xargs -I{connector_name} curl -s "http://localhost:8083/connectors/"{connector_name}"/status"| jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| column -s : -t| sed 's/\"//g'| sort | grep sink
sink-neo4j-orders-00 | RUNNING | RUNNING sink_postgres_00_json | RUNNING | FAILED
-
Sink 00 (Reading JSON data with no schema data (and
schemas.enable=false
, as is correct)):curl -s "http://localhost:8083/connectors/sink_postgres_00_json/status" | \ jq '.tasks[0].trace'
[...] org.apache.kafka.connect.errors.ConnectException: No fields found using key and value schemas for table: sink_postgres_00_json [...]
-
-
Solutions? For both sinks we have no schema data.
-
In the case of sink_00 we admit the fact (
schemas.enable=false
) and Kafka Connect complains that there is no schema.So what so do? Serialise the data with a schema. Either change the way the data is produced to include a schema (e.g. Avro, or with
schemas.enable=true
persource-debezium-orders-02
), OR use stream processing to apply a schema and reserialise the data.
-
-
To apply a schema and reserialise the data for consumption by Kafka Connect, you can use KSQL:
docker-compose exec ksql-cli bash -c 'echo -e "\n\n⏳ Waiting for KSQL to be available before launching CLI\n"; while [ $(curl -s -o /dev/null -w %{http_code} http://ksql-server:8088/) -eq 000 ] ; do echo -e $(date) "KSQL Server HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://ksql-server:8088/) " (waiting for 200)" ; sleep 5 ; done; ksql http://ksql-server:8088'
CREATE STREAM ORDERS_JSON ( id INT, order_id INT, customer_id INT, order_total_usd DOUBLE, make VARCHAR, model VARCHAR, delivery_city VARCHAR, delivery_company VARCHAR, delivery_address VARCHAR, CREATE_TS VARCHAR, UPDATE_TS VARCHAR ) WITH ( KAFKA_TOPIC='mysql-debezium-json-no-schema-asgard.demo.ORDERS', VALUE_FORMAT='JSON' ); SET 'auto.offset.reset' = 'earliest'; CREATE STREAM ORDERS_AVRO WITH ( VALUE_FORMAT='AVRO', KAFKA_TOPIC='asgard.demo.ORDERS-avro' ) AS SELECT * FROM ORDERS_JSON;
-
Stream the reserialised and schema-enriched data:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "sink_postgres_03_avro", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "topics": "asgard.demo.ORDERS-avro", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://schema-registry:8081", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "connection.url": "jdbc:postgresql://postgres:5432/", "connection.user": "postgres", "connection.password": "postgres", "auto.create": "true", "auto.evolve":"true", "pk.mode":"none", "table.name.format": "sink_postgres_03_avro" } }'
docker-compose exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB'
\x SELECT * FROM SINK_POSTGRES_03_AVRO ORDER BY "CREATE_TS" DESC LIMIT 1;
Some sinks will require a schema. An example of this is the JDBC Sink.
-
Create source data, serialised in varying ways:
-
JsonConverter, `schemas.enable=false
- a.k.a. throw away your schemas, I want to make life difficult for anyone using this data ;-)curl -i -X POST -H "Accept:application/json" \ -H "Content-Type:application/json" http://localhost:8083/connectors/ \ -d '{ "name": "source-debezium-orders-01", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "43", "database.server.name": "asgard", "table.whitelist": "demo.orders", "database.history.kafka.bootstrap.servers": "kafka:29092", "database.history.kafka.topic": "dbhistory.demo" , "decimal.handling.mode": "double", "include.schema.changes": "true", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "transforms": "unwrap,addTopicPrefix", "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope", "transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter", "transforms.addTopicPrefix.regex":"(.*)", "transforms.addTopicPrefix.replacement":"mysql-debezium-json-no-schema-$1" } }'
-
JsonConverter, `schemas.enable=true
- a.k.a. I want to keep my schemas (but don’t care about bloated message size)curl -i -X POST -H "Accept:application/json" \ -H "Content-Type:application/json" http://localhost:8083/connectors/ \ -d '{ "name": "source-debezium-orders-02", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "44", "database.server.name": "asgard", "table.whitelist": "demo.orders", "database.history.kafka.bootstrap.servers": "kafka:29092", "database.history.kafka.topic": "dbhistory.demo" , "decimal.handling.mode": "double", "include.schema.changes": "true", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "true", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "true", "transforms": "unwrap,addTopicPrefix", "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope", "transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter", "transforms.addTopicPrefix.regex":"(.*)", "transforms.addTopicPrefix.replacement":"mysql-debezium-json-with-schema-$1" } }'
-
-
Create a JDBC sink
-
Reading JSON data with no schema data (and
schemas.enable=false
, as is correct)curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "sink_postgres_00_json", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "topics": "mysql-debezium-json-no-schema-asgard.demo.ORDERS", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "connection.url": "jdbc:postgresql://postgres:5432/", "connection.user": "postgres", "connection.password": "postgres", "auto.create": "true", "auto.evolve":"true", "pk.mode":"none", "table.name.format": "sink_postgres_00_json" } }'
-
Reading JSON data with schema data (and
schemas.enable=true
, as is correct)curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "sink_postgres_01", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "topics": "mysql-debezium-json-with-schema-asgard.demo.ORDERS", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "true", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "true", "connection.url": "jdbc:postgresql://postgres:5432/", "connection.user": "postgres", "connection.password": "postgres", "auto.create": "true", "auto.evolve":"true", "pk.mode":"none", "errors.tolerance": "all", "table.name.format": "sink_postgres_01" } }'
-
Reading JSON data with no schema data (and
schemas.enable=true
, incorrectly)curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "sink_postgres_02", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "topics": "mysql-debezium-json-no-schema-asgard.demo.ORDERS", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "true", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "true", "connection.url": "jdbc:postgresql://postgres:5432/", "connection.user": "postgres", "connection.password": "postgres", "auto.create": "true", "auto.evolve":"true", "pk.mode":"none", "table.name.format": "sink_postgres_01" } }'
-
-
Two of the three sinks will be
FAILED
:curl -s "http://localhost:8083/connectors"| jq '.[]'| xargs -I{connector_name} curl -s "http://localhost:8083/connectors/"{connector_name}"/status"| jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| column -s : -t| sed 's/\"//g'| sort | grep sink
sink-neo4j-orders-00 | RUNNING | RUNNING sink_postgres_00_json | RUNNING | FAILED sink_postgres_01 | RUNNING | RUNNING sink_postgres_02 | RUNNING | FAILED
-
Sink 00 (Reading JSON data with no schema data (and
schemas.enable=false
, as is correct)):curl -s "http://localhost:8083/connectors/sink_postgres_00_json/status" | \ jq '.tasks[0].trace'
[...] org.apache.kafka.connect.errors.ConnectException: No fields found using key and value schemas for table: sink_postgres_00_json [...]
-
Sink 02 (Reading JSON data with no schema data (and
schemas.enable=true
, incorrectly))org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration
-
-
Solutions? For both sinks we have no schema data.
-
In the case of sink_00 we admit the fact (
schemas.enable=false
) and Kafka Connect complains that there is no schema. -
In the case of sink_02 we pretend that there is a schema (
schemas.enable=true
) and Kafka Connect spots our ruse and tells us that the JSON we’ve given it does not match that required (schema
/payload
as top-level elements)So what so do? Serialise the data with a schema. Either change the way the data is produced to include a schema (e.g. Avro, or with
schemas.enable=true
persource-debezium-orders-02
), OR use stream processing to apply a schema and reserialise the data.
-
-
To apply a schema and reserialise the data for consumption by Kafka Connect, you can use KSQL:
docker-compose exec ksql-cli bash -c 'echo -e "\n\n⏳ Waiting for KSQL to be available before launching CLI\n"; while [ $(curl -s -o /dev/null -w %{http_code} http://ksql-server:8088/) -eq 000 ] ; do echo -e $(date) "KSQL Server HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://ksql-server:8088/) " (waiting for 200)" ; sleep 5 ; done; ksql http://ksql-server:8088'
CREATE STREAM ORDERS_JSON ( id INT, order_id INT, customer_id INT, order_total_usd DOUBLE, make VARCHAR, model VARCHAR, delivery_city VARCHAR, delivery_company VARCHAR, delivery_address VARCHAR, CREATE_TS VARCHAR, UPDATE_TS VARCHAR ) WITH ( KAFKA_TOPIC='mysql-debezium-json-no-schema-asgard.demo.ORDERS', VALUE_FORMAT='JSON' ); SET 'auto.offset.reset' = 'earliest'; CREATE STREAM ORDERS_AVRO WITH ( VALUE_FORMAT='AVRO', KAFKA_TOPIC='asgard.demo.ORDERS-avro' ) AS SELECT * FROM ORDERS_JSON;
-
Stream the reserialised and schema-enriched data:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "sink_postgres_03_avro", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "topics": "asgard.demo.ORDERS-avro", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://schema-registry:8081", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "connection.url": "jdbc:postgresql://postgres:5432/", "connection.user": "postgres", "connection.password": "postgres", "auto.create": "true", "auto.evolve":"true", "pk.mode":"none", "table.name.format": "sink_postgres_03_avro" } }'
docker-compose exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB'
select * from sink_postgres_03_avro ORDER BY CREATE_TS DESC LIMIT 2;
Single Message Transforms can be used to apply transformations including:
-
Change the topic name (n.b. often used by sinks to define the target object name)
-
Dropping fields
-
Renaming fields
-
Renaming the topic
Here the example is on a sink connector but SMT are equally applicable to source connectors too.
-
Remove the key from its struct
{"id":41739}
becomes
41739
-
Remove part of the topic name
mysql-debezium-asgard.demo.ORDERS
becomes
asgard.demo.ORDERS
-
Append a timestamp to the topic name (useful for time-based indices in Elasticsearch etc)
asgard.demo.ORDERS
becomes
asgard.demo.ORDERS-201905
-
Rename a field
delivery_address
becomes
shipping_address
-
Drop a field
CREATE_TS
both get omitted from the target data
curl -i -X POST -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/ \
-d '{
"name": "sink-elastic-orders-01",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"topics": "mysql-debezium-asgard.demo.ORDERS",
"key.ignore": "false",
"schema.ignore": "true",
"type.name": "type.name=kafkaconnect",
"connection.url": "http://elasticsearch:9200",
"transforms": "dropTopicPrefix,extractKey,addDateToTopic,renameField,dropField",
"transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field":"id",
"transforms.dropTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropTopicPrefix.regex":"mysql-debezium-(.*)",
"transforms.dropTopicPrefix.replacement":"$1",
"transforms.addDateToTopic.type": "org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.addDateToTopic.topic.format": "${topic}-${timestamp}",
"transforms.addDateToTopic.timestamp.format": "YYYYMM",
"transforms.renameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.renameField.renames": "delivery_address:shipping_address",
"transforms.dropField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.dropField.blacklist": "CREATE_TS"
}
}'
Inspect the data in Elasticsearch:
curl -s http://localhost:9200/_cat/indices
green open .kibana_task_manager AhFACVWpRby6kZwYFwM68w 1 0 2 0 12.5kb 12.5kb
green open .kibana_1 xTC-RMxZSj-KcF22zmEoZA 1 0 5 0 22.9kb 22.9kb
yellow open asgard.demo.orders-201905 qzMvZH8DQWKkLjr1yFB-Bw 5 1 3338 0 1.3mb 1.3mb
yellow open mysql-debezium-asgard.demo.orders l5dwQAfjRkWfhTP7EZRFrw 5 1 0 0 1.2kb 1.2kb
echo '{ "size": 1, "sort": [ { "UPDATE_TS": { "order": "desc" } } ] }' |\
http http://localhost:9200/asgard.demo.orders-201905/_search
or
curl -s http://localhost:9200/asgard.demo.orders-201905/_search \
-H 'content-type: application/json' \
-d '{ "size": 1, "sort": [ { "UPDATE_TS": { "order": "desc" } } ] }' |\
jq '.'
Check the status of the connector
curl -s "http://localhost:8083/connectors"| jq '.[]'| xargs -I{connector_name} curl -s "http://localhost:8083/connectors/"{connector_name}"/status"| jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| column -s : -t| sed 's/\"//g'| sort
Output should be similar to
sink-elastic-orders-01 | RUNNING | RUNNING
source-debezium-orders-00 | RUNNING | RUNNING
Force a failure:
$ docker-compose stop mysql
Stopping mysql ... done
Check the status of the connector again
curl -s "http://localhost:8083/connectors"| jq '.[]'| xargs -I{connector_name} curl -s "http://localhost:8083/connectors/"{connector_name}"/status"| jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| column -s : -t| sed 's/\"//g'| sort
Output should be similar to
sink-elastic-orders-01 | RUNNING | RUNNING
source-debezium-orders-00 | RUNNING | FAILED
Now let’s see why. We could use the REST API, which may or may not give a useful trace:
curl -s "http://localhost:8083/connectors/source-debezium-orders-00/status" | \
jq '.tasks[0].trace'
"org.apache.kafka.connect.errors.ConnectException\n\tat io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)\n\tat io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:197)\n\tat io.debezium.connector.mysql.BinlogReader$ReaderThreadLifecycleListener.onCommunicationFailure(BinlogReader.java:1018)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:950)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:580)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:825)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.io.EOFException\n\tat com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:190)\n\tat com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.readInteger(ByteArrayInputStream.java:46)\n\tat com.github.shyiko.mysql.binlog.event.deserialization.EventHeaderV4Deserializer.deserialize(EventHeaderV4Deserializer.java:35)\n\tat com.github.shyiko.mysql.binlog.event.deserialization.EventHeaderV4Deserializer.deserialize(EventHeaderV4Deserializer.java:27)\n\tat com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:212)\n\tat io.debezium.connector.mysql.BinlogReader$1.nextEvent(BinlogReader.java:224)\n\tat com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:922)\n\t... 3 more\n"
(it’s useful, but not so readable)
Best is to crack open the Kafka Connect worker log
docker-compose logs -f kafka-connect
Then search from the end of it in reverse (I use GNU Screen to make this very easy) and look for ERROR
-
First reverse hit will be the task dying
Task is being killed and will not recover until manually restarted
-
Second reverse hit will be the cause of the task dying, often a stack trace that you’ll need to pick through
org.apache.kafka.connect.errors.ConnectException … Caused by: java.io.EOFException at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:190)
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_postgres_02",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://postgres:5432/",
"connection.user": "postgres",
"connection.password": "postgres",
"topic.prefix": "postgres-00-",
"poll.interval.ms": 1000,
"mode":"timestamp",
"tasks.max":2,
"table.whitelist" : "test1,test2",
"timestamp.column.name": "create_ts",
"validate.non.null": false
}
}'
$ curl -s "http://localhost:8083/connectors/jdbc_source_postgres_02/status" | \
jq '.tasks[]'
{
"id": 0,
"state": "RUNNING",
"worker_id": "kafka-connect-01:8083"
}
{
"id": 1,
"state": "RUNNING",
"worker_id": "kafka-connect-02:8083"
}
docker-compose stop kafka-connect-02
$ curl -s "http://localhost:8083/connectors/jdbc_source_postgres_02/status" | \
jq '.tasks[]'
{
"id": 0,
"state": "RUNNING",
"worker_id": "kafka-connect-01:8083"
}
{
"id": 1,
"state": "RUNNING",
"worker_id": "kafka-connect-01:8083"
}