Skip to content

Commit

Permalink
Fix: task mutation on worker leak
Browse files Browse the repository at this point in the history
  • Loading branch information
runabol committed Aug 1, 2024
1 parent fc44ef3 commit 3b1543c
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 32 deletions.
55 changes: 28 additions & 27 deletions internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func (w *Worker) cancelTask(t *tork.Task) error {
}

func (w *Worker) handleTask(t *tork.Task) error {
ctx := context.Background()
started := time.Now().UTC()
t.StartedAt = &started
t.NodeID = w.id
Expand All @@ -127,7 +128,7 @@ func (w *Worker) handleTask(t *tork.Task) error {
t.Error = err.Error()
t.FailedAt = &now
t.State = tork.TaskStateFailed
return w.broker.PublishTask(context.Background(), mq.QUEUE_ERROR, t)
return w.broker.PublishTask(ctx, mq.QUEUE_ERROR, t)
}
log.Debug().Msgf("Port mapping %d->%s", hostPort, p.Port)
defer w.releasePort(hostPort)
Expand All @@ -136,13 +137,35 @@ func (w *Worker) handleTask(t *tork.Task) error {
adapter := func(ctx context.Context, et task.EventType, t *tork.Task) error {
return w.runTask(t)
}
// clone the task so that the downstream
// process can mutate the task without
// affecting the original
rt := t.Clone()
mw := task.ApplyMiddleware(adapter, w.middleware)
if err := mw(context.Background(), task.StateChange, t); err != nil {
if err := mw(ctx, task.StateChange, rt); err != nil {
now := time.Now().UTC()
t.Error = err.Error()
t.FailedAt = &now
t.State = tork.TaskStateFailed
return w.broker.PublishTask(context.Background(), mq.QUEUE_ERROR, t)
return w.broker.PublishTask(ctx, mq.QUEUE_ERROR, t)
}
switch rt.State {
case tork.TaskStateCompleted:
t.Result = rt.Result
t.CompletedAt = rt.CompletedAt
t.State = rt.State
if err := w.broker.PublishTask(ctx, mq.QUEUE_COMPLETED, t); err != nil {
return err
}
case tork.TaskStateFailed:
t.Error = rt.Error
t.FailedAt = rt.FailedAt
t.State = rt.State
if err := w.broker.PublishTask(ctx, mq.QUEUE_ERROR, t); err != nil {
return err
}
default:
return errors.Errorf("unexpected state %s for task %s", rt.State, t.ID)
}
return nil
}
Expand All @@ -152,45 +175,23 @@ func (w *Worker) runTask(t *tork.Task) error {
defer func() {
atomic.AddInt32(&w.taskCount, -1)
}()
// clone the task so that the downstream
// process can mutate the task without
// affecting the original
rt := t.Clone()
// create a cancellation context in case
// the coordinator wants to cancel the
// task later on
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
w.tasks.Set(t.ID, runningTask{
cancel: cancel,
task: rt,
task: t,
})
defer w.tasks.Delete(t.ID)
// let the coordinator know that the task started executing
if err := w.broker.PublishTask(ctx, mq.QUEUE_STARTED, t); err != nil {
return err
}
if err := w.doRunTask(ctx, rt); err != nil {
if err := w.doRunTask(ctx, t); err != nil {
return err
}
switch rt.State {
case tork.TaskStateCompleted:
t.Result = rt.Result
t.CompletedAt = rt.CompletedAt
t.State = rt.State
if err := w.broker.PublishTask(ctx, mq.QUEUE_COMPLETED, t); err != nil {
return err
}
case tork.TaskStateFailed:
t.Error = rt.Error
t.FailedAt = rt.FailedAt
t.State = rt.State
if err := w.broker.PublishTask(ctx, mq.QUEUE_ERROR, t); err != nil {
return err
}
default:
return errors.Errorf("unexpected state %s for task %s", rt.State, t.ID)
}
return nil
}

Expand Down
12 changes: 7 additions & 5 deletions internal/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,8 @@ func Test_middleware(t *testing.T) {

completions := make(chan any)
err = b.SubscribeForTasks(mq.QUEUE_COMPLETED, func(tk *tork.Task) error {
assert.NotEmpty(t, tk.Result)
assert.Equal(t, "someval", tk.Env["SOMEVAR"])
assert.Equal(t, "someval", tk.Result)
assert.Equal(t, "", tk.Env["SOMEVAR"])
close(completions)
return nil
})
Expand All @@ -349,12 +349,14 @@ func Test_middleware(t *testing.T) {
err = w.Start()
assert.NoError(t, err)

err = b.PublishTask(context.Background(), "someq", &tork.Task{
tk := &tork.Task{
ID: uuid.NewUUID(),
State: tork.TaskStateScheduled,
Image: "alpine:3.18.3",
Run: "echo hello world > $TORK_OUTPUT",
})
Run: "echo -n $SOMEVAR > $TORK_OUTPUT",
}

err = b.PublishTask(context.Background(), "someq", tk)
assert.NoError(t, err)

<-completions
Expand Down

0 comments on commit 3b1543c

Please sign in to comment.