Skip to content

Commit

Permalink
Improve control over dynamic requirements (spotify#3179)
Browse files Browse the repository at this point in the history
* Add DynamicRequirements wrapper.

* Fix name.

* Typo.

* Add tests and docs.

* Polish docs.

* Fix linter errors.

* Fix docs.

* Fix typo.

* Implement review comments.
  • Loading branch information
riga authored Sep 12, 2022
1 parent 5c44d2c commit 07551b1
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 10 deletions.
8 changes: 6 additions & 2 deletions doc/tasks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ You can also yield a list of tasks.
def run(self):
other_target = yield OtherTask()
# dynamic dependencies resolve into targets
f = other_target.open('r')
# dynamic dependencies resolve into targets
f = other_target.open('r')
This mechanism is an alternative to Task.requires_ in case
Expand All @@ -208,6 +208,10 @@ It does come with some constraints:
the Task.run_ method will resume from scratch each time a new task is yielded.
In other words, you should make sure your Task.run_ method is idempotent.
(This is good practice for all Tasks in Luigi, but especially so for tasks with dynamic dependencies).
As this might entail redundant calls to tasks' :func:`~luigi.task.Task.complete` methods,
you should consider setting the "cache_task_completion" option in the :ref:`worker-config`.
To further control how dynamic task requirements are handled internally by worker nodes,
there is also the option to wrap dependent tasks by :class:`~luigi.task.DynamicRequirements`.

For an example of a workflow using dynamic dependencies, see
`examples/dynamic_requirements.py <https://github.com/spotify/luigi/blob/master/examples/dynamic_requirements.py>`_.
Expand Down
15 changes: 15 additions & 0 deletions examples/dynamic_requirements.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# limitations under the License.
#

import os
import random as rnd
import time

Expand Down Expand Up @@ -91,6 +92,20 @@ def run(self):
with self.output().open('w') as f:
f.write('Tada!')

# and in case data is rather long, consider wrapping the requirements
# in DynamicRequirements and optionally define a custom complete method
def custom_complete(complete_fn):
# example: Data() stores all outputs in the same directory, so avoid doing len(data) fs
# calls but rather check only the first, and compare basenames for the rest
# (complete_fn defaults to "lambda task: task.complete()" but can also include caching)
if not complete_fn(data_dependent_deps[0]):
return False
paths = [task.output().path for task in data_dependent_deps]
basenames = os.listdir(os.path.dirname(paths[0])) # a single fs call
return all(os.path.basename(path) in basenames for path in paths)

yield luigi.DynamicRequirements(data_dependent_deps, custom_complete)


if __name__ == '__main__':
luigi.run()
5 changes: 4 additions & 1 deletion luigi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
from luigi.__meta__ import __version__

from luigi import task
from luigi.task import Task, Config, ExternalTask, WrapperTask, namespace, auto_namespace
from luigi.task import (
Task, Config, ExternalTask, WrapperTask, namespace, auto_namespace, DynamicRequirements,
)

from luigi import target
from luigi.target import Target
Expand Down Expand Up @@ -56,6 +58,7 @@

__all__ = [
'task', 'Task', 'Config', 'ExternalTask', 'WrapperTask', 'namespace', 'auto_namespace',
'DynamicRequirements',
'target', 'Target', 'LocalTarget', 'rpc', 'RemoteScheduler',
'RPCError', 'parameter', 'Parameter', 'DateParameter', 'MonthParameter',
'YearParameter', 'DateHourParameter', 'DateMinuteParameter', 'DateSecondParameter',
Expand Down
81 changes: 81 additions & 0 deletions luigi/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,87 @@ def bulk_complete(cls, parameter_tuples):
return generated_tuples


class DynamicRequirements(object):
"""
Wraps dynamic requirements yielded in tasks's run methods to control how completeness checks of
(e.g.) large chunks of tasks are performed. Besides the wrapped *requirements*, instances of
this class can be passed an optional function *custom_complete* that might implement an
optimized check for completeness. If set, the function will be called with a single argument,
*complete_fn*, which should be used to perform the per-task check. Example:
.. code-block:: python
class SomeTaskWithDynamicRequirements(luigi.Task):
...
def run(self):
large_chunk_of_tasks = [OtherTask(i=i) for i in range(10000)]
def custom_complete(complete_fn):
# example: assume OtherTask always write into the same directory, so just check
# if the first task is complete, and compare basenames for the rest
if not complete_fn(large_chunk_of_tasks[0]):
return False
paths = [task.output().path for task in large_chunk_of_tasks]
basenames = os.listdir(os.path.dirname(paths[0])) # a single fs call
return all(os.path.basename(path) in basenames for path in paths)
yield DynamicRequirements(large_chunk_of_tasks, custom_complete)
.. py:attribute:: requirements
The original, wrapped requirements.
.. py:attribute:: flat_requirements
Flattened view of the wrapped requirements (via :py:func:`flatten`). Read only.
.. py:attribute:: paths
Outputs of the requirements in the identical structure (via :py:func:`getpaths`). Read only.
.. py:attribute:: custom_complete
The optional, custom function performing the completeness check of the wrapped requirements.
"""

def __init__(self, requirements, custom_complete=None):
super().__init__()

# store attributes
self.requirements = requirements
self.custom_complete = custom_complete

# cached flat requirements and paths
self._flat_requirements = None
self._paths = None

@property
def flat_requirements(self):
if self._flat_requirements is None:
self._flat_requirements = flatten(self.requirements)
return self._flat_requirements

@property
def paths(self):
if self._paths is None:
self._paths = getpaths(self.requirements)
return self._paths

def complete(self, complete_fn=None):
# default completeness check
if complete_fn is None:
def complete_fn(task):
return task.complete()

# use the custom complete function when set
if self.custom_complete:
return self.custom_complete(complete_fn)

# default implementation
return all(complete_fn(t) for t in self.flat_requirements)


class ExternalTask(Task):
"""
Subclass for references to external dependencies.
Expand Down
17 changes: 11 additions & 6 deletions luigi/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
from luigi.scheduler import DISABLED, DONE, FAILED, PENDING, UNKNOWN, Scheduler, RetryPolicy
from luigi.scheduler import WORKER_STATE_ACTIVE, WORKER_STATE_DISABLED
from luigi.target import Target
from luigi.task import Task, flatten, getpaths, Config
from luigi.task import Task, Config, DynamicRequirements
from luigi.task_register import TaskClassException
from luigi.task_status import RUNNING
from luigi.parameter import BoolParameter, FloatParameter, IntParameter, OptionalParameter, Parameter, TimeDeltaParameter
Expand Down Expand Up @@ -150,14 +150,19 @@ def _run_get_new_deps(self):
except StopIteration:
return None

new_req = flatten(requires)
if all(self.check_complete(t) for t in new_req):
next_send = getpaths(requires)
else:
# if requires is not a DynamicRequirements, create one to use its default behavior
if not isinstance(requires, DynamicRequirements):
requires = DynamicRequirements(requires)

if not requires.complete(self.check_complete):
# not all requirements are complete, return them which adds them to the tree
new_deps = [(t.task_module, t.task_family, t.to_str_params())
for t in new_req]
for t in requires.flat_requirements]
return new_deps

# get the next generator result
next_send = requires.paths

def run(self):
logger.info('[pid %s] Worker %s running %s', os.getpid(), self.worker_id, self.task)

Expand Down
41 changes: 40 additions & 1 deletion test/worker_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,15 @@ def run(self):

class DynamicDummyTask(Task):
p = luigi.Parameter()
sleep = luigi.FloatParameter(default=0.5, significant=False)

def output(self):
return luigi.LocalTarget(self.p)

def run(self):
with self.output().open('w') as f:
f.write('Done!')
time.sleep(0.5) # so we can benchmark & see if parallelization works
time.sleep(self.sleep) # so we can benchmark & see if parallelization works


class DynamicDummyTaskWithNamespace(DynamicDummyTask):
Expand Down Expand Up @@ -95,6 +96,37 @@ def run(self):
print('%d: %s' % (i, line.strip()), file=f)


class DynamicRequiresWrapped(Task):
p = luigi.Parameter()

def output(self):
return luigi.LocalTarget(os.path.join(self.p, 'parent'))

def run(self):
reqs = [
DynamicDummyTask(p=os.path.join(self.p, '%s.txt' % i), sleep=0.0)
for i in range(10)
]

# yield again as DynamicRequires
yield luigi.DynamicRequirements(reqs)

# and again with a custom complete function that does base name comparisons
def custom_complete(complete_fn):
if not complete_fn(reqs[0]):
return False
paths = [task.output().path for task in reqs]
basenames = os.listdir(os.path.dirname(paths[0]))
self._custom_complete_called = True
self._custom_complete_result = all(os.path.basename(path) in basenames for path in paths)
return self._custom_complete_result

yield luigi.DynamicRequirements(reqs, custom_complete)

with self.output().open('w') as f:
f.write('Done!')


class DynamicRequiresOtherModule(Task):
p = luigi.Parameter()

Expand Down Expand Up @@ -1153,6 +1185,13 @@ def test_dynamic_dependencies_other_module(self):
luigi.build([t], local_scheduler=True, workers=self.n_workers)
self.assertTrue(t.complete())

def test_wrapped_dynamic_requirements(self):
t = DynamicRequiresWrapped(p=self.p)
luigi.build([t], local_scheduler=True, workers=1)
self.assertTrue(t.complete())
self.assertTrue(getattr(t, '_custom_complete_called', False))
self.assertTrue(getattr(t, '_custom_complete_result', False))


class DynamicDependenciesWithMultipleWorkersTest(DynamicDependenciesTest):
n_workers = 100
Expand Down

0 comments on commit 07551b1

Please sign in to comment.