forked from pablin87/rtb_exchange_sim
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconnection.py
167 lines (150 loc) · 5.7 KB
/
connection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import pyev
import os
import sys
import logging
import socket
import signal
import errno
import weakref
NONBLOCKING = (errno.EAGAIN, errno.EWOULDBLOCK)
class Connection(object):
'''
Client connection to an rtb server
'''
STATE_NOT_CONNECTED = 'CONNECTED'
STATE_CONNECTING = 'CONNECTING'
STATE_CONNECTED = 'NOT_CONNECTED'
STATE_ERROR = 'ERROR'
STATE_IDLE = 'IDLE'
_id = 1
def __init__(self, address, loop,
request_cb, response_cb, error_cb, connect_cb=None):
self.request_cb = request_cb
self.response_cb = response_cb
self.error_cb = error_cb
self.connect_cb = connect_cb
self.last_qps = 0
self.current_qps = 0
self.sock = None
self.watcher = None
self.address = address
self.buf = ''
self.read_buf = ''
self.state = Connection.STATE_NOT_CONNECTED
self.loop = loop
self.id = Connection._id
Connection._id += 1
logging.debug("{0}: ready".format(self))
def __del__(self):
logging.info('------------------> ex __del__ conn %d' % self.id)
def connect(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setblocking(0)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
logging.info('connecting to %s:%d' %
(self.address[0], self.address[1]))
res = self.sock.connect_ex(self.address)
if res != errno.EINPROGRESS :
logging.error('unable to connect to %s:%d' %
(self.address[0], self.address[1]))
self.state = Connection.STATE_ERROR
else:
if not self.connect_cb:
self.connect_cb = self.io_cb
self.state = Connection.STATE_CONNECTING
self.watcher = pyev.Io(
self.sock,
pyev.EV_WRITE,
self.loop,
self.connect_cb)
self.watcher.start()
# start the timer
self.timer = pyev.Timer(1, 1, self.loop, self.set_qps)
self.timer.start()
return self.state
def send_buffer(self, buf):
logging.debug('conn.send_buffer')
# set the buffer and wait the write event
self.state = Connection.STATE_CONNECTED
self.buf = buf
self.reset(pyev.EV_WRITE)
def set_qps(self, watcher, revents):
self.last_qps = self.current_qps
self.current_qps = 0
def reset(self, events):
self.watcher.stop()
self.watcher.set(self.sock, events)
self.watcher.start()
def handle_error(self, msg, level=logging.ERROR, exc_info=True):
logging.error("{0}: {1} --> closing".format(self, msg),
exc_info=exc_info)
self.close()
if self.state != Connection.STATE_CONNECTING:
self.error_cb(self)
self.state = Connection.STATE_ERROR
def handle_read(self):
try:
logging.debug('handling read %d' % self.id)
b = self.sock.recv(1024)
logging.debug('reading -------------->%s<------------------' % b)
self.read_buf += b
logging.debug('buffer -------------->%s<------------------' % self.read_buf)
except socket.error as err:
if err.args[0] not in NONBLOCKING:
self.handle_error("error reading from {0}".format(self.sock))
else :
logging.error('NONBLOCKING event on read')
if self.read_buf:
buf = self.response_cb(self.read_buf, self)
# was it a full response ?
if not buf :
# we got a full response
self.current_qps += 1
self.read_buf = ''
if not self.state == Connection.STATE_IDLE:
self.reset(pyev.EV_WRITE)
else :
# we got a partial response keep on reading
logging.debug('partial buffer received %s' % self.read_buf)
self.reset(pyev.EV_READ)
else:
self.handle_error("connection closed by peer", logging.DEBUG, False)
def handle_write(self):
try:
logging.debug('handling write %d' % self.id)
self.state = Connection.STATE_CONNECTED
if not self.buf :
self.buf += self.request_cb(self)
sent = self.sock.send(self.buf)
except socket.error as err:
logging.error('handle_write ex')
if err.args[0] not in NONBLOCKING:
self.handle_error("error writing to {0}".format(self.sock))
else :
logging.error('NONBLOCKING event on write')
else :
if self.state == Connection.STATE_CONNECTING:
self.state = Connection.STATE_CONNECTED
self.buf = self.buf[sent:]
if not self.buf:
# all the request buffer was sent,
# let's wait for the response
self.reset(pyev.EV_READ)
else :
# there is still some buffer left,
# wait for the write event again
logging.info('partial buffer sent')
self.reset(pyev.EV_WRITE)
def io_cb(self, watcher, revents):
if revents & pyev.EV_READ:
self.handle_read()
else:
self.handle_write()
def close(self):
self.sock.close()
if self.watcher :
self.watcher.stop()
self.watcher = None
self.timer.stop()
self.timer = None
logging.debug("{0}: closed".format(self))