forked from openSUSE/osc
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcommon.py
210 lines (175 loc) · 7.53 KB
/
common.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
import unittest
import osc.core
import shutil
import tempfile
import os
import sys
from xml.etree import cElementTree as ET
EXPECTED_REQUESTS = []
if sys.version_info[0:2] in ((2, 6), (2, 7)):
bytes = lambda x, *args: x
try:
#python 2.x
from cStringIO import StringIO
from urllib2 import HTTPHandler, addinfourl, build_opener
from urlparse import urlparse, parse_qs
except ImportError:
from io import StringIO
from urllib.request import HTTPHandler, addinfourl, build_opener
from urllib.parse import urlparse, parse_qs
def urlcompare(url, *args):
"""compare all components of url except query string - it is converted to
dict, therefor different ordering does not makes url's different, as well
as quoting of a query string"""
components = urlparse(url)
query_args = parse_qs(components.query)
components = components._replace(query=None)
if not args:
return False
for url in args:
components2 = urlparse(url)
query_args2 = parse_qs(components2.query)
components2 = components2._replace(query=None)
if components != components2 or \
query_args != query_args2:
return False
return True
class RequestWrongOrder(Exception):
"""raised if an unexpected request is issued to urllib2"""
def __init__(self, url, exp_url, method, exp_method):
Exception.__init__(self)
self.url = url
self.exp_url = exp_url
self.method = method
self.exp_method = exp_method
def __str__(self):
return '%s, %s, %s, %s' % (self.url, self.exp_url, self.method, self.exp_method)
class RequestDataMismatch(Exception):
"""raised if POSTed or PUTed data doesn't match with the expected data"""
def __init__(self, url, got, exp):
self.url = url
self.got = got
self.exp = exp
def __str__(self):
return '%s, %s, %s' % (self.url, self.got, self.exp)
class MyHTTPHandler(HTTPHandler):
def __init__(self, exp_requests, fixtures_dir):
HTTPHandler.__init__(self)
self.__exp_requests = exp_requests
self.__fixtures_dir = fixtures_dir
def http_open(self, req):
r = self.__exp_requests.pop(0)
if not urlcompare(req.get_full_url(), r[1]) or req.get_method() != r[0]:
raise RequestWrongOrder(req.get_full_url(), r[1], req.get_method(), r[0])
if req.get_method() in ('GET', 'DELETE'):
return self.__mock_GET(r[1], **r[2])
elif req.get_method() in ('PUT', 'POST'):
return self.__mock_PUT(req, **r[2])
def __mock_GET(self, fullurl, **kwargs):
return self.__get_response(fullurl, **kwargs)
def __mock_PUT(self, req, **kwargs):
exp = kwargs.get('exp', None)
if exp is not None and 'expfile' in kwargs:
raise RuntimeError('either specify exp or expfile')
elif 'expfile' in kwargs:
exp = open(os.path.join(self.__fixtures_dir, kwargs['expfile']), 'r').read()
elif exp is None:
raise RuntimeError('exp or expfile required')
if exp is not None:
# use req.data instead of req.get_data() for python3 compatiblity
data = req.data
if hasattr(data, 'read'):
data = data.read()
if data != bytes(exp, "utf-8"):
raise RequestDataMismatch(req.get_full_url(), repr(data), repr(exp))
return self.__get_response(req.get_full_url(), **kwargs)
def __get_response(self, url, **kwargs):
f = None
if 'exception' in kwargs:
raise kwargs['exception']
if 'text' not in kwargs and 'file' in kwargs:
f = StringIO(open(os.path.join(self.__fixtures_dir, kwargs['file']), 'r').read())
elif 'text' in kwargs and 'file' not in kwargs:
f = StringIO(kwargs['text'])
else:
raise RuntimeError('either specify text or file')
resp = addinfourl(f, {}, url)
resp.code = kwargs.get('code', 200)
resp.msg = ''
return resp
def urldecorator(method, fullurl, **kwargs):
def decorate(test_method):
def wrapped_test_method(*args):
addExpectedRequest(method, fullurl, **kwargs)
test_method(*args)
# "rename" method otherwise we cannot specify a TestCaseClass.testName
# cmdline arg when using unittest.main()
wrapped_test_method.__name__ = test_method.__name__
return wrapped_test_method
return decorate
def GET(fullurl, **kwargs):
return urldecorator('GET', fullurl, **kwargs)
def PUT(fullurl, **kwargs):
return urldecorator('PUT', fullurl, **kwargs)
def POST(fullurl, **kwargs):
return urldecorator('POST', fullurl, **kwargs)
def DELETE(fullurl, **kwargs):
return urldecorator('DELETE', fullurl, **kwargs)
def addExpectedRequest(method, url, **kwargs):
global EXPECTED_REQUESTS
EXPECTED_REQUESTS.append((method, url, kwargs))
class OscTestCase(unittest.TestCase):
def setUp(self, copytree=True):
oscrc = os.path.join(self._get_fixtures_dir(), 'oscrc')
osc.core.conf.get_config(override_conffile=oscrc,
override_no_keyring=True, override_no_gnome_keyring=True)
os.environ['OSC_CONFIG'] = oscrc
self.tmpdir = tempfile.mkdtemp(prefix='osc_test')
if copytree:
shutil.copytree(os.path.join(self._get_fixtures_dir(), 'osctest'), os.path.join(self.tmpdir, 'osctest'))
global EXPECTED_REQUESTS
EXPECTED_REQUESTS = []
osc.core.conf._build_opener = lambda u: build_opener(MyHTTPHandler(EXPECTED_REQUESTS, self._get_fixtures_dir()))
self.stdout = sys.stdout
sys.stdout = StringIO()
def tearDown(self):
self.assertTrue(len(EXPECTED_REQUESTS) == 0)
sys.stdout = self.stdout
try:
shutil.rmtree(self.tmpdir)
except:
pass
def _get_fixtures_dir(self):
raise NotImplementedError('subclasses should implement this method')
def _change_to_pkg(self, name):
os.chdir(os.path.join(self.tmpdir, 'osctest', name))
def _check_list(self, fname, exp):
fname = os.path.join('.osc', fname)
self.assertTrue(os.path.exists(fname))
self.assertEqual(open(fname, 'r').read(), exp)
def _check_addlist(self, exp):
self._check_list('_to_be_added', exp)
def _check_deletelist(self, exp):
self._check_list('_to_be_deleted', exp)
def _check_conflictlist(self, exp):
self._check_list('_in_conflict', exp)
def _check_status(self, p, fname, exp):
self.assertEqual(p.status(fname), exp)
def _check_digests(self, fname, *skipfiles):
fname = os.path.join(self._get_fixtures_dir(), fname)
self.assertEqual(open(os.path.join('.osc', '_files'), 'r').read(), open(fname, 'r').read())
root = ET.parse(fname).getroot()
for i in root.findall('entry'):
if i.get('name') in skipfiles:
continue
self.assertTrue(os.path.exists(os.path.join('.osc', i.get('name'))))
self.assertEqual(osc.core.dgst(os.path.join('.osc', i.get('name'))), i.get('md5'))
def assertEqualMultiline(self, got, exp):
if (got + exp).find('\n') == -1:
self.assertEqual(got, exp)
else:
start_delim = "\n" + (" 8< ".join(["-----"] * 8)) + "\n"
end_delim = "\n" + (" >8 ".join(["-----"] * 8)) + "\n\n"
self.assertEqual(got, exp,
"got:" + start_delim + got + end_delim +
"expected:" + start_delim + exp + end_delim)