Skip to content

Commit

Permalink
Support for job-level defaults
Browse files Browse the repository at this point in the history
  • Loading branch information
runabol committed Sep 15, 2023
1 parent f2894a4 commit d6fc951
Show file tree
Hide file tree
Showing 12 changed files with 260 additions and 16 deletions.
52 changes: 44 additions & 8 deletions datastore/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type jobRecord struct {
Result string `db:"result"`
Error string `db:"error_"`
TS string `db:"ts"`
Defaults []byte `db:"defaults"`
}

type nodeRecord struct {
Expand Down Expand Up @@ -218,11 +219,15 @@ func (r nodeRecord) toNode() *tork.Node {
func (r jobRecord) toJob(tasks, execution []*tork.Task) (*tork.Job, error) {
var c tork.JobContext
if err := json.Unmarshal(r.Context, &c); err != nil {
return nil, errors.Wrapf(err, "error deserializing task.context")
return nil, errors.Wrapf(err, "error deserializing job.context")
}
var inputs map[string]string
if err := json.Unmarshal(r.Inputs, &inputs); err != nil {
return nil, errors.Wrapf(err, "error deserializing task.inputs")
return nil, errors.Wrapf(err, "error deserializing job.inputs")
}
var defaults tork.JobDefaults
if err := json.Unmarshal(r.Defaults, &defaults); err != nil {
return nil, errors.Wrapf(err, "error deserializing job.defaults")
}
return &tork.Job{
ID: r.ID,
Expand All @@ -243,6 +248,7 @@ func (r jobRecord) toJob(tasks, execution []*tork.Task) (*tork.Job, error) {
Output: r.Output,
Result: r.Result,
Error: r.Error,
Defaults: defaults,
}, nil
}

Expand Down Expand Up @@ -478,6 +484,24 @@ func (ds *PostgresDatastore) UpdateTask(ctx context.Context, id string, modify f
s := string(b)
subjob = &s
}
var limits *string
if t.Limits != nil {
b, err := json.Marshal(t.Limits)
if err != nil {
return errors.Wrapf(err, "failed to serialize task.limits")
}
s := string(b)
limits = &s
}
var retry *string
if t.Retry != nil {
b, err := json.Marshal(t.Retry)
if err != nil {
return errors.Wrapf(err, "failed to serialize task.retry")
}
s := string(b)
retry = &s
}
q := `update tasks set
position = $1,
state = $2,
Expand All @@ -490,8 +514,11 @@ func (ds *PostgresDatastore) UpdateTask(ctx context.Context, id string, modify f
result = $9,
each_ = $10,
subjob = $11,
parallel = $12
where id = $13`
parallel = $12,
limits = $13,
timeout = $14,
retry = $15
where id = $16`
_, err = ptx.exec(q,
t.Position, // $1
t.State, // $2
Expand All @@ -505,7 +532,10 @@ func (ds *PostgresDatastore) UpdateTask(ctx context.Context, id string, modify f
each, // $10
subjob, // $11
parallel, // $12
t.ID, // $13
limits, // $13
t.Timeout, // $14
retry, // $15
t.ID, // $16
)
if err != nil {
return errors.Wrapf(err, "error updating task %s", t.ID)
Expand Down Expand Up @@ -599,11 +629,17 @@ func (ds *PostgresDatastore) CreateJob(ctx context.Context, j *tork.Job) error {
if err != nil {
return errors.Wrapf(err, "failed to serialize job.inputs")
}
defaults, err := json.Marshal(j.Defaults)
if err != nil {
return errors.Wrapf(err, "failed to serialize job.defaults")
}
q := `insert into jobs
(id,name,description,state,created_at,started_at,tasks,position,inputs,context,parent_id,task_count,output_,result,error_)
(id,name,description,state,created_at,started_at,tasks,position,
inputs,context,parent_id,task_count,output_,result,error_,defaults)
values
($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15)`
_, err = ds.exec(q, j.ID, j.Name, j.Description, j.State, j.CreatedAt, j.StartedAt, tasks, j.Position, inputs, c, j.ParentID, j.TaskCount, j.Output, j.Result, j.Error)
($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16)`
_, err = ds.exec(q, j.ID, j.Name, j.Description, j.State, j.CreatedAt, j.StartedAt, tasks, j.Position,
inputs, c, j.ParentID, j.TaskCount, j.Output, j.Result, j.Error, defaults)
if err != nil {
return errors.Wrapf(err, "error inserting job to the db")
}
Expand Down
14 changes: 14 additions & 0 deletions datastore/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,13 +383,27 @@ func TestPostgresCreateAndGetJob(t *testing.T) {
Inputs: map[string]string{
"var1": "val1",
},
Defaults: tork.JobDefaults{
Timeout: "5s",
Retry: &tork.TaskRetry{
Limit: 2,
},
Limits: &tork.TaskLimits{
CPUs: ".5",
Memory: "10MB",
},
},
}
err = ds.CreateJob(ctx, &j1)
assert.NoError(t, err)
j2, err := ds.GetJobByID(ctx, j1.ID)
assert.NoError(t, err)
assert.Equal(t, j1.ID, j2.ID)
assert.Equal(t, "val1", j2.Inputs["var1"])
assert.Equal(t, "5s", j2.Defaults.Timeout)
assert.Equal(t, 2, j2.Defaults.Retry.Limit)
assert.Equal(t, ".5", j2.Defaults.Limits.CPUs)
assert.Equal(t, "10MB", j2.Defaults.Limits.Memory)
}

func TestPostgresUpdateJob(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion db/postgres/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ CREATE TABLE jobs (
task_count int not null,
output_ text,
result text,
error_ text
error_ text,
defaults jsonb
);
CREATE INDEX idx_jobs_state ON jobs (state);
Expand Down
16 changes: 16 additions & 0 deletions examples/job_defaults.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
name: job defaults example

defaults:
timeout: 5s
retry:
limit: 2

tasks:

- name: sleep a little
image: alpine:3.18.3
run: sleep 1

- name: sleep too much
image: alpine:3.18.3
run: sleep 10
22 changes: 22 additions & 0 deletions input/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ type Job struct {
Tasks []Task `json:"tasks,omitempty" yaml:"tasks,omitempty" validate:"required,min=1,dive"`
Inputs map[string]string `json:"inputs,omitempty" yaml:"inputs,omitempty"`
Output string `json:"output,omitempty" yaml:"output,omitempty" validate:"expr"`
Defaults Defaults `json:"defaults,omitempty" yaml:"defaults,omitempty"`
}

type Defaults struct {
Retry *Retry `json:"retry,omitempty" yaml:"retry,omitempty"`
Limits *Limits `json:"limits,omitempty" yaml:"limits,omitempty"`
Timeout string `json:"timeout,omitempty" yaml:"timeout,omitempty" validate:"duration"`
Queue string `json:"queue,omitempty" yaml:"queue,omitempty" validate:"queue"`
}

func (ji *Job) ID() string {
Expand Down Expand Up @@ -41,5 +49,19 @@ func (ji *Job) ToJob() *tork.Job {
j.Context.Inputs = ji.Inputs
j.TaskCount = len(tasks)
j.Output = ji.Output
j.Defaults = ji.Defaults.ToJobDefaults()
return j
}

func (d Defaults) ToJobDefaults() tork.JobDefaults {
jd := tork.JobDefaults{}
if d.Retry != nil {
jd.Retry = d.Retry.toTaskRetry()
}
if d.Limits != nil {
jd.Limits = d.Limits.toTaskLimits()
}
jd.Timeout = d.Timeout
jd.Queue = d.Queue
return jd
}
22 changes: 15 additions & 7 deletions input/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,11 @@ func (i Task) toTask() *tork.Task {
post := toAuxTasks(i.Post)
var retry *tork.TaskRetry
if i.Retry != nil {
retry = &tork.TaskRetry{
Limit: i.Retry.Limit,
}
retry = i.Retry.toTaskRetry()
}
var limits *tork.TaskLimits
if i.Limits != nil {
limits = &tork.TaskLimits{
CPUs: i.Limits.CPUs,
Memory: i.Limits.Memory,
}
limits = i.Limits.toTaskLimits()
}
var each *tork.EachTask
if i.Each != nil {
Expand Down Expand Up @@ -151,6 +146,19 @@ func toTasks(tis []Task) []*tork.Task {
return result
}

func (l *Limits) toTaskLimits() *tork.TaskLimits {
return &tork.TaskLimits{
CPUs: l.CPUs,
Memory: l.Memory,
}
}

func (r *Retry) toTaskRetry() *tork.TaskRetry {
return &tork.TaskRetry{
Limit: r.Limit,
}
}

type SubJob struct {
ID string `json:"id,omitempty"`
Name string `json:"name,omitempty" yaml:"name,omitempty" validate:"required"`
Expand Down
20 changes: 20 additions & 0 deletions input/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package input
import (
"testing"

"github.com/go-playground/validator/v10"
"github.com/runabol/tork/mq"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -84,6 +85,25 @@ func TestValidateJobNoName(t *testing.T) {
assert.Error(t, err)
}

func TestValidateJobDefaults(t *testing.T) {
j := Job{
Name: "test job",
Tasks: []Task{
{
Name: "some task",
Image: "some:image",
},
},
Defaults: Defaults{
Timeout: "1234",
},
}
err := j.Validate()
assert.Error(t, err)
errs := err.(validator.ValidationErrors)
assert.Equal(t, "Timeout", errs[0].Field())
}

func TestValidateJobTaskNoName(t *testing.T) {
j := Job{
Name: "test job",
Expand Down
8 changes: 8 additions & 0 deletions internal/coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,18 @@ func TestTaskMiddlewareNoOp(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, c)

j1 := &tork.Job{
ID: uuid.NewUUID(),
Name: "test job",
}
err = ds.CreateJob(context.Background(), j1)
assert.NoError(t, err)

tk := &tork.Task{
ID: uuid.NewUUID(),
Name: "my task",
State: tork.TaskStatePending,
JobID: j1.ID,
}

err = ds.CreateTask(context.Background(), tk)
Expand Down
8 changes: 8 additions & 0 deletions internal/coordinator/handlers/pending_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,17 @@ func Test_handlePendingTask(t *testing.T) {
handler := NewPendingHandler(ds, b)
assert.NotNil(t, handler)

j1 := &tork.Job{
ID: uuid.NewUUID(),
Name: "test job",
}
err = ds.CreateJob(ctx, j1)
assert.NoError(t, err)

tk := &tork.Task{
ID: uuid.NewUUID(),
Queue: "test-queue",
JobID: j1.ID,
}

err = ds.CreateTask(ctx, tk)
Expand Down
34 changes: 34 additions & 0 deletions internal/coordinator/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,49 @@ func (s *Scheduler) ScheduleTask(ctx context.Context, t *tork.Task) error {

func (s *Scheduler) scheduleRegularTask(ctx context.Context, t *tork.Task) error {
now := time.Now().UTC()
// apply job-level defaults
job, err := s.ds.GetJobByID(ctx, t.JobID)
if err != nil {
return err
}
if job.Defaults.Queue != "" {
t.Queue = job.Defaults.Queue
}
if t.Queue == "" {
t.Queue = mq.QUEUE_DEFAULT
}
if job.Defaults.Limits != nil {
if t.Limits == nil {
t.Limits = &tork.TaskLimits{}
}
if t.Limits.CPUs == "" {
t.Limits.CPUs = job.Defaults.Limits.CPUs
}
if t.Limits.Memory == "" {
t.Limits.Memory = job.Defaults.Limits.Memory
}
}
if t.Timeout == "" {
t.Timeout = job.Defaults.Timeout
}
if job.Defaults.Retry != nil {
if t.Retry == nil {
t.Retry = &tork.TaskRetry{}
}
if t.Retry.Limit == 0 {
t.Retry.Limit = job.Defaults.Retry.Limit
}
}
// mark job state as scheduled
t.State = tork.TaskStateScheduled
t.ScheduledAt = &now
if err := s.ds.UpdateTask(ctx, t.ID, func(u *tork.Task) error {
u.State = t.State
u.ScheduledAt = t.ScheduledAt
u.Queue = t.Queue
u.Limits = t.Limits
u.Timeout = t.Timeout
u.Retry = t.Retry
return nil
}); err != nil {
return errors.Wrapf(err, "error updating task in datastore")
Expand Down
Loading

0 comments on commit d6fc951

Please sign in to comment.