Skip to content

Latest commit

 

History

History
184 lines (134 loc) · 6.65 KB

live-demo-kafka-connect-iot-mqtt-connector.adoc

File metadata and controls

184 lines (134 loc) · 6.65 KB

Apache Kafka / Kafka Connect / MQTT / Mosquitto Live Demo

Kai Waehner <[email protected]> 10 Sept 2018

This script assumes that all components (Zookeeper, Kafka, Connect, MQTT Broker) use default values.

Configuration

Configure MQTT Connector properties file (for Kafka Connect standalone mode) /Users/kai.waehner/confluent-5.0.0/share/confluent-hub-components/confluentinc-kafka-connect-mqtt/etc/source-anonymous.properties with your values for

  • MQTT Broker URL

  • MQTT Topic(s) to consume from (comma-separated list)

  • Kafka Topic Mapping (prefix)

For example, if you want to consume the MQTT topics temperature and humidity, and you want the Kafka topics to be mqtt.temperature and mqtt.humidity, then do

    "mqtt.topics" : "temperature,humidity",
    "kafka.topics" : "mqtt."

This needs to be done before you start Kafka Connect. Example:

name=mqtt-source
tasks.max=1
connector.class=io.confluent.connect.mqtt.MqttSourceConnector
mqtt.server.uri=tcp://127.0.0.1:32790
mqtt.topics=temperature
kafka.topics=mqtt.

Starting backend services

Start the MQTT Broker and test publish / subscribe with 'dummy' topic:

brew services start mosquitto
mosquitto_sub -h 127.0.0.1 -t dummy
mosquitto_pub -h 127.0.0.1 -t dummy -m "Hello world"

Make sure to have Confluent folder on PATH. Otherwise, go to $CONFLUENT_INSTALL/bin to execute commands these commands.

Start Kafka Connect and dependencies (Kafka, Zookeeper, Schema Registry):

confluent start connect

confluent load mqtt-source -d /Users/kai.waehner/confluent-5.0.0/share/confluent-hub-components/confluentinc-kafka-connect-mqtt/etc/source-anonymous.properties

If you want to run Kafka Connect in distributed mode, you can do the following REST call without creating a property file (only possible when Connect is running):

curl -s -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors -d '{
    "name" : "mqtt-source",
"config" : {
    "connector.class" : "io.confluent.connect.mqtt.MqttSourceConnector",
    "tasks.max" : "1",
    "mqtt.server.uri" : "tcp://127.0.0.1:1883",
    "mqtt.topics" : "temperature",
    "kafka.topics" : "mqtt."
}
}'

Check if connector is loaded and status is 'RUNNING':

// REST
curl -s "http://localhost:8083/connectors"
curl -s "http://localhost:8083/connectors/mqtt-source/status"
curl -s -X DELETE localhost:8083/connectors/mqtt-source

// Confluent CLI
confluent status connectors
confluent status mqtt-source

Create a Kafka Topic for consuming the MQTT messages via the MQTT Connector (needs to be the same as in the Connector Config):

TODO: There was a bug in MQTT Connector. If topic mqtt.temperature does not receive any messages, the mapping does not work and always use the default value mqtt.. Then use mqtt. for topic creation AND the kafka-consumer below instead!

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mqtt.temperature

Starting Client services (Kafka Consumer and MQTT Publisher)

Use a CLI tool such as mosquitto_sub from the shell prompt to consume from MQTT topic (to ensure the MQTT infrastructure works):

$ mosquitto_sub -h 127.0.0.1 -t temperature
99999,4.193955593608823
99999,5.363750640274894
99999,7.292092517069437
[...]

Use a CLI tool such as kafka-console-consumer from the shell prompt to consume from Kafka topic:

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic mqtt.temperature --from-beginning
99999,4.193955593608823
99999,5.363750640274894
99999,7.292092517069437
[...]

Send a MQTT message to the MQTT Broker, using mosquitto_pub:

mosquitto_pub -h 127.0.0.1 -p 1883 -t temperature -q 2 -m "99999,2.10#"

