Skip to content

Commit

Permalink
Add telegram bot code and blog article
Browse files Browse the repository at this point in the history
  • Loading branch information
rmoff committed Apr 23, 2020
1 parent 448b807 commit 0bda0ac
Show file tree
Hide file tree
Showing 5 changed files with 692 additions and 6 deletions.
123 changes: 123 additions & 0 deletions wifi-fun/bot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# Code heavily inspired (lifted pretty much verbatim) from https://djangostars.com/blog/how-to-create-and-deploy-a-telegram-bot/
# Changes made by @rmoff to add call out to ksqlDB
#
# To run this:
#
# 1. Sign up for ngrok and run it:
# ./ngrok http 8080
# 2. Note the provided external URL from ngrok and set it as webhook for your telegram bot
# curl -L http://api.telegram.org/botXXXXXYYYYYYYY/setWebHook?url=https://xxxyyy12345.ngrok.io
# 3. Run the bot
# python bot.py
#
# Don't forget to also update the bot code for the hardcoded elements
# with telegram bot auth token, ksqlDB connection details, etc.

import requests
from bottle import Bottle, response, request as bottle_request
import requests, json,datetime


class BotHandlerMixin:
BOT_URL = None

def get_chat_id(self, data):
"""
Method to extract chat id from telegram request.
"""
chat_id = data['message']['chat']['id']

return chat_id

def get_message(self, data):
"""
Method to extract message id from telegram request.
"""
message_text = data['message']['text']

return message_text

def send_message(self, prepared_data):
"""
Prepared data should be json which includes at least `chat_id` and `text`
"""
message_url = self.BOT_URL + 'sendMessage'
requests.post(message_url, json=prepared_data)


class TelegramBot(BotHandlerMixin, Bottle):
BOT_URL = 'https://api.telegram.org/botXXXXXXXXYYYYYY/'

def __init__(self, *args, **kwargs):
super(TelegramBot, self).__init__()
self.route('/', callback=self.post_handler, method="POST")

def lookup_last_probe_enriched(self,device):
ksqldb_url = "http://localhost:8088/query"
headers = {'Content-Type':'application/vnd.ksql.v1+json; charset=utf-8'}
query={'ksql':'SELECT PROBE_COUNT, FIRST_PROBE, LAST_PROBE, UNIQUE_SSIDS_PROBED, SSIDS_PROBED FROM PCAP_STATS_ENRICHED_01 WHERE ROWKEY = \''+device+'\';'}

r = requests.post(ksqldb_url, data=json.dumps(query), headers=headers)

if r.status_code==200:
result=r.json()
if len(result)==2:

probe_count=result[1]['row']['columns'][0]
probe_first=datetime.datetime.fromtimestamp(float(result[1]['row']['columns'][1])/1000).strftime("%Y-%m-%d %H:%M:%S")
probe_last= datetime.datetime.fromtimestamp(float(result[1]['row']['columns'][2])/1000).strftime("%Y-%m-%d %H:%M:%S")
unique_ssids=result[1]['row']['columns'][3]
probed_ssids=result[1]['row']['columns'][4]

return('📡 Wi-Fi probe stats for %s\n\tEarliest probe : %s\n\tLatest probe : %s\n\tProbe count : %d\n\tUnique SSIDs : %s' % (device, probe_first, probe_last, probe_count, probed_ssids))
else:
return('🛎 No result found for device %s' % (device))
else:
return('❌ Query failed (%s %s)\n%s' % (r.status_code, r.reason, r.text))

def lookup_last_probe(self,device):
ksqldb_url = "http://localhost:8088/query"
headers = {'Content-Type':'application/vnd.ksql.v1+json; charset=utf-8'}
query={'ksql':'SELECT PROBE_COUNT, FIRST_PROBE, LAST_PROBE, UNIQUE_SSIDS_PROBED, SSIDS_PROBED FROM PCAP_STATS_01 WHERE ROWKEY = \''+device+'\';'}

r = requests.post(ksqldb_url, data=json.dumps(query), headers=headers)

