forked from favalex/modbus-cli
-
Notifications
You must be signed in to change notification settings - Fork 0
/
modbus_rtu.py
73 lines (59 loc) · 2.42 KB
/
modbus_rtu.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import logging
from .access import dump
class ModbusRtu:
def __init__(self, device, baud, parity, stop_bits, slave_id, timeout):
from serial import PARITY_EVEN, PARITY_ODD, PARITY_NONE
parity_opts = {'e': PARITY_EVEN, 'o': PARITY_ODD, 'n': PARITY_NONE}
self.device = device
self.timeout = timeout
self.baud = baud
self.parity = parity_opts[parity]
self.stop_bits = stop_bits
if slave_id is None:
slave_id = 1
self.slave_id = slave_id
import umodbus.client.serial.rtu as modbus
self.protocol = modbus
def connect(self):
from serial import Serial
logging.debug("Serial port %s. Parameters: %s baud, %s stop bit(s), parity: %s, timeout %ss.",
self.device,
self.baud,
self.stop_bits,
self.parity,
self.timeout,
)
self.connection = Serial(port=self.device, baudrate=self.baud, parity=self.parity,
stopbits=self.stop_bits, bytesize=8, timeout=self.timeout)
def send(self, request):
self.connection.write(request)
def receive(self, request):
response = self.connection.read(2)
if len(response) != 2:
raise RuntimeError('timeout')
slave_id, function = response
try:
if function in (1, 2, 3, 4):
# Functions with variable size
response += self.connection.read(1)
count = 2 + response[-1]
response += self.connection.read(count)
elif function in (5, 6, 15, 16):
# Function with fixed size
response += self.connection.read(6)
elif function & 0x80:
response += self.connection.read(3)
else:
response += self.connection.read(1024)
raise NotImplementedError('RTU function {}'.format(function))
finally:
logging.debug('← < %s > %s bytes', dump(response), len(response))
return self.protocol.parse_response_adu(response, request)
def close(self):
self.connection.close()
def perform_accesses(self, accesses, definitions):
for access in accesses:
access.perform(self)
if not access.write:
access.print_values(definitions)
return self