A Golang based high-performance, scalable and distributed workflow engine.
- REST API
- Horizontally scalable
- Task isolation - tasks are executed within a container to provide isolation, idempotency, and in order to enforce resource limits
- Automatic recovery of tasks in the event of a worker crash
- Supports both stand-alone and distributed setup
- Retry failed tasks
- Pre/Post tasks
- No single point of failure
- Task timeout
- Full-text search
- Expression Language
- Conditional Tasks
- Parallel Task
- Each Task
- Subjob Task
- Web UI
Coordinator: responsible for managing the lifecycle of a task through its various states and for exposing a REST API to the clients.
Worker: responsible for executing tasks by means of a runtime (typically Docker).
Broker: the message-queue, pub/sub mechanism used for routing tasks.
Datastore: holds the state for tasks and jobs.
Runtime: the platform used by workers to execute tasks. Currently only Docker is supported.
Before running the examples, ensure you have the following software installed:
Start in standalone
mode:
go run cmd/main.go standalone
Submit a job in another terminal:
# hello.yaml
---
name: hello job
tasks:
- name: say hello
image: ubuntu:mantic #docker image
run: |
echo -n hello world
- name: say goodbye
image: ubuntu:mantic
run: |
echo -n bye world
JOB_ID=$(curl \
-s \
-X POST \
--data-binary @hello.yaml \
-H "Content-type: text/yaml" \
http://localhost:8000/jobs | jq -r .id)
Query for the status of the job:
curl -s http://localhost:8000/jobs/$JOB_ID | jq .
{
"id": "ed0dba93d262492b8cf26e6c1c4f1c98",
"state": "COMPLETED",
...
"execution": [
{
...
"state": "COMPLETED",
}
],
}
The following job:
- Downloads a remote video file using a
pre
task to a shared/tmp
volume. - Converts the first 5 seconds of the downloaded video using
ffmpeg
. - Uploads the converted video to a destination using a
post
task.
# convert.yaml
---
name: convert a video
inputs:
source: https://upload.wikimedia.org/wikipedia/commons/1/18/Big_Buck_Bunny_Trailer_1080p.ogv
tasks:
- name: convert the first 5 seconds of a video
image: jrottenberg/ffmpeg:3.4-alpine
run: |
ffmpeg -i /tmp/input.ogv -t 5 /tmp/output.mp4
volumes:
- /tmp
pre:
- name: download the remote file
image: alpine:3.18.3
env:
SOURCE_URL: "{{ inputs.source }}"
run: |
wget \
$SOURCE_URL \
-O /tmp/input.ogv
post:
- name: upload the converted file
image: alpine:3.18.3
run: |
wget \
--post-file=/tmp/output.mp4 \
https://devnull-as-a-service.com/dev/null
Check out the examples folder.
By default all tasks are routed to the default
queue.
All workers subscribe to the default
queue unless they make use of the -queue
flag.
It is often desirable to route tasks to different queues in order to create specialized pools of workers.
For example, one pool of workers, specially configured to handle video transcoding can listen to video processing related tasks:
go run cmd/main.go worker -queue transcoding:3 -queue default:10
In this example the worker would handle up to 3 transcoding-related tasks and up to 10 "regular" tasks concurrently.
This could make sense because transcoding tends to be very resource intensive so a single worker might not want to handle more than 3 concurrent tasks.
To route a task to a specific queue use the queue
property:
name: transcode a video
queue: transcoding
image: jrottenberg/ffmpeg:3.4-alpine
run: |
ffmpeg \
-i https://upload.wikimedia.org/wikipedia/commons/1/18/Big_Buck_Bunny_Trailer_1080p.ogv \
output.mp4
-
jobs
- The queue used by for job creation and job-related state changes (e.g. cancellation) -
pending
- incoming tasks land in this queue prior to being scheduled for processing by the Coordinator. -
started
- when the worker starts working on a task it inserts the task to this queue to notify the Coordinator. -
completed
- when a worker completes the processing of a task successfully it inserts it -- along with its output -- to this queue to notify the Coordinator. -
error
- when a worker encounters an error while processing a task it inserts the task to this queue to notify the Coordinator. -
hearbeat
- the queue used by workers to periodically notify the Coordinator about their "aliveness". -
x-<worker id>
- each worker subscribes to an exclusive queue which can be used by the coordinator to cancel tasks started by a particular worker.
You can set custom environment variables for a given task by using the env
property:
name: print a message
image: ubuntu:mantic
env:
INTRO: hello world
OUTRO: bye world
run: |
echo $INTRO
echo $OUTRO
By convention, any environment variables which contain the keywords SECRET
, PASSWORD
or ACCESS_KEY
in their names will have their values automatically redacted from logs as well as from API responses.
Warning: Tork automatically redacts secrets printed to the log, but you should avoid printing secrets to the log intentionally.
The Datastore
is responsible for holding job and task metadata.
You can specify which type of datastore to use using the -datastore
flag.
inmemory
: the default implementation. Typically isn't suitable for production because all state will be lost upon restart.
postgres
: uses a Postgres database as the underlying implementation. Example:
Start Postgres DB:
docker compose up postgres -d
Run a migration to create the database schema
docker compose up migration
Start Tork
go run cmd/main.go \
standalone \
-datastore postgres \
-postgres-dsn "host=localhost user=tork password=tork dbname=tork port=5432 sslmode=disable"
To run in distributed mode we need to use an external message broker.
Start RabbitMQ:
docker compose up rabbitmq -d
Start the Coordinator:
go run cmd/main.go \
coordinator \
-broker rabbitmq \
-rabbitmq-url amqp://guest:guest@localhost:5672
Start the worker(s):
go run cmd/main.go \
worker \
-broker rabbitmq \
-rabbitmq-url amqp://guest:guest@localhost:5672
By default, a task has no resource constraints and can use as much of a given resource as the host’s kernel scheduler allows.
Tork provides several ways to control how much CPU and Memory a task can use:
Setting global defaults at the worker level
--default-cpus-limit
- the default maximum number of CPUs that a task is allowed to use when executing on as given worker. For instance, if the host machine has two CPUs and you set --default-cpus-limit="1.5"
, a task is guaranteed at most one and a half of the CPUs.
--default-memory-limit
- the default maximum amount of memory that a task is allowed to use when executing on a given worker. Units are specified as a positive integer, followed by a suffix of b
, k
, m
, g
, to indicate bytes, kilobytes, megabytes, or gigabytes.
Setting limits on the task itself
For more fine-grained control, default limits can be overridden at an individual task level:
name: some task
image: ubuntu:mantic
run: sleep 10
limits:
cpus: .5
memory: 10m
Tasks can write arbitrary text output to the standard output file defined in $TORK_OUTPUT
. Downstream tasks may refer to outputs of previous tasks. Example:
tasks:
- name: populate a variable
image: ubuntu:mantic
# The task must specify the name of the
# variable under which its results will be
# stored in the job's context
var: task1
run: |
echo -n "world" >> "$TORK_OUTPUT"
- name: say hello
image: ubuntu:mantic
env:
# refer to the outputs of the previous task
NAME: "{{ tasks.task1 }}"
run: |
echo -n hello $NAME
Tork uses the expr expression language to:
- Evaluate C-style embedded expressions in a task's environment variables.
- Evaluate a task's
if
condition to determine whether a task should run. When anif
expression evaluates to anything expectfalse
, the task will run.
Some examples:
Skip certain tasks using the if
property:
inputs:
run: "false"
tasks:
- name: say something
if: "{{ inputs.run == 'true' }}"
image: ubuntu:mantic
run: |
echo "this should not execute"
Access the job's context
inputs:
message: hello world
tasks:
- name: say something
image: ubuntu:mantic
env:
MESSAGE: "{{inputs.message}}"
run: |
echo $MESSAGE
Execute a built-in function.
tasks:
- name: print the length of a string
image: ubuntu:mantic
env:
LENGTH: "{{len("hello world")}}"
run: |
echo "The length of the string is: $LENGTH"
Execute one of Tork's built-in functions:
tasks:
- name: generate a random number
image: ubuntu:mantic
env:
RANDOM_NUMBER: "{{ randomInt() }}"
run: |
echo "a random number: $RANDOM_NUMBER"
To run a group of tasks concurrently, wrap them in a parallel
task. Example:
- name: a parallel task
parallel:
tasks:
- image: ubuntu:mantic
run: sleep 2
- image: ubuntu:mantic
run: sleep 1
- image: ubuntu:mantic
run: sleep 3
Executes the task
to for each item
in list
, in parallel.
- name: sample each task
each:
list: "{{ range(1,5) }}"
task:
image: ubuntu:mantic
env:
ITEM: "{{item.value}}"
INDEX: "{{item.index}}"
run: echo -n HELLO $ITEM at $INDEX
A task can start another job. When a sub-job completes or fails it marks its parent task as COMPLETED
or FAILED
respectively.
- name: a task that starts a sub-job
subjob:
name: my sub job
tasks:
- name: hello sub task
image: ubuntu:mantic
run: echo start of sub-job
- name: bye task
image: ubuntu:mantic
run: echo end of sub-job
It is sometimes desireable to execute a task - potentially using a different image
- before or after a task executes and share the state of that execution with the "main" task we want to execute. This is where pre
and post
tasks come in.
Each task can define a set of tasks that will be executed prior to its execution, and after its execution.
The pre
and post
tasks always execute on the same worker node which will execute the task itself and are considered to be an atomic part of the task. That is, a failure in any of the pre
/post
tasks is considered a failure of the entire task.
Additionally, any volumes
and networks
defined on the primary task are also accessible to the pre
and post
tasks.
- name: convert the first 5 seconds of a video
image: jrottenberg/ffmpeg:3.4-alpine
run: |
ffmpeg -i /tmp/input.ogv -t 5 /tmp/output.mp4
volumes:
- /tmp
pre:
- name: download the remote file
image: alpine:3.18.3
run: |
wget \
https://upload.wikimedia.org/wikipedia/commons/1/18/Big_Buck_Bunny_Trailer_1080p.ogv \
-O /tmp/input.ogv
post:
- name: upload the converted file
image: alpine:3.18.3
run: |
wget \
--post-file=/tmp/output.mp4 \
https://devnull-as-a-service.com/dev/null
Returns a list of the most recent jobs
Path:
GET /jobs
Query Params:
page
- page number (default: 1)size
- page size (default: 10 min:1 max:20)q
- full text search query
Response:
{
"items": [
{
"id": "c5873550dad7439e85ac781168e6e124",
"name": "sample job",
"state": "COMPLETED",
"createdAt": "2023-08-21T21:52:07.751041Z",
"startedAt": "2023-08-22T01:52:07.765393Z",
"completedAt": "2023-08-22T01:52:12.900569Z"
}
...
],
"number": 1,
"size": 4,
"totalPages": 1
}
Submit a new job to be scheduled for execution
Path:
POST /jobs
Headers:
JSON Input:
Content-Type:application/json
YAML Input:
Content-Type:text/yaml
Request body:
name
- a human-readable name for the jobtasks
- the list of task to executeinputs
- input parameters allow you to specify data that the job expects to use during its runtime. Input values can be accessed by tasks by using a template expression, e.g.{{ inputs.someParam }}
task properties:
name
- a human-readable name for the taskimage
(required) - the docker image to use to execute the taskrun
- the script to run on the containerentrypoint
- Allows to override the image'sentrypoint
. default:[sh, -c]
.cmd
- an alternative to using therun
property when you want to use theimage
's defaultentrypoint
.env
- a key-value map of environment variablesqueue
- the name of the queue that the task should be routed to. See queues.pre
- the list of tasks to execute prior to executing the actual task.post
- the list of tasks to execute post execution of the actual task.volumes
- a list of temporary volumes, created for the duration of the execution of the task. Useful for sharing state between the task and itspre
andpost
tasks.note: if you get anvolumes: - /data1 - /data2
invalid mount config for type "bind": bind source path does not exist
error it's most likely due to the fact that the Docker daemon isn't allowed to mount volumes from your default$TMPDIR
. Try using the--temp-dir
flag to explictly set it to another directory.networks
- Networks are the layer that allow task containers within the same node to communicate with each other. This could be useful when certain nodes are configured with long-running services which the task needs access to.networks: - some-network
retry
- the retry configuration to execute in case of a failure. Example:retry: limit: 5 # will retry up to 5 times
timeout
- the amount of time (specified as300ms
or1h
or45m
etc.) that a task may execute before it is cancelled.
Examples:
JSON:
curl -X POST "http://localhost:8000/jobs" \
-H "Content-Type: application/json" \
-d '{"name":"sample job","tasks":[{
"name": "sample task",
"image": "ubuntu:mantic",
"run": "echo hello world"
}]}'
YAML:
curl -X POST "http://localhost:8000/jobs" \
-H "Content-Type: text/yaml" \
-d \
'
name: sample job
tasks:
- name: sample task
image: ubuntu:mantic,
run: echo hello world
'
Response:
HTTP 200
{
"id": "68c602bed6d34d7f9130bfa13786e422",
"name": "sample job",
"state": "PENDING",
"createdAt": "2023-08-12T15:55:12.143998-04:00",
"tasks": [{
"name": "sample task",
"run": "echo hello world",
"image": "ubuntu:mantic,"
}]
}
Path:
PUT /jobs/{job id}/cancel
Response:
Success:
HTTP 200
{
"status": "OK"
}
Failure:
400 Bad Request
{
"message": "job in not running"
}
An endpoint to restart a failed/cancelled job.
Path:
PUT /jobs/{job id}/restart
Response:
Success:
HTTP 200
{
"status": "OK"
}
Failure:
400 Bad Request
{
"message": "job is COMPLETED and can not be restarted"
}
Tork Web is a web based tool for interacting with Tork.
- Redis broker integration
- Job-level defaults
- Podman runtime support
- Ability to schedule tasks using a
nodeSelector
- Webhooks
- WASM integration.
- Worker to send periodic updates of tasks it's working on
- Ability to assign tags to jobs in order to group them and for FTS.
Copyright (c) 2023-present Arik Cohen. Tork is free and open-source software licensed under the MIT License.