if r.status_code==200:
result=r.json()
if len(result)==2:
probe_count=result[1]['row']['columns'][0]
probe_first=result[1]['row']['columns'][1]
probe_last=result[1]['row']['columns'][2]
unique_ssids=result[1]['row']['columns'][3]
probed_ssids=result[1]['row']['columns'][4]

return('📡 Wi-Fi probe stats for %s\n\tEarliest probe : %s\n\tLatest probe : %s\n\tProbe count : %d\n\tUnique SSIDs : %d (%s)' % (device, probe_first, probe_last, probe_count, unique_ssids, probed_ssids))
else:
return('🛎 No result found for device %s' % (device))
else:
return('❌ Query failed (%s %s)\n%s' % (r.status_code, r.reason, r.text))

def prepare_data_for_answer(self, data):
message = self.get_message(data)
print('👉 Received message sent to us:\n\t%s' % (message))
answer = self.lookup_last_probe_enriched(message)
print('👈 Returning message back to sender:\n\t%s' % (answer))
chat_id = self.get_chat_id(data)
json_data = {
"chat_id": chat_id,
"text": answer,
}

return json_data

def post_handler(self):
data = bottle_request.json
answer_data = self.prepare_data_for_answer(data)
self.send_message(answer_data)

return response


if __name__ == '__main__':
app = TelegramBot()
app.run(host='localhost', port=8080)
6 changes: 3 additions & 3 deletions wifi-fun/create_replicator_source.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@ curl -s -X PUT -H "Accept:application/json" \
"key.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
"value.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
"header.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
"src.kafka.bootstrap.servers": "'$CCLOUD_BROKER_HOST':9092",
"src.kafka.bootstrap.servers": "'$CCLOUD_BROKER_HOST'",
"src.kafka.security.protocol": "SASL_SSL",
"src.kafka.sasl.mechanism": "PLAIN",
"src.kafka.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"'$CCLOUD_API_KEY'\" password=\"'$CCLOUD_API_SECRET'\";",
"src.consumer.group.id": "replicator-'$epoch'",
"src.kafka.auto.offset.reset": "latest",
"dest.kafka.bootstrap.servers": "kafka-1:39092,kafka-2:49092,kafka-3:59092",
"topic.whitelist": "pcap",
"topic.rename.format":"${topic}",
"topic.rename.format":"${topic}-ccloud-'$epoch'",
"confluent.license":"",
"confluent.topic.bootstrap.servers":"kafka-1:39092,kafka-2:49092,kafka-3:59092",
"confluent.topic.replication.factor":1,
"offset.start":"consumer"
}' | jq '.'

# "topic.rename.format":"${topic}-ccloud-'$epoch'",
6 changes: 3 additions & 3 deletions wifi-fun/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ services:
echo "Installing connector plugins"
confluent-hub install --no-prompt --component-dir /usr/share/confluent-hub-components/ confluentinc/kafka-connect-elasticsearch:5.4.1
confluent-hub install --no-prompt --component-dir /usr/share/confluent-hub-components/ confluentinc/kafka-connect-jdbc:5.4.1
confluent-hub install --no-prompt --component-dir /usr/share/confluent-hub-components/ neo4j/kafka-connect-neo4j:1.0.2
confluent-hub install --no-prompt --component-dir /usr/share/confluent-hub-components/ debezium/debezium-connector-mongodb:0.10.0
confluent-hub install --no-prompt --component-dir /usr/share/confluent-hub-components/ neo4j/kafka-connect-neo4j:1.0.7
confluent-hub install --no-prompt --component-dir /usr/share/confluent-hub-components/ debezium/debezium-connector-mongodb:1.1.0
#
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run &
Expand Down Expand Up @@ -310,7 +310,7 @@ services:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
volumes:
- ./data/postgres:/docker-entrypoint-initdb.d/
# - ./data/postgres:/docker-entrypoint-initdb.d/
- ./data/container_data/postgres:/var/lib/postgresql/data

kafkacat:
Expand Down
Loading

0 comments on commit 0bda0ac

Please sign in to comment.