Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

- reworked handling of bluetooth responses to handle more than 2 13-b… #29

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ build/
dist/
dalybms.egg-info/
dalybms/__pycache__/
venv/
.idea/
17 changes: 9 additions & 8 deletions dalybms/daly_bms.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ def get_status(self, response_data=None):
self.status = data
return data

def _calc_num_responses(self, status_field, num_per_frame):
def _calc_num_responses(self, status_field, num_per_frame=3):
if not self.status:
self.logger.error("get_status has to be called at least once before calling get_cell_voltages")
return False
Expand All @@ -250,9 +250,11 @@ def _calc_num_responses(self, status_field, num_per_frame):
if self.address == 8:
# via Bluetooth the BMS returns all frames, even when they don't have data
if status_field == 'cell_voltages':
max_responses = 16
# TODO Not too sure about this. My BMS returns 12 frames of 3 voltages each
max_responses = 12
elif status_field == 'temperatures':
max_responses = 3
# TODO Not too sure about this. My BMS returns 2 frames of 3 temperatures each
max_responses = 2
else:
self.logger.error("unkonwn status_field %s" % status_field)
return False
Expand Down Expand Up @@ -315,11 +317,10 @@ def get_balancing_status(self, response_data=None):
bits = bin(int(response_data.hex(), base=16))[2:].zfill(48)
self.logger.info(bits)
cells = {}
for cell in range(1, self.status["cells"] + 1):
cells[cell] = bool(int(bits[cell * -1]))
self.logger.info(cells)
# todo: get sample data and verify result
return {"error": "not implemented"}
for cell in range(0, self.status["cells"]):
cells[cell+1] = bool(int(bits[cell]))
self.logger.debug(cells)
return cells

def get_errors(self, response_data=None):
# Battery failure status
Expand Down
35 changes: 22 additions & 13 deletions dalybms/daly_bms_bluetooth.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,17 @@ async def _read(self, command, max_responses=1):
return result

def _notification_callback(self, handle, data):
self.logger.debug("%s %s %s" % (handle, repr(data), len(data)))
self.logger.debug(f'Notify callback. Handle: {handle}, Data Length: {len(data)}, Data: {data}')
responses = []
if len(data) == 13:
responses.append(data)
elif len(data) == 26:
responses.append(data[0:13])
responses.append(data[13:])
else:
self.logger.error(len(data), "bytes received, not 13 or 26, not implemented")

num_frames = int(len(data) / 13)
if num_frames == 0:
self.logger.debug(f"{len(data)} bytes received, not enough for a data frame, discarding")

for frame_start in range(0, num_frames*13, 13):
# check that the data frame is properly formatted:
if data[frame_start] == 0xa5 and data[frame_start + 1] == 0x01 and data[frame_start + 3] == 0x08:
responses.append(data[frame_start:frame_start+13])

for response_bytes in responses:
command = response_bytes[2:3].hex()
Expand Down Expand Up @@ -134,7 +136,7 @@ async def get_cell_voltage_range(self, response_data=None):

async def get_max_min_temperature(self, response_data=None):
response_data = await self._read_request("92")
return super().get_max_min_temperature(response_data=response_data)
return super().get_temperature_range(response_data=response_data)

async def get_mosfet_status(self, response_data=None):
response_data = await self._read_request("93")
Expand All @@ -145,25 +147,32 @@ async def get_status(self, response_data=None):
return super().get_status(response_data=response_data)

async def get_cell_voltages(self, response_data=None):
# required to get the number of cells
if not self.status:
await self.get_status()
max_responses = self._calc_cell_voltage_responses()

max_responses = self._calc_num_responses(status_field="cell_voltages")
if not max_responses:
return
response_data = await self._read_request("95", max_responses=max_responses)

return super().get_cell_voltages(response_data=response_data)

async def get_temperatures(self, response_data=None):
response_data = await self._read_request("95")

max_responses = self._calc_num_responses(status_field="temperatures")
if not max_responses:
return

response_data = await self._read_request("96", max_responses=max_responses)
return super().get_temperatures(response_data=response_data)

async def get_balancing_status(self, response_data=None):
response_data = await self._read_request("96")
response_data = await self._read_request("97")
return super().get_balancing_status(response_data=response_data)

