Skip to content

Commit

Permalink
Fix hadoop_jar arg handling.
Browse files Browse the repository at this point in the history
* HadoopJar was only doing Target-> path coercion if atomic output
  was enabled. This fixes it to always do it.
  • Loading branch information
jcrobak committed Jan 29, 2013
1 parent d276d0e commit 558d614
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 19 deletions.
2 changes: 0 additions & 2 deletions examples/terasort.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ def hadoop_examples_jar():
DEFAULT_TERASORT_OUT = '/tmp/terasort-out'


@luigi.expose
class TeraGen(luigi.hadoop_jar.HadoopJarJobTask):
"""Runs TeraGen, by default with 1TB of data (10B records)"""
records = luigi.Parameter(default="10000000000",
Expand All @@ -46,7 +45,6 @@ def args(self):
return [self.records, self.output()]


@luigi.expose
class TeraSort(luigi.hadoop_jar.HadoopJarJobTask):
"""Runs TeraGent, by default using """

Expand Down
33 changes: 16 additions & 17 deletions luigi/hadoop_jar.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,22 @@ def _fix_paths(job):
Return a list of temporary file pairs (tmpfile, destination path) and
a list of arguments. Converts each HdfsTarget to a string for the
path."""
if job.atomic_output():
tmp_files = []
args = []
for x in job.args():
if isinstance(x, luigi.hdfs.HdfsTarget): # input/output
if x.exists(): # input
args.append(x.path)
else: # output
y = luigi.hdfs.HdfsTarget(x.path + \
'-luigi-tmp-%09d' % random.randrange(0, 1e10))
tmp_files.append((y, x))
logger.info("Using temp path: {0} for path {1}".format(y.path, x.path))
args.append(y.path)
else:
args.append(str(x))
return (tmp_files, args)
return ([], job.args())
tmp_files = []
args = []
for x in job.args():
if isinstance(x, luigi.hdfs.HdfsTarget): # input/output
if x.exists() or not job.atomic_output(): # input
args.append(x.path)
else: # output
y = luigi.hdfs.HdfsTarget(x.path + \
'-luigi-tmp-%09d' % random.randrange(0, 1e10))
tmp_files.append((y, x))
logger.info("Using temp path: {0} for path {1}".format(y.path, x.path))
args.append(y.path)
else:
args.append(str(x))

return (tmp_files, args)

def run_job(self, job):
# TODO(jcrobak): libjars, files, etc. Can refactor out of
Expand Down

0 comments on commit 558d614

Please sign in to comment.