Skip to content

Commit

Permalink
Brought back the object super class, remove and refactored symbol typ…
Browse files Browse the repository at this point in the history
…e methods, added Symbol.value as buffer
  • Loading branch information
RobertoRoos committed Dec 22, 2020
1 parent 570e0cc commit 0ab7248
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 47 deletions.
2 changes: 1 addition & 1 deletion pyads/ads.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ def bytes_from_dict(
return byte_list


class Connection:
class Connection(object):
"""Class for managing the connection to an ADS device.
:ivar str ams_net_id: AMS net id of the remote device
Expand Down
4 changes: 2 additions & 2 deletions pyads/structs.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class SAmsAddr(Structure):
_fields_ = [("netId", SAmsNetId), ("port", c_uint16)]


class AmsAddr:
class AmsAddr(object):
"""Wrapper for SAmsAddr-structure to address an ADS device.
:type _ams_addr: SAmsAddr
Expand Down Expand Up @@ -155,7 +155,7 @@ def __repr__(self):
return "<AmsAddress {}:{}>".format(self.netid, self.port)


class NotificationAttrib:
class NotificationAttrib(object):
"""Notification Attribute."""

def __init__(
Expand Down
59 changes: 28 additions & 31 deletions pyads/symbol.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ class AdsSymbol:
e.g. "LREAL")
:ivar plc_type: ctypes type of variable (from constants.PLCTYPE_*)
:ivar comment: Comment of symbol
:ivar value: Buffered value, i.e. the most recently read or written
value for this symbol
"""

def __init__(
Expand Down Expand Up @@ -85,6 +87,8 @@ def __init__(
self.symbol_type = symbol_type
self.comment = comment

self.value = None # type: Any

if do_lookup:
self.create_symbol_from_info() # Perform remote lookup

Expand All @@ -94,7 +98,7 @@ def __init__(

self.plc_type: Optional[Any] = None
if self.symbol_type is not None:
self.plc_type = self.get_type_from_str(self.symbol_type)
self.plc_type = self._get_type_from_str(self.symbol_type)

def create_symbol_from_info(self) -> None:
"""Look up remaining info from the remote
Expand Down Expand Up @@ -126,7 +130,7 @@ def read_write_check(self):

if not self._plc or not self._plc.is_open:
raise ValueError(
"Cannot read or write data with missing or " "unopened Connection"
"Cannot read or write data with missing or closed Connection"
)

if not isinstance(self.index_group, int) or not isinstance(
Expand All @@ -138,27 +142,32 @@ def read_write_check(self):
)

def read(self) -> Any:
"""Read the current value of this symbol"""
"""Read the current value of this symbol.
The new read value is also saved in the buffer.
"""
self.read_write_check()
return self._plc.read(self.index_group, self.index_offset, self.plc_type)
self.value = self._plc.read(self.index_group, self.index_offset,
self.plc_type)
return self.value

def write(self, new_value: Optional[Any] = None):
"""Write a new value or the buffered value to the symbol.
def write(self, new_value: Any):
"""Write a new value to the symbol"""
When a new value was written, the buffer is updated.
:param new_value Value to be written to symbol (if None,
the buffered value is send instead)
"""
self.read_write_check()
return self._plc.write(
if new_value is None:
new_value = self.value # Send buffered value instead
else:
self.value = new_value # Update buffer with new value
self._plc.write(
self.index_group, self.index_offset, new_value, self.plc_type
)

@property
def value(self):
"""Equivalent to AdsSymbol.read()"""
return self.read()

@value.setter
def value(self, new_value):
"""Equivalent to AdsSymbol.write()"""
self.write(new_value)

def __repr__(self):
"""Debug string"""
t = type(self)
Expand Down Expand Up @@ -211,8 +220,7 @@ def del_device_notification(self, handles: Tuple[int, int]):
self._plc.del_device_notification(*handles)
self._handles_list.remove(handles)

@staticmethod
def get_type_from_str(type_str: str) -> Any:
def _get_type_from_str(self, type_str: str) -> Any:
"""Get PLCTYPE_* from PLC name string
If PLC name could not be mapped, return None. This is done on
Expand All @@ -236,7 +244,7 @@ def get_type_from_str(type_str: str) -> Any:
scalar_type_str = groups[2]

# Find scalar type
scalar_type = AdsSymbol.get_type_from_str(scalar_type_str)
scalar_type = self._get_type_from_str(scalar_type_str)

if scalar_type:
return scalar_type * size
Expand All @@ -258,14 +266,3 @@ def get_type_from_str(type_str: str) -> Any:
# error when they are being addressed

return None

@staticmethod
def get_type_from_int(type_int: int) -> Any:
"""Get PLCTYPE_* from a number
Also see `get_type_from_string()`
"""
if type_int in constants.ads_type_to_ctype:
return constants.ads_type_to_ctype[type_int]

return None
47 changes: 34 additions & 13 deletions tests/test_symbol.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
import time
import unittest
import pyads
from pyads.testserver import AdsTestServer, AmsPacket, AdvancedHandler, \
PLCVariable
from pyads.testserver import AdsTestServer, AdvancedHandler, PLCVariable
from pyads import constants, AdsSymbol

# These are pretty arbitrary
Expand All @@ -25,7 +24,7 @@ class AdsSymbolTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
# type: () -> None
"""Setup the ADS testserver."""
"""Setup the ADS test server."""
cls.handler = AdvancedHandler()
cls.test_server = AdsTestServer(handler=cls.handler, logging=False)
cls.test_server.start()
Expand All @@ -36,15 +35,15 @@ def setUpClass(cls):
@classmethod
def tearDownClass(cls):
# type: () -> None
"""Tear down the testserver."""
"""Tear down the test server."""
cls.test_server.stop()

# wait a bit for server to shutdown
time.sleep(1)

def setUp(self):
# type: () -> None
"""Establish connection to the testserver."""
"""Establish connection to the test server."""

# Clear test server and handler
self.test_server.request_history = []
Expand Down Expand Up @@ -114,7 +113,7 @@ def test_init_by_name_array(self):
self.assertEqual(var.symbol_type, symbol.symbol_type)
self.assertIsNone(symbol.comment)

my_list = symbol.value
my_list = symbol.read()

self.assertIsInstance(my_list, list)
self.assertEqual(5, len(my_list))
Expand Down Expand Up @@ -228,7 +227,7 @@ def test_init_manual(self):
self.test_var.index_group, self.test_var.index_offset, 12.3,
self.test_var.plc_type)

self.assertEqual(12.3, symbol.value)
self.assertEqual(12.3, symbol.read())

self.assertAdsRequestsCount(2) # Only a WRITE followed by a READ

Expand Down Expand Up @@ -269,7 +268,7 @@ def test_read_write_errors(self):

with self.assertRaises(ValueError) as cm:
symbol.read() # Cannot read with unopened Connection
self.assertIn('missing or unopened Connection', str(cm.exception))
self.assertIn('missing or closed Connection', str(cm.exception))

self.plc.open()

Expand All @@ -290,7 +289,7 @@ def test_read(self):

symbol = AdsSymbol(self.plc, name=self.test_var.name)

self.assertEqual(420.0, symbol.value)
self.assertEqual(420.0, symbol.read())

self.assertAdsRequestsCount(3) # WRITE, READWRITE for info and
# final read
Expand All @@ -301,7 +300,7 @@ def test_write(self):

symbol = AdsSymbol(self.plc, name=self.test_var.name)

symbol.value = 3.14 # Write
symbol.write(3.14) # Write

r_value = self.plc.read(
self.test_var.index_group, self.test_var.index_offset,
Expand All @@ -312,6 +311,29 @@ def test_write(self):
self.assertAdsRequestsCount(3) # READWRITE for info, WRITE and
# test read

def test_value(self):
"""Test the buffer property"""

with self.plc:
symbol = AdsSymbol(self.plc, name=self.test_var.name)

symbol.value = 420.0 # Shouldn't change anything yet

self.assertAdsRequestsCount(1) # Only a READWRITE for info

symbol.write()

self.assertAdsRequestsCount(2) # Written from buffer

symbol.read()

for i in range(10):
custom_buffer = symbol.value

self.assertEqual(420.0, symbol.value)

self.assertAdsRequestsCount(3) # Read only once

def test_get_symbol(self):
"""Test symbol by Connection method"""
with self.plc:
Expand All @@ -326,7 +348,7 @@ def test_get_symbol(self):
def test_add_notification(self):
"""Test notification registering"""

def my_callback(*args):
def my_callback(*_):
return

with self.plc:
Expand All @@ -341,7 +363,7 @@ def my_callback(*args):
def test_add_notification_delete(self):
"""Test notification registering"""

def my_callback(*args):
def my_callback(*_):
return

self.plc.open()
Expand All @@ -359,4 +381,3 @@ def my_callback(*args):

if __name__ == "__main__":
unittest.main()

0 comments on commit 0ab7248

Please sign in to comment.