async def get_errors(self, response_data=None):
response_data = await self._read_request("97")
response_data = await self._read_request("98")
return super().get_errors(response_data=response_data)

async def get_all(self):
Expand Down
8 changes: 4 additions & 4 deletions dalybms/error_codes.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@

ERROR_CODES = {
0: [
"one stage warning of unit over voltage",
"one stage warning of unit over voltage",
"one stage warning of unit over voltage",
"two stage warning of unit over voltage",
"cell voltage is too high level 1 alarm",
"cell voltage is too high level 2 alarm",
"cell voltage is too low level 1 alarm",
"cell voltage is too low level 2 alarm",
"Total voltage is too high One alarm",
"Total voltage is too high Level two alarm",
"Total voltage is too low One alarm",
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pyserial==3.5
# bleak==0.11.0
# bleak==0.11.0
pytest
238 changes: 238 additions & 0 deletions tests/test_bluetooth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
import asyncio
import pytest
from unittest.mock import patch, AsyncMock
from dalybms import DalyBMSBluetooth


class TestBluetoothCommands:
notification_handler = None
scenario_id = None

async def write_gatt_char_mock(self, char, data):
assert (self.notification_handler is not None)

# SoC command
if data == b'\xa5\x80\x90\x08\x00\x00\x00\x00\x00\x00\x00\x00\xbd':
# {'total_voltage': 53.0, 'current': -5.9, 'soc_percent': 92.2}
self.notification_handler('90', b'\xa5\x01\x90\x08\x02\x12\x00\x00t\xf5\x03\x9aX')

# Cell Voltage Range command
if data == b'\xa5\x80\x91\x08\x00\x00\x00\x00\x00\x00\x00\x00\xbe':
# {'highest_voltage': 3.315, 'highest_cell': 10, 'lowest_voltage': 3.308, 'lowest_cell': 1}
self.notification_handler('91', b'\xa5\x01\x91\x08\x0c\xf3\n\x0c\xec\x01\x03\x9a\xde')

# Temp range command
if data == b'\xa5\x80\x92\x08\x00\x00\x00\x00\x00\x00\x00\x00\xbf':
self.notification_handler('92', b'\xa5\x01\x92\x08=\x01=\x01\x00\x00\x00\xadi')

# Mosfet status command
if data == b'\xa5\x80\x93\x08\x00\x00\x00\x00\x00\x00\x00\x00\xc0':
if self.scenario_id == 1:
# {'mode': 'discharging', 'charging_mosfet': True, 'discharging_mosfet': True, 'capacity_ah': 256.76}
self.notification_handler('93', b'\xa5\x01\x93\x08\x02\x01\x01\x9e\x00\x03\xea\xf8\xc8')

if self.scenario_id == 2:
# {'mode': 'charging', 'charging_mosfet': False, 'discharging_mosfet': True, 'capacity_ah': 280.0}
self.notification_handler('93', b'\xa5\x01\x93\x08\x01\x00\x015\x00\x04E\xc0\x81')

self.scenario_id = None

# Status command
if data == b'\xa5\x80\x94\x08\x00\x00\x00\x00\x00\x00\x00\x00\xc1':
self.notification_handler('94', b'\xa5\x01\x94\x08\x10\x01\x00\x00\x02\x00\x0b\x9b\xfb')

# Cell Voltages command
if data == b'\xa5\x80\x95\x08\x00\x00\x00\x00\x00\x00\x00\x00\xc2':
self.notification_handler('95',
b'\xa5\x01\x95\x08\x01\r7\r6\r4\x9b\xa7\xa5\x01\x95\x08\x02\r5\r5\r5\x9b\xa6'
b'\xa5\x01\x95\x08\x03\r4\r>\r6\x9b\xb0\xa5\x01\x95\x08\x04\r7\r6\r7\x9b\xad'
b'\xa5\x01\x95\x08\x05\r4\r5\r4\x9b\xa7\xa5\x01\x95\x08\x06\r7\x00\x00\x00\x00'
b'\x9b(\xa5\x01\x95\x08\x07\x00\x00\x00\x00\x00\x00\x9b\xe5\xa5\x01\x95\x08\x08'
b'\x00\x00\x00\x00\x00\x00\x9b\xe6\xa5\x01\x95\x08\t\x00\x00\x00\x00\x00\x00'
b'\x9b\xe7\xa5\x01\x95\x08\n\x00\x00\x00\x00\x00\x00\x9b\xe8\xa5\x01\x95\x08'
b'\x0b\x00\x00\x00\x00\x00\x00\x9b\xe9\xa5\xa8\x00@\x00p$@\x00\r0\x00\x00p$'
b'@\x008\xa7\x00\x00m2\x00\x00p$@\x00S1\x00\x00\x00\x00\x00\x00\xa8\x00\xa5\x01'
b'\x95\x08\x0f\x00\x00\x00\x00\x00\x00\x9b\xed\xa5\x01\x95\x08\x10')

# Temperatures command
if data == b'\xa5\x80\x96\x08\x00\x00\x00\x00\x00\x00\x00\x00\xc3':
self.notification_handler('96',
b'\xa5\x01\x96\x08\x01=\x00\x00\x00\x00\x00\x00\x82\xa5\x01\x96\x08\x02\x00\x00\x00\x00\x00\x00\x00F')

# Balancing status command
if data == b'\xa5\x80\x97\x08\x00\x00\x00\x00\x00\x00\x00\x00\xc4':
if self.scenario_id == 1:
self.notification_handler('97', b'\xa5\x01\x97\x08\x01\x01\x00\x00\x00\x00\x00\x00G')

if self.scenario_id == 2:
self.notification_handler('97', b'\xa5\x01\x97\x08\x00\x00\x00\x00\x00\x00\x00\x00E')

# get errors command
if data == b'\xa5\x80\x98\x08\x00\x00\x00\x00\x00\x00\x00\x00\xc5':
if self.scenario_id == 1:
# []
self.notification_handler('98', b'\xa5\x01\x98\x08\x00\x00\x00\x00\x00\x00\x00\x00F')
if self.scenario_id == 2:
# ['cell voltage is too high level 2 alarm']
self.notification_handler('98', b'\xa5\x01\x98\x08\x02\x00\x00\x00\x00\x00\x00\x00H')

async def start_notify_mock(self, char_uuid, notification_handler):
self.notification_handler = notification_handler

def setup_bleak_mock(self, bleak_mock):
instance = bleak_mock.return_value
instance.connect = AsyncMock(return_value=True)
instance.start_notify = self.start_notify_mock
instance.write_gatt_char = self.write_gatt_char_mock

@pytest.mark.asyncio
async def test_get_soc(self):
with patch('dalybms.daly_bms_bluetooth.BleakClient') as bleak_mock:
self.setup_bleak_mock(bleak_mock)

daly = DalyBMSBluetooth()
await daly.connect('99:99:99:99:99:99')

response = await daly.get_soc()

# {'total_voltage': 53.0, 'current': -5.9, 'soc_percent': 92.2}
assert (response['total_voltage'] == 53.0)
assert (response['current'] == -5.9)
assert (response['soc_percent'] == 92.2)

@pytest.mark.asyncio
async def test_get_cell_voltage_range(self):
with patch('dalybms.daly_bms_bluetooth.BleakClient') as bleak_mock:
self.setup_bleak_mock(bleak_mock)

daly = DalyBMSBluetooth()
await daly.connect('99:99:99:99:99:99')

response = await daly.get_cell_voltage_range()

# {'highest_voltage': 3.315, 'highest_cell': 10, 'lowest_voltage': 3.308, 'lowest_cell': 1}
assert (response['highest_voltage'] == 3.315)
assert (response['highest_cell'] == 10)
assert (response['lowest_voltage'] == 3.308)
assert (response['lowest_cell'] == 1)

@pytest.mark.asyncio
async def test_get_max_min_temperature(self):
with patch('dalybms.daly_bms_bluetooth.BleakClient') as bleak_mock:
self.setup_bleak_mock(bleak_mock)

daly = DalyBMSBluetooth()
await daly.connect('99:99:99:99:99:99')

response = await daly.get_max_min_temperature()

assert (response['highest_temperature'] == 21)
assert (response['highest_sensor'] == 1)
assert (response['lowest_temperature'] == 21)
assert (response['lowest_sensor'] == 1)

@pytest.mark.asyncio
@pytest.mark.parametrize('scenario_id, expected', [
(1, {'mode': 'discharging', 'charging_mosfet': True, 'discharging_mosfet': True, 'capacity_ah': 256.76}),
(2, {'mode': 'charging', 'charging_mosfet': False, 'discharging_mosfet': True, 'capacity_ah': 280.0})])
async def test_get_mosfet_status(self, scenario_id, expected):
with patch('dalybms.daly_bms_bluetooth.BleakClient') as bleak_mock:
self.setup_bleak_mock(bleak_mock)
self.scenario_id = scenario_id
daly = DalyBMSBluetooth()
await daly.connect('99:99:99:99:99:99')

response = await daly.get_mosfet_status()

for k, v in expected.items():
assert (response[k] == v)

@pytest.mark.asyncio
async def test_status(self):
with patch('dalybms.daly_bms_bluetooth.BleakClient') as bleak_mock:
self.setup_bleak_mock(bleak_mock)

daly = DalyBMSBluetooth()
await daly.connect('99:99:99:99:99:99')

response = await daly.get_status()

assert (response['cells'] == 16)
assert (response['cycles'] == 11)
assert (response['temperature_sensors'] == 1)

@pytest.mark.asyncio
async def test_get_cell_voltages(self):
with patch('dalybms.daly_bms_bluetooth.BleakClient') as bleak_mock:
self.setup_bleak_mock(bleak_mock)

daly = DalyBMSBluetooth()
await daly.connect('99:99:99:99:99:99')

daly.status = {"cells": 16}

response = await daly.get_cell_voltages()

assert (response[1] == 3.383)
assert (response[2] == 3.382)
assert (response[3] == 3.380)
assert (response[4] == 3.381)
assert (response[5] == 3.381)
assert (response[6] == 3.381)
assert (response[7] == 3.380)
assert (response[8] == 3.390)
assert (response[9] == 3.382)
assert (response[10] == 3.383)
assert (response[11] == 3.382)
assert (response[12] == 3.383)
assert (response[13] == 3.380)
assert (response[14] == 3.381)
assert (response[15] == 3.380)
assert (response[16] == 3.383)

@pytest.mark.asyncio
async def test_get_temperatures(self):
with patch('dalybms.daly_bms_bluetooth.BleakClient') as bleak_mock:
self.setup_bleak_mock(bleak_mock)

daly = DalyBMSBluetooth()
await daly.connect('99:99:99:99:99:99')
await daly.get_status()

response = await daly.get_temperatures()

assert (response[1] == 21)

@pytest.mark.asyncio
@pytest.mark.parametrize('scenario_id, expected', [
(1, {1: True, 2: False, 3: False, 4: False, 5: False, 6: False, 7: False, 8: False, 9: True, 10: False, 11: False, 12: False, 13: False, 14: False, 15: False, 16: False}),
(2, {1: False, 2: False, 3: False, 4: False, 5: False, 6: False, 7: False, 8: False, 9: False, 10: False, 11: False, 12: False, 13: False, 14: False, 15: False, 16: False})])
async def test_get_balancing_status(self, scenario_id, expected):
with patch('dalybms.daly_bms_bluetooth.BleakClient') as bleak_mock:
self.setup_bleak_mock(bleak_mock)
self.scenario_id = scenario_id

daly = DalyBMSBluetooth()
await daly.connect('99:99:99:99:99:99')
await daly.get_status()

response = await daly.get_balancing_status()

for k, v in expected.items():
assert (response[k] == v)

@pytest.mark.asyncio
@pytest.mark.parametrize('scenario_id, expected', [
(1, []),
(2, ['cell voltage is too high level 2 alarm'])])
async def test_get_errors(self, scenario_id, expected):
with patch('dalybms.daly_bms_bluetooth.BleakClient') as bleak_mock:
self.setup_bleak_mock(bleak_mock)
self.scenario_id = scenario_id

daly = DalyBMSBluetooth()
await daly.connect('99:99:99:99:99:99')

response = await daly.get_errors()

assert (response == expected)