Skip to content

Commit

Permalink
rewrite connection pool
Browse files Browse the repository at this point in the history
  • Loading branch information
xxnet committed Aug 14, 2015
1 parent a2db438 commit 89119c1
Show file tree
Hide file tree
Showing 13 changed files with 3,121 additions and 183 deletions.
3 changes: 0 additions & 3 deletions gae_proxy/local/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,6 @@ def load(self):

self.USE_IPV6 = self.CONFIG.getint('google_ip', 'use_ipv6')

# change to False when require http://127.0.0.1:8084/quit
# then GoAgent will quit
self.keep_run = True

# change to True when finished import CA cert to browser
# launcher will wait import ready then open browser to show status, check update etc
Expand Down
19 changes: 19 additions & 0 deletions gae_proxy/local/connect_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,32 @@

import xlog

# change to False when exit: system tray exit menu, or Ctrl+C in console
# then GoAgent will quit
# Every long running thread should check it and exit when False
keep_running = True

connect_allow_time = 0
connect_fail_time = 0
scan_allow_time = 0

block_delay = 10 # (60 * 5)
scan_sleep_time = 600 # Need examination
last_request_time = 0

def touch_active():
global last_request_time
last_request_time = time.time()

def inactive_time():
global last_request_time
return time.time() - last_request_time

def is_active(timeout=60):
if inactive_time() < timeout:
return True
else:
return False

def allow_connect():
global connect_allow_time
Expand Down
135 changes: 73 additions & 62 deletions gae_proxy/local/connect_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def qsize(self):

def put(self, item):
handshake_time, sock = item
sock.last_use_time = time.time()
self.not_empty.acquire()
try:
self.pool[sock] = handshake_time
Expand Down Expand Up @@ -111,16 +112,16 @@ def get_slowest(self):
if not self.qsize():
raise ValueError("no item")

slowest_time = 0
slowest_handshake_time = 0
slowest_sock = None
for sock in self.pool:
time = self.pool[sock]
if time > slowest_time:
slowest_time = time
handshake_time = self.pool[sock]
if handshake_time > slowest_handshake_time:
slowest_handshake_time = handshake_time
slowest_sock = sock

self.pool.pop(slowest_sock)
return (slowest_time, slowest_sock)
return (slowest_handshake_time, slowest_sock)
finally:
self.not_empty.release()

Expand All @@ -133,8 +134,7 @@ def get_need_keep_alive(self, maxtime=200):
inactive_time = time.time() -sock.last_use_time
#logging.debug("inactive_time:%d", inactive_time * 1000)
if inactive_time >= maxtime:
if inactive_time <= 230:
return_list.append(sock)
return_list.append(sock)
del self.pool[sock]

return return_list
Expand All @@ -149,7 +149,7 @@ def to_string(self):
i = 0
for item in pool:
sock,t = item
str += "%d \t %s handshake:%d create:%d\r\n" % (i, sock.ip, t, time.time() -sock.last_use_time)
str += "%d \t %s handshake:%d not_active_time:%d\r\n" % (i, sock.ip, t, time.time() -sock.last_use_time)
i += 1
finally:
self.pool_lock.release()
Expand All @@ -159,9 +159,7 @@ def to_string(self):

def random_hostname():
return False
#return "cache.google.com"
word = ''.join(random.choice(('bcdfghjklmnpqrstvwxyz', 'aeiou')[x&1]) for x in xrange(random.randint(6, 10)))
#return "%s.appspot.com" % word
gltd = random.choice(['org', 'com', 'net', 'gov'])
return 'www.%s.%s' % (word, gltd)

Expand All @@ -184,7 +182,6 @@ def __init__(self):

if self.keep_alive:
p = threading.Thread(target = self.keep_alive_thread)
p.daemon = True
p.start()

def load_config(self):
Expand Down Expand Up @@ -218,7 +215,7 @@ def head_request(self, ssl_sock):
# public appid don't keep alive, for quota limit.
if ssl_sock.appid.startswith("xxnet-") and ssl_sock.appid[7:].isdigit():
#logging.info("public appid don't keep alive")
self.keep_alive = 0
#self.keep_alive = 0
return False

