-
Notifications
You must be signed in to change notification settings - Fork 80
/
qiita-recover-jobs
267 lines (233 loc) · 10.5 KB
/
qiita-recover-jobs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
#!/usr/bin/env python
# -----------------------------------------------------------------------------
# Copyright (c) 2014--, The Qiita Development Team.
#
# Distributed under the terms of the BSD 3-clause License.
#
# The full license is in the file LICENSE, distributed with this software.
# -----------------------------------------------------------------------------
from subprocess import check_output
from qiita_db.sql_connection import TRN
from qiita_db.processing_job import ProcessingJob
import pandas as pd
from time import sleep
from datetime import timedelta
from math import ceil
QIITA_QUEUE_LOG = '/home/qiita/qiita-queues-logs-DONT-DELETE.log'
SLEEP_TIME = 6
CHANCES = 3
SQL = """SELECT processing_job_id
FROM qiita.processing_job
JOIN qiita.processing_job_status
USING (processing_job_status_id)
WHERE processing_job_status = %s"""
def _submit_jobs(jids_to_recover, recover_type):
# we are going to split the SLEEP_TIME by CHANCES so we can ctrl-c
# ... just in case
st = int(ceil(SLEEP_TIME/CHANCES))
len_jids_to_recover = len(jids_to_recover)
for i, j in enumerate(jids_to_recover):
print('recovering %s: %d/%d' % (recover_type, len_jids_to_recover, i))
job = ProcessingJob(j)
job._set_status('in_construction')
job.submit()
for i in range(CHANCES):
print('You can ctrl-c now, iteration %d' % i)
sleep(st)
def _retrieve_queue_jobs():
qiita_jobs = [line.split()[0] for line in check_output("qstat").split("\n")
# just retriving 'qiita' and ignoring [] (ipython workers)
if 'qiita' in line and '[]' not in line and
# and private jobs
'private' not in line and
'STDIN' not in line]
qiita_jids = []
for qj in qiita_jobs:
# to retrieve info about the jobs we need to use the fullname, so
# appending .ucsd.edu
args = ["qstat", "-f", "%s.ucsd.edu" % qj]
# the name is the last string of the line and has .txt prepended
qji = [line.split()[-1].split(".")[0]
for line in check_output(args).split("\n")
if 'Job_Name' in line]
qiita_jids.extend(qji)
return set(qiita_jids)
def _get_jids_to_recover(recover_type):
with TRN:
TRN.add(SQL, [recover_type])
jids = set(TRN.execute_fetchflatten())
jids_to_recover = list(jids - _retrieve_queue_jobs())
print('Total %s: %d' % (recover_type, len(jids_to_recover)))
return jids_to_recover
def _parse_queue_values(d):
max_mem = 0
max_pmem = 0
max_vmem = 0
max_wt = timedelta(hours=0, minutes=0, seconds=0)
d = d.split(',')
for dd in d:
if dd.startswith('mem'):
v = int(dd[4:-2])
if v > max_mem:
max_mem = v
elif dd.startswith('pmem'):
v = int(dd[5:-2])
if v > max_pmem:
max_pmem = v
elif dd.startswith('vmem'):
v = int(dd[5:-2])
if v > max_mem:
max_mem = v
elif dd.startswith('walltime'):
v = map(int, dd[9:].split(':'))
v = timedelta(hours=v[0], minutes=v[1], seconds=v[2])
if v > max_wt:
max_wt = v
return max_mem, max_pmem, max_vmem, max_wt
def _qiita_queue_log_parse(jids_to_recover):
df = pd.read_csv(QIITA_QUEUE_LOG, sep='\t',
index_col=None, header=None, dtype=str)
# renaming columns so they are easier to manipulate
df.columns = ['bjid', 'user', 'group', 'jid', 'session', 'resource-list',
'resource-used', 'queue', 'account', 'exit-code']
# remove the register and empty fields to avoid errors
df = df[(df.bjid != '0') &
(~df.bjid.isnull()) &
(~df.user.isnull()) &
(df.jid != 'register.txt')]
# generate the qiita job id
df['qjid'] = df.jid.apply(lambda x: x.split('.')[0])
results = []
for jid, ddf in df.groupby('qjid'):
if jid in jids_to_recover:
vals = []
for _, r in ddf.iterrows():
vals.append({
'exit-code': r['exit-code'],
'resource-list': _parse_queue_values(r['resource-list']),
'resource-used': _parse_queue_values(r['resource-used'])})
results.append((ProcessingJob(jid), vals))
return results
def _flush_queues(recover_type):
# README 1: in theory we should be able to submit all recover_type jobs
# one after the other but in reality that's not possible. The issue
# is that a job is going to stay as running/waiting until is completed.
# Thus, we need to run complete_job first, wait for everything to finish,
# then continue with validate, then release_validators, and
# finally everything else. Note that is suggested to wait for the
# full recovery type to finish before moving to the next one
# README 2: we now have a logging file for all submitted jobs, so let's
# start checking for those that failed for system crashes or cause the
# workers were busy, error-codes: 1-2
# first start with completing jobs that are not running
jids_to_recover = _get_jids_to_recover(recover_type)
review_jobs = _qiita_queue_log_parse(jids_to_recover)
jids_review_jobs = [j.id for j, r in review_jobs
if {rr['exit-code'] for rr in r} == {'1'}]
_submit_jobs(jids_review_jobs, recover_type + '/queue_log/1')
jids_to_recover = _get_jids_to_recover(recover_type)
review_jobs = _qiita_queue_log_parse(jids_to_recover)
jids_review_jobs = [j.id for j, r in review_jobs
if {rr['exit-code'] for rr in r} == {'0'}]
_submit_jobs(jids_review_jobs, recover_type + '/queue_log/0')
jids_to_recover = _get_jids_to_recover(recover_type)
complete_job = [j for j in jids_to_recover
if ProcessingJob(j).command.name == 'complete_job']
_submit_jobs(complete_job, recover_type + '/complete_job')
# first start validators that are not running
jids_to_recover = _get_jids_to_recover(recover_type)
validate = [j for j in jids_to_recover
if ProcessingJob(j).command.name == 'Validate']
_submit_jobs(validate, recover_type + '/validate')
# then the release validator
jids_to_recover = _get_jids_to_recover(recover_type)
release_validators = [
j for j in jids_to_recover
if ProcessingJob(j).command.name == 'release_validators']
_submit_jobs(release_validators, recover_type + '/release_validators')
def qiita_recover_jobs():
# general full processing pipeline, as an example a deblur job as it yields
# two artifacts, each new line represents a new job, each idented block a
# waiting job
# -> deblur
# -> complete_job -> release_validator
# -> validate biom 1
# -> release_validator
# -> complete_job -> create artifact
# -> validate biom 2
# -> release_validator
# -> complete_job -> create artifact
# Step 1: recover jobs that are in queue status
recover_type = 'queued'
_flush_queues(recover_type)
# then we recover what's left
jids_to_recover = _get_jids_to_recover(recover_type)
_submit_jobs(jids_to_recover, recover_type)
# Step 2: recover jobs that are running, note that there are several steps
# to recover this group: 2.1. check if they have validators,
# 2.2. if so, recover validators, 2. recover failed jobs
with TRN:
recover_type = 'running'
_flush_queues(recover_type)
jids_to_recover = _get_jids_to_recover(recover_type)
# 3.1, and 3.2: checking which jobs have validators, and recover them
jobs_with_validators = []
for j in jids_to_recover:
job = ProcessingJob(j)
validators = list(job.validator_jobs)
if not validators:
jobs_with_validators.append(j)
continue
else:
# adding validators to jobs_with_validators to ignore them
# in the next code of block
for vj in validators:
jobs_with_validators.append(vj.id)
status = set([v.status for v in validators
if v.id not in _retrieve_queue_jobs()])
# if there are no status, that means that the validators weren't
# created and we should rerun from scratch (Step 4)
if not bool(status):
continue
# it multiple status in the validators, it's a complex behaivor
# and needs a case by case solution
if len(status) != 1:
print("Job '%s' has too many validators status (%d), check "
"them by hand" % (j, len(status)))
continue
status = list(status)[0]
if status == 'waiting':
print("releasing job validators: %s" % j)
try:
job.release_validators()
except Exception:
print("ERROR, releasing %s validators" % j)
sleep(SLEEP_TIME)
elif status == 'running':
_submit_jobs(validators, recover_type + ' validator, running')
elif status == 'error':
# in this case is the same process than before but we need
# to split the set in_construction and submit in 2 steps,
# however, we can still submit via _submit_jobs
for v in validators:
vjob = ProcessingJob(v)
vjob._set_status('in_construction')
_submit_jobs(validators, recover_type + ' validator, error')
else:
print("Check the status of this job %s : %s and validators"
"%s." % (j, status, validators))
jids_to_recover = set(jids_to_recover) - set(jobs_with_validators)
# Step 3: Finally, we recover all the leftover jobs
for i, j in enumerate(jids_to_recover):
job = ProcessingJob(j)
status = job.status
if status == 'waiting':
print("releasing job validators: %s" % j)
job.release_validators()
sleep(SLEEP_TIME)
elif 'running' == status:
_submit_jobs([j], 'main_job, running')
if __name__ == '__main__':
raise ValueError('This script should never be called directly but should '
'be used as a reference if we need to recover jobs, '
'see: qiita_recover_jobs')