Skip to content

Files

Latest commit

Jan 23, 2020
1122693 · Jan 23, 2020

History

History
This branch is 20 commits behind confluentinc/demo-scene:master.

influxdb-and-kafka

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
Jan 23, 2020
Jan 23, 2020

Getting data into InfluxDB from Kafka with Kafka Connect

JSON, using embedded Kafka Connect schema and a map field type for tags.

Load test data:

docker exec -i kafkacat kafkacat -b kafka-1:39092 -P -t json_01 <<EOF
{ "schema": { "type": "struct", "fields": [ { "field": "tags" , "type": "map", "keys": { "type": "string", "optional": false }, "values": { "type": "string", "optional": false }, "optional": false}, { "field": "stock", "type": "double", "optional": true } ], "optional": false, "version": 1 }, "payload": { "tags": { "host": "FOO", "product": "wibble" }, "stock": 500.0 } }
EOF

Check data is there:

docker exec kafkacat kafkacat -b kafka-1:39092 -C -u -t json_01

Create the connector:

curl -i -X PUT -H "Content-Type:application/json" \
        http://localhost:8083/connectors/sink_influx_json_01/config \
        -d '{
            "connector.class"               : "io.confluent.influxdb.InfluxDBSinkConnector",
            "value.converter"               : "org.apache.kafka.connect.json.JsonConverter",
            "value.converter.schemas.enable": "true",
            "key.converter"                 : "org.apache.kafka.connect.storage.StringConverter",
            "topics"                        : "json_01",
            "influxdb.url"                  : "http://influxdb:8086",
            "influxdb.db"                   : "my_db",
            "measurement.name.format"       : "${topic}"
        }'

Make sure that the connector’s running

$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
           jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
           column -s : -t| sed 's/\"//g'| sort
sink  |  sink_influx_json_01  |  RUNNING  |  RUNNING  |  io.confluent.influxdb.InfluxDBSinkConnector

Check that the data’s made it to InfluxDB:

$ docker exec -it influxdb influx -execute 'show measurements on "my_db"'
name: measurements
name
----
json_01

$ docker exec -it influxdb influx -execute 'show tag keys on "my_db"'
name: json_01
tagKey
------
host
product

$ docker exec -it influxdb influx -execute 'SELECT * FROM json_01 GROUP BY host, product;' -database "my_db"
name: json_01
tags: host=FOO, product=wibble
time                stock
----                -----
1579779810974000000 500
1579779820028000000 500
1579779833143000000 500

Avro

Load test data:

docker exec -i schema-registry /usr/bin/kafka-avro-console-producer --broker-list kafka-1:39092 --topic avro_01 --property value.schema='{ "type": "record", "name": "myrecord", "fields": [ { "name": "tags", "type": { "type": "map", "values": "string" } }, { "name": "stock", "type": "double" } ] }' <<EOF
{ "tags": { "host": "FOO", "product": "wibble" }, "stock": 500.0 }
EOF

Check the data’s there (I’m using kafkacat just to be contrary; you use use kafka-avro-console-consumer too):

$ docker exec -i kafkacat kafkacat -b kafka-1:39092 -C -t avro_01 -r http://schema-registry:8081 -s avro
{"tags": {"host": "FOO", "product": "wibble"}, "stock": 500.0}

Create the connector:

curl -i -X PUT -H  "Content-Type:application/json" \
        http://localhost:8083/connectors/sink_influx_avro_01/config \
        -d '{
            "connector.class"                    : "io.confluent.influxdb.InfluxDBSinkConnector",
            "value.converter"                    : "io.confluent.connect.avro.AvroConverter",
            "value.converter.schema.registry.url": "http://schema-registry:8081",
            "key.converter"                      : "org.apache.kafka.connect.storage.StringConverter",
            "topics"                             : "avro_01",
            "influxdb.url"                       : "http://influxdb:8086",
            "influxdb.db"                        : "my_db",
            "measurement.name.format"            : "${topic}"
        }'

Make sure that the connector’s running

$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
           jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
           column -s : -t| sed 's/\"//g'| sort
sink  |  sink_influx_avro_01  |  RUNNING  |  RUNNING  |  io.confluent.influxdb.InfluxDBSinkConnector

Check that the data’s made it to InfluxDB

$ docker exec -it influxdb influx -execute 'SELECT * FROM avro_01 GROUP BY host, product;' -database "my_db"
name: avro_01
tags: host=FOO, product=wibble
time                stock
----                -----
1579781680622000000 500