-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathclient.py
185 lines (151 loc) · 5.9 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
import logging
from pexpect import fdpexpect, TIMEOUT, EOF
from openmath import encoder, decoder
from .scscp import SCSCPConnectionError, SCSCPQuit, SCSCPCancel, SCSCPProcedureMessage
from .processing_instruction import ProcessingInstruction as PI, OrderedProcessingInstruction as OPI
class TimeoutError(RuntimeError):
""" Client/Server timeout """
pass
INITIALIZED=0
CONNECTED=1
CLOSED=2
def _assert_status(status, msg=None):
def wrap(fun):
def wrapper(self, *args, **kwds):
if self.status != status:
raise RuntimeError(msg or "Bad status %d." % self.status)
return fun(self, *args, **kwds)
return wrapper
return wrap
_assert_connected = _assert_status(CONNECTED, "Not connected.")
class SCSCPPeer(object):
"""
Base class for SCSCP client and server
"""
def __init__(self, socket, timeout=30, logger=None, me='Client', you='Server'):
self.socket = socket
self.stream = fdpexpect.fdspawn(socket, timeout=timeout)
self.status = INITIALIZED
self.log = logger or logging.getLogger(__name__)
self.me, self.you = me, you
def _get_next_PI(self, expect=None, timeout=-1):
while True:
try:
self.stream.expect(PI.PI_regex, timeout=timeout)
except TIMEOUT:
raise TimeoutError("%s took too long to respond." % self.you)
except EOF:
raise ConnectionResetError("%s closed unexpectedly." % self.you)
try:
pi = PI.parse(self.stream.after)
except SCSCPConnectionError:
self.quit()
raise
self.log.debug("Received PI: %s" % pi)
if expect is not None and pi.key not in expect:
if pi.key == 'quit':
self.quit()
reason = pi.attrs.get('reason')
raise SCSCPQuit("%s closed session (reason: %s)." % (self.you, reason), reason)
if pi.key == '' and 'info' in pi.attrs:
self.log.info("SCSCP info: %s" % pi.attrs.get('info').decode())
continue
else:
raise SCSCPConnectionError("%s sent unexpected message: %s" % (self.you, pi.key), pi)
else:
return pi
def _send_PI(self, key='', **kwds):
pi = PI(key, **kwds)
self.log.debug("Sending PI: %s" % pi)
return self.socket.send(bytes(pi) + b'\n')
def _send_ordered_PI(self, key, attrs):
pi = OPI(key, attrs)
self.log.debug("Sending PI: %s" % pi)
return self.socket.send(bytes(pi) + b'\n')
@_assert_connected
def send(self, msg):
""" Send SCSCP message """
self._send_PI('start')
try:
self.log.debug(b'Sending message: %s' % msg)
self.socket.send(msg + b'\n')
except:
self._send_PI('cancel')
raise
else:
self._send_PI('end')
@_assert_connected
def receive(self, timeout=-1):
""" Receive SCSCP message """
pi = self._get_next_PI(['start'], timeout=timeout)
pi = self._get_next_PI(['end', 'cancel'], timeout=timeout)
if pi.key == 'cancel':
raise SCSCPCancel('%s canceled transmission' % self.you)
msg = self.stream.before
self.log.debug(b'Received message: %s' % msg)
return msg
@_assert_connected
def quit(self, reason=None):
""" Send SCSCP quit message """
kwds = {} if reason is None else { 'reason': None }
try:
self._send_PI('quit', **kwds)
self.socket.close()
except ConnectionError:
pass
finally:
self.status = CLOSED
@_assert_connected
def info(self, info):
""" Send SCSCP info message """
self._send_PI(info=info)
class SCSCPPeerOM(SCSCPPeer):
"""
Base class for SCSCP client and server understanding OpenMath
"""
def receive(self, timeout=-1):
msg = super(SCSCPPeerOM, self).receive(timeout)
return decoder.decode_bytes(msg)
def send(self, om):
return super(SCSCPPeerOM, self).send(encoder.encode_bytes(om))
class SCSCPClientBase(SCSCPPeer):
"""
A simple SCSCP synchronous client, with no understanding of OpenMath.
"""
def __init__(self, socket, timeout=30, logger=None):
super(SCSCPClientBase, self).__init__(socket, timeout, logger, me="Client", you="Server")
@_assert_status(INITIALIZED, "Session already opened.")
def connect(self, timeout=None):
""" SCSCP handshake """
pi = self._get_next_PI([''], timeout=timeout)
if ('scscp_versions' not in pi.attrs
or b'1.3' not in pi.attrs['scscp_versions'].split()):
self.quit()
raise SCSCPConnectionError("Unsupported SCSCP versions %s." % pi.attrs.get('scscp_versions'), pi)
self.service_info = pi.attrs
self._send_PI(version=b'1.3')
pi = self._get_next_PI([''], timeout=timeout)
if pi.attrs.get('version') != b'1.3':
self.quit()
raise SCSCPConnectionError("Server sent unexpected response.", pi)
self.status = CONNECTED
@_assert_connected
def terminate(self, id):
""" Send SCSCP terminate message """
self._send_PI('terminate', call_id=id)
class SCSCPClient(SCSCPClientBase, SCSCPPeerOM):
"""
A simple SCSCP synchronous client.
"""
def wait(self, timeout=-1):
return SCSCPProcedureMessage.from_om(self.receive(timeout))
def call(self, data, cookie=False, **opts):
if cookie:
opts['return_cookie'] = True
elif cookie is None:
opts['return_nothing'] = True
else:
opts['return_object'] = True
call = SCSCPProcedureMessage.call(data, id=None, **opts)
self.send(call.om())
return call