Skip to content

Commit

Permalink
Add influxdb notes
Browse files Browse the repository at this point in the history
  • Loading branch information
rmoff committed Jan 23, 2020
1 parent 74eb519 commit 54559e8
Show file tree
Hide file tree
Showing 2 changed files with 346 additions and 0 deletions.
136 changes: 136 additions & 0 deletions influxdb-and-kafka/README.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
= Getting data into InfluxDB from Kafka with Kafka Connect

See blog post:

== JSON, using embedded Kafka Connect schema and a `map` field type for `tags`.

Load test data:

[source,bash]
----
docker exec -i kafkacat kafkacat -b kafka-1:39092 -P -t json_01 <<EOF
{ "schema": { "type": "struct", "fields": [ { "field": "tags" , "type": "map", "keys": { "type": "string", "optional": false }, "values": { "type": "string", "optional": false }, "optional": false}, { "field": "stock", "type": "double", "optional": true } ], "optional": false, "version": 1 }, "payload": { "tags": { "host": "FOO", "product": "wibble" }, "stock": 500.0 } }
EOF
----

Check data is there:

[source,bash]
----
docker exec kafkacat kafkacat -b kafka-1:39092 -C -u -t json_01
----

Create the connector:

[source,bash]
----
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/sink_influx_json_01/config \
-d '{
"connector.class" : "io.confluent.influxdb.InfluxDBSinkConnector",
"value.converter" : "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"key.converter" : "org.apache.kafka.connect.storage.StringConverter",
"topics" : "json_01",
"influxdb.url" : "http://influxdb:8086",
"influxdb.db" : "my_db",
"measurement.name.format" : "${topic}"
}'
----
{{< /highlight >}}

Make sure that the connector's running

[source,bash]
----
$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
column -s : -t| sed 's/\"//g'| sort
sink | sink_influx_json_01 | RUNNING | RUNNING | io.confluent.influxdb.InfluxDBSinkConnector
----

Check that the data's made it to InfluxDB:

[source,bash]
-----
$ docker exec -it influxdb influx -execute 'show measurements on "my_db"'
name: measurements
name
----
json_01

$ docker exec -it influxdb influx -execute 'show tag keys on "my_db"'
name: json_01
tagKey
------
host
product
$ docker exec -it influxdb influx -execute 'SELECT * FROM json_01 GROUP BY host, product;' -database "my_db"
name: json_01
tags: host=FOO, product=wibble
time stock
---- -----
1579779810974000000 500
1579779820028000000 500
1579779833143000000 500
-----

== Avro

Load test data:

[source,bash]
----
docker exec -i schema-registry /usr/bin/kafka-avro-console-producer --broker-list kafka-1:39092 --topic avro_01 --property value.schema='{ "type": "record", "name": "myrecord", "fields": [ { "name": "tags", "type": { "type": "map", "values": "string" } }, { "name": "stock", "type": "double" } ] }' <<EOF
{ "tags": { "host": "FOO", "product": "wibble" }, "stock": 500.0 }
EOF
----

Check the data's there (I'm using kafkacat just to be contrary; you use use `kafka-avro-console-consumer` too):

[source,bash]
----
$ docker exec -i kafkacat kafkacat -b kafka-1:39092 -C -t avro_01 -r http://schema-registry:8081 -s avro
{"tags": {"host": "FOO", "product": "wibble"}, "stock": 500.0}
----

Create the connector:

[source,bash]
----
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/sink_influx_avro_01/config \
-d '{
"connector.class" : "io.confluent.influxdb.InfluxDBSinkConnector",
"value.converter" : "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter" : "org.apache.kafka.connect.storage.StringConverter",
"topics" : "avro_01",
"influxdb.url" : "http://influxdb:8086",
"influxdb.db" : "my_db",
"measurement.name.format" : "${topic}"
}'
----

Make sure that the connector's running

[source,bash]
----
$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
column -s : -t| sed 's/\"//g'| sort
sink | sink_influx_avro_01 | RUNNING | RUNNING | io.confluent.influxdb.InfluxDBSinkConnector
----

Check that the data's made it to InfluxDB

[source,bash]
-----
$ docker exec -it influxdb influx -execute 'SELECT * FROM avro_01 GROUP BY host, product;' -database "my_db"
name: avro_01
tags: host=FOO, product=wibble
time stock
---- -----
1579781680622000000 500
-----
210 changes: 210 additions & 0 deletions influxdb-and-kafka/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
---
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.4.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
# volumes:
# - ./data/container_data/zk-data:/var/lib/zookeeper/data
# - ./data/container_data/zk-txn-logs:/var/lib/zookeeper/log

kafka-1:
image: confluentinc/cp-enterprise-kafka:5.4.0
container_name: kafka-1
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:39092,HOST://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:39092,HOST://localhost:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
# volumes:
# - ./data/container_data/kafka-1-data:/var/lib/kafka/data

