forked from pymodbus-dev/pymodbus
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclient_sync.py
executable file
·146 lines (125 loc) · 4.08 KB
/
client_sync.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#!/usr/bin/env python3
"""Pymodbus Synchronous Client Example.
An example of a single threaded synchronous client.
usage::
client_sync.py [-h] [-c {tcp,udp,serial,tls}]
[-f {ascii,rtu,socket,tls}]
[-l {critical,error,warning,info,debug}] [-p PORT]
[--baudrate BAUDRATE] [--host HOST]
-h, --help
show this help message and exit
-c, --comm {tcp,udp,serial,tls}
set communication, default is tcp
-f, --framer {ascii,rtu,socket,tls}
set framer, default depends on --comm
-l, --log {critical,error,warning,info,debug}
set log level, default is info
-p, --port PORT
set port
--baudrate BAUDRATE
set serial device baud rate
--host HOST
set host, default is 127.0.0.1
The corresponding server must be started before e.g. as:
python3 server_sync.py
"""
import logging
import sys
try:
import helper
except ImportError:
print("*** ERROR --> THIS EXAMPLE needs the example directory, please see \n\
https://pymodbus.readthedocs.io/en/latest/source/examples.html\n\
for more information.")
sys.exit(-1)
import pymodbus.client as modbusClient
from pymodbus import ModbusException
_logger = logging.getLogger(__file__)
_logger.setLevel("DEBUG")
def setup_sync_client(description=None, cmdline=None):
"""Run client setup."""
args = helper.get_commandline(
server=False,
description=description,
cmdline=cmdline,
)
_logger.info("### Create client object")
client = None
if args.comm == "tcp":
client = modbusClient.ModbusTcpClient(
args.host,
port=args.port,
# Common optional parameters:
framer=args.framer,
timeout=args.timeout,
# retries=3,
# TCP setup parameters
# source_address=("localhost", 0),
)
elif args.comm == "udp":
client = modbusClient.ModbusUdpClient(
args.host,
port=args.port,
# Common optional parameters:
framer=args.framer,
timeout=args.timeout,
# retries=3,
# UDP setup parameters
# source_address=None,
)
elif args.comm == "serial":
client = modbusClient.ModbusSerialClient(
port=args.port, # serial port
# Common optional parameters:
# framer=ModbusRtuFramer,
timeout=args.timeout,
# retries=3,
# Serial setup parameters
baudrate=args.baudrate,
# bytesize=8,
# parity="N",
# stopbits=1,
# handle_local_echo=False,
)
elif args.comm == "tls":
client = modbusClient.ModbusTlsClient(
args.host,
port=args.port,
# Common optional parameters:
framer=args.framer,
timeout=args.timeout,
# retries=3,
# TLS setup parameters
sslctx=modbusClient.ModbusTlsClient.generate_ssl(
certfile=helper.get_certificate("crt"),
keyfile=helper.get_certificate("key"),
# password=None,
),
)
return client
def run_sync_client(client, modbus_calls=None):
"""Run sync client."""
_logger.info("### Client starting")
client.connect()
if modbus_calls:
modbus_calls(client)
client.close()
_logger.info("### End of Program")
def run_a_few_calls(client):
"""Test connection works."""
try:
rr = client.read_coils(32, 1, slave=1)
assert len(rr.bits) == 8
rr = client.read_holding_registers(4, 2, slave=1)
assert rr.registers[0] == 17
assert rr.registers[1] == 17
except ModbusException as exc:
raise exc
def main(cmdline=None):
"""Combine setup and run."""
testclient = setup_sync_client(
description="Run synchronous client.", cmdline=cmdline
)
run_sync_client(testclient, modbus_calls=run_a_few_calls)
if __name__ == "__main__":
main()