Skip to content

Commit

Permalink
WIP (do not merge)
Browse files Browse the repository at this point in the history
  • Loading branch information
rmoff committed Sep 27, 2019
1 parent be72fe2 commit 766059b
Show file tree
Hide file tree
Showing 8 changed files with 734 additions and 75 deletions.
3 changes: 1 addition & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,4 @@ delta_configs
.project
.settings/
.DS_Store


mqtt-tracker/.env
35 changes: 30 additions & 5 deletions mqtt-tracker/README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

**Built based on https://github.com/saubury/race-mapper[Simon Aubury's excellent example], all credit and thanks to Simon for this.**

image::images/mqtt_kafka_01.png[]

* Install Owntracks on your phone
* Owntracks sends https://owntracks.org/booklet/tech/json/#_typelocation[data] to MQTT server
* Kafka Connect ingests MQTT into Kafka topic
Expand Down Expand Up @@ -32,22 +34,30 @@ Tip: https://mqtt-explorer.com/[MQTT Explorer] is a useful tool.
1. Install Docker and Docker Compose
2. Allocate >=8GB to Docker
3. `docker-compose up -d`
4. Configure KSQL
** Run KSQL CLI
=== Kafka Connect MQTT Source
This is deployed automagically as part of the Docker Compose.
=== KSQL
* Run KSQL CLI
+
[source,bash]
----
docker exec -it ksql-cli bash -c 'echo -e "\n\n⏳ Waiting for KSQL to be available before launching CLI\n"; while : ; do curl_status=$(curl -s -o /dev/null -w %{http_code} http://ksql-server:8088/info) ; echo -e $(date) " KSQL server listener HTTP state: " $curl_status " (waiting for 200)" ; if [ $curl_status -eq 200 ] ; then break ; fi ; sleep 5 ; done ; ksql http://ksql-server:8088'
----
** Run script
* Run script
+
[source,sql]
----
RUN SCRIPT '/data/mqtt.ksql';
----
5. Configure Elasticsearch sink
+
=== Kafka Connect Elasticsearch sink
NOTE: The Docker Compose automagically deploys Elasticsearch dynamic mapping template so that geopoint fields and dates are correctly picked up.
[source,bash]
----
curl -i -X PUT -H "Content-Type:application/json" \
Expand Down Expand Up @@ -83,3 +93,18 @@ curl -i -X PUT -H "Content-Type:application/json" \
}'
----
Check connector status:
[source,bash]
----
curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
column -s : -t| sed 's/\"//g'| sort
----
[source,bash]
----
sink | sink-elastic-runner_location-00 | RUNNING | RUNNING | io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
sink | sink-elastic-runner_status-00 | RUNNING | RUNNING | io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
source | source-mqtt-01 | RUNNING | RUNNING | io.confluent.connect.mqtt.MqttSourceConnector
---
33 changes: 33 additions & 0 deletions mqtt-tracker/create_mqtt_source.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
source .env

echo "Waiting for Kafka Connect to start listening on $CONNECT_REST_ADVERTISED_HOST_NAME:$CONNECT_REST_PORT"
while : ; do
curl_status=$(curl -s -o /dev/null -w %{http_code} http://$CONNECT_REST_ADVERTISED_HOST_NAME:$CONNECT_REST_PORT/connectors)
echo -e $(date) " Kafka Connect listener HTTP state: " $curl_status " (waiting for 200)"
if [ $curl_status -eq 200 ] ; then
break
fi
sleep 5
done
#
echo -e "\n--\n+> Creating Kafka Connect MQTT source"
curl -X PUT -H "Content-Type:application/json" http://$CONNECT_REST_ADVERTISED_HOST_NAME:$CONNECT_REST_PORT/connectors/source-mqtt-01/config \
-d '{
"connector.class" : "io.confluent.connect.mqtt.MqttSourceConnector",
"mqtt.server.uri" : "${file:/data/mqtt.credentials:MQTT_URL}",
"mqtt.password" : "${file:/data/mqtt.credentials:MQTT_PASSWORD}",
"mqtt.username" : "${file:/data/mqtt.credentials:MQTT_USERNAME}",
"mqtt.topics" : "owntracks/#",
"kafka.topic" : "data_mqtt",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"tasks.max" : "1",
"confluent.topic.bootstrap.servers" : "${CCLOUD_BROKER_HOST}:9092",
"confluent.topic.sasl.jaas.config" : "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${CCLOUD_API_KEY}\" password=\"${CCLOUD_API_SECRET}\";",
"confluent.topic.security.protocol": "SASL_SSL",
"confluent.topic.ssl.endpoint.identification.algorithm": "https",
"confluent.topic.sasl.mechanism": "PLAIN",
"confluent.topic.request.timeout.ms": "20000",
"confluent.topic.retry.backoff.ms": "500"
}'
#
83 changes: 71 additions & 12 deletions mqtt-tracker/data/mqtt.ksql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE STREAM REALTIME_RUNNER_LOCATION_STREAM
CREATE STREAM MQTT_RAW
(TID VARCHAR,
BATT INTEGER,
LON DOUBLE,
Expand All @@ -7,30 +7,89 @@ CREATE STREAM REALTIME_RUNNER_LOCATION_STREAM
ALT INTEGER,
COG INTEGER,
VEL INTEGER,
P DOUBLE,
BS INTEGER,
CONN VARCHAR)
CONN VARCHAR,
ACC INTEGER,
T VARCHAR,
VAC INTEGER,
INREGIONS VARCHAR,
TYPE VARCHAR)
WITH (KAFKA_TOPIC = 'data_mqtt', VALUE_FORMAT='JSON');

