Skip to content

Commit

Permalink
enable concurrent connect control for win10
Browse files Browse the repository at this point in the history
  • Loading branch information
xxnet committed Aug 20, 2015
1 parent c0c2026 commit 1d2b6ff
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 20 deletions.
1 change: 1 addition & 0 deletions gae_proxy/local/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ def load(self):
self.LOVE_TIP = self.CONFIG.get('love', 'tip').encode('utf8').decode('unicode-escape').split('|')

self.USE_IPV6 = self.CONFIG.getint('google_ip', 'use_ipv6')
self.https_max_connect_thread = config.CONFIG.getint("connect_manager", "https_max_connect_thread")


# change to True when finished import CA cert to browser
Expand Down
118 changes: 111 additions & 7 deletions gae_proxy/local/connect_control.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,114 @@

import time

import threading
import xlog
import sys
from config import config

# change to False when exit: system tray exit menu, or Ctrl+C in console
# then GoAgent will quit
# Every long running thread should check it and exit when False
keep_running = True

connect_allow_time = 0
connect_fail_time = 0
scan_allow_time = 0
#=============================================
# concurrent connect control
# Windows10 will block sound when too many concurrent connect out in the same time.
# so when user request web, scan thread will stop to reduce concurrent action.
def check_win10():
if sys.platform != "win32":
return False

import ctypes
class OSVERSIONINFOEXW(ctypes.Structure):
_fields_ = [('dwOSVersionInfoSize', ctypes.c_ulong),
('dwMajorVersion', ctypes.c_ulong),
('dwMinorVersion', ctypes.c_ulong),
('dwBuildNumber', ctypes.c_ulong),
('dwPlatformId', ctypes.c_ulong),
('szCSDVersion', ctypes.c_wchar*128),
('wServicePackMajor', ctypes.c_ushort),
('wServicePackMinor', ctypes.c_ushort),
('wSuiteMask', ctypes.c_ushort),
('wProductType', ctypes.c_byte),
('wReserved', ctypes.c_byte)]

os_version = OSVERSIONINFOEXW()
os_version.dwOSVersionInfoSize = ctypes.sizeof(os_version)
retcode = ctypes.windll.Ntdll.RtlGetVersion(ctypes.byref(os_version))
if retcode != 0:
xlog.warn("Failed to get win32 OS version")
return False

if os_version.dwMajorVersion == 10:
xlog.info("detect Win10, enable connect concurent control.")
return True

return False

is_win10 = check_win10()

ccc_lock = threading.Lock()
high_prior_lock = []
low_prior_lock = []
high_prior_connecting_num = 0
low_prior_connecting_num = 0

def start_connect_register(high_prior=False):
global high_prior_connecting_num, low_prior_connecting_num
if not is_win10:
return

ccc_lock.acquire()
try:
if high_prior_connecting_num + low_prior_connecting_num > config.https_max_connect_thread:
atom_lock = threading.Lock()
atom_lock.acquire()
if high_prior:
high_prior_lock.append(atom_lock)
else:
low_prior_lock.append(atom_lock)
ccc_lock.release()
atom_lock.acquire()

ccc_lock.acquire()

if high_prior:
high_prior_connecting_num += 1
else:
low_prior_connecting_num += 1
finally:
ccc_lock.release()


def end_connect_register(high_prior=False):
global high_prior_connecting_num, low_prior_connecting_num
if not is_win10:
return

ccc_lock.acquire()
try:
if high_prior:
high_prior_connecting_num -= 1
else:
low_prior_connecting_num -= 1

if high_prior_connecting_num + low_prior_connecting_num < config.https_max_connect_thread:
if len(high_prior_lock):
atom_lock = high_prior_lock.pop()
atom_lock.release()
return

if len(low_prior_lock):
atom_lock = low_prior_lock.pop()
atom_lock.release()
return
finally:
ccc_lock.release()

#=============================================
# this design is for save resource when browser have no request for long time.
# when idle, connect pool will not maintain the connect ready link to save resources.

block_delay = 10 # (60 * 5)
scan_sleep_time = 600 # Need examination
last_request_time = 0

def touch_active():
Expand All @@ -29,6 +124,15 @@ def is_active(timeout=60):
return True
else:
return False
#==============================================
# honey pot is out of date, setup in 2015-05
# The code may be deleted in the future
connect_allow_time = 0
connect_fail_time = 0
scan_allow_time = 0

block_delay = 10 # (60 * 5)
scan_sleep_time = 600 # Need examination

def allow_connect():
global connect_allow_time
Expand Down Expand Up @@ -80,4 +184,4 @@ def block_stat():
return "Connect Blocked, %d seconds to wait." % wait_time
elif scan_time > 0:
return "Scan Blocked, %d seconds to wait." % scan_time

#=============================================
7 changes: 6 additions & 1 deletion gae_proxy/local/connect_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def __init__(self):
p.start()

def load_config(self):
self.max_thread_num = config.CONFIG.getint("connect_manager", "https_max_connect_thread") #10
self.max_thread_num = config.https_max_connect_thread #10
self.connection_pool_max_num = config.CONFIG.getint("connect_manager", "https_connection_pool_max") #20/30
self.connection_pool_min_num = config.CONFIG.getint("connect_manager", "https_connection_pool_min") #20/30
self.keep_alive = config.CONFIG.getint("connect_manager", "https_keep_alive") #1
Expand Down Expand Up @@ -423,7 +423,9 @@ def create_connection_worker(self, type="gae"):

