-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy path_resultsdb.py
305 lines (265 loc) · 9.8 KB
/
_resultsdb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
#
# Results DB module.
#
# This file is part of Pints Functional Testing.
# Copyright (c) 2017-2019, University of Oxford.
# For licensing information, see the LICENSE file distributed with the Pints
# functional testing software package.
#
from collections import defaultdict
import json
import sqlite3
import time
import pfunk
def _ensure_database_schema(connection):
"""
Given a connection to a sqlite3 database, create a table (if needed)
containing the appropriate columns for our test results.
"""
query = """ create table if not exists test_results(
identifier integer primary key asc,
name varchar,
date date,
status varchar,
python varchar,
pints varchar,
pints_commit varchar,
pfunk_commit varchar,
pints_authored_date date,
pints_committed_date date,
pints_commit_msg varchar,
pfunk_authored_date date,
pfunk_committed_date date,
pfunk_commit_msg varchar,
seed integer,
json varchar
)"""
connection.execute(query)
connection.commit()
class ResultsDatabaseSchemaClient(object):
"""
Abstract parent for database readers and writers, keeping track of the
database columns and how to interpret the JSON column.
"""
primary_columns = ['identifier']
columns = [
'name',
'date',
'status',
'python',
'pints',
'pints_commit',
'pfunk_commit',
'pints_authored_date',
'pints_committed_date',
'pints_commit_msg',
'pfunk_authored_date',
'pfunk_committed_date',
'pfunk_commit_msg',
'seed',
]
def json_values(self):
"""
Interpret the json column in the test results table as a dictionary and
return it.
:return: The dictionary retrieved from the json, or the empty dict if
the field is empty.
"""
result = self._connection.execute(
'select json from test_results where identifier = ?', [self._row])
json_field = result.fetchone()[0]
dictionary = {}
if json_field is not None:
dictionary = json.loads(json_field)
return dictionary
class ResultsDatabaseWriter(ResultsDatabaseSchemaClient):
"""
Provides write access to a SQLite3 database containing test results.
For compatibility with the interfaces supplied by
ResultsWriter/ResultsReader, the flat-file equivalents, instances of this
class accept a test name and date, and provide access to the values in a
row matching. However, due to adding multiprocessing support, test name
and date no longer uniquely identify a test invocation so a separate
integer counter is used as the primary key.
This class is a Context Manager, so use it in a with block:
>>> with ResultsDatabaseWriter(":memory:", "a_test_name",
... "2019-01-01T12:34:56") as w:
... w[status] = "pending"
"""
def __init__(self, filename, test_name, date, existing_row_id=None):
self._connection = None
self._filename = filename
self.__ensure_schema()
self._name = test_name
self._date = date
if existing_row_id is not None:
self._row = existing_row_id
else:
self.__ensure_row_exists()
def __enter__(self):
self._connection = connect_to_database(self.filename())
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._connection.close()
def __ensure_schema(self):
"""
Establish that a test_results table exists, and if it didn't before,
that it has the correct schema. Note that this method uses a temporary
connection so it can be called outside the Context Manager lifecycle.
:return: None.
"""
conn = connect_to_database(self.filename())
_ensure_database_schema(conn)
conn.commit()
conn.close()
def __ensure_row_exists(self):
"""
Create a row in the table to represent the current result, and store
its primary key.
Note that this method uses a temporary connection so it can be called
outside of the Context Manager lifecycle.
:return: None
"""
# ensure the row exists
conn = connect_to_database(self.filename())
conn.execute(
'insert into test_results(name,date) values (?,?)',
(self._name, self._date))
row_id = conn.execute('select last_insert_rowid()')
self._row = row_id.fetchone()[0]
conn.commit()
conn.close()
def row_id(self):
"""
Return the primary key for this writer's table row. Mostly for
debugging.
"""
return self._row
def __setitem__(self, key, value):
if key in self.primary_columns:
# don't update these
pass
elif key in self.columns:
self._connection.execute(
f'update test_results set {key} = ? where identifier = ?',
(value, self._row))
self._connection.commit()
else:
dictionary = self.json_values()
# workaround: if we're given a numpy array, make a Python list
if getattr(value, 'tolist', None) is not None:
value = value.tolist()
dictionary[key] = value
json_field = json.dumps(dictionary)
self._connection.execute(
'update test_results set json = ? where identifier = ?',
(json_field, self._row))
self._connection.commit()
def write(self):
"""
Provides compatibility with the file-writer interface for writing test
results.
:return: None.
"""
pass
def filename(self):
"""
Provides compatibility with the file-writer interface for writing test
results.
:return: The path to the database.
"""
return self._filename
class ResultsDatabaseReader(ResultsDatabaseSchemaClient):
"""
Provides read access to a row in the test results database.
"""
def __init__(self, connection, row_id):
self._connection = connection
self._row = row_id
def __getitem__(self, item):
if item in self.primary_columns or item in self.columns:
result = self._connection.execute(
f'select {item} from test_results where identifier = ?',
[self._row])
database_row = result.fetchone()
if database_row is None:
raise KeyError(
f'row_id {self._row} is not present in the database')
return database_row[0]
dictionary = defaultdict(lambda: None, self.json_values())
return dictionary[item]
class ResultsDatabaseResultsSet(object):
"""
Represents a collection of rows in the test results database. Provides
keyed access to the fields in the results, so set['foo'] gives you a list
of the 'foo' fields for all of the results in the set.
"""
def __init__(self, result_rows):
self._rows = result_rows
def get_single_item(self, item):
return [r[item] for r in self._rows]
def __getitem__(self, item):
# Treat single-value case like multi-value case
single_value = (type(item) != tuple)
if single_value:
item = (item,)
return [self.get_single_item(i) for i in item]
def find_test_results(name, database, ignore_incomplete=True):
"""
Fetches a set of all results for a test with the given name in the
database.
:param name: The name of the test to find results for.
:param database: A path to a pfunk test results database.
:return: A ResultsDatabaseResultsSet with all of the relevant test results.
"""
connection = connect_to_database(database)
q1 = 'select identifier from test_results where name like ?'
q2 = ' AND status like ?'
q3 = ' order by pints_committed_date, pfunk_committed_date'
if ignore_incomplete:
results = connection.execute(q1 + q2 + q3, [name, 'done'])
else:
results = connection.execute(q1 + q2, [name])
row_ids = [r[0] for r in results.fetchall()]
row_readers = [
ResultsDatabaseReader(connection, row_id) for row_id in row_ids]
return ResultsDatabaseResultsSet(row_readers)
def connect_to_database(database):
"""
Establishes a connection to a test results database.
:param database: A path to a pfunk test results database.
:return: An open sqlite3 connection to the database.
"""
connection = sqlite3.connect(database, timeout=30)
connection.row_factory = sqlite3.Row
return connection
def find_test_dates(database, ignore_unknown=True):
"""
Returns a dict mapping test names to the time (a ``time.struct_time``) when
they were last run.
If a test has not been run, then a default time (Jan 1 1970 00:00:00 UTC)
is set.
Only tests that are currently defined in ``pfunk`` are returned, to get the
full list, use ``ignore_unknown=False``.
"""
# Fetch test names and dates
connection = connect_to_database(database)
_ensure_database_schema(connection)
names_and_dates = connection.execute(
'select name, max(date) as "most_recent" from test_results'
' group by name'
).fetchall()
connection.close()
name_date_map = {
t[0]: time.strptime(t[1], '%Y-%m-%d-%H:%M:%S') for t in names_and_dates
}
# Add date for known tests that are not mentioned in the db
known_tests = pfunk.tests.tests()
for test in known_tests:
if test not in name_date_map:
name_date_map[test] = time.struct_time([0] * 9)
# Remove unknown tests that are mentioned in the db
unknown_tests = set(name_date_map.keys()) - set(known_tests)
for test in unknown_tests:
del(name_date_map[test])
return name_date_map