CREATE STREAM RUNNER_LOCATION_00
SELECT ROWKEY, SPLIT(ROWKEY, '/')[2] AS WHO FROM MQTT_RAW LIMIT 5;

SELECT LAT, LON, CAST(LAT AS VARCHAR) ||','||CAST(LON AS VARCHAR) AS LOCATION FROM MQTT_RAW LIMIT 5;

SELECT BS FROM MQTT_RAW;

SELECT BS, CASE WHEN BS=0 THEN 'Unknown'
WHEN BS=1 THEN 'Unplugged'
WHEN BS=2 THEN 'Charging'
WHEN BS=3 THEN 'Full'
ELSE '[unknown]'
END AS BATTERY_STATUS, BATT FROM MQTT_RAW;

CREATE STREAM RUNNER_LOCATION_00
WITH (VALUE_FORMAT='AVRO') AS
SELECT SPLIT(ROWKEY, '/')[2] AS WHO
, TST * 1000 AS EVENT_TIME
, CAST(LAT AS VARCHAR) ||','||CAST(LON AS VARCHAR) AS LOCATION
, *
FROM REALTIME_RUNNER_LOCATION_STREAM;
, TST * 1000 AS EVENT_TIME
, CASE WHEN LAT IS NULL OR LON IS NULL THEN CAST(NULL AS VARCHAR)
ELSE CAST(LAT AS VARCHAR) ||','||CAST(LON AS VARCHAR)
END AS LOCATION
, ACC AS LOCATION_ACCURACY_M
, ALT AS ALTITUDE_M
, BATT AS BATTERY_PCT
, CASE WHEN BS=0 THEN 'Unknown'
WHEN BS=1 THEN 'Unplugged'
WHEN BS=2 THEN 'Charging'
WHEN BS=3 THEN 'Full'
ELSE '[unknown]'
END AS BATTERY_STATUS
, COG AS COURSE_OVER_GROUN
, CASE WHEN T='p' THEN 'ping issued randomly by background task'
WHEN T='c' THEN 'circular region enter/leave event'
WHEN T='b' THEN 'beacon region enter/leave event'
WHEN T='r' THEN 'response to a reportLocation cmd message'
WHEN T='u' THEN 'manual publish requested by the user'
WHEN T='t' THEN 'timer based publish in move move'
WHEN T='v' THEN 'updated by Settings/Privacy/Locations Services/System Services/Frequent Locations monitoring'
ELSE '[unknown]'
END AS REPORT_TRIGGER
, TID AS TRACKER_ID
, VAC AS VERTICAL_ACCURACY_M
, VEL AS VELOCITY_KMH
, P AS PRESSURE_KPA
, CASE WHEN CONN='w' THEN 'WiFI'
WHEN CONN='o' THEN 'Offline'
WHEN CONN='m' THEN 'Mobile'
ELSE '[unknown]'
END AS CONNECTIVITY_STATUS
, INREGIONS AS REGIONS
, LAT, LON
FROM REALTIME_RUNNER_LOCATION_STREAM;

SELECT ROWTIME, EVENT_TIME FROM RUNNER_LOCATION_00;

SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss'), TIMESTAMPTOSTRING(EVENT_TIME, 'yyyy-MM-dd HH:mm:ss') FROM RUNNER_LOCATION_00;

CREATE STREAM RUNNER_LOCATION
WITH (TIMESTAMP='EVENT_TIME') AS
SELECT *
FROM RUNNER_LOCATION_00;
FROM RUNNER_LOCATION_00
PARTITION BY WHO;

SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss'), TIMESTAMPTOSTRING(EVENT_TIME, 'yyyy-MM-dd HH:mm:ss') FROM RUNNER_LOCATION;



