forked from lvgl/lv_micropython
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
examples/bluetooth: Add basic BLE peripheral examples.
Consisting of: - ble_advertising.py -- helper to generate advertising payload. - ble_temperature.py -- simple temperature device. - ble_uart_periperhal.py -- BLE UART wrapper. - ble_uart_repl.py -- dupterm-compatible uart.
- Loading branch information
Showing
4 changed files
with
271 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
# Helpers for generating BLE advertising payloads. | ||
|
||
from micropython import const | ||
import struct | ||
|
||
# Advertising payloads are repeated packets of the following form: | ||
# 1 byte data length (N + 1) | ||
# 1 byte type (see constants below) | ||
# N bytes type-specific data | ||
|
||
_ADV_TYPE_FLAGS = const(0x01) | ||
_ADV_TYPE_NAME = const(0x09) | ||
_ADV_TYPE_UUID16_COMPLETE = const(0x3) | ||
_ADV_TYPE_APPEARANCE = const(0x19) | ||
|
||
# Generate a payload to be passed to gap_advertise(adv_data=...). | ||
def advertising_payload(limited_disc=False, br_edr=False, name=None, services=None, appearance=0): | ||
payload = bytearray() | ||
|
||
def _append(adv_type, value): | ||
nonlocal payload | ||
payload += struct.pack('BB', len(value) + 1, adv_type) + value | ||
|
||
_append(_ADV_TYPE_FLAGS, struct.pack('B', (0x01 if limited_disc else 0x02) + (0x00 if br_edr else 0x04))) | ||
|
||
if name: | ||
_append(_ADV_TYPE_NAME, name) | ||
|
||
if services: | ||
for uuid in services: | ||
# TODO: Support bluetooth.UUID class. | ||
_append(_ADV_TYPE_UUID16_COMPLETE, struct.pack('<h', uuid)) | ||
|
||
# See org.bluetooth.characteristic.gap.appearance.xml | ||
_append(_ADV_TYPE_APPEARANCE, struct.pack('<h', appearance)) | ||
|
||
return payload |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
# This example demonstrates a simple temperature sensor peripheral. | ||
# | ||
# The sensor's local value updates every second, and it will notify | ||
# any connected central every 10 seconds. | ||
|
||
import bluetooth | ||
import random | ||
import struct | ||
import time | ||
from ble_advertising import advertising_payload | ||
|
||
from micropython import const | ||
_IRQ_CENTRAL_CONNECT = const(1 << 0) | ||
_IRQ_CENTRAL_DISCONNECT = const(1 << 1) | ||
|
||
# org.bluetooth.service.environmental_sensing | ||
_ENV_SENSE_UUID = bluetooth.UUID(0x181A) | ||
# org.bluetooth.characteristic.temperature | ||
_TEMP_CHAR = (bluetooth.UUID(0x2A6E), bluetooth.FLAG_READ|bluetooth.FLAG_NOTIFY,) | ||
_ENV_SENSE_SERVICE = (_ENV_SENSE_UUID, (_TEMP_CHAR,),) | ||
|
||
# org.bluetooth.characteristic.gap.appearance.xml | ||
_ADV_APPEARANCE_GENERIC_THERMOMETER = const(768) | ||
|
||
class BLETemperature: | ||
def __init__(self, ble, name='mpy-temp'): | ||
self._ble = ble | ||
self._ble.active(True) | ||
self._ble.irq(handler=self._irq) | ||
((self._handle,),) = self._ble.gatts_register_services((_ENV_SENSE_SERVICE,)) | ||
self._connections = set() | ||
self._payload = advertising_payload(name=name, services=[0x181A], appearance=_ADV_APPEARANCE_GENERIC_THERMOMETER) | ||
self._advertise() | ||
|
||
def _irq(self, event, data): | ||
# Track connections so we can send notifications. | ||
if event == _IRQ_CENTRAL_CONNECT: | ||
conn_handle, _, _, = data | ||
self._connections.add(conn_handle) | ||
elif event == _IRQ_CENTRAL_DISCONNECT: | ||
conn_handle, _, _, = data | ||
self._connections.remove(conn_handle) | ||
# Start advertising again to allow a new connection. | ||
self._advertise() | ||
|
||
def set_temperature(self, temp_deg_c, notify=False): | ||
# Data is sint16 in degrees Celsius with a resolution of 0.01 degrees Celsius. | ||
# Write the local value, ready for a central to read. | ||
self._ble.gatts_write(self._handle, struct.pack('<h', int(temp_deg_c * 100))) | ||
if notify: | ||
for conn_handle in self._connections: | ||
# Notify connected centrals to issue a read. | ||
self._ble.gatts_notify(conn_handle, self._handle) | ||
|
||
def _advertise(self, interval_us=500000): | ||
self._ble.gap_advertise(interval_us, adv_data=self._payload) | ||
|
||
|
||
def demo(): | ||
ble = bluetooth.BLE() | ||
temp = BLETemperature(ble) | ||
|
||
t = 25 | ||
i = 0 | ||
|
||
while True: | ||
# Write every second, notify every 10 seconds. | ||
i = (i + 1) % 10 | ||
temp.set_temperature(t, notify=i == 0) | ||
# Random walk the temperature. | ||
t += random.uniform(-0.5, 0.5) | ||
time.sleep_ms(1000) | ||
|
||
|
||
if __name__ == '__main__': | ||
demo() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
# This example demonstrates a peripheral implementing the Nordic UART Service (NUS). | ||
|
||
import bluetooth | ||
from ble_advertising import advertising_payload | ||
|
||
from micropython import const | ||
_IRQ_CENTRAL_CONNECT = const(1 << 0) | ||
_IRQ_CENTRAL_DISCONNECT = const(1 << 1) | ||
_IRQ_GATTS_WRITE = const(1 << 2) | ||
|
||
_UART_UUID = bluetooth.UUID('6E400001-B5A3-F393-E0A9-E50E24DCCA9E') | ||
_UART_TX = (bluetooth.UUID('6E400003-B5A3-F393-E0A9-E50E24DCCA9E'), bluetooth.FLAG_NOTIFY,) | ||
_UART_RX = (bluetooth.UUID('6E400002-B5A3-F393-E0A9-E50E24DCCA9E'), bluetooth.FLAG_WRITE,) | ||
_UART_SERVICE = (_UART_UUID, (_UART_TX, _UART_RX,),) | ||
|
||
# org.bluetooth.characteristic.gap.appearance.xml | ||
_ADV_APPEARANCE_GENERIC_COMPUTER = const(128) | ||
|
||
class BLEUART: | ||
def __init__(self, ble, name='mpy-uart', rxbuf=100): | ||
self._ble = ble | ||
self._ble.active(True) | ||
self._ble.irq(handler=self._irq) | ||
((self._tx_handle, self._rx_handle,),) = self._ble.gatts_register_services((_UART_SERVICE,)) | ||
# Increase the size of the rx buffer. | ||
self._ble.gatts_write(self._rx_handle, bytes(rxbuf)) | ||
self._connections = set() | ||
self._rx_buffer = bytearray() | ||
self._handler = None | ||
self._payload = advertising_payload(name=name, appearance=_ADV_APPEARANCE_GENERIC_COMPUTER) | ||
self._advertise() | ||
|
||
def irq(self, handler): | ||
self._handler = handler | ||
|
||
def _irq(self, event, data): | ||
# Track connections so we can send notifications. | ||
if event == _IRQ_CENTRAL_CONNECT: | ||
conn_handle, _, _, = data | ||
self._connections.add(conn_handle) | ||
elif event == _IRQ_CENTRAL_DISCONNECT: | ||
conn_handle, _, _, = data | ||
if conn_handle in self._connections: | ||
self._connections.remove(conn_handle) | ||
# Start advertising again to allow a new connection. | ||
self._advertise() | ||
elif event == _IRQ_GATTS_WRITE: | ||
conn_handle, value_handle, = data | ||
if conn_handle in self._connections and value_handle == self._rx_handle: | ||
self._rx_buffer += self._ble.gatts_read(self._rx_handle) | ||
if self._handler: | ||
self._handler() | ||
|
||
def any(self): | ||
return len(self._rx_buffer) | ||
|
||
def read(self, sz=None): | ||
if not sz: | ||
sz = len(self._rx_buffer) | ||
result = self._rx_buffer[0:sz] | ||
self._rx_buffer = self._rx_buffer[sz:] | ||
return result | ||
|
||
def write(self, data): | ||
for conn_handle in self._connections: | ||
self._ble.gatts_notify(conn_handle, self._tx_handle, data) | ||
|
||
def close(self): | ||
for conn_handle in self._connections: | ||
self._ble.gap_disconnect(conn_handle) | ||
self._connections.clear() | ||
|
||
def _advertise(self, interval_us=500000): | ||
self._ble.gap_advertise(interval_us, adv_data=self._payload) | ||
|
||
|
||
# ble = bluetooth.BLE() | ||
# uart = BLEUART(ble) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
# Proof-of-concept of a REPL over BLE UART. | ||
# | ||
# Tested with the Adafruit Bluefruit app on Android. | ||
# Set the EoL characters to \r\n. | ||
|
||
import bluetooth | ||
import io | ||
import os | ||
import micropython | ||
import machine | ||
|
||
from ble_uart_peripheral import BLEUART | ||
|
||
_MP_STREAM_POLL = const(3) | ||
_MP_STREAM_POLL_RD = const(0x0001) | ||
|
||
# TODO: Remove this when STM32 gets machine.Timer. | ||
if hasattr(machine, 'Timer'): | ||
_timer = machine.Timer(-1) | ||
else: | ||
_timer = None | ||
|
||
# Batch writes into 50ms intervals. | ||
def schedule_in(handler, delay_ms): | ||
def _wrap(_arg): | ||
handler() | ||
if _timer: | ||
_timer.init(mode=machine.Timer.ONE_SHOT, period=delay_ms, callback=_wrap) | ||
else: | ||
micropython.schedule(_wrap, None) | ||
|
||
# Simple buffering stream to support the dupterm requirements. | ||
class BLEUARTStream(io.IOBase): | ||
def __init__(self, uart): | ||
self._uart = uart | ||
self._tx_buf = bytearray() | ||
self._uart.irq(self._on_rx) | ||
|
||
def _on_rx(self): | ||
# Needed for ESP32. | ||
if hasattr(os, 'dupterm_notify'): | ||
os.dupterm_notify(None) | ||
|
||
def read(self, sz=None): | ||
return self._uart.read(sz) | ||
|
||
def readinto(self, buf): | ||
avail = self._uart.read(len(buf)) | ||
if not avail: | ||
return None | ||
for i in range(len(avail)): | ||
buf[i] = avail[i] | ||
return len(avail) | ||
|
||
def ioctl(self, op, arg): | ||
if op == _MP_STREAM_POLL: | ||
if self._uart.any(): | ||
return _MP_STREAM_POLL_RD | ||
return 0 | ||
|
||
def _flush(self): | ||
data = self._tx_buf[0:100] | ||
self._tx_buf = self._tx_buf[100:] | ||
self._uart.write(data) | ||
if self._tx_buf: | ||
schedule_in(self._flush, 50) | ||
|
||
def write(self, buf): | ||
empty = not self._tx_buf | ||
self._tx_buf += buf | ||
if empty: | ||
schedule_in(self._flush, 50) | ||
|
||
|
||
def start(): | ||
ble = bluetooth.BLE() | ||
uart = BLEUART(ble, name='mpy-repl') | ||
stream = BLEUARTStream(uart) | ||
|
||
os.dupterm(stream) |