From 9497699edaee026e529a87fc0cce5f88a6be9827 Mon Sep 17 00:00:00 2001 From: dequis <dx@dxzone.com.ar> Date: Mon, 22 Nov 2021 17:37:12 +0100 Subject: [PATCH] Add mixed endian writing support --- modbus_cli/access.py | 10 +++++++++- tests/test_access.py | 42 ++++++++++++++++++++++++++++++++++-------- 2 files changed, 43 insertions(+), 9 deletions(-) diff --git a/modbus_cli/access.py b/modbus_cli/access.py index c1366d7..c0f0bce 100644 --- a/modbus_cli/access.py +++ b/modbus_cli/access.py @@ -177,6 +177,11 @@ def write_registers_send(self, modbus): else: words = [] + if self.byte_order == 'mixed': + register_fmt = '<H' + else: + register_fmt = '>H' + for pack_type, value in zip(self.pack_types, self.values_to_write): n_bytes = struct.calcsize(pack_type) assert n_bytes % 2 == 0 @@ -186,7 +191,10 @@ def write_registers_send(self, modbus): else: value = int(value, 0) - words.extend([h << 8 | l for h, l in grouper(struct.pack(pack_type, value), 2)]) + words.extend([ + struct.unpack(register_fmt, bytes(byte_pair))[0] + for byte_pair in grouper(struct.pack(pack_type, value), 2) + ]) if len(words) == 1: message = modbus.protocol.write_single_register(modbus.slave_id, self.address(), words[0]) diff --git a/tests/test_access.py b/tests/test_access.py index 1d82217..78acfa6 100644 --- a/tests/test_access.py +++ b/tests/test_access.py @@ -121,28 +121,54 @@ def test_write_coils(self): def test_presenter(self): pass - def test_endianness(self): + def test_endianness_read(self): modbus = mocked_modbus() + + # given these two 16 bit registers, what do we interpret? modbus.receive = Mock(return_value=[0x1122, 0x3344]) - def perform(byte_order, *fields): + def perform(byte_order, fields): addresses = list(range(len(fields))) access = Access('h', addresses, fields, byte_order=byte_order) access.perform(modbus) return access.values # big endian 16/32 bit fields - self.assertEqual(perform('be', '>H', '>H'), [(0x1122, ), (0x3344, )]) - self.assertEqual(perform('be', '>I'), [(0x11223344, )]) + self.assertEqual(perform('be', ['>H', '>H']), [(0x1122, ), (0x3344, )]) + self.assertEqual(perform('be', ['>I']), [(0x11223344, )]) # little endian 16/32 bit fields - self.assertEqual(perform('le', '<H', '<H'), [(0x2211, ), (0x4433, )]) - self.assertEqual(perform('le', '<I'), [(0x44332211, )]) + self.assertEqual(perform('le', ['<H', '<H']), [(0x2211, ), (0x4433, )]) + self.assertEqual(perform('le', ['<I']), [(0x44332211, )]) # mixed endian 16/32 bit fields - self.assertEqual(perform('mixed', '<H', '<H'), [(0x1122, ), (0x3344, )]) - self.assertEqual(perform('mixed', '<I'), [(0x33441122, )]) + self.assertEqual(perform('mixed', ['<H', '<H']), [(0x1122, ), (0x3344, )]) + self.assertEqual(perform('mixed', ['<I']), [(0x33441122, )]) + + def test_endianness_write(self): + modbus = mocked_modbus() + + # given these user inputs, what 16 bit registers are actually committed? + values16 = ["0x1122", "0x3344"] + values32 = ["0x11223344"] + + def perform(byte_order, fields, values): + addresses = list(range(len(fields))) + access = Access('h', addresses, fields, values=values, byte_order=byte_order) + access.perform(modbus) + return modbus.protocol.write_multiple_registers.call_args.args[2] + + # big endian 16/32 bit fields + self.assertEqual(perform('be', ['>H', '>H'], values16), [0x1122, 0x3344]) + self.assertEqual(perform('be', ['>I'], values32), [0x1122, 0x3344]) + # little endian 16/32 bit fields + self.assertEqual(perform('le', ['<H', '<H'], values16), [0x2211, 0x4433]) + self.assertEqual(perform('le', ['<I'], values32), [0x4433, 0x2211]) + + # mixed endian 16/32 bit fields + self.assertEqual(perform('mixed', ['<H', '<H'], values16), [0x1122, 0x3344]) + self.assertEqual(perform('mixed', ['<I'], values32), [0x3344, 0x1122]) if __name__ == '__main__':