CREATE TABLE RUNNER_STATUS AS
SELECT WHO,
MIN(VEL) AS MIN_SPEED,
MAX(VEL) AS MAX_SPEED,
MIN(GEO_DISTANCE(LAT, LON, 53.925915, -1.823168, 'KM')) AS DIST_TO_ILKLEY,
MIN(VELOCITY_KMH) AS MIN_SPEED,
MAX(VELOCITY_KMH) AS MAX_SPEED,
COUNT(*) AS NUM_EVENTS,
MAX(ROWTIME) AS LAST_EVENT_TS
MAX(ROWTIME) AS LAST_EVENT_TS,
MIN(GEO_DISTANCE(LAT, LON, 53.925915, -1.823168, 'KM')) AS DIST_TO_ILKLEY
FROM RUNNER_LOCATION
WINDOW TUMBLING (SIZE 5 MINUTE)
GROUP BY WHO;
116 changes: 60 additions & 56 deletions mqtt-tracker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ services:
- ./data:/data

kafka-connect-01:
# Ingest
image: confluentinc/cp-kafka-connect:${CONFLUENTPLATFORM_VERSION}
container_name: kafka-connect-01
depends_on:
Expand All @@ -106,21 +105,22 @@ services:
ports:
- 8083:8083
environment:
CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
CONNECT_CUB_KAFKA_TIMEOUT: 300
CONNECT_BOOTSTRAP_SERVERS: 'kafka:29092'
CONNECT_BOOTSTRAP_SERVERS: "kafka:29092"
CONNECT_REST_ADVERTISED_HOST_NAME: 'kafka-connect-01'
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_GROUP_ID: kafka-connect-group-01
CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-group-01-configs
CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-group-01-offsets
CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-group-01-status
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONNECT_INTERNAL_KEY_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
CONNECT_INTERNAL_VALUE_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
CONNECT_LOG4J_ROOT_LOGLEVEL: 'INFO'
CONNECT_LOG4J_ROOT_LOGLEVEL: 'TRACE'
CONNECT_LOG4J_LOGGERS: 'org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR'
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '1'
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '1'
Expand Down Expand Up @@ -194,44 +194,44 @@ services:
sleep 5
done
echo -e "\n--\n+> Creating Elasticsearch dynamic mapping"
curl -XPUT "http://localhost:9200/_template/kafkaconnect/" -H 'Content-Type: application/json' -d'
curl -XPUT "http://localhost:9200/_template/kafkaconnect/?include_type_name=true" -H 'Content-Type: application/json' -d'
{
"template": "*",
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0
},
"mappings": {
"_default_": {
"dynamic_templates": [
{
"dates": {
"match": "*_TS",
"mapping": {
"type": "date"
"template": "*",
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0
},
"mappings": {
"_default_" : {
"dynamic_templates": [
{
"dates": {
"match": "*_TS",
"mapping": {
"type": "date"
}
}
},
{
"heights": {
"match": "HEIGHT",
"mapping": {
"type": "float"
}
}
},
{
"locations": {
"match": "LOCATION",
"mapping": {
"type": "geo_point"
}
}
}
]
}
}
}
},
{
"heights": {
"match": "HEIGHT",
"mapping": {
"type": "float"
}
}
},
{
"locations": {
"match": "LOCATION",
"mapping": {
"type": "geo_point"
}
}
}
]
}
}
}'
}'
sleep infinity
Expand Down Expand Up @@ -299,14 +299,18 @@ services:
sleep infinity
kafkacat:
image: edenhill/kafkacat:1.5.0
container_name: kafkacat
depends_on:
- kafka
command:
- bash
- -c
- |
echo '{"batt":100,"lon":-1.8029050435887677,"acc":30,"p":98.641334533691406,"bs":1,"vel":0,"vac":12,"lat":53.925261804884229,"t":"t","conn":"w","tst":1569319041,"alt":97,"_type":"location","tid":"FF"}' | \
kafkacat -b kafka:29092 -P -t data_mqtt
# kafkacat:
# image: edenhill/kafkacat:1.5.0
# container_name: kafkacat
# # depends_on:
# # - kafka
# command:
# - bash
# - -c
# - |
# echo '{"batt":100,"lon":-1.8029050435887677,"acc":30,"p":98.641334533691406,"bs":1,"vel":0,"vac":12,"lat":53.925261804884229,"t":"t","conn":"w","tst":1569319041,"alt":97,"_type":"location","tid":"FF"}' | \
# kafkacat -b ${CCLOUD_BROKER_HOST}
# -X security.protocol=SASL_SSL -X sasl.mechanisms=PLAIN \
# -X sasl.username="${CCLOUD_API_KEY}" -X sasl.password="${CCLOUD_API_SECRET}" \
# -X ssl.ca.location=/usr/local/etc/openssl/cert.pem -X api.version.request=true
# -P -t data_mqtt
Loading

0 comments on commit 766059b

Please sign in to comment.