A Golang based high-performance, scalable and distributed job execution engine.
- REST API
- Horizontally scalable
- Task isolation - tasks are executed within a container to provide isolation, idempotency, and in order to enforce resource limits
- Automatic recovery of tasks in the event of a worker crash
- Supports both stand-alone and distributed setup
- Retry failed tasks
- Pre/Post tasks
- No single point of failure.
- Task timeout
- Expression Language
- Conditional Tasks
Coordinator: responsible for managing the lifecycle of a task through its various states and for exposing a REST API to the clients.
Worker: responsible for executing tasks by means of a runtime (typically Docker).
Broker: the message-queue, pub/sub mechanism used for routing tasks.
Datastore: holds the state for tasks and jobs.
Runtime: the platform used by workers to execute tasks. Currently only Docker is supported.
Before running the examples, ensure you have the following software installed:
Start in standalone
mode:
go run cmd/main.go -mode standalone
Submit a job in another terminal:
# hello.yaml
---
name: hello job
tasks:
- name: say hello
image: ubuntu:mantic
run: |
echo -n hello world
- name: say goodbye
image: ubuntu:mantic
run: |
echo -n bye world
JOB_ID=$(curl \
-s \
-X POST \
--data-binary @hello.yaml \
-H "Content-type: text/yaml" \
http://localhost:3000/job | jq -r .id)
Query for the status of the job:
curl -s http://localhost:3000/job/$JOB_ID | jq .
{
"id": "ed0dba93d262492b8cf26e6c1c4f1c98",
"state": "COMPLETED",
...
"execution": [
{
...
"state": "COMPLETED",
}
],
}
The following job:
- Downloads a remote video file using a
pre
task to a shared/tmp
volume. - Converts the first 5 seconds of the downloaded video using
ffmpeg
. - Uploads the converted video to a destination using a
post
task.
# convert.yaml
---
name: convert a video
inputs:
source: https://upload.wikimedia.org/wikipedia/commons/1/18/Big_Buck_Bunny_Trailer_1080p.ogv
tasks:
- name: convert the first 5 seconds of a video
image: jrottenberg/ffmpeg:3.4-alpine
run: |
ffmpeg -i /tmp/input.ogv -t 5 /tmp/output.mp4
volumes:
- /tmp
pre:
- name: download the remote file
image: alpine:3.18.3
env:
SOURCE_URL: "{{ inputs.source }}"
run: |
wget \
$SOURCE_URL \
-O /tmp/input.ogv
post:
- name: upload the converted file
image: alpine:3.18.3
run: |
wget \
--post-file=/tmp/output.mp4 \
https://devnull-as-a-service.com/dev/null
By default all tasks are routed to the default
queue.
All workers subscribe to the default
queue unless they make use of the -queue
flag.
It is often desirable to route tasks to different queues in order to create specialized pools of workers.
For example, one pool of workers, specially configured to handle video transcoding can listen to video processing related tasks:
go run cmd/main.go -mode worker -queue transcoding:3 -queue default:10
In this example the worker would handle up to 3 transcoding-related tasks and up to 10 "regular" tasks concurrently.
This could make sense because transcoding tends to be very resource intensive so a single worker might not want to handle more than 3 concurrent tasks.
To route a task to a special queue use the queue
property:
name: transcode a video
queue: transcoding
image: jrottenberg/ffmpeg:3.4-alpine
run: |
ffmpeg \
-i https://upload.wikimedia.org/wikipedia/commons/1/18/Big_Buck_Bunny_Trailer_1080p.ogv \
output.mp4
-
jobs
- incoming jobs land in this queue prior to being scheduled for processing by the Coordinator. -
pending
- incoming tasks land in this queue prior to being scheduled for processing by the Coordinator. -
started
- when the worker starts working on a task it inserts the task to this queue to notify the Coordinator. -
completed
- when a worker completes the processing of a task successfully it inserts it -- along with its output -- to this queue to notify the Coordinator. -
error
- when a worker encounters an error while processing a task it inserts the task to this queue to notify the Coordinator. -
hearbeat
- the queue used by workers to periodically notify the Coordinator about their "aliveness". -
x-<worker id>
- each worker subscribes to an exclusive queue which can be used by the coordinator to cancel tasks started by a particular worker.
You can set custom environment variables for a given task by using the env
property:
name: print a message
image: ubuntu:mantic
env:
INTRO: hello world
OUTRO: bye world
run: |
echo $INTRO
echo $OUTRO
By convention, any environment variables which contain the keywords SECRET
, PASSWORD
or ACCESS_KEY
in their names will have their values automatically redacted from logs as well as from API responses.
Warning: Tork automatically redacts secrets printed to the log, but you should avoid printing secrets to the log intentionally.
The Datastore
is responsible for holding job and task metadata.
You can specify which type of datastore to use using the -datastore
flag.
inmemory
: the default implementation. Typically isn't suitable for production because all state will be lost upon restart.
postgres
: uses a Postgres database as the underlying implementation. Example:
Start Postgres DB:
docker compose up postgres -d
Run a migration to create the database schema
docker compose up migration
Start Tork
go run cmd/main.go \
-mode standalone \
-datastore postgres \
-postgres-dsn "host=localhost user=tork password=tork dbname=tork port=5432 sslmode=disable"
To run in distributed mode we need to use an external message broker.
Start RabbitMQ:
docker compose up rabbitmq -d
Start the Coordinator:
go run cmd/main.go \
-mode coordinator \
-broker rabbitmq \
-rabbitmq-url amqp://guest:guest@localhost:5672
Start the worker(s):
go run cmd/main.go \
-mode worker \
-broker rabbitmq \
-rabbitmq-url amqp://guest:guest@localhost:5672
By default, a task has no resource constraints and can use as much of a given resource as the host’s kernel scheduler allows.
Tork provides several ways to control how much CPU and Memory a task can use:
Setting global defaults at the worker level
--default-cpus-limit
- the default maximum number of CPUs that a task is allowed to use when executing on as given worker. For instance, if the host machine has two CPUs and you set --default-cpus-limit="1.5"
, a task is guaranteed at most one and a half of the CPUs.
--default-memory-limit
- the default maximum amount of memory that a task is allowed to use when executing on a given worker. Units are specified as a positive integer, followed by a suffix of b
, k
, m
, g
, to indicate bytes, kilobytes, megabytes, or gigabytes.
Setting limits on the task itself
For more fine-grained control, default limits can be overridden at an individual task level:
name: some task
image: ubuntu:mantic
run: sleep 10
limits:
cpus: .5
memory: 10m
Tasks can write arbitrary text output to the standard output file defined in $TORK_OUTPUT
. Downstream tasks may refer to outputs of previous tasks. Example:
tasks:
- name: populate a variable
image: ubuntu:mantic
# The task must specify the name of the
# variable under which its results will be
# stored in the job's context
var: task1
run: |
echo -n "world" >> "$TORK_OUTPUT"
- name: say hello
image: ubuntu:mantic
env:
# refer to the outputs of the previous task
NAME: "{{ tasks.task1 }}"
run: |
echo -n hello $NAME
Tork uses the expr language to:
- Evaluate embedded expressions in a task's environment variables.
- Evaluate a task's
if
condition to determine whether a task should run. When anif
expression evaluates to anything expectfalse
, the task will run.
Some examples:
- Skip certain tasks using the
if
property:
inputs:
run: "true"
tasks:
- name: say something
if: "{{ inputs.run == 'true' }}"
image: ubuntu:mantic
run: |
echo "this should not execute"
- access the job's context
inputs:
message: hello world
tasks:
- name: say something
image: ubuntu:mantic
env:
MESSAGE: "{{inputs.message}}"
run: |
echo $MESSAGE
- execute a built-in functions.
tasks:
- name: print the length of a string
image: ubuntu:mantic
env:
LENGTH: "{{len("hello world")}}"
run: |
echo "The length of the string is: $LENGTH"
- execute one of tork's custom functions:
tasks:
- name: generate a random number
image: ubuntu:mantic
env:
RANDOM_NUMBER: "{{ randomInt() }}"
run: |
echo "a random number: $RANDOM_NUMBER"
It is sometimes desireable to execute a task - potentially using a different image
- before or after a task executes and share the state of that execution with the "main" task we want to execute. This is where pre
and post
tasks come in.
Each task can define a set of tasks that will be executed prior to its execution, and after its execution.
The pre
and post
tasks always execute on the same worker node which will execute the task itself and are considered to be an atomic part of the task. That is, a failure in any of the pre
/post
tasks is considered a failure of the entire task.
Additionally, any volumes
defined are also accessible to the pre
and post
tasks.
- name: convert the first 5 seconds of a video
image: jrottenberg/ffmpeg:3.4-alpine
run: |
ffmpeg -i /tmp/input.ogv -t 5 /tmp/output.mp4
volumes:
- /tmp
pre:
- name: download the remote file
image: alpine:3.18.3
run: |
wget \
https://upload.wikimedia.org/wikipedia/commons/1/18/Big_Buck_Bunny_Trailer_1080p.ogv \
-O /tmp/input.ogv
post:
- name: upload the converted file
image: alpine:3.18.3
run: |
wget \
--post-file=/tmp/output.mp4 \
https://devnull-as-a-service.com/dev/null
Submit a new job to be scheduled for execution
Path:
POST /job
Headers:
JSON Input:
Content-Type:application/json
YAML Input:
Content-Type:text/yaml
Request body:
name
- a human-readable name for the jobtasks
- the list of task to executeinputs
- input parameters allow you to specify data that the job expects to use during its runtime. Input values can be accessed by tasks by using a template expression, e.g.{{ inputs.someParam }}
task properties:
name
- a human-readable name for the taskimage
(required) - the docker image to use to execute the taskrun
- the script to run on the containerenv
- a key-value map of environment variablesqueue
- the name of the queue that the task should be routed to. See queues.pre
- the list of tasks to execute prior to executing the actual task.post
- the list of tasks to execute post execution of the actual task.volumes
- a list of temporary volumes, created for the duration of the execution of the task. Useful for sharing state between the task and itspre
andpost
tasks.note: if you get anvolumes: - /data1 - /data2
invalid mount config for type "bind": bind source path does not exist
error it's most likely due to the fact that the Docker daemon isn't allowed to mount volumes from your default$TMPDIR
. Try using the--temp-dir
flag to explictly set it to another directory.retry
- the retry configuration to execute in case of a failure. Example:retry: limit: 5 # will retry up to 5 times
timeout
- the amount of time (specified as300ms
or1h
or45m
etc.) that a task may execute before it is cancelled.
Examples:
JSON:
curl -X POST "http://localhost:3000/job" \
-H "Content-Type: application/json" \
-d '{"name":"sample job","tasks":[{
"name": "sample task",
"image": "ubuntu:mantic",
"run": "echo hello world"
}]}'
YAML:
curl -X POST "http://localhost:3000/job" \
-H "Content-Type: text/yaml" \
-d \
'
name: sample job
tasks:
- name: sample task
image: ubuntu:mantic,
run: echo hello world
'
Response:
HTTP 200
{
"id": "68c602bed6d34d7f9130bfa13786e422",
"name": "sample job",
"state": "PENDING",
"createdAt": "2023-08-12T15:55:12.143998-04:00",
"tasks": [{
"name": "sample task",
"run": "echo hello world",
"image": "ubuntu:mantic,"
}]
}
Path:
PUT /job/{job id}/cancel
Response:
Success:
HTTP 200
{
"status": "OK"
}
Failure:
400 Bad Request
{
"error": "job in not running"
}
Copyright (c) 2023-present Arik Cohen. Tork is free and open-source software licensed under the MIT License.