#logging.debug("head request %s", host)
Expand Down Expand Up @@ -252,75 +249,79 @@ def head_request(self, ssl_sock):
if response:
response.close()

def keep_alive_worker(self, sock):
if self.head_request(sock):
self.save_ssl_connection_for_reuse(sock)
else:
sock.close()

def start_keep_alive(self, sock):
work_thread = threading.Thread(target=self.keep_alive_worker, args=(sock,))
work_thread.start()

def keep_alive_thread(self):
while self.keep_alive:
time.sleep(1)
try:
sock_list = self.new_conn_pool.get_need_keep_alive(maxtime=self.keep_alive-3)
for ssl_sock in sock_list:
ssl_sock.close()

sock_list = self.gae_conn_pool.get_need_keep_alive(maxtime=self.keep_alive-3)
for ssl_sock in sock_list:
# only keep little alive link.
# if you have 25 appid, you can keep 5 alive link.
if self.gae_conn_pool.qsize() > max(1, len(appid_manager.working_appid_list)/2):
ssl_sock.close()
continue
while self.keep_alive and connect_control.keep_running:
if not connect_control.is_active():
time.sleep(1)
continue

#inactive_time = time.time() -ssl_sock.last_use_time
#logging.debug("inactive_time:%d", inactive_time)
if self.head_request(ssl_sock):
self.save_ssl_connection_for_reuse(ssl_sock)
else:
ssl_sock.close()
new_list = self.new_conn_pool.get_need_keep_alive(maxtime=self.keep_alive-3)
old_list = self.gae_conn_pool.get_need_keep_alive(maxtime=self.keep_alive-3)
to_keep_live_list = new_list + old_list

self.create_more_connection()
except Exception as e:
xlog.warn("keep alive except:%r", e)
for ssl_sock in to_keep_live_list:
inactive_time = time.time() - ssl_sock.last_use_time
if inactive_time > self.keep_alive:
ssl_sock.close()
else:
self.start_keep_alive(ssl_sock)

time.sleep(1)

def save_ssl_connection_for_reuse(self, ssl_sock, host=None):
ssl_sock.last_use_time = time.time()

if host:
if host not in self.host_conn_pool:
self.host_conn_pool[host] = Connect_pool()

self.host_conn_pool[host].put( (ssl_sock.handshake_time, ssl_sock) )
return

self.gae_conn_pool.put( (ssl_sock.handshake_time, ssl_sock) )
else:
self.gae_conn_pool.put( (ssl_sock.handshake_time, ssl_sock) )

while self.gae_conn_pool.qsize() > self.connection_pool_max_num:
t, ssl_sock = self.gae_conn_pool.get_slowest()
while self.gae_conn_pool.qsize() > self.connection_pool_max_num:
handshake_time, ssl_sock = self.gae_conn_pool.get_slowest()

if t < self.keep_alive:
self.gae_conn_pool.put( (ssl_sock.handshake_time, ssl_sock) )
#ssl_sock.close()
return
else:
ssl_sock.close()
if handshake_time < self.keep_alive:
self.gae_conn_pool.put( (ssl_sock.handshake_time, ssl_sock) )
return
else:
ssl_sock.close()


def create_more_connection(self):
#need_conn_num = self.connection_pool_min_num - self.new_conn_pool.qsize()
def create_more_connection(self, type="gae"):
if type == "gae":
need_conn_num = self.connection_pool_min_num - self.new_conn_pool.qsize() - self.gae_conn_pool.qsize()
else:
need_conn_num = self.connection_pool_min_num - self.new_conn_pool.qsize()

#target_thread_num = min(self.max_thread_num, need_conn_num)
while self.thread_num < self.max_thread_num and self.new_conn_pool.qsize() < self.connection_pool_min_num:
target_thread_num = min(self.max_thread_num, need_conn_num)
#while self.thread_num < self.max_thread_num and connect_control.keep_running:
for i in range(0, target_thread_num):
if not connect_control.allow_connect():
break

