From abde7e71e4de2e5833cf596079588f0e74110e23 Mon Sep 17 00:00:00 2001 From: Arik Cohen Date: Mon, 7 Oct 2024 17:09:42 -0400 Subject: [PATCH] Feature: sub-job secrets and auto-delete --- input/task.go | 8 ++++++++ internal/coordinator/scheduler/scheduler.go | 15 +++++++++++---- internal/coordinator/scheduler/scheduler_test.go | 8 ++++++++ internal/eval/eval.go | 7 +++++++ internal/eval/eval_test.go | 7 +++++++ internal/redact/redact.go | 5 +++++ internal/redact/redact_test.go | 6 ++++++ task.go | 8 ++++++++ 8 files changed, 60 insertions(+), 4 deletions(-) diff --git a/input/task.go b/input/task.go index 2ed40655..4fa28fb9 100644 --- a/input/task.go +++ b/input/task.go @@ -41,6 +41,8 @@ type SubJob struct { Description string `json:"description,omitempty" yaml:"description,omitempty"` Tasks []Task `json:"tasks,omitempty" yaml:"tasks,omitempty" validate:"required"` Inputs map[string]string `json:"inputs,omitempty" yaml:"inputs,omitempty"` + Secrets map[string]string `json:"secrets,omitempty" yaml:"secrets,omitempty"` + AutoDelete *AutoDelete `json:"autoDelete,omitempty" yaml:"autoDelete,omitempty"` Output string `json:"output,omitempty" yaml:"output,omitempty"` Detached bool `json:"detached,omitempty" yaml:"detached,omitempty"` Webhooks []Webhook `json:"webhooks,omitempty" yaml:"webhooks,omitempty" validate:"dive"` @@ -153,10 +155,16 @@ func (i Task) toTask() *tork.Task { Description: i.SubJob.Description, Tasks: toTasks(i.SubJob.Tasks), Inputs: maps.Clone(i.SubJob.Inputs), + Secrets: maps.Clone(i.SubJob.Secrets), Output: i.SubJob.Output, Detached: i.SubJob.Detached, Webhooks: webhooks, } + if i.SubJob.AutoDelete != nil { + subjob.AutoDelete = &tork.AutoDelete{ + After: i.SubJob.AutoDelete.After, + } + } } var parallel *tork.ParallelTask if i.Parallel != nil { diff --git a/internal/coordinator/scheduler/scheduler.go b/internal/coordinator/scheduler/scheduler.go index dc4306f1..2f6c2750 100644 --- a/internal/coordinator/scheduler/scheduler.go +++ b/internal/coordinator/scheduler/scheduler.go @@ -115,10 +115,15 @@ func (s *Scheduler) scheduleAttachedSubJob(ctx context.Context, t *tork.Task) er State: tork.JobStatePending, Tasks: t.SubJob.Tasks, Inputs: t.SubJob.Inputs, - Context: tork.JobContext{Inputs: t.SubJob.Inputs}, - TaskCount: len(t.SubJob.Tasks), - Output: t.SubJob.Output, - Webhooks: t.SubJob.Webhooks, + Secrets: t.SubJob.Secrets, + Context: tork.JobContext{ + Inputs: t.SubJob.Inputs, + Secrets: t.SubJob.Secrets, + }, + TaskCount: len(t.SubJob.Tasks), + Output: t.SubJob.Output, + Webhooks: t.SubJob.Webhooks, + AutoDelete: t.SubJob.AutoDelete, } if err := s.ds.UpdateTask(ctx, t.ID, func(u *tork.Task) error { u.State = tork.TaskStateRunning @@ -150,10 +155,12 @@ func (s *Scheduler) scheduleDetachedSubJob(ctx context.Context, t *tork.Task) er State: tork.JobStatePending, Tasks: t.SubJob.Tasks, Inputs: t.SubJob.Inputs, + Secrets: t.SubJob.Secrets, Context: tork.JobContext{Inputs: t.SubJob.Inputs}, TaskCount: len(t.SubJob.Tasks), Output: t.SubJob.Output, Webhooks: t.SubJob.Webhooks, + AutoDelete: t.SubJob.AutoDelete, } if err := s.ds.CreateJob(ctx, subjob); err != nil { return errors.Wrapf(err, "error creating subjob") diff --git a/internal/coordinator/scheduler/scheduler_test.go b/internal/coordinator/scheduler/scheduler_test.go index 44d62cef..957f45c5 100644 --- a/internal/coordinator/scheduler/scheduler_test.go +++ b/internal/coordinator/scheduler/scheduler_test.go @@ -407,6 +407,8 @@ func Test_scheduleSubJobTask(t *testing.T) { processed := make(chan any) err := b.SubscribeForJobs(func(j *tork.Job) error { assert.NotEmpty(t, j.ParentID) + assert.Equal(t, "https://example.com", j.Inputs["some_input"]) + assert.Equal(t, "password", j.Secrets["some_secret"]) close(processed) return nil }) @@ -430,6 +432,12 @@ func Test_scheduleSubJobTask(t *testing.T) { JobID: j.ID, SubJob: &tork.SubJobTask{ Name: "my sub job", + Inputs: map[string]string{ + "some_input": "https://example.com", + }, + Secrets: map[string]string{ + "some_secret": "password", + }, Tasks: []*tork.Task{ { Name: "some task", diff --git a/internal/eval/eval.go b/internal/eval/eval.go index f9a9ff0f..e4116b2a 100644 --- a/internal/eval/eval.go +++ b/internal/eval/eval.go @@ -108,6 +108,13 @@ func EvaluateTask(t *tork.Task, c map[string]any) error { } t.SubJob.Inputs[k] = result } + for k, v := range t.SubJob.Secrets { + result, err := EvaluateTemplate(v, c) + if err != nil { + return err + } + t.SubJob.Secrets[k] = result + } for _, wh := range t.SubJob.Webhooks { url, err := EvaluateTemplate(wh.URL, c) if err != nil { diff --git a/internal/eval/eval_test.go b/internal/eval/eval_test.go index b07efe14..d46841ca 100644 --- a/internal/eval/eval_test.go +++ b/internal/eval/eval_test.go @@ -229,6 +229,9 @@ func TestEvalSubjob(t *testing.T) { "input1": "{{inputs.VAR1}}", "input2": "{{inputs.VAR2}}", }, + Secrets: map[string]string{ + "secret": "{{secrets.SECRET}}", + }, Webhooks: []*tork.Webhook{{ URL: "{{inputs.VAR1}}", Headers: map[string]string{ @@ -242,10 +245,14 @@ func TestEvalSubjob(t *testing.T) { "VAR1": "VAL1", "VAR2": "VAL2", }, + "secrets": map[string]string{ + "SECRET": "PASSWORD", + }, }) assert.NoError(t, err) assert.Equal(t, "some name VAL1", t1.SubJob.Name) assert.Equal(t, "VAL1", t1.SubJob.Inputs["input1"]) + assert.Equal(t, "PASSWORD", t1.SubJob.Secrets["secret"]) assert.Equal(t, "VAL1", t1.SubJob.Webhooks[0].URL) assert.Equal(t, "VAL2", t1.SubJob.Webhooks[0].Headers["somekey"]) } diff --git a/internal/redact/redact.go b/internal/redact/redact.go index a8cde9dd..e61a06c7 100644 --- a/internal/redact/redact.go +++ b/internal/redact/redact.go @@ -80,6 +80,11 @@ func (r *Redacter) doRedactTask(t *tork.Task, secrets map[string]string) { if redacted.Registry != nil { redacted.Registry.Password = redactedStr } + if redacted.SubJob != nil { + for k := range redacted.SubJob.Secrets { + redacted.SubJob.Secrets[k] = redactedStr + } + } } func (r *Redacter) RedactJob(j *tork.Job) { diff --git a/internal/redact/redact_test.go b/internal/redact/redact_test.go index 4f480b70..7433ef86 100644 --- a/internal/redact/redact_test.go +++ b/internal/redact/redact_test.go @@ -58,6 +58,11 @@ func TestRedactTask(t *testing.T) { }, }, }, + SubJob: &tork.SubJobTask{ + Secrets: map[string]string{ + "hush": "shhhhh", + }, + }, Registry: &tork.Registry{ Username: "me", Password: "secret", @@ -82,6 +87,7 @@ func TestRedactTask(t *testing.T) { assert.Equal(t, "hello world", ta.Parallel.Tasks[0].Env["harmless"]) assert.Equal(t, "[REDACTED]", ta.Registry.Password) assert.Equal(t, "[REDACTED]", ta.Env["thing"]) + assert.Equal(t, "[REDACTED]", ta.SubJob.Secrets["hush"]) } func TestRedactJob(t *testing.T) { diff --git a/task.go b/task.go index b8859fb6..d3086017 100644 --- a/task.go +++ b/task.go @@ -107,6 +107,8 @@ type SubJobTask struct { Description string `json:"description,omitempty"` Tasks []*Task `json:"tasks,omitempty"` Inputs map[string]string `json:"inputs,omitempty"` + Secrets map[string]string `json:"secrets,omitempty"` + AutoDelete *AutoDelete `json:"autoDelete,omitempty"` Output string `json:"output,omitempty"` Detached bool `json:"detached,omitempty"` Webhooks []*Webhook `json:"webhooks,omitempty"` @@ -256,11 +258,17 @@ func (e *EachTask) Clone() *EachTask { } func (s *SubJobTask) Clone() *SubJobTask { + var autoDelete *AutoDelete + if s.AutoDelete != nil { + autoDelete = s.AutoDelete.Clone() + } return &SubJobTask{ ID: s.ID, Name: s.Name, Description: s.Description, Inputs: maps.Clone(s.Inputs), + Secrets: maps.Clone(s.Secrets), + AutoDelete: autoDelete, Tasks: CloneTasks(s.Tasks), Output: s.Output, Detached: s.Detached,