forked from DMOJ/judge-server
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathproblem.py
228 lines (191 loc) · 8.48 KB
/
problem.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
import os
import subprocess
import zipfile
from functools import partial
import yaml
from yaml.parser import ParserError
from yaml.scanner import ScannerError
from dmoj import checkers
from dmoj.config import InvalidInitException, ConfigNode
from dmoj.generator import GeneratorManager
from dmoj.judgeenv import get_problem_root
from dmoj.utils.module import load_module_from_file
class Problem(object):
def __init__(self, problem_id, time_limit, memory_limit, load_pretests_only=False):
self.id = problem_id
self.time_limit = time_limit
self.memory_limit = memory_limit
self.generator_manager = GeneratorManager()
self.problem_data = ProblemDataManager(problem_id)
# Checkers modules must be stored in a dict, for the duration of execution,
# lest globals be deleted with the module.
self._checkers = {}
self._testcase_counter = 0
self._batch_counter = 0
try:
doc = yaml.safe_load(self.problem_data['init.yml'])
if not doc:
raise InvalidInitException('I find your lack of content disturbing.')
self.config = ConfigNode(doc, defaults={
'output_prefix_length': 64,
'output_limit_length': 25165824,
})
except (IOError, ParserError, ScannerError) as e:
raise InvalidInitException(str(e))
self.problem_data.archive = self._resolve_archive_files()
self.is_pretested = load_pretests_only and 'pretest_test_cases' in self.config
self.cases = self._resolve_testcases(self.config['pretest_test_cases' if self.is_pretested else 'test_cases'])
def load_checker(self, name):
if name in self._checkers:
return self._checkers[name]
self._checkers[name] = checker = load_module_from_file(os.path.join(get_problem_root(self.id), name))
return checker
def _resolve_archive_files(self):
if self.config.archive:
archive_path = os.path.join(get_problem_root(self.id), self.config.archive)
if not os.path.exists(archive_path):
raise InvalidInitException('archive file "%s" does not exist' % archive_path)
try:
archive = zipfile.ZipFile(archive_path, 'r')
except zipfile.BadZipfile:
raise InvalidInitException('bad archive: "%s"' % archive_path)
return archive
return None
def _resolve_testcases(self, cfg, batch_no=0):
cases = []
for case_config in cfg:
if 'batched' in case_config.raw_config:
self._batch_counter += 1
cases.append(BatchedTestCase(self._batch_counter, case_config, self))
else:
cases.append(TestCase(self._testcase_counter, batch_no, case_config, self))
self._testcase_counter += 1
return cases
class ProblemDataManager(dict):
def __init__(self, problem_id, **kwargs):
super(ProblemDataManager, self).__init__(**kwargs)
self.problem_id = problem_id
self.archive = None
def __missing__(self, key):
try:
return open(os.path.join(get_problem_root(self.problem_id), key), 'r').read()
except IOError:
if self.archive:
zipinfo = self.archive.getinfo(key)
return self.archive.open(zipinfo).read()
raise KeyError('file "%s" could not be found' % key)
def __del__(self):
if self.archive:
self.archive.close()
class BatchedTestCase(object):
def __init__(self, batch_no, config, problem):
self.config = config
self.batch_no = batch_no
self.points = config.points
self.batched_cases = problem._resolve_testcases(config['batched'], batch_no=batch_no)
if any(isinstance(case, BatchedTestCase) for case in self.batched_cases):
raise InvalidInitException("nested batches")
self.problem = problem
def __str__(self):
return 'BatchedTestCase{cases=%s}' % str(self.batched_cases)
class TestCase(object):
def __init__(self, count, batch_no, config, problem):
self.position = count
self.batch = batch_no
self.config = config
self.problem = problem
self.points = config.points
self.output_prefix_length = config.output_prefix_length
self._generated = None
def io_redirects(self):
redirects = self.config.io_redirects
if not redirects:
return None
# io_redirects:
# DATA01.in:
# fd: 0
# mode: "r"
# DATA01.out:
# fd: 1
# mode: "w"
filtered_data = {}
for redirect in redirects:
mapping = redirects[redirect]
if 'fd' not in mapping:
raise InvalidInitException("no fd specified for redirect '%s'" % redirect)
if 'mode' not in mapping:
raise InvalidInitException("no mode specified for redirect '%s'" % redirect)
if mapping.mode not in 'rw':
raise InvalidInitException("invalid mode for redirect '%s': valid options are 'r', 'w'" % redirect)
if isinstance(mapping.fd, str):
mapped = {'stdin': 0, 'stdout': 1, 'stderr': 2}.get(mapping.fd, None)
if mapped is None:
raise InvalidInitException("unknown named fd for redirect '%s'" % redirect)
mapping.fd = mapped
filtered_data[redirect] = (mapping.mode, mapping.fd)
return filtered_data
def _normalize(self, data):
# Normalize all newline formats (\r\n, \r, \n) to \n, otherwise we have problems with people creating
# data on Macs (\r newline) when judged programs assume \n
return data.replace('\r\n', '\r').replace('\r', '\n')
def _run_generator(self, gen, args=None):
flags = []
args = args or []
if isinstance(gen, str):
filename = os.path.join(get_problem_root(self.problem.id), gen)
else:
filename = gen.source
if gen.flags:
flags += gen.flags
if not args and gen.args:
args += gen.args
executor = self.problem.generator_manager.get_generator(filename, flags)
# convert all args to str before launching; allows for smoother int passing
proc = executor.launch_unsafe(*map(str, args), stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
try:
input = self.problem.problem_data[self.config['in']] if self.config['in'] else None
except KeyError:
input = None
self._generated = map(self._normalize, proc.communicate(input))
def input_data(self):
gen = self.config.generator
if gen:
if self._generated is None:
self._run_generator(gen, args=self.config.generator_args)
if self._generated[0]:
return self._generated[0]
# in file is optional
return self._normalize(self.problem.problem_data[self.config['in']]) if self.config['in'] else ''
def output_data(self):
if self.config.out:
return self._normalize(self.problem.problem_data[self.config.out])
gen = self.config.generator
if gen:
if self._generated is None:
self._run_generator(gen, args=self.config.generator_args)
return self._generated[1]
def checker(self):
try:
name = self.config['checker'] or 'standard'
if isinstance(name, ConfigNode):
params = name['args'] or {}
name = name['name']
else:
params = {}
if '.' in name:
try:
checker = self.problem.load_checker(name)
except IOError:
raise InvalidInitException('checker module path does not exist: %s' % name)
else:
checker = getattr(checkers, name)
except AttributeError as e:
raise InvalidInitException('error loading checker: ' + e.message)
if not hasattr(checker, 'check') or not callable(checker.check):
raise InvalidInitException('malformed checker: no check method found')
return partial(checker.check, **params)
def free_data(self):
self._generated = None
def __str__(self):
return 'TestCase{in=%s,out=%s,points=%s}' % (self.config['in'], self.config['out'], self.config['points'])