Skip to content

Commit

Permalink
Update documentation for luigi.task.
Browse files Browse the repository at this point in the history
  • Loading branch information
jcrobak authored and Joe Crobak committed Feb 10, 2014
1 parent 5df0a2d commit d12023f
Showing 1 changed file with 148 additions and 23 deletions.
171 changes: 148 additions & 23 deletions luigi/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,18 @@
def namespace(namespace=None):
""" Call to set namespace of tasks declared after the call.
If called without arguments or with None as the namespace, the namespace is reset, which is recommended to do at the end of any file where the namespace is set to avoid unintentionally setting namespace on tasks outside of the scope of the current file."""
If called without arguments or with ``None`` as the namespace, the namespace
is reset, which is recommended to do at the end of any file where the
namespace is set to avoid unintentionally setting namespace on tasks outside
of the scope of the current file.
"""
Register._default_namespace = namespace


def id_to_name_and_params(task_id):
''' Turn a task_id into a (task_family, {params}) tuple.
E.g. calling with 'Foo(bar=bar, baz=baz)' returns ('Foo', {'bar': 'bar', 'baz': 'baz'})
E.g. calling with ``Foo(bar=bar, baz=baz)`` returns
``('Foo', {'bar': 'bar', 'baz': 'baz'})``
'''
lparen = task_id.index('(')
task_family = task_id[:lparen]
Expand All @@ -49,12 +54,21 @@ def split_equals(x):


class Register(abc.ABCMeta):
# 1. Cache instances of objects so that eg. X(1, 2, 3) always returns the same object
# 2. Keep track of all subclasses of Task and expose them
"""
The Metaclass of :py:class:`Task`. Acts as a global registry of Tasks with
the following properties:
1. Cache instances of objects so that eg. ``X(1, 2, 3)`` always returns the
same object.
2. Keep track of all subclasses of :py:class:`Task` and expose them.
"""
__instance_cache = {}
_default_namespace = None
_reg = []
AMBIGUOUS_CLASS = object() # Placeholder denoting an error
"""If this value is returned by :py:meth:`get_reg` then there is an
ambiguous task name (two :py:class:`Task` have the same name). This denotes
an error."""

def __new__(metacls, classname, bases, classdict):
""" Custom class creation for namespacing. Also register all subclasses
Expand Down Expand Up @@ -100,21 +114,32 @@ def instantiate():

@classmethod
def clear_instance_cache(self):
"""Clear/Reset the instance cache."""
Register.__instance_cache = {}

@classmethod
def disable_instance_cache(self):
"""Disables the instance cache."""
Register.__instance_cache = None

@property
def task_family(cls):
"""The task family for the given class.
If ``cls.task_namespace is None`` then it's the name of the class.
Otherwise, ``<task_namespace>.`` is prefixed to the class name.
"""
if cls.task_namespace is None:
return cls.__name__
else:
return "%s.%s" % (cls.task_namespace, cls.__name__)

@classmethod
def get_reg(cls):
"""Return all of the registery classes.
:return: a ``dict`` of task_family -> class
"""
reg = {}
for cls in cls._reg:
name = cls.task_family
Expand All @@ -128,6 +153,10 @@ def get_reg(cls):

@classmethod
def get_global_params(cls):
"""Compiles and returns the global parameters for all :py:class:`Task`.
:return: a ``dict`` of parameter name -> parameter.
"""
global_params = {}
for cls in cls._reg:
for param_name, param_obj in cls.get_global_params():
Expand All @@ -140,16 +169,39 @@ def get_global_params(cls):


class Task(object):
__metaclass__ = Register

"""
non-declared properties: (created in metaclass):
This is the base class of all Luigi Tasks, the base unit of work in Luigi.
`Task.task_namespace` - optional string which is prepended to the task name for the sake of scheduling.
If it isn't overridden in a Task, whatever was last declared using `luigi.namespace` will be used.
A Luigi Task describes a unit or work. The key methods of a Task, which must
be implemented in a subclass are:
`Task._parameters` - list of (parameter_name, parameter) tuples for this task class
* :py:meth:`run` - the computation done by this task.
* :py:meth:`requires` - the list of Tasks that this Task depends on.
* :py:meth:`output` - the output :py:class:`Target` that this Task creates.
Parameters to the Task should be declared as members of the class, e.g.::
class MyTask(luigi.Task):
count = luigi.IntParameter()
Each Task exposes a constructor accepting all :py:class:`Parameter` (and
values) as kwargs. e.g. ``MyTask(count=10)`` would instantiate `MyTask`.
In addition to any declared properties and methods, there are a few
non-declared properties, which are created by the :py:class:`Register`
metaclass:
``Task.task_namespace``
optional string which is prepended to the task name for the sake of
scheduling. If it isn't overridden in a Task, whatever was last declared
using `luigi.namespace` will be used.
``Task._parameters``
list of ``(parameter_name, parameter)`` tuples for this task class
"""
__metaclass__ = Register


_event_callbacks = {}

@classmethod
Expand All @@ -161,7 +213,8 @@ def wrapped(callback):
return wrapped

def trigger_event(self, event, *args, **kwargs):
""" Trigger that calls all of the specified events associated with this class.
"""Trigger that calls all of the specified events associated with this
class.
"""
for event_class, event_callbacks in self._event_callbacks.iteritems():
if not isinstance(self, event_class):
Expand All @@ -178,11 +231,14 @@ def trigger_event(self, event, *args, **kwargs):

@property
def task_family(self):
""" Convenience method since a property on the metaclass isn't directly accessible through the class instances"""
"""Convenience method since a property on the metaclass isn't directly
accessible through the class instances.
"""
return self.__class__.task_family

