forked from micropython/micropython
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathble_bonding_peripheral.py
194 lines (170 loc) · 6.65 KB
/
ble_bonding_peripheral.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# This example demonstrates a simple temperature sensor peripheral.
#
# The sensor's local value updates every second, and it will notify
# any connected central every 10 seconds.
#
# Work-in-progress demo of implementing bonding and passkey auth.
import bluetooth
import random
import struct
import time
import json
import binascii
from ble_advertising import advertising_payload
from micropython import const
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_INDICATE_DONE = const(20)
_IRQ_ENCRYPTION_UPDATE = const(28)
_IRQ_PASSKEY_ACTION = const(31)
_IRQ_GET_SECRET = const(29)
_IRQ_SET_SECRET = const(30)
_FLAG_READ = const(0x0002)
_FLAG_NOTIFY = const(0x0010)
_FLAG_INDICATE = const(0x0020)
_FLAG_READ_ENCRYPTED = const(0x0200)
# org.bluetooth.service.environmental_sensing
_ENV_SENSE_UUID = bluetooth.UUID(0x181A)
# org.bluetooth.characteristic.temperature
_TEMP_CHAR = (
bluetooth.UUID(0x2A6E),
_FLAG_READ | _FLAG_NOTIFY | _FLAG_INDICATE | _FLAG_READ_ENCRYPTED,
)
_ENV_SENSE_SERVICE = (
_ENV_SENSE_UUID,
(_TEMP_CHAR,),
)
# org.bluetooth.characteristic.gap.appearance.xml
_ADV_APPEARANCE_GENERIC_THERMOMETER = const(768)
_IO_CAPABILITY_DISPLAY_ONLY = const(0)
_IO_CAPABILITY_DISPLAY_YESNO = const(1)
_IO_CAPABILITY_KEYBOARD_ONLY = const(2)
_IO_CAPABILITY_NO_INPUT_OUTPUT = const(3)
_IO_CAPABILITY_KEYBOARD_DISPLAY = const(4)
_PASSKEY_ACTION_INPUT = const(2)
_PASSKEY_ACTION_DISP = const(3)
_PASSKEY_ACTION_NUMCMP = const(4)
class BLETemperature:
def __init__(self, ble, name="mpy-temp"):
self._ble = ble
self._load_secrets()
self._ble.irq(self._irq)
self._ble.config(bond=True)
self._ble.config(le_secure=True)
self._ble.config(mitm=True)
self._ble.config(io=_IO_CAPABILITY_DISPLAY_YESNO)
self._ble.active(True)
self._ble.config(addr_mode=2)
((self._handle,),) = self._ble.gatts_register_services((_ENV_SENSE_SERVICE,))
self._connections = set()
self._payload = advertising_payload(
name=name, services=[_ENV_SENSE_UUID], appearance=_ADV_APPEARANCE_GENERIC_THERMOMETER
)
self._advertise()
def _irq(self, event, data):
# Track connections so we can send notifications.
if event == _IRQ_CENTRAL_CONNECT:
conn_handle, _, _ = data
self._connections.add(conn_handle)
elif event == _IRQ_CENTRAL_DISCONNECT:
conn_handle, _, _ = data
self._connections.remove(conn_handle)
self._save_secrets()
# Start advertising again to allow a new connection.
self._advertise()
elif event == _IRQ_ENCRYPTION_UPDATE:
conn_handle, encrypted, authenticated, bonded, key_size = data
print("encryption update", conn_handle, encrypted, authenticated, bonded, key_size)
elif event == _IRQ_PASSKEY_ACTION:
conn_handle, action, passkey = data
print("passkey action", conn_handle, action, passkey)
if action == _PASSKEY_ACTION_NUMCMP:
accept = int(input("accept? "))
self._ble.gap_passkey(conn_handle, action, accept)
elif action == _PASSKEY_ACTION_DISP:
print("displaying 123456")
self._ble.gap_passkey(conn_handle, action, 123456)
elif action == _PASSKEY_ACTION_INPUT:
print("prompting for passkey")
passkey = int(input("passkey? "))
self._ble.gap_passkey(conn_handle, action, passkey)
else:
print("unknown action")
elif event == _IRQ_GATTS_INDICATE_DONE:
conn_handle, value_handle, status = data
elif event == _IRQ_SET_SECRET:
sec_type, key, value = data
key = sec_type, bytes(key)
value = bytes(value) if value else None
print("set secret:", key, value)
if value is None:
if key in self._secrets:
del self._secrets[key]
return True
else:
return False
else:
self._secrets[key] = value
return True
elif event == _IRQ_GET_SECRET:
sec_type, index, key = data
print("get secret:", sec_type, index, bytes(key) if key else None)
if key is None:
i = 0
for (t, _key), value in self._secrets.items():
if t == sec_type:
if i == index:
return value
i += 1
return None
else:
key = sec_type, bytes(key)
return self._secrets.get(key, None)
def set_temperature(self, temp_deg_c, notify=False, indicate=False):
# Data is sint16 in degrees Celsius with a resolution of 0.01 degrees Celsius.
# Write the local value, ready for a central to read.
self._ble.gatts_write(self._handle, struct.pack("<h", int(temp_deg_c * 100)))
if notify or indicate:
for conn_handle in self._connections:
if notify:
# Notify connected centrals.
self._ble.gatts_notify(conn_handle, self._handle)
if indicate:
# Indicate connected centrals.
self._ble.gatts_indicate(conn_handle, self._handle)
def _advertise(self, interval_us=500000):
self._ble.config(addr_mode=2)
self._ble.gap_advertise(interval_us, adv_data=self._payload)
def _load_secrets(self):
self._secrets = {}
try:
with open("secrets.json", "r") as f:
entries = json.load(f)
for sec_type, key, value in entries:
self._secrets[sec_type, binascii.a2b_base64(key)] = binascii.a2b_base64(value)
except:
print("no secrets available")
def _save_secrets(self):
try:
with open("secrets.json", "w") as f:
json_secrets = [
(sec_type, binascii.b2a_base64(key), binascii.b2a_base64(value))
for (sec_type, key), value in self._secrets.items()
]
json.dump(json_secrets, f)
except:
print("failed to save secrets")
def demo():
ble = bluetooth.BLE()
temp = BLETemperature(ble)
t = 25
i = 0
while True:
# Write every second, notify every 10 seconds.
i = (i + 1) % 10
temp.set_temperature(t, notify=i == 0, indicate=False)
# Random walk the temperature.
t += random.uniform(-0.5, 0.5)
time.sleep_ms(1000)
if __name__ == "__main__":
demo()