forked from testerSunshine/12306
-
Notifications
You must be signed in to change notification settings - Fork 0
/
httpUtils.py
executable file
·175 lines (155 loc) · 6.04 KB
/
httpUtils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# -*- coding: utf8 -*-
import json
import socket
from collections import OrderedDict
from time import sleep
import requests
from agency.agency_tools import proxy
from config import logger
def _set_header_default():
header_dict = OrderedDict()
# header_dict["Accept"] = "application/json, text/plain, */*"
header_dict["Accept-Encoding"] = "gzip, deflate"
header_dict[
"User-Agent"] = "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36"
header_dict["Content-Type"] = "application/x-www-form-urlencoded; charset=UTF-8"
header_dict["Origin"] = "https://kyfw.12306.cn"
header_dict["Connection"] = "keep-alive"
return header_dict
class HTTPClient(object):
def __init__(self, is_proxy):
"""
:param method:
:param headers: Must be a dict. Such as headers={'Content_Type':'text/html'}
"""
self.initS()
self._cdn = None
self._proxies = None
if is_proxy is 1:
self.proxy = proxy()
self._proxies = self.proxy.setProxy()
# print(u"设置当前代理ip为 {}, 请注意代理ip是否可用!!!!!请注意代理ip是否可用!!!!!请注意代理ip是否可用!!!!!".format(self._proxies))
def initS(self):
self._s = requests.Session()
self._s.headers.update(_set_header_default())
return self
def set_cookies(self, kwargs):
"""
设置cookies
:param kwargs:
:return:
"""
for kwarg in kwargs:
for k, v in kwarg.items():
self._s.cookies.set(k, v)
def get_cookies(self):
"""
获取cookies
:return:
"""
return self._s.cookies.values()
def del_cookies(self):
"""
删除所有的key
:return:
"""
self._s.cookies.clear()
def del_cookies_by_key(self, key):
"""
删除指定key的session
:return:
"""
self._s.cookies.set(key, None)
def setHeaders(self, headers):
self._s.headers.update(headers)
return self
def resetHeaders(self):
self._s.headers.clear()
self._s.headers.update(_set_header_default())
def getHeadersHost(self):
return self._s.headers["Host"]
def setHeadersHost(self, host):
self._s.headers.update({"Host": host})
return self
def getHeadersReferer(self):
return self._s.headers["Referer"]
def setHeadersReferer(self, referer):
self._s.headers.update({"Referer": referer})
return self
@property
def cdn(self):
return self._cdn
@cdn.setter
def cdn(self, cdn):
self._cdn = cdn
def send(self, urls, data=None, **kwargs):
"""send request to url.If response 200,return response, else return None."""
allow_redirects = False
is_logger = urls.get("is_logger", False)
req_url = urls.get("req_url", "")
re_try = urls.get("re_try", 0)
s_time = urls.get("s_time", 0)
is_cdn = urls.get("is_cdn", False)
is_test_cdn = urls.get("is_test_cdn", False)
error_data = {"code": 99999, "message": u"重试次数达到上限"}
if data:
method = "post"
self.setHeaders({"Content-Length": "{0}".format(len(data))})
else:
method = "get"
self.resetHeaders()
self.setHeadersReferer(urls["Referer"])
if is_logger:
logger.log(
u"url: {0}\n入参: {1}\n请求方式: {2}\n".format(req_url, data, method, ))
self.setHeadersHost(urls["Host"])
if is_test_cdn:
url_host = self._cdn
elif is_cdn:
if self._cdn:
# print(u"当前请求cdn为{}".format(self._cdn))
url_host = self._cdn
else:
url_host = urls["Host"]
else:
url_host = urls["Host"]
http = urls.get("httpType") or "https"
for i in range(re_try):
try:
# sleep(urls["s_time"]) if "s_time" in urls else sleep(0.001)
sleep(s_time)
try:
requests.packages.urllib3.disable_warnings()
except:
pass
response = self._s.request(method=method,
timeout=2,
proxies=self._proxies,
url=http + "://" + url_host + req_url,
data=data,
allow_redirects=allow_redirects,
verify=False,
**kwargs)
if response.status_code == 200 or response.status_code == 302:
if urls.get("not_decode", False):
return response.content
if response.content:
if is_logger:
logger.log(
u"出参:{0}".format(response.content.decode()))
if urls["is_json"]:
return json.loads(response.content.decode() if isinstance(response.content, bytes) else response.content)
else:
return response.content.decode("utf8", "ignore") if isinstance(response.content, bytes) else response.content
else:
print(f"url: {urls['req_url']}返回参数为空, 接口状态码: {response.status_code}")
logger.log(
u"url: {} 返回参数为空".format(urls["req_url"]))
continue
else:
sleep(urls["re_time"])
except (requests.exceptions.Timeout, requests.exceptions.ReadTimeout, requests.exceptions.ConnectionError):
pass
except socket.error:
pass
return error_data