Skip to content

Commit 1d8f592

Browse files
author
Adam Rudd
committed
First commit - very messy, not all packet types implemented, not very extensively tested
0 parents  commit 1d8f592

File tree

2 files changed

+267
-0
lines changed

2 files changed

+267
-0
lines changed

MQTT.py

+267
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
from twisted.internet.protocol import Protocol
2+
3+
class MQTTProtocol(Protocol):
4+
buffer = bytearray()
5+
type = None
6+
qos = None
7+
length = None
8+
messageID = 1
9+
10+
def dataReceived(self, data):
11+
self._accumulatePacket(data)
12+
13+
def _accumulatePacket(self, data):
14+
self.buffer.extend(data)
15+
16+
while len(self.buffer):
17+
if self.length is None:
18+
# Start on a new packet
19+
20+
# Haven't got enough data to start a new packet,
21+
# wait for some more
22+
if len(self.buffer) < 2: break
23+
24+
lenLen = 1
25+
while lenLen < len(self.buffer):
26+
if not self.buffer[lenLen] & 0x80: break
27+
lenLen += 1
28+
29+
# We still haven't got all of the remaining length field
30+
if lenLen < len(self.buffer) and self.buffer[lenLen] & 0x80: return
31+
32+
self.type = self.buffer[0] & 0xF0
33+
self.qos = self.buffer[0] & 0x06
34+
self.length = self._decodeLength(self.buffer[1:1+lenLen])
35+
self.buffer = self.buffer[lenLen+1:]
36+
37+
if len(self.buffer) >= self.length:
38+
chunk = self.buffer[:self.length]
39+
self._processPacket(chunk)
40+
self.buffer = self.buffer[self.length:]
41+
self.length = None
42+
self.type = None
43+
self.qos = None
44+
else:
45+
break
46+
47+
def _processPacket(self, packet):
48+
if self.type == 0x01 << 4:
49+
# Connect
50+
pass
51+
elif self.type == 0x02 << 4:
52+
# Connack
53+
status = packet[1]
54+
self.connackReceived(status)
55+
elif self.type == 0x03 << 4:
56+
# Publish
57+
topicLen = packet[0] * 256 + packet[1]
58+
topic = self._decodeString(packet)
59+
packet = packet[2:]
60+
packet = packet[topicLen:]
61+
62+
if self.qos == 0x00 << 1:
63+
payload = bytearray(packet)
64+
else:
65+
payload = bytearray(packet[:-2])
66+
67+
self.publishReceived(str(topic), str(payload))
68+
69+
elif self.type == 0x04 << 4:
70+
# Puback
71+
pass
72+
73+
elif self.type == 0x05 << 4:
74+
# Pubrec
75+
pass
76+
77+
elif self.type == 0x06 << 4:
78+
# Pubrel
79+
pass
80+
81+
elif self.type == 0x07 << 4:
82+
# Pubcomp
83+
pass
84+
85+
elif self.type == 0x08 << 4:
86+
# Subscribe
87+
pass
88+
89+
elif self.type == 0x09 << 4:
90+
# Suback
91+
pass
92+
93+
elif self.type == 0x0A << 4:
94+
# Unsubscribe
95+
pass
96+
97+
elif self.type == 0x0B <<4:
98+
# Unsuback
99+
pass
100+
101+
elif self.type == 0x0C << 4:
102+
# Pingreq
103+
self.pingReqReceived()
104+
105+
elif self.type == 0x0D << 4:
106+
# Pingresp
107+
pass
108+
109+
elif self.type == 0x0E << 4:
110+
pass
111+
# Disconnect
112+
113+
def connectionMade(self):
114+
pass
115+
116+
def connectionLost(self, reason):
117+
pass
118+
119+
def connackReceived(self, status):
120+
pass
121+
122+
def publishReceived(self, topic, message):
123+
pass
124+
125+
def pingReqReceived(self):
126+
pass
127+
128+
def connect(self, clientID, keepalive = 3000, willTopic = None,
129+
willMessage = None, willQoS = 0, willRetain = False):
130+
header = bytearray()
131+
varHeader = bytearray()
132+
payload = bytearray()
133+
134+
varHeader.extend( self._encodeString("MQIsdp") )
135+
varHeader.append(3)
136+
137+
if willMessage is None or willTopic is None:
138+
# Clean start, no will message
139+
varHeader.append( 0 << 2 | 1 << 1 )
140+
else:
141+
varHeader.append(willRetain << 5 | willQoS << 3 | 1 << 2 | 1 << 1)
142+
143+
varHeader.extend(self._encodeValue(keepalive/1000))
144+
145+
payload.extend(self._encodeString(clientID))
146+
if willMessage is not None and willTopic is not None:
147+
payload.extend(self._encodeString(willTopic))
148+
payload.extend(self._encodeString(willMessage))
149+
150+
header.append(0x01 << 4)
151+
header.extend(self._encodeLength(len(varHeader) + len(payload)))
152+
153+
self.transport.write(str(header))
154+
self.transport.write(str(varHeader))
155+
self.transport.write(str(payload))
156+
157+
def publish(self, topic, message, qosLevel = 0):
158+
"""
159+
Only supports QoS level 0 publishes
160+
"""
161+
header = bytearray()
162+
varHeader = bytearray()
163+
payload = bytearray()
164+
165+
# Type = publish, QoS = 0
166+
header.append(0x03 << 4 | 0x00 << 1)
167+
168+
varHeader.extend(self._encodeString(topic))
169+
payload.extend(message)
170+
171+
header.extend(self._encodeLength(len(varHeader) + len(payload)))
172+
173+
self.transport.write(str(header))
174+
self.transport.write(str(varHeader))
175+
self.transport.write(str(payload))
176+
177+
def subscribe(self, topic, requestedQoS = 0):
178+
179+
"""
180+
Only supports QoS = 0 subscribes
181+
Only supports one subscription per message
182+
"""
183+
header = bytearray()
184+
varHeader = bytearray()
185+
payload = bytearray()
186+
187+
# Type = subscribe, QoS = 1
188+
header.append(0x08 << 4 | 0x01 << 1)
189+
190+
varHeader.extend(self._encodeValue(self.messageID))
191+
self.messageID += 1
192+
193+
payload.extend(self._encodeString(topic))
194+
payload.append(0)
195+
196+
header.extend(self._encodeLength(len(varHeader) + len(payload)))
197+
198+
self.transport.write(str(header))
199+
self.transport.write(str(varHeader))
200+
self.transport.write(str(payload))
201+
202+
def unsubscribe(self, topic):
203+
header = bytearray()
204+
varHeader = bytearray()
205+
payload = bytearray
206+
207+
header.append(0x0A << 4 | 0x01 < 1)
208+
209+
varHeader.extend(self._encodeValue(self.messageID))
210+
self.messageID += 1
211+
212+
payload.extend(self._encodeString(topic))
213+
214+
header.extend(self._encodeLength(len(payload) + len(varHeader)))
215+
216+
self.transport.write(str(header))
217+
self.transport.write(str(varHeader))
218+
self.transport.write(str(payload))
219+
220+
def pingRequest(self):
221+
self.transport.write(str(0x0C << 4))
222+
223+
def disconnect(self):
224+
self.transport.write(str(0x0E << 4))
225+
226+
def _encodeString(self, string):
227+
encoded = bytearray()
228+
encoded.append(len(string) >> 8)
229+
encoded.append(len(string) & 0xFF)
230+
for i in string:
231+
encoded.append(i)
232+
233+
return encoded
234+
235+
def _decodeString(self, encodedString):
236+
length = 256 * encodedString[0] + encodedString[1]
237+
return str(encodedString[2:2+length])
238+
239+
def _encodeLength(self, length):
240+
encoded = bytearray()
241+
while True:
242+
digit = length % 128
243+
length //= 128
244+
if length > 0:
245+
digit |= 128
246+
247+
encoded.append(digit)
248+
if length <= 0: break
249+
250+
return encoded
251+
252+
def _encodeValue(self, value):
253+
encoded = bytearray()
254+
encoded.append(value >> 8)
255+
encoded.append(value & 0xFF)
256+
257+
return encoded
258+
259+
def _decodeLength(self, lengthArray):
260+
length = 0
261+
multiplier = 1
262+
for i in lengthArray:
263+
length += (i & 0x7F) * multiplier
264+
multiplier *= 0x80
265+
266+
return length
267+

README

Whitespace-only changes.

0 commit comments

Comments
 (0)