@classmethod
def get_params(cls):
"""Returns all of the Parameters for this Task."""
# We want to do this here and not at class instantiation, or else there is no room to extend classes dynamically
params = []
for param_name in dir(cls):
Expand All @@ -198,14 +254,23 @@ def get_params(cls):

@classmethod
def get_global_params(cls):
"""Return the global parameters for this Task."""
return [(param_name, param_obj) for param_name, param_obj in cls.get_params() if param_obj.is_global]

@classmethod
def get_nonglobal_params(cls):
"""Return the non-global parameters for this Task."""
return [(param_name, param_obj) for param_name, param_obj in cls.get_params() if not param_obj.is_global]

@classmethod
def get_param_values(cls, params, args, kwargs):
"""Get the values of the parameters from the args and kwargs.
:param params: list of (param_name, Parameter).
:param args: positional arguments
:param kwargs: keyword arguments.
:returns: list of `(name, value)` tuples, one for each parameter.
"""
result = {}

params_dict = dict(params)
Expand Down Expand Up @@ -249,6 +314,15 @@ def list_to_tuple(x):
return [(param_name, list_to_tuple(result[param_name])) for param_name, param_obj in params]

def __init__(self, *args, **kwargs):
"""Constructor to resolve values for all Parameters.
For example, the Task::
class MyTask(luigi.Task):
count = luigi.IntParameter()
can be instantiated as ``MyTask(count=10)``.
"""
params = self.get_params()
param_values = self.get_param_values(params, args, kwargs)

Expand All @@ -271,11 +345,19 @@ def __init__(self, *args, **kwargs):
self.__hash = hash(self.task_id)

def initialized(self):
"""Returns ``True`` if the Task is initialized and ``False`` otherwise."""
return hasattr(self, 'task_id')

@classmethod
def from_input(cls, params, global_params):
''' Creates an instance from a str->str hash (to be used for cmd line interaction etc) '''
"""Creates an instance from a str->str hash
This method is for parsing of command line arguments or other
non-programmatic invocations.
:param params: dict of param name -> value.
:param global_params: dict of param name -> value, the global params.
"""
for param_name, param in global_params:
value = param.parse_from_input(param_name, params[param_name])
param.set_default(value)
Expand Down Expand Up @@ -315,7 +397,7 @@ def __repr__(self):

def complete(self):
"""
If the task has any outputs, return true if all outputs exists.
If the task has any outputs, return ``True`` if all outputs exists.
Otherwise, return whether or not the task has run or not
"""
outputs = flatten(self.output())
Expand All @@ -331,9 +413,29 @@ def complete(self):
return True

def output(self):
"""The output that this Task produces.
The output of the Task determines if the Task needs to be run--the task
is considered finished iff the outputs all exist. Subclasses should
override this method to return a single :py:class:`Target` or a list of
:py:class:`Target` instances.
Implementation note
If running multiple workers, the output must be a resource that is accessible
by all workers, such as a DFS or database. Otherwise, workers might compute
the same output since they don't see the work done by other workers.
"""
return [] # default impl

def requires(self):
"""The Tasks that this Task depends on.
A Task will only run if all of the Tasks that it requires are completed.
If your Task does not require any other Tasks, then you don't need to
override this method. Otherwise, a Subclasses can override this method
to return a single Task, a list of Task instances, or a dict whose
values are Task instances.
"""
return [] # default impl

def _requires(self):
Expand All @@ -349,13 +451,23 @@ def _requires(self):
return flatten(self.requires()) # base impl

def input(self):
"""Returns the outputs of the Tasks returned by :py:meth:`requires`
:return: a list of :py:class:`Target` objects which are specified as
outputs of all required Tasks.
"""
return getpaths(self.requires())

def deps(self):
"""Internal method used by the scheduler
Returns the flattened list of requires.
"""
# used by scheduler
return flatten(self._requires())

def run(self):
"""The task run method, to be overridden in a subclass."""
pass # default impl

def on_failure(self, exception):
Expand All @@ -379,17 +491,28 @@ def on_success(self):


def externalize(task):
"""Returns an externalized version of the Task.
See py:class:`ExternalTask`.
"""
task.run = NotImplemented
return task


class ExternalTask(Task):
"""Subclass for references to external dependencies"""
"""Subclass for references to external dependencies.
An ExternalTask's does not have a `run` implementation, which signifies to
the framework that this Task's :py:meth:`output` is generated outside of
Luigi.
"""
run = NotImplemented


class WrapperTask(Task):
"""Use for tasks that only wrap other tasks and that by definition are done if all their requirements exist. """
"""Use for tasks that only wrap other tasks and that by definition are done
if all their requirements exist.
"""
def complete(self):
return all(r.complete() for r in flatten(self.requires()))

Expand All @@ -415,13 +538,15 @@ def getpaths(struct):

def flatten(struct):
"""Cleates a flat list of all all items in structured output (dicts, lists, items)
Examples:
> _flatten({'a': foo, b: bar})
[foo, bar]
> _flatten([foo, [bar, troll]])
[foo, bar, troll]
> _flatten(foo)
[foo]
Examples::
> _flatten({'a': foo, b: bar})
[foo, bar]
> _flatten([foo, [bar, troll]])
[foo, bar, troll]
> _flatten(foo)
[foo]
"""
if struct is None:
return []
Expand Down

0 comments on commit d12023f

Please sign in to comment.