-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathconnectors.py
142 lines (111 loc) · 4.29 KB
/
connectors.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import abc
import io
import json as libjson
import socket
from urllib.parse import unquote
import bddrest
from .helpers import encode_multipart_data, querystring_encode
from .response import Response
class WSGIResponse(Response):
def write(self, data):
if self.body is None:
self.body = b''
if isinstance(data, str):
self.body += data.encode()
else:
self.body += data
class Connector(metaclass=abc.ABCMeta):
def request(self, verb='GET', path='/', form=None, multipart=None,
json=None, environ=None, headers=None, body=None,
content_type=None, content_length=None, **kw):
headers = headers or []
if body is None:
if multipart:
content_type, body, content_length = \
encode_multipart_data(multipart)
elif json:
body = libjson.dumps(json)
content_type = 'application/json;charset:utf-8'
content_length = len(body)
elif isinstance(form, dict):
body = querystring_encode(form)
content_length = len(body)
content_type = 'application/x-www-form-urlencoded'
else:
content_length = len(body)
if isinstance(body, str):
body = body.encode()
content_length = len(body)
if content_type is not None:
headers.append(('Content-Type', content_type))
if content_length is not None:
headers.append(('Content-Length', str(content_length)))
return self._send_request(verb, path, environ, headers, body, **kw)
@abc.abstractmethod
def _send_request(self, verb, path, environ, headers, body=None, **kw):
pass
class WSGIConnector(Connector):
def __init__(self, application, environ=None):
self.application = application
self.environ = environ
def _prepare_environ(self, verb, path, headers, payload=None,
extra_environ=None, https=False):
if isinstance(payload, io.BytesIO):
input_file = payload
elif payload:
input_file = io.BytesIO(payload)
input_file.seek(0)
else:
input_file = io.BytesIO()
error_file = io.StringIO()
environ = self.environ.copy() if self.environ else {}
environ['wsgi.input'] = input_file
environ['wsgi.errors'] = error_file
environ['wsgi.version'] = (1, 0)
environ['wsgi.multithread'] = False
environ['wsgi.multiprocess'] = False
environ['wsgi.run_once'] = True
environ['wsgi.url_scheme'] = 'https' if https else 'http'
environ['REQUEST_METHOD'] = verb
environ['SERVER_NAME'] = socket.gethostname()
environ['HTTP_HOST'] = 'bddrest-interceptor'
environ['HTTP_PORT'] = 80
environ['SERVER_PROTOCOL'] = 'HTTP/1.1\r\n'
environ['REMOTE_ADDR'] = '127.0.0.1'
environ['HTTP_USER_AGENT'] = f'Python bddrest/{bddrest.__version__}'
if https:
environ['HTTPS'] = 'yes'
if '?' in path:
path, query = path.split('?', 1)
environ['QUERY_STRING'] = query
environ['PATH_INFO'] = unquote(path, 'iso-8859-1')
if extra_environ:
environ.update(extra_environ)
for k, v in headers:
key = k.upper().replace('-', '_')
if key not in ['CONTENT_TYPE', 'CONTENT_LENGTH']:
key = 'HTTP_' + key
environ[key] = v
return environ
def _send_request(self, verb, path, environ, headers, body=None,
https=False, **kw):
environ_ = self._prepare_environ(verb, path, headers, body, environ,
https)
response = None
def start_response(status, headers, exc_info=None):
nonlocal response
if exc_info:
raise exc_info[1]
response = WSGIResponse(status, headers)
return response.write
result = self.application(
environ_,
start_response
)
try:
for i in result:
response.write(i)
finally:
if hasattr(result, 'close'):
result.close()
return response