Kai Waehner <[email protected]> 10 Sept 2018
This script assumes that all components (Zookeeper, Kafka, Connect, MQTT Broker) use default values.
Configure MQTT Connector properties file (for Kafka Connect standalone mode) /Users/kai.waehner/confluent-5.0.0/share/confluent-hub-components/confluentinc-kafka-connect-mqtt/etc/source-anonymous.properties
with your values for
-
MQTT Broker URL
-
MQTT Topic(s) to consume from (comma-separated list)
-
Kafka Topic Mapping (prefix)
For example, if you want to consume the MQTT topics temperature
and humidity
, and you want the Kafka topics to be mqtt.temperature
and mqtt.humidity
, then do
"mqtt.topics" : "temperature,humidity",
"kafka.topics" : "mqtt."
This needs to be done before you start Kafka Connect. Example:
name=mqtt-source
tasks.max=1
connector.class=io.confluent.connect.mqtt.MqttSourceConnector
mqtt.server.uri=tcp://127.0.0.1:32790
mqtt.topics=temperature
kafka.topics=mqtt.
Start the MQTT Broker and test publish / subscribe with 'dummy' topic:
brew services start mosquitto
mosquitto_sub -h 127.0.0.1 -t dummy
mosquitto_pub -h 127.0.0.1 -t dummy -m "Hello world"
Make sure to have Confluent folder on PATH. Otherwise, go to $CONFLUENT_INSTALL/bin
to execute commands these commands.
Start Kafka Connect and dependencies (Kafka, Zookeeper, Schema Registry):
confluent start connect
confluent load mqtt-source -d /Users/kai.waehner/confluent-5.0.0/share/confluent-hub-components/confluentinc-kafka-connect-mqtt/etc/source-anonymous.properties
If you want to run Kafka Connect in distributed mode, you can do the following REST call without creating a property file (only possible when Connect is running):
curl -s -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors -d '{ "name" : "mqtt-source", "config" : { "connector.class" : "io.confluent.connect.mqtt.MqttSourceConnector", "tasks.max" : "1", "mqtt.server.uri" : "tcp://127.0.0.1:1883", "mqtt.topics" : "temperature", "kafka.topics" : "mqtt." } }'
Check if connector is loaded and status is 'RUNNING':
// REST
curl -s "http://localhost:8083/connectors"
curl -s "http://localhost:8083/connectors/mqtt-source/status"
curl -s -X DELETE localhost:8083/connectors/mqtt-source
// Confluent CLI
confluent status connectors
confluent status mqtt-source
Create a Kafka Topic for consuming the MQTT messages via the MQTT Connector (needs to be the same as in the Connector Config):
TODO: There was a bug in MQTT Connector. If topic mqtt.temperature
does not receive any messages, the mapping does not work and always use the default value mqtt.
. Then use mqtt.
for topic creation AND the kafka-consumer below instead!
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mqtt.temperature
Use a CLI tool such as mosquitto_sub
from the shell prompt to consume from MQTT topic (to ensure the MQTT infrastructure works):
$ mosquitto_sub -h 127.0.0.1 -t temperature
99999,4.193955593608823
99999,5.363750640274894
99999,7.292092517069437
[...]
Use a CLI tool such as kafka-console-consumer
from the shell prompt to consume from Kafka topic:
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic mqtt.temperature --from-beginning
99999,4.193955593608823
99999,5.363750640274894
99999,7.292092517069437
[...]
Send a MQTT message to the MQTT Broker, using mosquitto_pub
:
mosquitto_pub -h 127.0.0.1 -p 1883 -t temperature -q 2 -m "99999,2.10#"
mosquitto_pub -h 127.0.0.1 -p 1883 -t temperature -q 2 -m "99999,2.10# 2.13# 2.19# 2.28# 2.44# 2.62# 2.80# 3.04# 3.36# 3.69# 3.97# 4.24# 4.53#4.80# 5.02# 5.21# 5.40# 5.57# 5.71# 5.79# 5.86# 5.92# 5.98# 6.02# 6.06# 6.08# 6.14# 6.18# 6.22# 6.27#6.32# 6.35# 6.38# 6.45# 6.49# 6.53# 6.57# 6.64# 6.70# 6.73# 6.78# 6.83# 6.88# 6.92# 6.94# 6.98# 7.01#7.03# 7.05# 7.06# 7.07# 7.08# 7.06# 7.04# 7.03# 6.99# 6.94# 6.88# 6.83# 6.77# 6.69# 6.60# 6.53# 6.45#6.36# 6.27# 6.19# 6.11# 6.03# 5.94# 5.88# 5.81# 5.75# 5.68# 5.62# 5.61# 5.54# 5.49# 5.45# 5.42# 5.38#5.34# 5.31# 5.30# 5.29# 5.26# 5.23# 5.23# 5.22# 5.20# 5.19# 5.18# 5.19# 5.17# 5.15# 5.14# 5.17# 5.16#5.15# 5.15# 5.15# 5.14# 5.14# 5.14# 5.15# 5.14# 5.14# 5.13# 5.15# 5.15# 5.15# 5.14# 5.16# 5.15# 5.15#5.14# 5.14# 5.15# 5.15# 5.14# 5.13# 5.14# 5.14# 5.11# 5.12# 5.12# 5.12# 5.09# 5.09# 5.09# 5.10# 5.08# 5.08# 5.08# 5.08# 5.06# 5.05# 5.06# 5.07# 5.05# 5.03# 5.03# 5.04# 5.03# 5.01# 5.01# 5.02# 5.01# 5.01#5.00# 5.00# 5.02# 5.01# 4.98# 5.00# 5.00# 5.00# 4.99# 5.00# 5.01# 5.02# 5.01# 5.03# 5.03# 5.02# 5.02#5.04# 5.04# 5.04# 5.02# 5.02# 5.01# 4.99# 4.98# 4.96# 4.96# 4.96# 4.94# 4.93# 4.93# 4.93# 4.93# 4.93# 5.02# 5.27# 5.80# 5.94# 5.58# 5.39# 5.32# 5.25# 5.21# 5.13# 4.97# 4.71# 4.39# 4.05# 3.69# 3.32# 3.05#2.99# 2.74# 2.61# 2.47# 2.35# 2.26# 2.20# 2.15# 2.10# 2.08"
Now run a script to generate and publish a continuous stream of MQTT messages:
./sensor_generator.sh
Stop the sensor_generator script with Control-C.
Stop the Kafka and MQTT consumers with Control-C.
Finally, stop the backend services:
brew services stop mosquitto
confluent destroy
Here are some helpful commands if you have problems starting mosquitto or finding out if it is running and on which port:
// If brew does not work (or you are not on Mac):
// Start Mosquitto
/usr/local/sbin/mosquitto
// Start Moquitto (being in PATH) with a specific config file
mosquitto -c /usr/local/etc/mosquitto/mosquitto.conf
// Mosquitto Broker running? => Find process:
ps -ef | grep mosquitto
// Which port?
lsof -n -P -i4|grep LISTEN
Logging is a worker-level configuration, so there’s no way to set it per connector instance.
To adjust the log levels you modify the connect-log4j.properties
file and restart your worker.
Default log level is ERROR. You can change to INFO or DEBUG.
vi /Users/kai.waehner/confluent-5.0.0/etc/kafka/connect-log4j.properties
// => Change log level
// Restart Kafka Connect worker
confluent stop connect
confluent start connect