Skip to content

Commit

Permalink
Feature: sub-job secrets and auto-delete
Browse files Browse the repository at this point in the history
  • Loading branch information
runabol committed Oct 7, 2024
1 parent e6d55db commit abde7e7
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 4 deletions.
8 changes: 8 additions & 0 deletions input/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type SubJob struct {
Description string `json:"description,omitempty" yaml:"description,omitempty"`
Tasks []Task `json:"tasks,omitempty" yaml:"tasks,omitempty" validate:"required"`
Inputs map[string]string `json:"inputs,omitempty" yaml:"inputs,omitempty"`
Secrets map[string]string `json:"secrets,omitempty" yaml:"secrets,omitempty"`
AutoDelete *AutoDelete `json:"autoDelete,omitempty" yaml:"autoDelete,omitempty"`
Output string `json:"output,omitempty" yaml:"output,omitempty"`
Detached bool `json:"detached,omitempty" yaml:"detached,omitempty"`
Webhooks []Webhook `json:"webhooks,omitempty" yaml:"webhooks,omitempty" validate:"dive"`
Expand Down Expand Up @@ -153,10 +155,16 @@ func (i Task) toTask() *tork.Task {
Description: i.SubJob.Description,
Tasks: toTasks(i.SubJob.Tasks),
Inputs: maps.Clone(i.SubJob.Inputs),
Secrets: maps.Clone(i.SubJob.Secrets),
Output: i.SubJob.Output,
Detached: i.SubJob.Detached,
Webhooks: webhooks,
}
if i.SubJob.AutoDelete != nil {
subjob.AutoDelete = &tork.AutoDelete{
After: i.SubJob.AutoDelete.After,
}
}
}
var parallel *tork.ParallelTask
if i.Parallel != nil {
Expand Down
15 changes: 11 additions & 4 deletions internal/coordinator/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,15 @@ func (s *Scheduler) scheduleAttachedSubJob(ctx context.Context, t *tork.Task) er
State: tork.JobStatePending,
Tasks: t.SubJob.Tasks,
Inputs: t.SubJob.Inputs,
Context: tork.JobContext{Inputs: t.SubJob.Inputs},
TaskCount: len(t.SubJob.Tasks),
Output: t.SubJob.Output,
Webhooks: t.SubJob.Webhooks,
Secrets: t.SubJob.Secrets,
Context: tork.JobContext{
Inputs: t.SubJob.Inputs,
Secrets: t.SubJob.Secrets,
},
TaskCount: len(t.SubJob.Tasks),
Output: t.SubJob.Output,
Webhooks: t.SubJob.Webhooks,
AutoDelete: t.SubJob.AutoDelete,
}
if err := s.ds.UpdateTask(ctx, t.ID, func(u *tork.Task) error {
u.State = tork.TaskStateRunning
Expand Down Expand Up @@ -150,10 +155,12 @@ func (s *Scheduler) scheduleDetachedSubJob(ctx context.Context, t *tork.Task) er
State: tork.JobStatePending,
Tasks: t.SubJob.Tasks,
Inputs: t.SubJob.Inputs,
Secrets: t.SubJob.Secrets,
Context: tork.JobContext{Inputs: t.SubJob.Inputs},
TaskCount: len(t.SubJob.Tasks),
Output: t.SubJob.Output,
Webhooks: t.SubJob.Webhooks,
AutoDelete: t.SubJob.AutoDelete,
}
if err := s.ds.CreateJob(ctx, subjob); err != nil {
return errors.Wrapf(err, "error creating subjob")
Expand Down
8 changes: 8 additions & 0 deletions internal/coordinator/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,8 @@ func Test_scheduleSubJobTask(t *testing.T) {
processed := make(chan any)
err := b.SubscribeForJobs(func(j *tork.Job) error {
assert.NotEmpty(t, j.ParentID)
assert.Equal(t, "https://example.com", j.Inputs["some_input"])
assert.Equal(t, "password", j.Secrets["some_secret"])
close(processed)
return nil
})
Expand All @@ -430,6 +432,12 @@ func Test_scheduleSubJobTask(t *testing.T) {
JobID: j.ID,
SubJob: &tork.SubJobTask{
Name: "my sub job",
Inputs: map[string]string{
"some_input": "https://example.com",
},
Secrets: map[string]string{
"some_secret": "password",
},
Tasks: []*tork.Task{
{
Name: "some task",
Expand Down
7 changes: 7 additions & 0 deletions internal/eval/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,13 @@ func EvaluateTask(t *tork.Task, c map[string]any) error {
}
t.SubJob.Inputs[k] = result
}
for k, v := range t.SubJob.Secrets {
result, err := EvaluateTemplate(v, c)
if err != nil {
return err
}
t.SubJob.Secrets[k] = result
}
for _, wh := range t.SubJob.Webhooks {
url, err := EvaluateTemplate(wh.URL, c)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions internal/eval/eval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,9 @@ func TestEvalSubjob(t *testing.T) {
"input1": "{{inputs.VAR1}}",
"input2": "{{inputs.VAR2}}",
},
Secrets: map[string]string{
"secret": "{{secrets.SECRET}}",
},
Webhooks: []*tork.Webhook{{
URL: "{{inputs.VAR1}}",
Headers: map[string]string{
Expand All @@ -242,10 +245,14 @@ func TestEvalSubjob(t *testing.T) {
"VAR1": "VAL1",
"VAR2": "VAL2",
},
"secrets": map[string]string{
"SECRET": "PASSWORD",
},
})
assert.NoError(t, err)
assert.Equal(t, "some name VAL1", t1.SubJob.Name)
assert.Equal(t, "VAL1", t1.SubJob.Inputs["input1"])
assert.Equal(t, "PASSWORD", t1.SubJob.Secrets["secret"])
assert.Equal(t, "VAL1", t1.SubJob.Webhooks[0].URL)
assert.Equal(t, "VAL2", t1.SubJob.Webhooks[0].Headers["somekey"])
}
Expand Down
5 changes: 5 additions & 0 deletions internal/redact/redact.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ func (r *Redacter) doRedactTask(t *tork.Task, secrets map[string]string) {
if redacted.Registry != nil {
redacted.Registry.Password = redactedStr
}
if redacted.SubJob != nil {
for k := range redacted.SubJob.Secrets {
redacted.SubJob.Secrets[k] = redactedStr
}
}
}

func (r *Redacter) RedactJob(j *tork.Job) {
Expand Down
6 changes: 6 additions & 0 deletions internal/redact/redact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ func TestRedactTask(t *testing.T) {
},
},
},
SubJob: &tork.SubJobTask{
Secrets: map[string]string{
"hush": "shhhhh",
},
},
Registry: &tork.Registry{
Username: "me",
Password: "secret",
Expand All @@ -82,6 +87,7 @@ func TestRedactTask(t *testing.T) {
assert.Equal(t, "hello world", ta.Parallel.Tasks[0].Env["harmless"])
assert.Equal(t, "[REDACTED]", ta.Registry.Password)
assert.Equal(t, "[REDACTED]", ta.Env["thing"])
assert.Equal(t, "[REDACTED]", ta.SubJob.Secrets["hush"])
}

func TestRedactJob(t *testing.T) {
Expand Down
8 changes: 8 additions & 0 deletions task.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ type SubJobTask struct {
Description string `json:"description,omitempty"`
Tasks []*Task `json:"tasks,omitempty"`
Inputs map[string]string `json:"inputs,omitempty"`
Secrets map[string]string `json:"secrets,omitempty"`
AutoDelete *AutoDelete `json:"autoDelete,omitempty"`
Output string `json:"output,omitempty"`
Detached bool `json:"detached,omitempty"`
Webhooks []*Webhook `json:"webhooks,omitempty"`
Expand Down Expand Up @@ -256,11 +258,17 @@ func (e *EachTask) Clone() *EachTask {
}

func (s *SubJobTask) Clone() *SubJobTask {
var autoDelete *AutoDelete
if s.AutoDelete != nil {
autoDelete = s.AutoDelete.Clone()
}
return &SubJobTask{
ID: s.ID,
Name: s.Name,
Description: s.Description,
Inputs: maps.Clone(s.Inputs),
Secrets: maps.Clone(s.Secrets),
AutoDelete: autoDelete,
Tasks: CloneTasks(s.Tasks),
Output: s.Output,
Detached: s.Detached,
Expand Down

0 comments on commit abde7e7

Please sign in to comment.