From 07551b147ae75817718b485d38c45dae65b4633b Mon Sep 17 00:00:00 2001 From: Marcel R Date: Tue, 13 Sep 2022 01:14:07 +0200 Subject: [PATCH] Improve control over dynamic requirements (#3179) * Add DynamicRequirements wrapper. * Fix name. * Typo. * Add tests and docs. * Polish docs. * Fix linter errors. * Fix docs. * Fix typo. * Implement review comments. --- doc/tasks.rst | 8 +++- examples/dynamic_requirements.py | 15 ++++++ luigi/__init__.py | 5 +- luigi/task.py | 81 ++++++++++++++++++++++++++++++++ luigi/worker.py | 17 ++++--- test/worker_test.py | 41 +++++++++++++++- 6 files changed, 157 insertions(+), 10 deletions(-) diff --git a/doc/tasks.rst b/doc/tasks.rst index b10a80c3a8..0d8c7bf060 100644 --- a/doc/tasks.rst +++ b/doc/tasks.rst @@ -198,8 +198,8 @@ You can also yield a list of tasks. def run(self): other_target = yield OtherTask() - # dynamic dependencies resolve into targets - f = other_target.open('r') + # dynamic dependencies resolve into targets + f = other_target.open('r') This mechanism is an alternative to Task.requires_ in case @@ -208,6 +208,10 @@ It does come with some constraints: the Task.run_ method will resume from scratch each time a new task is yielded. In other words, you should make sure your Task.run_ method is idempotent. (This is good practice for all Tasks in Luigi, but especially so for tasks with dynamic dependencies). +As this might entail redundant calls to tasks' :func:`~luigi.task.Task.complete` methods, +you should consider setting the "cache_task_completion" option in the :ref:`worker-config`. +To further control how dynamic task requirements are handled internally by worker nodes, +there is also the option to wrap dependent tasks by :class:`~luigi.task.DynamicRequirements`. For an example of a workflow using dynamic dependencies, see `examples/dynamic_requirements.py `_. diff --git a/examples/dynamic_requirements.py b/examples/dynamic_requirements.py index ed2feba81f..13389c45e7 100644 --- a/examples/dynamic_requirements.py +++ b/examples/dynamic_requirements.py @@ -15,6 +15,7 @@ # limitations under the License. # +import os import random as rnd import time @@ -91,6 +92,20 @@ def run(self): with self.output().open('w') as f: f.write('Tada!') + # and in case data is rather long, consider wrapping the requirements + # in DynamicRequirements and optionally define a custom complete method + def custom_complete(complete_fn): + # example: Data() stores all outputs in the same directory, so avoid doing len(data) fs + # calls but rather check only the first, and compare basenames for the rest + # (complete_fn defaults to "lambda task: task.complete()" but can also include caching) + if not complete_fn(data_dependent_deps[0]): + return False + paths = [task.output().path for task in data_dependent_deps] + basenames = os.listdir(os.path.dirname(paths[0])) # a single fs call + return all(os.path.basename(path) in basenames for path in paths) + + yield luigi.DynamicRequirements(data_dependent_deps, custom_complete) + if __name__ == '__main__': luigi.run() diff --git a/luigi/__init__.py b/luigi/__init__.py index aa5c7c17f8..874014211a 100644 --- a/luigi/__init__.py +++ b/luigi/__init__.py @@ -21,7 +21,9 @@ from luigi.__meta__ import __version__ from luigi import task -from luigi.task import Task, Config, ExternalTask, WrapperTask, namespace, auto_namespace +from luigi.task import ( + Task, Config, ExternalTask, WrapperTask, namespace, auto_namespace, DynamicRequirements, +) from luigi import target from luigi.target import Target @@ -56,6 +58,7 @@ __all__ = [ 'task', 'Task', 'Config', 'ExternalTask', 'WrapperTask', 'namespace', 'auto_namespace', + 'DynamicRequirements', 'target', 'Target', 'LocalTarget', 'rpc', 'RemoteScheduler', 'RPCError', 'parameter', 'Parameter', 'DateParameter', 'MonthParameter', 'YearParameter', 'DateHourParameter', 'DateMinuteParameter', 'DateSecondParameter', diff --git a/luigi/task.py b/luigi/task.py index 7ae61d246a..027f080e75 100644 --- a/luigi/task.py +++ b/luigi/task.py @@ -765,6 +765,87 @@ def bulk_complete(cls, parameter_tuples): return generated_tuples +class DynamicRequirements(object): + """ + Wraps dynamic requirements yielded in tasks's run methods to control how completeness checks of + (e.g.) large chunks of tasks are performed. Besides the wrapped *requirements*, instances of + this class can be passed an optional function *custom_complete* that might implement an + optimized check for completeness. If set, the function will be called with a single argument, + *complete_fn*, which should be used to perform the per-task check. Example: + + .. code-block:: python + + class SomeTaskWithDynamicRequirements(luigi.Task): + ... + + def run(self): + large_chunk_of_tasks = [OtherTask(i=i) for i in range(10000)] + + def custom_complete(complete_fn): + # example: assume OtherTask always write into the same directory, so just check + # if the first task is complete, and compare basenames for the rest + if not complete_fn(large_chunk_of_tasks[0]): + return False + paths = [task.output().path for task in large_chunk_of_tasks] + basenames = os.listdir(os.path.dirname(paths[0])) # a single fs call + return all(os.path.basename(path) in basenames for path in paths) + + yield DynamicRequirements(large_chunk_of_tasks, custom_complete) + + .. py:attribute:: requirements + + The original, wrapped requirements. + + .. py:attribute:: flat_requirements + + Flattened view of the wrapped requirements (via :py:func:`flatten`). Read only. + + .. py:attribute:: paths + + Outputs of the requirements in the identical structure (via :py:func:`getpaths`). Read only. + + .. py:attribute:: custom_complete + + The optional, custom function performing the completeness check of the wrapped requirements. + """ + + def __init__(self, requirements, custom_complete=None): + super().__init__() + + # store attributes + self.requirements = requirements + self.custom_complete = custom_complete + + # cached flat requirements and paths + self._flat_requirements = None + self._paths = None + + @property + def flat_requirements(self): + if self._flat_requirements is None: + self._flat_requirements = flatten(self.requirements) + return self._flat_requirements + + @property + def paths(self): + if self._paths is None: + self._paths = getpaths(self.requirements) + return self._paths + + def complete(self, complete_fn=None): + # default completeness check + if complete_fn is None: + def complete_fn(task): + return task.complete() + + # use the custom complete function when set + if self.custom_complete: + return self.custom_complete(complete_fn) + + # default implementation + return all(complete_fn(t) for t in self.flat_requirements) + + class ExternalTask(Task): """ Subclass for references to external dependencies. diff --git a/luigi/worker.py b/luigi/worker.py index 38b21b76ab..8a70e60062 100644 --- a/luigi/worker.py +++ b/luigi/worker.py @@ -55,7 +55,7 @@ from luigi.scheduler import DISABLED, DONE, FAILED, PENDING, UNKNOWN, Scheduler, RetryPolicy from luigi.scheduler import WORKER_STATE_ACTIVE, WORKER_STATE_DISABLED from luigi.target import Target -from luigi.task import Task, flatten, getpaths, Config +from luigi.task import Task, Config, DynamicRequirements from luigi.task_register import TaskClassException from luigi.task_status import RUNNING from luigi.parameter import BoolParameter, FloatParameter, IntParameter, OptionalParameter, Parameter, TimeDeltaParameter @@ -150,14 +150,19 @@ def _run_get_new_deps(self): except StopIteration: return None - new_req = flatten(requires) - if all(self.check_complete(t) for t in new_req): - next_send = getpaths(requires) - else: + # if requires is not a DynamicRequirements, create one to use its default behavior + if not isinstance(requires, DynamicRequirements): + requires = DynamicRequirements(requires) + + if not requires.complete(self.check_complete): + # not all requirements are complete, return them which adds them to the tree new_deps = [(t.task_module, t.task_family, t.to_str_params()) - for t in new_req] + for t in requires.flat_requirements] return new_deps + # get the next generator result + next_send = requires.paths + def run(self): logger.info('[pid %s] Worker %s running %s', os.getpid(), self.worker_id, self.task) diff --git a/test/worker_test.py b/test/worker_test.py index ea443b668c..39d3e466f8 100644 --- a/test/worker_test.py +++ b/test/worker_test.py @@ -59,6 +59,7 @@ def run(self): class DynamicDummyTask(Task): p = luigi.Parameter() + sleep = luigi.FloatParameter(default=0.5, significant=False) def output(self): return luigi.LocalTarget(self.p) @@ -66,7 +67,7 @@ def output(self): def run(self): with self.output().open('w') as f: f.write('Done!') - time.sleep(0.5) # so we can benchmark & see if parallelization works + time.sleep(self.sleep) # so we can benchmark & see if parallelization works class DynamicDummyTaskWithNamespace(DynamicDummyTask): @@ -95,6 +96,37 @@ def run(self): print('%d: %s' % (i, line.strip()), file=f) +class DynamicRequiresWrapped(Task): + p = luigi.Parameter() + + def output(self): + return luigi.LocalTarget(os.path.join(self.p, 'parent')) + + def run(self): + reqs = [ + DynamicDummyTask(p=os.path.join(self.p, '%s.txt' % i), sleep=0.0) + for i in range(10) + ] + + # yield again as DynamicRequires + yield luigi.DynamicRequirements(reqs) + + # and again with a custom complete function that does base name comparisons + def custom_complete(complete_fn): + if not complete_fn(reqs[0]): + return False + paths = [task.output().path for task in reqs] + basenames = os.listdir(os.path.dirname(paths[0])) + self._custom_complete_called = True + self._custom_complete_result = all(os.path.basename(path) in basenames for path in paths) + return self._custom_complete_result + + yield luigi.DynamicRequirements(reqs, custom_complete) + + with self.output().open('w') as f: + f.write('Done!') + + class DynamicRequiresOtherModule(Task): p = luigi.Parameter() @@ -1153,6 +1185,13 @@ def test_dynamic_dependencies_other_module(self): luigi.build([t], local_scheduler=True, workers=self.n_workers) self.assertTrue(t.complete()) + def test_wrapped_dynamic_requirements(self): + t = DynamicRequiresWrapped(p=self.p) + luigi.build([t], local_scheduler=True, workers=1) + self.assertTrue(t.complete()) + self.assertTrue(getattr(t, '_custom_complete_called', False)) + self.assertTrue(getattr(t, '_custom_complete_result', False)) + class DynamicDependenciesWithMultipleWorkersTest(DynamicDependenciesTest): n_workers = 100