port = 443
#logging.debug("create ssl conn %s", ip_str)
connect_control.start_connect_register(True)
ssl_sock = self._create_ssl_connection( (ip_str, port) )
connect_control.end_connect_register(True)
if ssl_sock:
ssl_sock.last_use_time = time.time()
self.new_conn_pool.put((ssl_sock.handshake_time, ssl_sock))
Expand Down Expand Up @@ -510,6 +512,8 @@ def create_connection(self, host="", port=443, sock_life=5):
return None

def _create_connection(ip_port, delay=0):

connect_control.start_connect_register(True)
time.sleep(delay)
ip = ip_port[0]
sock = None
Expand Down Expand Up @@ -555,6 +559,7 @@ def _create_connection(ip_port, delay=0):
self.thread_num_lock.acquire()
self.thread_num -= 1
self.thread_num_lock.release()
connect_control.end_connect_register(True)


if host != "appengine.google.com":
Expand Down
2 changes: 1 addition & 1 deletion gae_proxy/local/direct_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def handler(method, host, url, headers, body, wfile):
response = fetch(method, host, url, headers, body)
if response:
break
except OpenSSL.SysCallError as e:
except OpenSSL.SSL.SysCallError as e:
errors.append(e)
xlog.warn("direct_handler.handler err:%r %s/%s", e, host, url)
except Exception as e:
Expand Down
11 changes: 10 additions & 1 deletion gae_proxy/local/google_ip.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,9 @@ def scan_ip_worker(self):
if self.is_bad_ip(ip_str):
continue

connect_control.start_connect_register()
result = check_ip.test_gws(ip_str)
connect_control.end_connect_register()
if not result:
continue

Expand All @@ -535,7 +537,14 @@ def scan_ip_worker(self):
xlog.info("scan_ip_worker exit")

def search_more_google_ip(self):
while self.searching_thread_count < self.scan_ip_thread_num:
if config.USE_IPV6:
return

new_thread_num = self.scan_ip_thread_num - self.searching_thread_count
if new_thread_num < 1:
return

for i in range(0, new_thread_num):
self.ncount_lock.acquire()
self.searching_thread_count += 1
self.ncount_lock.release()
Expand Down
2 changes: 1 addition & 1 deletion gae_proxy/local/proxy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ max_good_ip_num = 3000
ip_connect_interval = 10

[connect_manager]
https_max_connect_thread = 50
https_max_connect_thread = 30
https_connection_pool_min = 20
https_connection_pool_max = 60

Expand Down
27 changes: 18 additions & 9 deletions gae_proxy/local/web_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def __init__(self):
self.proxy_passwd = ""

self.host_appengine_mode = "gae"
self.ip_connect_interval = ""
self.ip_connect_interval = 10
self.auto_adjust_scan_ip_thread_num = 1
self.scan_ip_thread_num = 0
self.use_ipv6 = 0
Expand All @@ -80,20 +80,23 @@ def __init__(self):
def load(self):
ConfigParser.RawConfigParser.OPTCRE = re.compile(r'(?P<option>[^=\s][^=]*)\s*(?P<vi>[=])\s*(?P<value>.*)$')

self.USER_CONFIG = ConfigParser.ConfigParser()
CONFIG_USER_FILENAME = os.path.abspath( os.path.join(root_path, 'data', 'gae_proxy', 'config.ini'))

self.DEFAULT_CONFIG = ConfigParser.ConfigParser()
DEFAULT_CONFIG_FILENAME = os.path.abspath( os.path.join(current_path, 'proxy.ini'))


self.USER_CONFIG = ConfigParser.ConfigParser()
CONFIG_USER_FILENAME = os.path.abspath( os.path.join(root_path, 'data', 'gae_proxy', 'config.ini'))

try:
if os.path.isfile(CONFIG_USER_FILENAME):
self.USER_CONFIG.read(CONFIG_USER_FILENAME)
if os.path.isfile(DEFAULT_CONFIG_FILENAME):
self.DEFAULT_CONFIG.read(DEFAULT_CONFIG_FILENAME)
self.user_special.scan_ip_thread_num = self.DEFAULT_CONFIG.getint('google_ip', 'max_scan_ip_thread_num')
self.ip_connect_interval = self.DEFAULT_CONFIG.getint('google_ip', 'ip_connect_interval')
else:
return

if os.path.isfile(DEFAULT_CONFIG_FILENAME):
self.DEFAULT_CONFIG.read(DEFAULT_CONFIG_FILENAME)
if os.path.isfile(CONFIG_USER_FILENAME):
self.USER_CONFIG.read(CONFIG_USER_FILENAME)
else:
return

Expand Down Expand Up @@ -279,6 +282,8 @@ def do_GET(self):

if path == '/deploy':
return self.req_deploy_handler()
elif path == "/config":
return self.req_config_handler()
elif path == "/ip_list":
return self.req_ip_list_handler()
elif path == "/scan_ip":
Expand Down Expand Up @@ -507,8 +512,12 @@ def req_status_handler(self):
"ip_handshake_100":google_ip.ip_handshake_th(100),
"block_stat":connect_control.block_stat(),
"use_ipv6":config.CONFIG.getint("google_ip", "use_ipv6"),
"high_prior_connecting_num":connect_control.high_prior_connecting_num,
"low_prior_connecting_num":connect_control.low_prior_connecting_num,
"high_prior_lock":len(connect_control.high_prior_lock),
"low_prior_lock":len(connect_control.low_prior_lock),
}
data = json.dumps(res_arr)
data = json.dumps(res_arr, indent=0, sort_keys=True)
self.send_response('text/html', data)

def req_config_handler(self):
Expand Down

0 comments on commit 1d2b6ff

Please sign in to comment.