self.thread_num_lock.acquire()
self.thread_num += 1
self.thread_num_lock.release()
p = threading.Thread(target = self.connect_thread)
p.daemon = True
p = threading.Thread(target = self.create_connection_worker, args=(type,))
p.start()
time.sleep(0.3)


#time.sleep(0.3)

def _create_ssl_connection(self, ip_port):
if not connect_control.allow_connect():
time.sleep(1)
return False

sock = None
Expand Down Expand Up @@ -405,9 +406,16 @@ def verify_SSL_certificate_issuer(ssl_sock):
return False


def connect_thread(self):
def create_connection_worker(self, type="gae"):
try:
while self.new_conn_pool.qsize() < self.connection_pool_min_num:
while connect_control.keep_running:
if type == "gae":
if (self.new_conn_pool.qsize() + self.gae_conn_pool.qsize()) >= self.connection_pool_min_num:
break
else:
if self.new_conn_pool.qsize() >= self.connection_pool_min_num:
break

ip_str = google_ip.get_gws_ip()
if not ip_str:
xlog.warning("no gws ip")
Expand All @@ -427,7 +435,7 @@ def connect_thread(self):
self.thread_num -= 1
self.thread_num_lock.release()

def create_ssl_connection(self, host=''):
def get_ssl_connection(self, host=''):
ssl_sock = None
if host:
if host in self.host_conn_pool:
Expand All @@ -439,9 +447,8 @@ def create_ssl_connection(self, host=''):
ssl_sock = None
break

if time.time() - ssl_sock.last_use_time < self.keep_alive+1: # gws ssl connection can keep for 230s after created
if time.time() - ssl_sock.last_use_time < self.keep_alive: # gws ssl connection can keep for 60s after created
xlog.debug("host_conn_pool %s get:%s handshake:%d", host, ssl_sock.ip, handshake_time)
return ssl_sock
break
else:
ssl_sock.close()
Expand All @@ -455,20 +462,25 @@ def create_ssl_connection(self, host=''):
ssl_sock = None
break

if time.time() - ssl_sock.last_use_time < self.keep_alive+1: # gws ssl connection can keep for 230s after created
if time.time() - ssl_sock.last_use_time < self.keep_alive: # gws ssl connection can keep for 60s after created
xlog.debug("ssl_pool.get:%s handshake:%d", ssl_sock.ip, handshake_time)
break
else:
ssl_sock.close()
continue

self.create_more_connection()
if host:
self.create_more_connection(type="host")
else:
self.create_more_connection(type="gae")

if ssl_sock:
return ssl_sock
else:
#xlog.debug("get_ssl wait")
ret = self.new_conn_pool.get(True, self.max_timeout)
if ret:
#xlog.debug("get_ssl ok")
handshake_time, ssl_sock = ret
return ssl_sock
else:
Expand Down Expand Up @@ -573,7 +585,6 @@ def _create_connection(ip_port, delay=0):
self.thread_num += 1
self.thread_num_lock.release()
p = threading.Thread(target=_create_connection, args=(addr,))
p.daemon = True
p.start()

try:
Expand Down
2 changes: 1 addition & 1 deletion gae_proxy/local/direct_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def fetch(method, host, path, headers, payload, bufsize=8192):
request_data += ''.join('%s: %s\r\n' % (k, v) for k, v in headers.items())
request_data += '\r\n'

ssl_sock = https_manager.create_ssl_connection(host)
ssl_sock = https_manager.get_ssl_connection(host)
if not ssl_sock:
return

Expand Down
2 changes: 1 addition & 1 deletion gae_proxy/local/gae_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def request(headers={}, payload=None):
for i in range(max_retry):
ssl_sock = None
try:
ssl_sock = https_manager.create_ssl_connection()
ssl_sock = https_manager.get_ssl_connection()
if not ssl_sock:
xlog.debug('create_ssl_connection fail')
continue
Expand Down
Loading

0 comments on commit 89119c1

Please sign in to comment.