-
Notifications
You must be signed in to change notification settings - Fork 16
/
Copy pathsbank_pipe
383 lines (322 loc) · 14.4 KB
/
sbank_pipe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2012-2017 Stephen Privitera
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from __future__ import print_function
##############################################################################
# import standard modules and append the lalapps prefix to the python path
import getpass
import os
import socket
import sys
import tempfile
import shutil
##############################################################################
# import the modules we need to build the pipeline
from optparse import OptionParser
from glue.pipeline import DeepCopyableConfigParser as dcConfigParser
from glue import pipeline
from lalapps import inspiral
def which(prog):
out = shutil.which(prog)
if out is None:
raise ValueError("Could not find {} in your path, have you built the proper software and sourced the proper env. scripts?".format(prog))
return out
def log_path():
host = socket.getfqdn()
username = getpass.getuser()
#FIXME add more hosts as you need them
if 'caltech.edu' in host: return '/usr1/' + username
if 'phys.uwm.edu' in host: return '/localscratch/' + username
if 'aei.uni-hannover.de' in host: return '/local/user/' + username
if 'phy.syr.edu' in host: return '/usr1/' + username
else: return tempfile.gettempdir()
class bank_DAG(pipeline.CondorDAG):
def __init__(self, name, logpath = log_path()):
self.basename = name
tempfile.tempdir = logpath
tempfile.template = self.basename + '.dag.log.'
logfile = tempfile.mktemp()
fh = open( logfile, "w" )
fh.close()
pipeline.CondorDAG.__init__(self,logfile)
self.set_dag_file(self.basename)
self.jobsDict = {}
self.node_id = 0
self.output_cache = []
def add_node(self, node):
node.set_retry(3)
self.node_id += 1
node.add_macro("macroid", self.node_id)
pipeline.CondorDAG.add_node(self, node)
def write_cache(self):
out = self.basename + ".cache"
f = open(out,"w")
for c in self.output_cache:
f.write(str(c)+"\n")
f.close()
class SBankJob(inspiral.InspiralAnalysisJob):
def __init__(self,cp, tag_base="sbank"):
exec_name = 'sbank'
extension = 'xml'
sections = ['sbank']
inspiral.InspiralAnalysisJob.__init__(self,cp,sections,exec_name,extension)
self.tag_base = tag_base
self.set_sub_file(tag_base+'.sub')
self.set_stdout_file('logs/'+tag_base+'-$(macroid)-$(process).out')
self.set_stderr_file('logs/'+tag_base+'-$(macroid)-$(process).err')
if cp.has_section("accounting"):
self.add_condor_cmd('accounting_group', cp.get("accounting", "accounting-group"))
if cp.has_option("accounting", "accounting-group-user"):
self.add_condor_cmd('accounting_group_user', cp.get("accounting", "accounting-group-user"))
self.add_condor_cmd('getenv','True')
self.add_condor_cmd('request_memory', '3999')
self.add_condor_cmd('request_disk', '1MB')
if "OMP_NUM_THREADS" in os.environ:
self.add_condor_cmd('request_cpus', os.environ["OMP_NUM_THREADS"])
class SBankNode(pipeline.CondorDAGNode):
def __init__(self, job, dag, tag="SBANK", seed=0, bank_seed=[], mchirp_boundaries_file=None, mchirp_boundaries_index=None, p_node=[]):
pipeline.CondorDAGNode.__init__(self,job)
if (mchirp_boundaries_file is None) ^ (mchirp_boundaries_index is None):
raise ValueError("must supply both mchirp_boundaries_file and mchirp_boundaries_index or neither")
if mchirp_boundaries_file is not None:
self.add_var_arg("--mchirp-boundaries-file %s --mchirp-boundaries-index %s" % (mchirp_boundaries_file, mchirp_boundaries_index))
for bseed in bank_seed:
self.add_var_arg("--bank-seed %s" % bseed)
self.add_var_opt("seed", seed)
fout = "%s.xml.gz" % tag
self.add_var_opt("output-filename", fout)
self.add_output_file(fout)
for p in p_node:
self.add_parent(p)
dag.add_node(self)
class SBankChooseMchirpBoundariesJob(inspiral.InspiralAnalysisJob):
def __init__(self,cp, tag_base="sbank_choose_mchirp_boundaries"):
exec_name = 'sbank_choose_mchirp_boundaries'
extension = 'txt'
sections = ['split']
inspiral.InspiralAnalysisJob.__init__(self,cp,sections,exec_name,extension)
self.set_universe("local")
self.set_sub_file(tag_base+'.sub')
self.tag_base = tag_base
self.set_stdout_file('logs/'+tag_base+'-$(macroid)-$(process).out')
self.set_stderr_file('logs/'+tag_base+'-$(macroid)-$(process).err')
self.add_condor_cmd('request_memory', '1000')
self.add_condor_cmd('request_disk', '1MB')
if cp.has_section("accounting"):
self.add_condor_cmd('accounting_group', cp.get("accounting", "accounting-group"))
if cp.has_option("accounting", "accounting-group-user"):
self.add_condor_cmd('accounting_group_user', cp.get("accounting", "accounting-group-user"))
self.add_condor_cmd('getenv','True')
class SBankChooseMchirpBoundariesNode(pipeline.CondorDAGNode):
def __init__(self, job, dag, bank, tag = None, p_node=[]):
pipeline.CondorDAGNode.__init__(self,job)
self.add_file_arg(bank)
nbanks = int(self.job().get_opts()["nbanks"])
self.add_var_opt("nbanks", nbanks)
if tag:
output_fname = "%s_mchirp_boundaries.txt" % tag
else:
output_fname = "mchirp_boundaries.txt"
self.add_var_opt("output-file", output_fname)
self.add_output_file(output_fname)
for p in p_node:
self.add_parent(p)
dag.add_node(self)
class LWAddJob(pipeline.CondorDAGJob):
"""
A ligolw_add node. This node is used to combine the split template banks
into one aggregate bank.
"""
def __init__(self, cp, executable=which('ligolw_add'), tag_base='ligolw_add'):
"""
"""
self.__prog__ = 'ligolw_add'
self.__executable = executable
self.__universe = 'vanilla'
pipeline.CondorDAGJob.__init__(self,self.__universe,self.__executable)
self.add_condor_cmd('request_memory', '3999')
self.add_condor_cmd('request_disk', '1MB')
self.add_condor_cmd('getenv','True')
if cp.has_section("accounting"):
self.add_condor_cmd('accounting_group', cp.get("accounting", "accounting-group"))
if cp.has_option("accounting", "accounting-group-user"):
self.add_condor_cmd('accounting_group_user', cp.get("accounting", "accounting-group-user"))
self.tag_base = tag_base
self.add_condor_cmd('environment',"KMP_LIBRARY=serial;MKL_SERIAL=yes")
self.set_sub_file(tag_base+'.sub')
self.set_stdout_file('logs/'+tag_base+'-$(macroid)-$(process).out')
self.set_stderr_file('logs/'+tag_base+'-$(macroid)-$(process).err')
class LWAddNode(pipeline.CondorDAGNode):
"""
"""
def __init__(self, job, dag, xmls,output, p_node=[]):
pipeline.CondorDAGNode.__init__(self,job)
for x in xmls: self.add_file_arg(x)
self.add_var_opt("output", output)
self.add_output_file(output)
for p in p_node:
if p is not None:
self.add_parent(p)
dag.add_node(self)
usage = """
sbank_pipe generates a DAG for parallelizing the
construction of template banks via sbank. Its primary
input is an ini file, which takes form given below. Once you have an
ini file, create the DAG by
sbank_pipe --config-file INIFILE.ini --user-tag SBANK_PIPE
;;; BEGIN EXAMPLE INI ;;;
[condor] ; This section points to the executables, and provides condor options
universe = vanilla
sbank = /usr/bin/sbank
sbank_choose_mchirp_boundaries = /usr/bin/sbank_choose_mchirp_boundaries
;[accounting]
;accounting-group = ???
;specifying the user is only necessary for shared accounts
;accounting-group-user = ???
[sbank]
; This section contains the parameters of the entire bank parameter
; space you wish to cover. sbank_pipe will divide the space for you.
approximant = SEOBNRv2_ROM_DoubleSpin
match-min = 0.97
flow = 30.0
noise-model = aLIGOZeroDetHighPower
instrument = H1
mass1-min = 2.0
mass1-max = 296.0
mass2-min = 2.0
mass2-max = 296.0
mtotal-min = 4.0
mtotal-max = 350.0
mratio-min = 1.0
mratio-max = 14.0
spin1-max = 0.98
spin1-min = -0.98
spin2-max = 0.98
spin2-min = -0.98
aligned-spin =
neighborhood-param = dur
neighborhood-size = 0.5
checkpoint = 500
;cache-waveforms =
; please check your memory requirements before setting this option
; if jobs use too much memory, they will swap and run forever
coarse-match-df = 4.0
iterative-match-df-max= 1.0
; If you want to add precomputed banks to the workflow
;bank-seed = FILENAME[:APPROX]
[coarse-sbank]
; This section is for planning the splitting of the parameter
; space. To do so, we generate a "coarse" bank, i.e., a bank on the
; same parameter space but with a weaker convergence criteria. This
; process gives a rough measure of the density of templates the final
; bank will require. Use the max-new-templates option to prevent the job
; from running forever, but the more templates you have in the coarse
; bank, the less "over-coverage" you will incur from the bank
; splitting process. A good rule of thumb is that you want ~1000
; templates per split bank.
max-new-templates = 15000
match-min = 0.93
convergence-threshold = 1000
[split]
; This section configures the parallelization. nbanks are how many
; splits (in chirp mass) you want. You can crank it to infinity at the
; cost of overcoverage.
nbanks = 15
;;; END EXAMPLE INI ;;;
"""
def parse_command_line():
parser = OptionParser(usage=usage)
parser.add_option("--config-file", default=None, help="Read options for generating template bank placement pipeline from configuration ini file..")
parser.add_option("--user-tag", default="SBANK", help="Make your results feel special and give them a unique name.")
options, filenames = parser.parse_args()
return options, filenames
options, filenames = parse_command_line()
try: os.mkdir("logs")
except: pass
# create top level dag object
dag = bank_DAG(options.user_tag)
# read config file
cp = dcConfigParser()
cp.read(options.config_file)
# initialize sbank job objects
sbankJob = SBankJob(cp, tag_base=options.user_tag + "_sbank")
mm = cp.get("sbank", "match-min")
cp.remove_option("sbank", "match-min") # don't want it entering via add_ini_opts
# set up bank generation
# Two modes:
# 1. generate coarse, partition mchirp space, generate sub-banks, ligolw_add
# 2. take given bank, split it, and sim it
nbanks = int(cp.get("split","nbanks"))
# for layering, we prefer nbanks to be odd so that no second-stage job
# works on the bank boundary
if not (nbanks % 2):
nbanks += 1
cp.set("split", "nbanks", str(nbanks))
bank_names = []
bank_nodes = []
# set up sole coarse node to plan out the mini-sbank nodes
coarse_sbank_node = SBankNode(sbankJob, dag, "%s_SBANK_COARSE" % options.user_tag)
coarse_mm = cp.get("coarse-sbank", "match-min")
coarse_sbank_node.add_var_opt("match-min", coarse_mm)
coarse_thresh = cp.get("coarse-sbank", "convergence-threshold")
coarse_sbank_node.add_var_arg("--convergence-threshold %s" % coarse_thresh)
if cp.has_option("coarse-sbank", "max-new-templates"):
templates_max = int(cp.get("coarse-sbank", "max-new-templates"))
assert templates_max >= 3*nbanks # you need at least a few templates per bank
else:
templates_max = 15*nbanks # to prevent the coarse bank from running forever
coarse_sbank_node.add_var_arg("--max-new-templates %s" % templates_max)
xmlCoarse, = coarse_sbank_node.get_output_files()
pnode = [coarse_sbank_node]
bank_names.append(xmlCoarse)
bank_nodes.append(coarse_sbank_node)
# use coarse bank to choose mchirp regions of roughly equal template number
sbankChooseMchirpBoundariesJob = SBankChooseMchirpBoundariesJob(cp, tag_base=options.user_tag + "_mchirp_boundaries")
sbankChooseMchirpBoundariesNode = SBankChooseMchirpBoundariesNode(sbankChooseMchirpBoundariesJob, dag, xmlCoarse, options.user_tag, pnode)
mchirp_boundaries_fname, = sbankChooseMchirpBoundariesNode.get_output_files()
# generate a bank for each mchirp region
# first compute even numbered split banks
for j in range(0, nbanks, 2):
bank_node = SBankNode(sbankJob, dag, "%s_SBANK_SPLIT_BANK_%04d" % (options.user_tag, j), seed="%d" % (j*nbanks+1), mchirp_boundaries_file=mchirp_boundaries_fname, mchirp_boundaries_index=str(j), p_node=[sbankChooseMchirpBoundariesNode], bank_seed=[xmlCoarse])
bank_node.add_var_opt("match-min", mm)
bank_node.set_priority(1) # want complete bank before sims
bank_nodes.append(bank_node)
bank_name, = bank_node.get_output_files()
bank_names.append(bank_name)
# then compute odd numbered split banks using even banks as seeds
for j in range(1, nbanks, 2):
if j < nbanks - 1:
p_node = [bank_nodes[(j+1)//2], bank_nodes[(j+3)//2]]
bank_seed = [xmlCoarse, bank_names[(j+1)//2], bank_names[(j+3)//2]]
else:
p_node = [bank_nodes[(j+1)//2]]
bank_seed = [xmlCoarse, bank_names[(j+1)//2]]
bank_node = SBankNode(sbankJob, dag, "%s_SBANK_SPLIT_BANK_%04d" % (options.user_tag, j), seed="%d" % (j*nbanks+1), mchirp_boundaries_file=mchirp_boundaries_fname, mchirp_boundaries_index=str(j), p_node=p_node, bank_seed=bank_seed)
bank_node.add_var_opt("match-min", mm)
bank_node.set_priority(1) # want complete bank before sims
bank_nodes.append(bank_node)
bank_name, = bank_node.get_output_files()
bank_names.append(bank_name)
# recombine bank fragments under a common name
lwaddJob = LWAddJob(cp, tag_base=options.user_tag + "_lwadd")
lwaddNode = LWAddNode(lwaddJob, dag, bank_names, "SBANK_COMBINED-%s.xml.gz" % options.user_tag, bank_nodes)
# write the dag
dag.write_sub_files()
dag.write_script()
dag.write_cache()
dag.write_dag()