Skip to content

Commit

Permalink
Bug fix on nested sub job in parallel task
Browse files Browse the repository at this point in the history
  • Loading branch information
runabol committed Aug 23, 2023
1 parent cc98094 commit c0c17f8
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 14 deletions.
23 changes: 14 additions & 9 deletions coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (c *Coordinator) scheduleParallelTask(ctx context.Context, t *task.Task) er
pt.ID = uuid.NewUUID()
pt.JobID = j.ID
pt.State = task.Pending
pt.Position = t.Position
pt.Position = 0
pt.CreatedAt = &now
pt.ParentID = t.ID
if err := eval.EvaluateTask(pt, j.Context.AsMap()); err != nil {
Expand Down Expand Up @@ -307,13 +307,17 @@ func (c *Coordinator) handleStartedTask(t *task.Task) error {

func (c *Coordinator) handleCompletedTask(t *task.Task) error {
ctx := context.Background()
if t.ParentID != "" && t.SubJob == nil {
return c.completeCompositeTask(ctx, t)
return c.completeTask(ctx, t)
}

func (c *Coordinator) completeTask(ctx context.Context, t *task.Task) error {
if t.ParentID != "" {
return c.completeSubTask(ctx, t)
}
return c.completeRegularTask(ctx, t)
return c.completeTopLevelTask(ctx, t)
}

func (c *Coordinator) completeCompositeTask(ctx context.Context, t *task.Task) error {
func (c *Coordinator) completeSubTask(ctx context.Context, t *task.Task) error {
parent, err := c.ds.GetTaskByID(ctx, t.ParentID)
if err != nil {
return errors.Wrapf(err, "error getting parent composite task: %s", t.ParentID)
Expand Down Expand Up @@ -364,7 +368,7 @@ func (c *Coordinator) completeEachTask(ctx context.Context, t *task.Task) error
return errors.New("error fetching the parent task")
}
if parent.State == task.Completed {
return c.completeRegularTask(ctx, parent)
return c.completeTask(ctx, parent)
}
return nil
}
Expand Down Expand Up @@ -409,12 +413,12 @@ func (c *Coordinator) completeParallelTask(ctx context.Context, t *task.Task) er
}
// complete the parent task
if parent.State == task.Completed {
return c.completeRegularTask(ctx, parent)
return c.completeTask(ctx, parent)
}
return nil
}

func (c *Coordinator) completeRegularTask(ctx context.Context, t *task.Task) error {
func (c *Coordinator) completeTopLevelTask(ctx context.Context, t *task.Task) error {
log.Debug().Str("task-id", t.ID).Msg("received task completion")
// update task in DB
if err := c.ds.UpdateTask(ctx, t.ID, func(u *task.Task) error {
Expand All @@ -427,7 +431,7 @@ func (c *Coordinator) completeRegularTask(ctx context.Context, t *task.Task) err
}
// update job in DB
if err := c.ds.UpdateJob(ctx, t.JobID, func(u *job.Job) error {
u.Position = u.Position + 1 // FIXME: make idempotent
u.Position = u.Position + 1
if u.Position > len(u.Tasks) {
now := time.Now().UTC()
u.State = job.Completed
Expand Down Expand Up @@ -471,6 +475,7 @@ func (c *Coordinator) completeRegularTask(ctx context.Context, t *task.Task) err
if err != nil {
return errors.Wrapf(err, "could not find parent task for subtask: %s", j.ParentID)
}
fmt.Println("parent task", parent.ID)
now := time.Now().UTC()
parent.State = task.Completed
parent.CompletedAt = &now
Expand Down
3 changes: 3 additions & 0 deletions coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,9 @@ func doRunJob(t *testing.T, filename string) *job.Job {
w, err := worker.NewWorker(worker.Config{
Broker: b,
Runtime: rt,
Queues: map[string]int{
"default": 2,
},
})
assert.NoError(t, err)

Expand Down
10 changes: 5 additions & 5 deletions examples/subjob.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,23 @@ tasks:
subjob:
name: my sub job 1
tasks:
- name: hello sub task
- name: hello sub task job 1
image: ubuntu:mantic
run: echo start of sub-job
- name: bye task
- name: bye sub task job 1
image: ubuntu:mantic
run: echo end of sub-job
- name: sample job 2
subjob:
name: my sub job 2
tasks:
- name: hello sub task
- name: hello sub task job 2
image: ubuntu:mantic
run: echo start of sub-job
- name: bye task
- name: bye sub task job 2
image: ubuntu:mantic
run: echo end of sub-job

- name: bye task
image: ubuntu:mantic
run: echo end of job
run: echo end of job

0 comments on commit c0c17f8

Please sign in to comment.