kafka-2:
image: confluentinc/cp-enterprise-kafka:5.4.0
container_name: kafka-2
depends_on:
- zookeeper
ports:
- 19092:19092
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:39092,HOST://0.0.0.0:19092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:39092,HOST://localhost:19092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
# volumes:
# - ./data/container_data/kafka-2-data:/var/lib/kafka/data

kafka-3:
image: confluentinc/cp-enterprise-kafka:5.4.0
container_name: kafka-3
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:39092,HOST://0.0.0.0:29092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-3:39092,HOST://localhost:29092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
# volumes:
# - ./data/container_data/kafka-3-data:/var/lib/kafka/data

schema-registry:
image: confluentinc/cp-schema-registry:5.4.0
ports:
- 8081:8081
container_name: schema-registry
depends_on:
- zookeeper
- kafka-1
- kafka-2
- kafka-3
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka-1:39092,PLAINTEXT://kafka-2:39092,PLAINTEXT://kafka-3:39092
SCHEMA_REGISTRY_CUB_KAFKA_TIMEOUT: 300

ksqldb-server:
image: rmoff/ksqldb-server:master-20200121-f608d84cc
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- kafka-1
- kafka-connect-01
ports:
- "8088:8088"
environment:
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: kafka-1:39092,kafka-2:39092,kafka-3:39092
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
KSQL_KSQL_CONNECT_URL: http://kafka-connect-01:8083
KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081

ksqldb-cli:
image: rmoff/ksqldb-cli:master-20200121-f608d84cc
container_name: ksqldb-cli
depends_on:
- ksqldb-server
entrypoint: /bin/sh
tty: true
# volumes:
# - ./data:/data

kafka-connect-01:
image: confluentinc/cp-kafka-connect:5.4.0
container_name: kafka-connect-01
depends_on:
- kafka-1
- kafka-2
- kafka-3
- schema-registry
ports:
- 8083:8083
environment:
CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
CONNECT_CUB_KAFKA_TIMEOUT: 300
CONNECT_BOOTSTRAP_SERVERS: "kafka-1:39092,kafka-2:39092,kafka-3:39092"
CONNECT_REST_ADVERTISED_HOST_NAME: 'kafka-connect-01'
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: kafka-connect-group-01
CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-group-01-configs
CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-group-01-offsets
CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-group-01-status
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONNECT_INTERNAL_KEY_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
CONNECT_INTERNAL_VALUE_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
CONNECT_LOG4J_ROOT_LOGLEVEL: 'INFO'
CONNECT_LOG4J_LOGGERS: 'org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR'
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '1'
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '1'
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '1'
CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components/'
# External secrets config
# See https://docs.confluent.io/current/connect/security.html#externalizing-secrets
CONNECT_CONFIG_PROVIDERS: 'file'
CONNECT_CONFIG_PROVIDERS_FILE_CLASS: 'org.apache.kafka.common.config.provider.FileConfigProvider'
command:
# In the command section, $ are replaced with $$ to avoid the error 'Invalid interpolation format for "command" option'
- bash
- -c
- |
echo "Installing connector plugins"
confluent-hub install --no-prompt confluentinc/kafka-connect-influxdb:1.1.2
#
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run &
#
echo "Waiting for Kafka Connect to start listening on $$CONNECT_REST_ADVERTISED_HOST_NAME:$$CONNECT_REST_PORT ⏳"
while : ; do
curl_status=$$(curl -s -o /dev/null -w %{http_code} http://$$CONNECT_REST_ADVERTISED_HOST_NAME:$$CONNECT_REST_PORT/connectors)
echo -e $$(date) " Kafka Connect listener HTTP state: " $$curl_status " (waiting for 200)"
if [ $$curl_status -eq 200 ] ; then
break
fi
sleep 5
done
#
echo "Waiting for Schema Registry to start listening on schema-registry:8081 ⏳"
while [ $$(curl -s -o /dev/null -w %{http_code} http://schema-registry:8081) -eq 000 ] ; do
echo -e $$(date) " Schema Registry listener HTTP state: " $$(curl -s -o /dev/null -w %{http_code} http://schema-registry:8081) " (waiting for != 000)"
sleep 5
done
#
sleep infinity
influxdb:
image: influxdb:1.7.9
container_name: influxdb
ports:
- 8086:8086

chronograf:
image: chronograf:1.7
container_name: chronograf
ports:
- 8888:8888

postgres:
# *-----------------------------*
# To connect to the DB:
# docker exec -it postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB'
# *-----------------------------*
image: postgres:11
container_name: postgres
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres

kafkacat:
image: edenhill/kafkacat:1.5.0
container_name: kafkacat
entrypoint:
- /bin/sh
- -c
- |
apk add jq;
while [ 1 -eq 1 ];do sleep 60;done

0 comments on commit 54559e8

Please sign in to comment.