From dd491dae7faa0539a296cf8699e49158d5768616 Mon Sep 17 00:00:00 2001 From: Dehua Zhu Date: Tue, 5 Mar 2019 15:14:43 +0100 Subject: [PATCH] add the functionality to download the remote MiniAOD temporarily to process it locally --- python/hn3l_cfg.py | 30 ++++- python/utils/EOSEventsWithDownload.py | 162 ++++++++++++++++++++++++++ 2 files changed, 188 insertions(+), 4 deletions(-) create mode 100644 python/utils/EOSEventsWithDownload.py diff --git a/python/hn3l_cfg.py b/python/hn3l_cfg.py index b9b5f0c..8924b38 100644 --- a/python/hn3l_cfg.py +++ b/python/hn3l_cfg.py @@ -381,15 +381,37 @@ def generateKeyConfigs(samples,production, promptLeptonType, L1L2LeptonType, isD comp.fineSplitFactor = 1 comp.files = comp.files[:] - # the following is declared in case this cfg is used in input to the - # heppy.py script + ################################################### + ### PREPROCESSOR ### + ################################################### + preprocessor = None + + #temporary copy remote files using xrd from PhysicsTools.HeppyCore.framework.eventsfwlite import Events + from CMGTools.HNL.utils.EOSEventsWithDownload import EOSEventsWithDownload + event_class = EOSEventsWithDownload if not preprocessor else Events + EOSEventsWithDownload.aggressive = 2 # always fetch if running on Wigner + EOSEventsWithDownload.long_cache = getHeppyOption('long_cache', False) + + if preprocessor: preprocessor.prefetch = prefetch + + # if extrap_muons_to_L1: + # fname = '$CMSSW_BASE/src/CMGTools/WTau3Mu/prod/muon_extrapolator_cfg.py' + # sequence.append(fileCleaner) + # preprocessor = CmsswPreprocessor(fname, addOrigAsSecondary=False) + + # if compute_mvamet: + # fname = '$CMSSW_BASE/src/CMGTools/WTau3Mu/prod/compute_mva_met_data_cfg.py' + # sequence.append(fileCleaner) + # preprocessor = CmsswPreprocessor(fname, addOrigAsSecondary=False) + + # the following is declared in case this cfg is used in input to the heppy.py script config = cfg.Config( components = selectedComponents, sequence = sequence, services = [], - preprocessor = None, - events_class = Events + preprocessor = preprocessor, + events_class = event_class ) printComps(config.components, True) diff --git a/python/utils/EOSEventsWithDownload.py b/python/utils/EOSEventsWithDownload.py new file mode 100644 index 0000000..c954bcd --- /dev/null +++ b/python/utils/EOSEventsWithDownload.py @@ -0,0 +1,162 @@ +from DataFormats.FWLite import Events as FWLiteEvents +from CMGTools.Production.changeComponentAccessMode import convertFile as convertFileAccess +import os, subprocess, json, timeit, hashlib + +class EOSEventsWithDownload(object): + def __init__(self, files, tree_name): + self.aggressive = getattr(self.__class__, 'aggressive', 0) + self.long_cache = getattr(self.__class__, 'long_cache', False) + print "[FileFetcher]: Aggressive prefetching level %d" % self.aggressive + self._files = [] + self._nevents = 0 + try: + query = ["edmFileUtil", "--ls", "-j"]+[("file:"+f if f[0]=="/" else f) for f in files] + retjson = subprocess.check_output(query) + retobj = json.loads(retjson) + for entry in retobj: + self._files.append( (str(entry['file']), self._nevents, self._nevents+entry['events'] ) ) # str() is needed since the output is a unicode string + self._nevents += entry['events'] + except subprocess.CalledProcessError: + print "[FileFetcher]: Failed the big query: ",query + ## OK, now we go for something more fancy + for f in files: + print "[FileFetcher]: Try file: ",f + OK = False + # step 1: try the local query + if f[0] == "/": + urls = [ 'file:'+f ] + else: + urls = [ f ] # one retry + try: + # then try the two main redirectors, and EOS again + if "/store/data" in f and "PromptReco" in f: + urls.append( convertFileAccess(f, "root://eoscms.cern.ch//eos/cms/tier0%s") ) + urls.append( convertFileAccess(f, "root://xrootd-cms.infn.it/%s") ) + urls.append( convertFileAccess(f, "root://cmsxrootd.fnal.gov/%s") ) + urls.append( convertFileAccess(f, "root://cms-xrd-global.cern.ch/%s") ) + urls.append( convertFileAccess(f, "root://eoscms.cern.ch//eos/cms%s") ) + except: + pass + for u in urls: + print "[FileFetcher]: Try url: ",u + try: + query = ["edmFileUtil", "--ls", "-j", u] + retjson = subprocess.check_output(query) + retobj = json.loads(retjson) + for entry in retobj: + self._files.append( (str(entry['file']), self._nevents, self._nevents+entry['events'] ) ) # str() is needed since the output is a unicode string + self._nevents += entry['events'] + OK = True + print "[FileFetcher]: Successful URL ",u + break + except: + print "[FileFetcher]: Failed the individual query: ",query + pass + if not OK: + if self.aggressive == 3 and "/store/mc" in f: + print "[FileFetcher]: Will skip file ",f + continue + raise RuntimeError, "Failed to file %s in any way. aborting job " % f + else: + print self._files + if self.aggressive == 3 and self._nevents == 0: + raise RuntimeError, "Failed to find all files for this component. aborting job " + self._fileindex = -1 + self._localCopy = None + self.events = None + ## Discover where I am + self.inMeyrin = True + if 'LSB_JOBID' in os.environ and 'HOSTNAME' in os.environ and self.aggressive == 1: + hostname = os.environ['HOSTNAME'].replace(".cern.ch","") + try: + wigners = subprocess.check_output(["bmgroup","g_wigner"]).split() + if hostname in wigners: + self.inMeyrin = False + print "[FileFetcher]: Host %s is in bmgroup g_wigner, so I assume I'm in Wigner and not Meyrin" % hostname + except: + pass + ## How aggressive should I be? + # 0 = default; 1 = always fetch from Wigner; 2 = always fetch from anywhere if it's a xrootd url + def __len__(self): + return self._nevents + def __getattr__(self, key): + return getattr(self.events, key) + def isLocal(self,filename): + if self.aggressive == -2: return True + if filename.startswith("root://") and not filename.startswith("root://eoscms"): + return False # always prefetch AAA + if self.aggressive == -1: return True + if self.aggressive >= 2: return False + if self.aggressive >= 1 and not self.inMeyrin: return False + fpath = filename.replace("root://eoscms.cern.ch//","/").replace("root://eoscms//","/") + if "?" in fpath: fpath = fpath.split["?"][0] + try: + finfo = subprocess.check_output(["eos", "fileinfo", fpath]) + replicas = False + nears = False + for line in finfo.split("\n"): + if line.endswith("geotag"): + replicas = True + elif replicas and ".cern.ch" in line: + geotag = int(line.split()[-1]) + print "[FileFetcher]: Found a replica with geotag %d" % geotag + if self.inMeyrin: + if geotag > 9000: return False # far replica: bad (EOS sometimes gives the far even if there's a near!) + else: nears = True # we have found a replica that is far away + else: + if geotag < 1000: return False # far replica: bad (EOS sometimes gives the far even if there's a near!) + else: nears = True # we have found a replica that is far away + # if we have found some near replicas, and no far replicas + if nears: return True + except: + pass + # we don't know, so we don't transfer (better slow than messed up) + return True + def __getitem__(self, iEv): + if self._fileindex == -1 or not(self._files[self._fileindex][1] <= iEv and iEv < self._files[self._fileindex][2]): + self.events = None # so it's closed + if self._localCopy: + print "[FileFetcher]: Removing local cache file %s" % self._localCopy + try: + os.remove(self._localCopy) + except: + pass + self._localCopy = None + for i,(fname,first,last) in enumerate(self._files): + if first <= iEv and iEv < last: + print "[FileFetcher]: For event range [ %d, %d ) will use file %r " % (first,last,fname) + self._fileindex = i + if fname.startswith("root://eoscms") or (self.aggressive >= 2 and fname.startswith("root://")): + if not self.isLocal(fname): + tmpdir = os.environ['TMPDIR'] if 'TMPDIR' in os.environ else "/tmp" + rndchars = "".join([hex(ord(i))[2:] for i in os.urandom(8)]) if not self.long_cache else "long_cache-id%d-%s" % (os.getuid(), hashlib.sha1(fname).hexdigest()); + localfile = "%s/%s-%s.root" % (tmpdir, os.path.basename(fname).replace(".root",""), rndchars) + if self.long_cache and os.path.exists(localfile): + print "[FileFetcher]: Filename %s is already available in local path %s " % (fname,localfile) + fname = localfile + else: + try: + print "[FileFetcher]: Filename %s is remote (geotag >= 9000), will do a copy to local path %s " % (fname,localfile) + start = timeit.default_timer() + subprocess.check_output(["xrdcp","-f","-N",fname,localfile]) + print "[FileFetcher]: Time used for transferring the file locally: %s s" % (timeit.default_timer() - start) + if not self.long_cache: self._localCopy = localfile + fname = localfile + except: + print "[FileFetcher]: Could not save file locally, will run from remote" + if os.path.exists(localfile): os.remove(localfile) # delete in case of incomplete transfer + print "[FileFetcher]: Will run from "+fname + self.events = FWLiteEvents([fname]) + break + self.events.to(iEv - self._files[self._fileindex][1]) + return self + def endLoop(self): + if '_localCopy' not in self.__dict__: + return + todelete = self.__dict__['_localCopy'] + if todelete: + print "[FileFetcher]: Removing local cache file ",todelete + os.remove(todelete) + def __del__(self): + self.endLoop() +