-
Notifications
You must be signed in to change notification settings - Fork 0
/
__main__.py
141 lines (118 loc) · 4.42 KB
/
__main__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import paho.mqtt.client as mqtt
from graphviz import Digraph
from time import sleep, time
import os
import logging
from typing import Dict, Optional
from pathlib import Path
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class MQTTMindMap:
def __init__(self,
host: str = 'localhost',
port: int = 1884,
update_interval: float = 1.0,
output_dir: str = 'output'):
"""
Initialize the MQTT Mind Map generator.
Args:
host: MQTT broker host
port: MQTT broker port
update_interval: Minimum time between graph updates in seconds
output_dir: Directory to save the generated mind maps
"""
self.host = host
self.port = port
self.update_interval = update_interval
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
# Initialize graph and state
self.dot = Digraph(comment='MQTT Mind Map')
self.dot.attr(rankdir='LR') # Left to right layout
self.topic_values: Dict[str, str] = {}
self.last_update = 0
self.running = False
# Setup MQTT client
self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1)
self.client.on_message = self._on_message
self.client.on_connect = self._on_connect
self.client.on_disconnect = self._on_disconnect
def _on_connect(self, client, userdata, flags, rc):
"""Callback for when the client connects to the broker."""
if rc == 0:
logger.info("Connected to MQTT broker")
self.client.subscribe("#")
else:
logger.error(f"Failed to connect to MQTT broker with code: {rc}")
def _on_disconnect(self, client, userdata, rc):
"""Callback for when the client disconnects from the broker."""
logger.warning(f"Disconnected from MQTT broker with code: {rc}")
def _on_message(self, client, userdata, message):
"""Handle incoming MQTT messages."""
try:
topic = message.topic
value = message.payload.decode('utf-8')
self.topic_values[topic] = value
# Rate limit graph updates
current_time = time()
if current_time - self.last_update >= self.update_interval:
self.update_mind_map()
self.last_update = current_time
except Exception as e:
logger.error(f"Error processing message: {e}")
def update_mind_map(self):
"""Update and render the mind map."""
try:
# Clear previous graph
self.dot.clear()
# Process all topics
for topic, value in self.topic_values.items():
parts = topic.split('/')
# Add edges
for i in range(1, len(parts)):
parent = '/'.join(parts[:i])
child = '/'.join(parts[:i+1])
self.dot.edge(parent, child)
# Add value node
self.dot.node(topic, label=f"{parts[-1]}: {value}")
# Render the updated mind map
output_path = self.output_dir / 'dynamic_mqtt_mind_map'
self.dot.render(str(output_path), format='png', cleanup=True)
except Exception as e:
logger.error(f"Error updating mind map: {e}")
def start(self):
"""Start the MQTT mind map generator."""
try:
self.running = True
self.client.connect(self.host, self.port)
self.client.loop_start()
while self.running:
sleep(0.1)
except KeyboardInterrupt:
logger.info("Shutting down...")
except Exception as e:
logger.error(f"Error in main loop: {e}")
finally:
self.stop()
def stop(self):
"""Stop the MQTT mind map generator."""
self.running = False
self.client.loop_stop()
self.client.disconnect()
logger.info("Shutdown complete")
def main():
"""Main entry point for the application."""
host = os.environ.get('AWSIP2', 'localhost')
port = int(os.environ.get('AWSPORT', 1884))
mind_map = MQTTMindMap(
host=host,
port=port,
update_interval=1.0
)
mind_map.start()
if __name__ == "__main__":
main()