🎥 Check out the video tutorial here: https://rmoff.dev/kafka-elasticsearch-video
This demo uses Docker and Docker Compose to provision the stack, but all you actually need for getting data from Kafka to Elasticsearch is Apache Kafka and the Kafka Connect Elasticsearch Sink connector. It also uses ksqlDB as an easy interface for producing/consuming from Kafka topics, and creating Kafka Connect connectors - but you don’t have to use it in order to use Kafka Connect.
-
Bring the Docker Compose up
docker-compose up -d
-
Make sure everything is up and running
➜ docker-compose ps Name Command State Ports -------------------------------------------------------------------------------------------------- broker /etc/confluent/docker/run Up 0.0.0.0:9092->9092/tcp elasticsearch /usr/local/bin/docker-entr ... Up 0.0.0.0:9200->9200/tcp, 9300/tcp kafka-connect bash -c echo "Installing c ... Up (healthy) 0.0.0.0:8083->8083/tcp, 9092/tcp kafkacat /bin/sh -c apk add jq; Up wh ... kibana /usr/local/bin/dumb-init - ... Up 0.0.0.0:5601->5601/tcp ksqldb /usr/bin/docker/run Up 0.0.0.0:8088->8088/tcp schema-registry /etc/confluent/docker/run Up 0.0.0.0:8081->8081/tcp zookeeper /etc/confluent/docker/run Up 2181/tcp, 2888/tcp, 3888/tcp
Wait for ksqlDB and Kafka Connect
echo -e "\n\n=============\nWaiting for Kafka Connect to start listening on localhost ⏳\n=============\n" while [ $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) -ne 200 ] ; do echo -e "\t" $(date) " Kafka Connect listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) " (waiting for 200)" sleep 5 done echo -e $(date) "\n\n--------------\n\o/ Kafka Connect is ready! \n--------------\n" docker exec -it ksqldb bash -c 'echo -e "\n\n⏳ Waiting for ksqlDB to be available before launching CLI\n"; while : ; do curl_status=$(curl -s -o /dev/null -w %{http_code} http://ksqldb:8088/info) ; echo -e $(date) " ksqlDB server listener HTTP state: " $curl_status " (waiting for 200)" ; if [ $curl_status -eq 200 ] ; then break ; fi ; sleep 5 ; done ; ksql http://ksqldb:8088'
-
Create test topic + data using ksqlDB (but it’s still just a Kafka topic under the covers)
docker exec -it ksqldb ksql http://ksqldb:8088
CREATE STREAM TEST01 (COL1 INT, COL2 VARCHAR) WITH (KAFKA_TOPIC='test01', PARTITIONS=1, VALUE_FORMAT='AVRO');
INSERT INTO TEST01 (ROWKEY, COL1, COL2) VALUES ('X',1,'FOO'); INSERT INTO TEST01 (ROWKEY, COL1, COL2) VALUES ('Y',2,'BAR');
SHOW TOPICS; PRINT test01 FROM BEGINNING LIMIT 2;
Kafka Topic | Partitions | Partition Replicas ------------------------------------------------------------------------- confluent_rmoff_01ksql_processing_log | 1 | 1 test01 | 1 | 1 ------------------------------------------------------------------------- Key format: KAFKA_STRING Value format: AVRO or KAFKA_STRING rowtime: 5/4/20 10:02:51 AM UTC, key: X, value: {"COL1": 1, "COL2": "FOO"} rowtime: 5/4/20 10:02:51 AM UTC, key: Y, value: {"COL1": 2, "COL2": "BAR"} Topic printing ceased ksql>
-
Stream the data to Elasticsearch with Kafka Connect
I’m using ksqlDB to create the connector but you can use the Kafka Connect REST API directly if you want to. Kafka Connect is part of Apache Kafka and you don’t have to use ksqlDB to use Kafka Connect.
CREATE SINK CONNECTOR SINK_ELASTIC_TEST_01 WITH ( 'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector', 'connection.url' = 'http://elasticsearch:9200', 'key.converter' = 'org.apache.kafka.connect.storage.StringConverter', 'type.name' = '_doc', 'topics' = 'test01', 'key.ignore' = 'true', 'schema.ignore' = 'false' );
-
Check the data in Elasticsearch
curl -s http://localhost:9200/test01/_search \ -H 'content-type: application/json' \ -d '{ "size": 42 }' | jq -c '.hits.hits[]'
{"_index":"test01","_type":"_doc","_id":"test01+0+1","_score":1,"_source":{"COL1":2,"COL2":"BAR"}} {"_index":"test01","_type":"_doc","_id":"test01+0+0","_score":1,"_source":{"COL1":1,"COL2":"FOO"}}
Check the mapping
curl -s http://localhost:9200/test01/_mapping | jq '.'
{ "test01": { "mappings": { "properties": { "COL1": { "type": "integer" }, "COL2": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } } } } } }
-
But where did our
ROWKEY
go? And what happens if we insert new data against the same key and a new one?-- New key ('Z') INSERT INTO TEST01 (ROWKEY, COL1, COL2) VALUES ('Z',1,'WOO'); -- New value for existing key ('Y') INSERT INTO TEST01 (ROWKEY, COL1, COL2) VALUES ('Y',4,'PFF');
Elasticsearch:
curl -s http://localhost:9200/test01/_search \ -H 'content-type: application/json' \ -d '{ "size": 42 }' | jq -c '.hits.hits[]'
{"_index":"test01","_type":"_doc","_id":"test01+0+1","_score":1,"_source":{"COL1":2,"COL2":"BAR"}} {"_index":"test01","_type":"_doc","_id":"test01+0+0","_score":1,"_source":{"COL1":1,"COL2":"FOO"}} {"_index":"test01","_type":"_doc","_id":"test01+0+3","_score":1,"_source":{"COL1":4,"COL2":"PFF"}} {"_index":"test01","_type":"_doc","_id":"test01+0+2","_score":1,"_source":{"COL1":1,"COL2":"WOO"}}
Note that the
_id
is made up of<topic><partition><offset>
, which we can prove with kafkacat:docker exec kafkacat kafkacat \ -b broker:29092 \ -r http://schema-registry:8081 -s key=s -s value=avro \ -C -o beginning -e -q \ -t test01 \ -f 'Topic+Partition+Offset: %t+%p+%o\tKey: %k\tValue: %s\n'
Topic+Partition+Offset: test01+0+0 Key: X Value: {"COL1": {"int": 1}, "COL2": {"string": "FOO"}} Topic+Partition+Offset: test01+0+1 Key: Y Value: {"COL1": {"int": 2}, "COL2": {"string": "BAR"}} Topic+Partition+Offset: test01+0+2 Key: Z Value: {"COL1": {"int": 1}, "COL2": {"string": "WOO"}} Topic+Partition+Offset: test01+0+3 Key: Y Value: {"COL1": {"int": 4}, "COL2": {"string": "PFF"}}
-
Let’s recreate the connector and use the Kafka message key as the document ID to enable updates & deletes against existing documents.
-
ksqlDB - drop the connector
-
DROP CONNECTOR SINK_ELASTIC_TEST_01;
-
-
bash - delete the existing index in Elasticsearch (drop the connector first otherwise you’ll see the index get recreated)
-
docker exec elasticsearch curl -s -XDELETE "http://localhost:9200/test01"
-
-
In ksqlDB create the connector as before but with
key.ignore=false
.NoteThe connector is given a new name. If you give it the same as before then Kafka Connect will assume it’s the same connector and not re-send any of the existing records. CREATE SINK CONNECTOR SINK_ELASTIC_TEST_02 WITH ( 'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector', 'connection.url' = 'http://elasticsearch:9200', 'key.converter' = 'org.apache.kafka.connect.storage.StringConverter', 'type.name' = '_doc', 'topics' = 'test01', 'key.ignore' = 'false', 'schema.ignore' = 'false' );
Check the new data in Elasticsearch:
curl -s http://localhost:9200/test01/_search \ -H 'content-type: application/json' \ -d '{ "size": 42 }' | jq -c '.hits.hits[]'
{"_index":"test01","_type":"_doc","_id":"X","_score":1,"_source":{"COL1":1,"COL2":"FOO"}} {"_index":"test01","_type":"_doc","_id":"Y","_score":1,"_source":{"COL1":4,"COL2":"PFF"}} {"_index":"test01","_type":"_doc","_id":"Z","_score":1,"_source":{"COL1":1,"COL2":"WOO"}}
Note that
_id
now maps the key of the Kafka message, and that the value for message key/document idY
has been updated in place. Here’s the data in the Kafka topic in ksqlDB:ksql> SET 'auto.offset.reset' = 'earliest'; ksql> SELECT ROWKEY, COL1, COL2 FROM TEST01 EMIT CHANGES LIMIT 4;
+-------+------+-----+ |ROWKEY |COL1 |COL2 | +-------+------+-----+ |X |1 |FOO | |Y |2 |BAR | |Z |1 |WOO | |Y |4 |PFF |
-
What about deletes? We can do those too, using tombstone (null value) messages. By default the connector will ignore these but there’s an option to process them as deletes - behavior.on.null.values
.
-
ksqlDB - drop the connector
DROP CONNECTOR SINK_ELASTIC_TEST_02;
-
bash - delete the existing index in Elasticsearch (drop the connector first otherwise you’ll see the index get recreated)
docker exec elasticsearch curl -s -XDELETE "http://localhost:9200/test01"
In ksqlDB create the connector as before but with behavior.on.null.values=delete
.
Note
|
The connector is given a new name. If you give it the same as before then Kafka Connect will assume it’s the same connector and not re-send any of the existing records. |
CREATE SINK CONNECTOR SINK_ELASTIC_TEST_03 WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url' = 'http://elasticsearch:9200',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'type.name' = '_doc',
'topics' = 'test01',
'key.ignore' = 'false',
'schema.ignore' = 'false',
'behavior.on.null.values' = 'delete'
);
Remind ourselves of source data in ksqlDB:
PRINT test01 FROM BEGINNING;
rowtime: 4/30/20 4:24:12 PM UTC, key: X, value: {"COL1": 1, "COL2": "FOO"}
rowtime: 4/30/20 4:24:12 PM UTC, key: Y, value: {"COL1": 2, "COL2": "BAR"}
rowtime: 4/30/20 4:24:19 PM UTC, key: Z, value: {"COL1": 1, "COL2": "WOO"}
rowtime: 4/30/20 4:24:19 PM UTC, key: Y, value: {"COL1": 4, "COL2": "PFF"}
Current Elasticsearch state:
curl -s http://localhost:9200/test01/_search \
-H 'content-type: application/json' \
-d '{ "size": 42 }' | jq -c '.hits.hits[]'
{"_index":"test01","_type":"_doc","_id":"X","_score":1,"_source":{"COL1":1,"COL2":"FOO"}}
{"_index":"test01","_type":"_doc","_id":"Y","_score":1,"_source":{"COL1":4,"COL2":"PFF"}}
{"_index":"test01","_type":"_doc","_id":"Z","_score":1,"_source":{"COL1":1,"COL2":"WOO"}}
Now send a tombstone message, using kafkacat (-Z
to send empty value as tombstone, -K
to specify key separator):
echo 'Y:' | \
docker exec -i kafkacat kafkacat \
-b broker:29092 \
-P -Z -K: \
-t test01
Check the topic in ksqlDB:
PRINT test01 FROM BEGINNING;
rowtime: 4/30/20 4:24:12 PM UTC, key: X, value: {"COL1": 1, "COL2": "FOO"}
rowtime: 4/30/20 4:24:12 PM UTC, key: Y, value: {"COL1": 2, "COL2": "BAR"}
rowtime: 4/30/20 4:24:19 PM UTC, key: Z, value: {"COL1": 1, "COL2": "WOO"}
rowtime: 4/30/20 4:24:19 PM UTC, key: Y, value: {"COL1": 4, "COL2": "PFF"}
rowtime: 4/30/20 4:27:50 PM UTC, key: Y, value: <null>
Check Elasticsearch to see that document with key Y
has been deleted:
curl -s http://localhost:9200/test01/_search \
-H 'content-type: application/json' \
-d '{ "size": 42 }' | jq -c '.hits.hits[]'
{"_index":"test01","_type":"_doc","_id":"X","_score":1,"_source":{"COL1":1,"COL2":"FOO"}}
{"_index":"test01","_type":"_doc","_id":"Z","_score":1,"_source":{"COL1":1,"COL2":"WOO"}}
-
schemas.ignore=false
means that Kafka Connect will define the index mapping based on the schema of the source data-
If you use this it is mandatory to have a source schema (e.g. Avro, Protobuf, etc — NOT plain JSON)
-
-
schemas.ignore=true
means Kafka Connect will just send the values and let Elasticsearch figure out how to map them using dynamic field mapping and optionally dynamic templates that you define in advance.
Set up some JSON data in a topic:
CREATE STREAM TEST_JSON (COL1 INT, COL2 VARCHAR) WITH (KAFKA_TOPIC='TEST_JSON', PARTITIONS=1, VALUE_FORMAT='JSON');
INSERT INTO TEST_JSON (COL1, COL2) VALUES (1,'FOO');
INSERT INTO TEST_JSON (COL1, COL2) VALUES (2,'BAR');
Try streaming this JSON data to to Elasticsearch
CREATE SINK CONNECTOR SINK_ELASTIC_TEST_JSON_A WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url' = 'http://elasticsearch:9200',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'type.name' = '_doc',
'topics' = 'TEST_JSON',
'key.ignore' = 'true',
'schema.ignore' = 'false'
);
Connector fails. Why?
➜ curl -s http://localhost:8083/connectors/SINK_ELASTIC_TEST_JSON_A/status | jq '.'
{
"name": "SINK_ELASTIC_TEST_JSON_A",
"connector": {
"state": "RUNNING",
"worker_id": "kafka-connect:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "kafka-connect:8083",
"trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:492)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:325)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic TEST_JSON to Avro: \n\tat io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:114)\n\tat org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:492)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)\n\t... 13 more\nCaused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!\n"
}
],
"type": "sink"
}
Error within this is:
org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic TEST_JSON to Avro:
…
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
We’re reading JSON data but using the Avro converter (as specified as the default converter for the worker) in the Docker Compose:
kafka-connect:
image: confluentinc/cp-kafka-connect-base:5.5.0
…
environment:
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
So recreate the connector and specify JSON converter (because we’re reading JSON data from the topic)
DROP CONNECTOR SINK_ELASTIC_TEST_JSON_A;
CREATE SINK CONNECTOR SINK_ELASTIC_TEST_JSON_A WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url' = 'http://elasticsearch:9200',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'value.converter' = 'org.apache.kafka.connect.json.JsonConverter',
'value.converter.schemas.enable' = 'true',
'type.name' = '_doc',
'topics' = 'TEST_JSON',
'key.ignore' = 'true',
'schema.ignore' = 'false'
);
Fails
➜ curl -s http://localhost:8083/connectors/SINK_ELASTIC_TEST_JSON_A/status | jq '.'
{
"name": "SINK_ELASTIC_TEST_JSON_A",
"connector": {
"state": "RUNNING",
"worker_id": "kafka-connect:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "kafka-connect:8083",
"trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:492)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:325)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.\n\tat org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:359)\n\tat org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:492)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)\n\t... 13 more\n"
}
],
"type": "sink"
}
Nested error:
org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
We’re reading JSON data but have told the converter to look for a schema (schemas.enable
) which we don’t have.
Recreate the connector and set the converter to not expect a schema embedded in the JSON data (value.converter.schemas.enable' = 'false'
):
DROP CONNECTOR SINK_ELASTIC_TEST_JSON_A;
CREATE SINK CONNECTOR SINK_ELASTIC_TEST_JSON_A WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url' = 'http://elasticsearch:9200',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'value.converter' = 'org.apache.kafka.connect.json.JsonConverter',
'value.converter.schemas.enable' = 'false',
'type.name' = '_doc',
'topics' = 'TEST_JSON',
'key.ignore' = 'true',
'schema.ignore' = 'false'
);
Connector fails
➜ curl -s http://localhost:8083/connectors/SINK_ELASTIC_TEST_JSON_A/status | jq '.'
{
"name": "SINK_ELASTIC_TEST_JSON_A",
"connector": {
"state": "RUNNING",
"worker_id": "kafka-connect:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "kafka-connect:8083",
"trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:568)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.DataException: Cannot infer mapping without schema.\n\tat io.confluent.connect.elasticsearch.Mapping.inferMapping(Mapping.java:84)\n\tat io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.createMapping(JestElasticsearchClient.java:391)\n\tat io.confluent.connect.elasticsearch.Mapping.createMapping(Mapping.java:66)\n\tat io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:265)\n\tat io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:174)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)\n\t... 10 more\n"
}
],
"type": "sink"
}
Nested error:
org.apache.kafka.connect.errors.DataException: Cannot infer mapping without schema.
The connector is being told that we will supply a schema with the data that will be used to create the Elasticsearch mapping:
'schema.ignore' = 'false'
BUT we do not have a declared schema in the data.
DROP CONNECTOR SINK_ELASTIC_TEST_JSON_A;
CREATE SINK CONNECTOR SINK_ELASTIC_TEST_JSON_A WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url' = 'http://elasticsearch:9200',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'value.converter' = 'org.apache.kafka.connect.json.JsonConverter',
'value.converter.schemas.enable' = 'false',
'type.name' = '_doc',
'topics' = 'TEST_JSON',
'key.ignore' = 'true',
'schema.ignore' = 'true'
);
➜ curl -s http://localhost:8083/connectors/SINK_ELASTIC_TEST_JSON_A/status | jq '.'
{
"name": "SINK_ELASTIC_TEST_JSON_A",
"connector": {
"state": "RUNNING",
"worker_id": "kafka-connect:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "kafka-connect:8083"
}
],
"type": "sink"
}
Data is in Elasticsearch:
➜ curl -s http://localhost:9200/test_json/_search \
-H 'content-type: application/json' \
-d '{ "size": 42 }' | jq -c '.hits.hits[]'
{"_index":"test_json","_type":"_doc","_id":"TEST_JSON+0+0","_score":1,"_source":{"COL2":"FOO","COL1":1}}
{"_index":"test_json","_type":"_doc","_id":"TEST_JSON+0+1","_score":1,"_source":{"COL2":"BAR","COL1":2}}
CREATE STREAM TEST02 (COL1 INT, ORDER_TS_EPOCH BIGINT, SHIP_TS_STR VARCHAR)
WITH (KAFKA_TOPIC='test02', PARTITIONS=1, VALUE_FORMAT='AVRO');
INSERT INTO TEST02 (ROWKEY, COL1, ORDER_TS_EPOCH, SHIP_TS_STR)
VALUES ('MY_KEY__X',
1,
STRINGTOTIMESTAMP('2020-02-17T15:22:00Z','yyyy-MM-dd''T''HH:mm:ssX'),
'2020-02-17T15:22:00Z');
INSERT INTO TEST02 (ROWKEY, COL1, ORDER_TS_EPOCH, SHIP_TS_STR)
VALUES ('MY_KEY__Y',
1,
STRINGTOTIMESTAMP('2020-02-17T15:26:00Z','yyyy-MM-dd''T''HH:mm:ssX'),
'2020-02-17T15:26:00Z');
PRINT test02 FROM BEGINNING;
Key format: HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: AVRO
rowtime: 5/4/20 10:24:46 AM UTC, key: [M@6439948753387347800/-], value: {"COL1": 1, "ORDER_TS_EPOCH": 1581952920000, "SHIP_TS_STR": "2020-02-17T15:22:00Z"}
rowtime: 5/4/20 10:24:47 AM UTC, key: [M@6439948753387347801/-], value: {"COL1": 1, "ORDER_TS_EPOCH": 1581953160000, "SHIP_TS_STR": "2020-02-17T15:26:00Z"}
CREATE SINK CONNECTOR SINK_ELASTIC_TEST_02_A WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url' = 'http://elasticsearch:9200',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'value.converter'= 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url'= 'http://schema-registry:8081',
'type.name' = '_doc',
'topics' = 'test02',
'key.ignore' = 'false',
'schema.ignore' = 'false'
);
Check we’ve got data:
curl -s http://localhost:9200/test02/_search \
-H 'content-type: application/json' \
-d '{ "size": 42 }' | jq -c '.hits.hits[]'
{"_index":"test02","_type":"_doc","_id":"MY_KEY__Y","_score":1,"_source":{"COL1":1,"ORDER_TS_EPOCH":1581953160000,"SHIP_TS_STR":"2020-02-17T15:26:00Z"}}
{"_index":"test02","_type":"_doc","_id":"MY_KEY__X","_score":1,"_source":{"COL1":1,"ORDER_TS_EPOCH":1581952920000,"SHIP_TS_STR":"2020-02-17T15:22:00Z"}}
Check the mappings - note neither of the timestamps are date
types
curl -s http://localhost:9200/test02/_mapping | jq '.'
{
"test02": {
"mappings": {
"properties": {
"COL1": {
"type": "integer"
},
"ORDER_TS_EPOCH": {
"type": "long"
},
"SHIP_TS_STR": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}
}
}
Drop the connector
DROP CONNECTOR SINK_ELASTIC_TEST_02_A;
Drop the index
docker exec elasticsearch curl -s -XDELETE "http://localhost:9200/test02"
Ref: dynamic mapping
CREATE SINK CONNECTOR SINK_ELASTIC_TEST_02_B WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url' = 'http://elasticsearch:9200',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'value.converter'= 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url'= 'http://schema-registry:8081',
'type.name' = '_doc',
'topics' = 'test02',
'key.ignore' = 'false',
'schema.ignore' = 'true'
);
Picks up string (SHIP_TS_STR
) because it looks like one, but not the epoch (ORDER_TS_EPOCH
)
curl -s http://localhost:9200/test02/_mapping | jq '.'
{
"test02": {
"mappings": {
"properties": {
"COL1": {
"type": "long"
},
"ORDER_TS_EPOCH": {
"type": "long"
},
"SHIP_TS_STR": {
"type": "date"
}
}
}
}
}
Drop the connector
DROP CONNECTOR SINK_ELASTIC_TEST_02_B;
Drop the index
docker exec elasticsearch curl -s -XDELETE "http://localhost:9200/test02"
CREATE SINK CONNECTOR SINK_ELASTIC_TEST_02_C WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url' = 'http://elasticsearch:9200',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'value.converter'= 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url'= 'http://schema-registry:8081',
'type.name' = '_doc',
'topics' = 'test02',
'key.ignore' = 'false',
'schema.ignore' = 'false',
'transforms' = 'setTimestampType0',
'transforms.setTimestampType0.type' = 'org.apache.kafka.connect.transforms.TimestampConverter$Value',
'transforms.setTimestampType0.field' = 'ORDER_TS_EPOCH',
'transforms.setTimestampType0.target.type' = 'Timestamp'
);
curl -s http://localhost:9200/test02/_mapping | jq '.'
{
"test02": {
"mappings": {
"properties": {
"COL1": {
"type": "integer"
},
"ORDER_TS_EPOCH": {
"type": "date"
},
"SHIP_TS_STR": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}
}
}
Drop the connector
DROP CONNECTOR SINK_ELASTIC_TEST_02_C;
Drop the index
docker exec elasticsearch curl -s -XDELETE "http://localhost:9200/test02"
Create dynamic template
curl -s -XPUT "http://localhost:9200/_template/rmoff/" -H 'Content-Type: application/json' -d'
{
"template": "*",
"mappings": { "dynamic_templates": [ { "dates": { "match": "*_TS_*", "mapping": { "type": "date" } } } ] }
}'
Create the connector
Note
|
schema.ignore is set to true , since we want Elasticsearch to use its dynamic field mapping and thus dynamic templates to determine the mapping types.
|
CREATE SINK CONNECTOR SINK_ELASTIC_TEST_02_D WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url' = 'http://elasticsearch:9200',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'value.converter'= 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url'= 'http://schema-registry:8081',
'type.name' = '_doc',
'topics' = 'test02',
'key.ignore' = 'false',
'schema.ignore' = 'true'
);
curl -s http://localhost:9200/test02/_mapping | jq '.'
{
"test02": {
"mappings": {
"dynamic_templates": [
{
"dates": {
"match": "*_TS_*",
"mapping": {
"type": "date"
}
}
}
],
"properties": {
"COL1": {
"type": "long"
},
"ORDER_TS_EPOCH": {
"type": "date"
},
"SHIP_TS_STR": {
"type": "date"
}
}
}
}
}
Drop connector :
DROP CONNECTOR SINK_ELASTIC_TEST_02_D;
Drop index
docker exec elasticsearch curl -s -XDELETE "http://localhost:9200/test02"
Drop dynamic template
docker exec elasticsearch curl -s -XDELETE "http://localhost:9200/_template/rmoff/"
What about if we want to use the Kafka message’s timestamp? Producer can set this, no point duplicating it in the message value itself.
CREATE SINK CONNECTOR SINK_ELASTIC_TEST_02_E WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url' = 'http://elasticsearch:9200',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'value.converter'= 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url'= 'http://schema-registry:8081',
'type.name' = '_doc',
'topics' = 'test02',
'key.ignore' = 'false',
'schema.ignore' = 'false',
'transforms' = 'ExtractTimestamp',
'transforms.ExtractTimestamp.type' = 'org.apache.kafka.connect.transforms.InsertField$Value',
'transforms.ExtractTimestamp.timestamp.field' = 'MSG_TS'
);
Elasticsearch data:
curl -s http://localhost:9200/test02/_search \
-H 'content-type: application/json' \
-d '{ "size": 42 }' | jq -c '.hits.hits[]'
{"_index":"test02","_type":"_doc","_id":"MY_KEY__X","_score":1,"_source":{"COL1":1,"ORDER_TS_EPOCH":1581952920000,"SHIP_TS_STR":"2020-02-17T15:22:00Z","MSG_TS":1588587886954}}
{"_index":"test02","_type":"_doc","_id":"MY_KEY__Y","_score":1,"_source":{"COL1":1,"ORDER_TS_EPOCH":1581953160000,"SHIP_TS_STR":"2020-02-17T15:26:00Z","MSG_TS":1588587887036}}
Mapping for MSG_TS
is date
but since dynamic mapping is in use and there’s no dynamic template the other two date fields are not seen as date
:
curl -s http://localhost:9200/test02/_mapping | jq '.'
{
"test02": {
"mappings": {
"properties": {
"COL1": {
"type": "integer"
},
"MSG_TS": {
"type": "date"
},
"ORDER_TS_EPOCH": {
"type": "long"
},
"SHIP_TS_STR": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}
}
}
Alternatives include:
-
schema.ignore=false
and SMT to set timestamp types (`org.apache.kafka.connect.transforms.TimestampConverter) -
schema.ignore=true
and use a dynamic template -
schema.ignore=true
and SMT to forceMSG_TS
to string so that Elasticsearch can guess at it correctly - see below
Drop connector
DROP CONNECTOR SINK_ELASTIC_TEST_02_E;
Drop index
docker exec elasticsearch curl -s -XDELETE "http://localhost:9200/test02"
Create connector
CREATE SINK CONNECTOR SINK_ELASTIC_TEST_02_F WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url' = 'http://elasticsearch:9200',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'value.converter'= 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url'= 'http://schema-registry:8081',
'type.name' = '_doc',
'topics' = 'test02',
'key.ignore' = 'false',
'schema.ignore' = 'true',
'transforms' = 'ExtractTimestamp, setTimestampType',
'transforms.ExtractTimestamp.type' = 'org.apache.kafka.connect.transforms.InsertField$Value',
'transforms.ExtractTimestamp.timestamp.field' = 'MSG_TS',
'transforms.setTimestampType.type' = 'org.apache.kafka.connect.transforms.TimestampConverter$Value',
'transforms.setTimestampType.field' = 'MSG_TS',
'transforms.setTimestampType.target.type' = 'string',
'transforms.setTimestampType.format' = 'yyyy-MM-dd\''T\''HH:mm:ssX'
);
curl -s http://localhost:9200/test02/_mapping | jq '.'
{
"test02": {
"mappings": {
"properties": {
"COL1": {
"type": "long"
},
"MSG_TS": {
"type": "date"
},
"ORDER_TS_EPOCH": {
"type": "long"
},
"SHIP_TS_STR": {
"type": "date"
}
}
}
}
}
Index name by default is the topic name, forced to lowercase automagically if necessary:
docker exec elasticsearch curl -s "http://localhost:9200/_cat/indices/*?h=idx,docsCount" |grep -v '^\.'
test02 2
CREATE SINK CONNECTOR SINK_ELASTIC_TEST_04 WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url' = 'http://elasticsearch:9200',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'type.name' = '_doc',
'topics' = 'test02',
'key.ignore' = 'true',
'schema.ignore' = 'true',
'transforms' = 'changeIndexname',
'transforms.changeIndexname.type' = 'org.apache.kafka.connect.transforms.RegexRouter',
'transforms.changeIndexname.regex' = '(.*)02',
'transforms.changeIndexname.replacement' = 'foo-$1'
);
docker exec elasticsearch curl -s "http://localhost:9200/_cat/indices/*?h=idx,docsCount" |grep -v '^\.'
test02 2
foo-test 2
CREATE SINK CONNECTOR SINK_ELASTIC_TEST_05 WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url' = 'http://elasticsearch:9200',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'type.name' = '_doc',
'topics' = 'test02',
'key.ignore' = 'true',
'schema.ignore' = 'true',
'transforms' = 'appendTimestampToIX',
'transforms.appendTimestampToIX.type' = 'org.apache.kafka.connect.transforms.TimestampRouter',
'transforms.appendTimestampToIX.topic.format' = '${topic}-${timestamp}',
'transforms.appendTimestampToIX.timestamp.format' = 'yyyy-MM-dd'
);
docker exec elasticsearch curl -s "http://localhost:9200/_cat/indices/*?h=idx,docsCount" |grep -v '^\.'
test02 2
test02-2020-05-01 2
foo-test 2
CREATE SINK CONNECTOR SINK_ELASTIC_TEST_06 WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url' = 'http://elasticsearch:9200',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'type.name' = '_doc',
'topics' = 'test02',
'key.ignore' = 'true',
'schema.ignore' = 'true',
'transforms' = 'changeIndexname,appendTimestampToIX',
'transforms.changeIndexname.type' = 'org.apache.kafka.connect.transforms.RegexRouter',
'transforms.changeIndexname.regex' = '(.*)02',
'transforms.changeIndexname.replacement' = 'foo-$1',
'transforms.appendTimestampToIX.type' = 'org.apache.kafka.connect.transforms.TimestampRouter',
'transforms.appendTimestampToIX.topic.format' = '${topic}-${timestamp}',
'transforms.appendTimestampToIX.timestamp.format' = 'yyyy-MM-dd'
);
docker exec elasticsearch curl -s "http://localhost:9200/_cat/indices/*?h=idx,docsCount" |grep -v '^\.'
test02 2
test02-2020-05-01 2
foo-test 2
foo-test-2020-05-01 2
Note
|
This section also illustrates working with Kafka Connect using the REST API directly instead of the ksqlDB interface as shown above. |
Write to a topic:
echo '1:{"a":1}' | \ docker exec -i kafkacat kafkacat \ -b broker:29092 \ -P -t test03 -Z -K:
For info you can read from the topic if you want to:
docker exec kafkacat kafkacat \
-b broker:29092 \
-C -o beginning -u -q \
-t test03 \
-f 'Topic+Partition+Offset: %t+%p+%o\tKey: %k\tValue: %s\n'
Create the connector:
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/sink-elastic-test03/config \
-d '{
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"key.converter" : "org.apache.kafka.connect.storage.StringConverter",
"value.converter" : "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable" : "false",
"topics" : "test03",
"connection.url" : "http://elasticsearch:9200",
"type.name" : "_doc",
"key.ignore" : "false",
"schema.ignore" : "true"
}'
Works as designed
curl -s http://localhost:9200/test03/_search \
-H 'content-type: application/json' \
-d '{ "size": 42 }' | jq -c '.hits.hits[]'
{"_index":"test03","_type":"_doc","_id":"1","_score":1,"_source":{"a":1}}
Now send a bad message (malformed JSON)
echo '1:{"fieldnamewithoutclosingquote:1}' | \
docker exec -i kafkacat kafkacat \
-b broker:29092 \
-P -t test03 -Z -K:
Check connector status
curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
column -s : -t| sed 's/\"//g'| sort
sink | sink-elastic-test03 | RUNNING | FAILED | io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
Check error
curl -s http://localhost:8083/connectors/sink-elastic-test03/status | jq -r '.tasks[].trace'
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
…
org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input in field name
at [Source: (byte[])"{"fieldnamewithoutclosingquote:1}"; line: 1, column: 34]
Add error handling
"errors.tolerance" : "all",
"errors.log.enable" : "true"
"errors.log.include.messages" : "true"
This uses a PUT
which creates the config if not there, and updates it if it is. Much easier than delete/create each time.
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/sink-elastic-test03/config \
-d '{
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"key.converter" : "org.apache.kafka.connect.storage.StringConverter",
"value.converter" : "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable" : "false",
"topics" : "test03",
"connection.url" : "http://elasticsearch:9200",
"type.name" : "_doc",
"key.ignore" : "false",
"schema.ignore" : "true",
"errors.tolerance" : "all",
"errors.log.enable" : "true",
"errors.log.include.messages" : "true"
}'
Connector runs:
curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
column -s : -t| sed 's/\"//g'| sort
sink | sink-elastic-test03 | RUNNING | RUNNING | io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
Logs an message for the malformed message:
docker logs kafka-connect
Validate that the pipeline is running by sending a good message
echo '3:{"a":3}' | \
docker exec -i kafkacat kafkacat \
-b broker:29092 \
-P -t test03 -Z -K:
Verify it’s present in Elasticsearch:
curl -s http://localhost:9200/test03/_search \
-H 'content-type: application/json' \
-d '{ "size": 42 }' | jq -c '.hits.hits[]'
{"_index":"test03","_type":"_doc","_id":"1","_score":1,"_source":{"a":1}}
{"_index":"test03","_type":"_doc","_id":"3","_score":1,"_source":{"a":3}}
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/sink-elastic-test03/config \
-d '{
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"key.converter" : "org.apache.kafka.connect.storage.StringConverter",
"value.converter" : "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable" : "false",
"topics" : "test03",
"connection.url" : "http://elasticsearch:9200",
"type.name" : "_doc",
"key.ignore" : "false",
"schema.ignore" : "true",
"errors.tolerance" : "all",
"errors.log.enable" : "true",
"errors.log.include.messages" : "true",
"errors.deadletterqueue.topic.name":"dlq_sink-elastic-test03",
"errors.deadletterqueue.topic.replication.factor": 1,
"errors.deadletterqueue.context.headers.enable":true
}'
Send a badly-formed message
echo '4:{never gonna give you up}' | \
docker exec -i kafkacat kafkacat \
-b broker:29092 \
-P -t test03 -Z -K:
Look at the dead letter queue topic:
docker exec kafkacat kafkacat \
-b broker:29092 \
-C -o beginning -u -q \
-t dlq_sink-elastic-test03 \
-f '%t\tKey: %k\tValue: %s\nHeaders: %h\n'
dlq_sink-elastic-test03 Key: 4 Value: {never gonna give you up}
Headers: __connect.errors.topic=test03,__connect.errors.partition=0,__connect.errors.offset=3,__connect.errors.connector.name=sink-elastic-te
st03,__connect.errors.task.id=0,__connect.errors.stage=VALUE_CONVERTER,__connect.errors.class.name=org.apache.kafka.connect.json.JsonConverte
r,__connect.errors.exception.class.name=org.apache.kafka.connect.errors.DataException,__connect.errors.exception.message=Converting byte[] to
Kafka Connect data failed due to serialization error: ,__connect.errors.exception.stacktrace=org.apache.kafka.connect.errors.DataException:
Converting byte[] to Kafka Connect data failed due to serialization error:
…
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('n' (c
ode 110)): was expecting double-quote to start field name
at [Source: (byte[])"{never gonna give you up}"; line: 1, column: 3]
Note how the full stack trace for the error is available from the header of the Kafka message, along with details of its source message offset etc
Target mapping has field a
with type long
:
curl -s http://localhost:9200/test03/_mapping | jq '.'
{
"test03": {
"mappings": {
"properties": {
"a": {
"type": "long"
}
}
}
}
}
What if you send through a value that’s not long
?
echo '5:{"a":"this is valid JSON but is string content"}' | \
docker exec -i kafkacat kafkacat \
-b broker:29092 \
-P -t test03 -Z -K:
Message doesn’t arrive in Elasticsearch:
➜ curl -s http://localhost:9200/test03/_search \
-H 'content-type: application/json' \
-d '{ "size": 42 }' | jq -c '.hits.hits[]'
{"_index":"test03","_type":"_doc","_id":"1","_score":1,"_source":{"a":1}}
{"_index":"test03","_type":"_doc","_id":"3","_score":1,"_source":{"a":3}}
Check connector status
curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
column -s : -t| sed 's/\"//g'| sort
sink | sink-elastic-test03 | RUNNING | FAILED | io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
Why’s it crashed?
curl -s http://localhost:8083/connectors/sink-elastic-test03/status | jq -r '.tasks[].trace'
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:568)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: Bulk request failed: [{"type":"mapper_parsing_exception","reason":"failed to parse field [a] of type [long] in document with id '5'. Preview of field's value: 'this is valid JSON but is string content'","caused_by":{"type":"illegal_argument_exception","reason":"For input string: \"this is valid JSON but is string content\""}}]
…
Set "behavior.on.malformed.documents" : "warn"
:
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/sink-elastic-test03/config \
-d '{
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"key.converter" : "org.apache.kafka.connect.storage.StringConverter",
"value.converter" : "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable" : "false",
"topics" : "test03",
"connection.url" : "http://elasticsearch:9200",
"type.name" : "_doc",
"key.ignore" : "false",
"schema.ignore" : "true",
"errors.tolerance" : "all",
"errors.log.enable" : "true",
"errors.log.include.messages" : "true",
"errors.deadletterqueue.topic.name":"dlq_sink-elastic-test03",
"errors.deadletterqueue.topic.replication.factor": 1,
"errors.deadletterqueue.context.headers.enable":true,
"behavior.on.malformed.documents" : "warn"
}'
Send some more data through
echo '6:{"a":42}' | \
docker exec -i kafkacat kafkacat \
-b broker:29092 \
-P -t test03 -Z -K:
Pipeline is working
curl -s http://localhost:9200/test03/_search \
-H 'content-type: application/json' \
-d '{ "size": 42 }' | jq -c '.hits.hits[]'
{"_index":"test03","_type":"_doc","_id":"1","_score":1,"_source":{"a":1}}
{"_index":"test03","_type":"_doc","_id":"3","_score":1,"_source":{"a":3}}
{"_index":"test03","_type":"_doc","_id":"6","_score":1,"_source":{"a":42}}
🎥 Check out the video tutorial here: https://rmoff.dev/kafka-elasticsearch-video
-
Kafka Connect Deep Dive – Error Handling and Dead Letter Queues
-
TimestampConverter Single Message Transform
-
TimestampRouter Single Message Transform
-
RegExRouter Single Message Transform