mosquitto_pub -h 127.0.0.1 -p 1883 -t temperature -q 2 -m "99999,2.10# 2.13# 2.19# 2.28# 2.44# 2.62# 2.80# 3.04# 3.36# 3.69# 3.97# 4.24# 4.53#4.80# 5.02# 5.21# 5.40# 5.57# 5.71# 5.79# 5.86# 5.92# 5.98# 6.02# 6.06# 6.08# 6.14# 6.18# 6.22# 6.27#6.32# 6.35# 6.38# 6.45# 6.49# 6.53# 6.57# 6.64# 6.70# 6.73# 6.78# 6.83# 6.88# 6.92# 6.94# 6.98# 7.01#7.03# 7.05# 7.06# 7.07# 7.08# 7.06# 7.04# 7.03# 6.99# 6.94# 6.88# 6.83# 6.77# 6.69# 6.60# 6.53# 6.45#6.36# 6.27# 6.19# 6.11# 6.03# 5.94# 5.88# 5.81# 5.75# 5.68# 5.62# 5.61# 5.54# 5.49# 5.45# 5.42# 5.38#5.34# 5.31# 5.30# 5.29# 5.26# 5.23# 5.23# 5.22# 5.20# 5.19# 5.18# 5.19# 5.17# 5.15# 5.14# 5.17# 5.16#5.15# 5.15# 5.15# 5.14# 5.14# 5.14# 5.15# 5.14# 5.14# 5.13# 5.15# 5.15# 5.15# 5.14# 5.16# 5.15# 5.15#5.14# 5.14# 5.15# 5.15# 5.14# 5.13# 5.14# 5.14# 5.11# 5.12# 5.12# 5.12# 5.09# 5.09# 5.09# 5.10# 5.08# 5.08# 5.08# 5.08# 5.06# 5.05# 5.06# 5.07# 5.05# 5.03# 5.03# 5.04# 5.03# 5.01# 5.01# 5.02# 5.01# 5.01#5.00# 5.00# 5.02# 5.01# 4.98# 5.00# 5.00# 5.00# 4.99# 5.00# 5.01# 5.02# 5.01# 5.03# 5.03# 5.02# 5.02#5.04# 5.04# 5.04# 5.02# 5.02# 5.01# 4.99# 4.98# 4.96# 4.96# 4.96# 4.94# 4.93# 4.93# 4.93# 4.93# 4.93# 5.02# 5.27# 5.80# 5.94# 5.58# 5.39# 5.32# 5.25# 5.21# 5.13# 4.97# 4.71# 4.39# 4.05# 3.69# 3.32# 3.05#2.99# 2.74# 2.61# 2.47# 2.35# 2.26# 2.20# 2.15# 2.10# 2.08"

Now run a script to generate and publish a continuous stream of MQTT messages:

./sensor_generator.sh

Stop services and destroy test data

Stop the sensor_generator script with Control-C.

Stop the Kafka and MQTT consumers with Control-C.

Finally, stop the backend services:

brew services stop mosquitto
confluent destroy

Errors? Problems?

Here are some helpful commands if you have problems starting mosquitto or finding out if it is running and on which port:

Find the system process and port of MQTT Broker

// If brew does not work (or you are not on Mac):
// Start Mosquitto
/usr/local/sbin/mosquitto

// Start Moquitto (being in PATH) with a specific config file
mosquitto -c /usr/local/etc/mosquitto/mosquitto.conf

// Mosquitto Broker running? => Find process:
ps -ef | grep mosquitto

// Which port?
lsof -n -P -i4|grep LISTEN

Change log level of Kafka Connect

Logging is a worker-level configuration, so there’s no way to set it per connector instance. To adjust the log levels you modify the connect-log4j.properties file and restart your worker.

Default log level is ERROR. You can change to INFO or DEBUG.

vi /Users/kai.waehner/confluent-5.0.0/etc/kafka/connect-log4j.properties
// => Change log level

// Restart Kafka Connect worker
confluent stop connect
confluent start connect