woflo is a Python local-first no-bloat extensible task orchestration framework.
Okay, that's a lot of buzz. So what is actually the point?
Main goal is to abstract a lot of functionality related to Task orchestration and execution away while keeping the "API" clear and dead-simple.
Currently this includes:
- retries
- retry timeout
- parallelism
- logging
To download from PyPI use:
pip install woflo
To install from source:
git clone https://github.com/petereon/woflo.git
cd woflo
poetry build
cd dist
pip install ./woflo-<version>-py3-none-any.whl
Intended usage is by utilizing a decorator @task
. Consider a very simple example which would run 10 sleepy workers in parallel without blocking the main thread:
import time
from woflo import task
@task
def sleepy_worker():
time.sleep(5)
print('I am done')
for _ in range(10):
sleepy_task_run = sleepy_worker()
You can also include retries for tasks that might fail at times. Following would attempt to run the decorated function for 3 times in total with 5 second delay between attempts.
from woflo import task
@task(retries=2, retry_sleep_time=5)
def fetch_data_from_api_on_unstable_connection():
...
Furthermore, you can also provide a runner within a @task
decorator. For example the SequentialTaskRun
if prefer your tasks to run sequentially and like to wait around a computer a lot. For example:
from woflo import task
from woflo.runners import SequentialTaskRun
@task(runner=SequentialTaskRun)
def sequential_sleepy_worker():
time.sleep(5)
print('I am done')
for _ in range(10):
sleepy_task_run = sequential_sleepy_worker()
Each TaskRun
should also expose a few methods that enable you to handle it:
.get_result()
to fetch the return value of the finished task.wait()
to block main thread till the task finishes (irrelevant forSequentialTaskRun
which will block until it finishes anyway).stop()
to stop the task while its running (irrelevant forSequentialTaskRun
which will block until it finishes anyway).is_running()
to check if the task is still running (irrelevant forSequentialTaskRun
which will block until it finishes anyway)
Let us define an example task:
import time
from woflo import task
@task
def quick_nap(duration):
time.sleep(duration)
if duration < 10
raise Exception("Ouch oof")
else:
return 'Well rested'
After you run it,
napping = quick_nap(10)
you can check on it to monitor it's state and receive results,
assert napping.is_running()
napping.wait()
assert napping.get_result() == "Well rested"
It is designed to be easily extended by developing a custom Task
runners. Library itself currently exposes two such runners, MultiprocessTaskRun
and SequentialTaskRun
.
Additionally woflo
makes available a BaseTaskRun
, an interface against which custom runners can be developed.
The defualt task runner is MultiprocessTaskRun
, which can run multiple tasks, or even multiple instances of the same task at the same time in parallel in separate Python process.
The defualt task runner, which can run multiple tasks, or even multiple instances of the same task at the same time in parallel in separate Python process.
It offers two modes of operation with different memory overhead 1:
ForkProcess
, which forks a main process and inherits all of its state.ForkProcess
is default on Darwin and Linux (it is not available on Windows)SpawnProcess
, which spawns a new process with some global state.SpawnProcess
is default on Windows.
This behavior can be configured by setting the process_type
:
from woflo.runners.multiprocess import SpawnProcess, MultiprocessTaskRun
MultiprocessTaskRun.process_type = SpawnProcess
@task(runner=MultiprocessTaskRun)
def sleepy_worker():
time.sleep(5)
print('I am done')
-
Setup GitHub Actions, SonarCloud monitoring and Codecov -
Make a PyPI Package - Implement a Thread runner
- Implement a Dask runner
-
Implement an Async runner - Decide on final API and create a version 1.x.x
-
Processes potentially inherint a large in-memory state in MultiprocessTaskRun -
Imports need some refactoring -
SpawnProcess
running intoOSError: [Errno 9] Bad file descriptor
onmacOS 12.6
when usingmultiprocess.sharedctypes.Value
as reflected in this issue -
.get_result()
ofMultiprocessTaskRun
sometimes blocks forever instead of returning result if the result is too large (~ 60kb). This seems to be the case on both macOS and Linux at least.
Footnotes
-
General recommendation is leaving this as default unless you run accross issues with memory usage. This would usually be the case in a memory intensive applications such as data pipeline-ning or ML. To appreciate the difference, please refer to this article by Dr John A Stevenson.
SpawnProcess
tends to be problematic, so if you are not dead-set on usingMultiprocess
as your processing backend you'd probably be better off withDaskTaskRun
(currently on a Roadmap) ↩