Skip to content

Commit

Permalink
new hadoop_jvm module for java/scala hadoop jobs.
Browse files Browse the repository at this point in the history
* new JvmJobTask and extracted some common functionality from
  HadoopJobTask into BaseHadoopJobTask
* moved jobconf logic into the BaseHadoopJobTask
* bare-bones JvmHadoopJobRunner and extracted common functionality
  from HadoopJobRunner into static methods (submitting/tracking a
  job).
  • Loading branch information
jcrobak committed Jan 4, 2013
1 parent 107b879 commit e5f6e85
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 28 deletions.
66 changes: 38 additions & 28 deletions luigi/hadoop.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,13 +221,7 @@ def run_job(self, job):
if files:
arglist += ['-files', ','.join(files)]

jobconfs = []

jobconfs.append('mapred.job.name=%s' % job.task_id)
jobconfs.append('mapred.reduce.tasks=%s' % job.n_reduce_tasks)
pool = job.pool
if pool is not None:
jobconfs.append('mapred.fairscheduler.pool=%s' % pool)
jobconfs = job.jobconfs()

for k, v in self.jobconfs.iteritems():
jobconfs.append('%s=%s' % (k, v))
Expand Down Expand Up @@ -258,10 +252,17 @@ def run_job(self, job):
# submit job
create_packages_archive(packages, self.tmp_dir + '/packages.tar')

logger.info(' '.join(arglist))

job._dump(self.tmp_dir)

self.run_and_track_hadoop_job(arglist)

# rename temporary work directory to given output
tmp_target.move(output_final, fail_if_exists=True)
self.finish()

@staticmethod
def run_and_track_hadoop_job(arglist):
logger.info(' '.join(arglist))
proc = subprocess.Popen(arglist, stderr=subprocess.PIPE)

# We parse the output to try to find the tracking URL.
Expand All @@ -280,16 +281,12 @@ def run_job(self, job):
# Try to fetch error logs if possible
if tracking_url:
try:
self.fetch_raise_failures(tracking_url)
HadoopJobRunner.fetch_raise_failures(tracking_url)
except Exception, e:
raise RuntimeError('Streaming job failed with exit code %d. Additionally, an error occurred when fetching data from %s: %s' % (proc.returncode, tracking_url, e))

raise RuntimeError('Streaming job failed with exit code %d' % proc.returncode)

# rename temporary work directory to given output
tmp_target.move(output_final, fail_if_exists=True)
self.finish()

@staticmethod
def fetch_raise_failures(tracking_url):
''' Uses mechanize to fetch the actual task logs from the task tracker.
Expand All @@ -315,7 +312,7 @@ def fetch_raise_failures(tracking_url):
for exc in re.findall(r'luigi-exc-hex=[0-9a-f]+', data):
print '---------- %s:' % task_url
print exc.split('=')[-1].decode('hex')

def finish(self):
if self.tmp_dir and os.path.exists(self.tmp_dir):
logger.debug('Removing directory %s' % self.tmp_dir)
Expand Down Expand Up @@ -388,9 +385,20 @@ def run_job(self, job):
reduce_output.close()


class JobTask(luigi.Task):
class BaseHadoopJobTask(luigi.Task):
n_reduce_tasks = 25
pool = luigi.Parameter(is_global=True, default=None, significant=False)
task_id = None

def jobconfs(self):
jcs = []
jcs.append('mapred.job.name=%s' % self.task_id)
jcs.append('mapred.reduce.tasks=%s' % self.n_reduce_tasks)
pool = self.pool
if pool is not None:
jcs.append('mapred.fairscheduler.pool=%s' % pool)
return jcs


def init_local(self):
''' Implement any work to setup any internal datastructure etc here.
Expand All @@ -404,22 +412,10 @@ def init_local(self):
def init_hadoop(self):
pass

def init_mapper(self):
pass

def init_combiner(self):
pass

def init_reducer(self):
pass

def run(self):
self.init_local()
self.job_runner().run_job(self)

def _setup_remote(self):
self._setup_links()

def requires_local(self):
''' Default impl - override this method if you need any local input to be accessible in init() '''
return []
Expand All @@ -437,6 +433,20 @@ def deps(self):
# Overrides the default implementation
return luigi.task.flatten(self.requires_hadoop()) + luigi.task.flatten(self.requires_local())

class JobTask(BaseHadoopJobTask):

def init_mapper(self):
pass

def init_combiner(self):
pass

def init_reducer(self):
pass

def _setup_remote(self):
self._setup_links()

def job_runner(self):
# We recommend that you define a subclass, override this method and set up your own config
return DefaultHadoopJobRunner()
Expand Down
59 changes: 59 additions & 0 deletions luigi/hadoop_jvm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@

import logging
import os

import luigi
import luigi.hdfs
from luigi.hadoop import BaseHadoopJobTask, HadoopJobRunner, JobRunner

logger = logging.getLogger('luigi-interface')


class JvmHadoopJobRunner(JobRunner):

def __init__(self):
pass

def run_job(self, job):
# TODO(jcrobak): libjars, files, etc. Can refactor out of
# hadoop.HadoopJobRunner
if not os.path.exists(job.jar()):
logger.error("Can't find jar: {0}, full path {1}".format(job.jar(),
os.path.abspath(job.jar())))
raise Exception("job jar does not exist")
arglist = ['hadoop', 'jar', job.jar(), job.main()]

jobconfs = job.jobconfs()

for jc in jobconfs:
arglist += ['-D' + jc]

arglist += job.args()

HadoopJobRunner.run_and_track_hadoop_job(arglist)

# TODO support temp output locations?
self.finish()

def finish(self):
pass

def __del__(self):
self.finish()


class JvmHadoopJobTask(BaseHadoopJobTask):

def jar(self):
return None

def main(self):
return None

def job_runner(self):
# We recommend that you define a subclass, override this method and set up your own config
return JvmHadoopJobRunner()

def args(self):
"""returns an array of args to pass to the job (after hadoop jar <jar> <main>)."""
return []
7 changes: 7 additions & 0 deletions luigi/hdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,3 +270,10 @@ def has_write_access(self):
else:
remove(test_path, recursive=False)
return True


class NamedHdfsTarget(HdfsTarget):

def __init__(self, name, path=None, format=Plain, is_tmp=False):
super(NamedHdfsTarget, self).__init__(path, format, is_tmp)
self.name = name

0 comments on commit e5f6e85

Please sign in to comment.