Skip to content

Commit

Permalink
Support for auto-retry of failed tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
runabol committed Aug 10, 2023
1 parent 2dfbec4 commit 71da688
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 3 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,13 @@ Content-Type:text/yaml
- `pre` - the list of tasks to execute prior to executing the actual task.
- `post` - the list of tasks to execute post execution of the actual task.
- `volumes` - a list of temporary volumes, created for the duration of the execution of the task. Useful for sharing state between the task and its `pre` and `post` tasks.
- `retry` - the retry configuration to execute in case of a failure. Example:
```yaml
retry:
limit: 5 # will retry up to 5 times
initialDelay: 5s # optional: default 1s (max: 5m)
scalingFactor: # optional: default 2 (max: 10)
```

**Examples:**

Expand Down
34 changes: 34 additions & 0 deletions coordinator/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,40 @@ func (s *api) createTask(c *gin.Context) {
c.AbortWithError(http.StatusBadRequest, errors.Errorf("can't route to special queue: %s", t.Queue))
return
}
if t.Retry != nil {
if t.Retry.Attempts != 0 {
c.AbortWithError(http.StatusBadRequest, errors.Errorf("can't specify retry.attempts"))
return
}
if t.Retry.Limit > 10 {
c.AbortWithError(http.StatusBadRequest, errors.Errorf("can't specify retry.limit > 10"))
return
}
if t.Retry.ScalingFactor > 5 {
c.AbortWithError(http.StatusBadRequest, errors.Errorf("can't specify a retry.scalingFactor > 5"))
return
}
if t.Retry.ScalingFactor < 2 {
t.Retry.ScalingFactor = 2
}
if t.Retry.Limit < 0 {
t.Retry.Limit = 0
}
if t.Retry.InitialDelay == "" {
t.Retry.InitialDelay = "1s"
}
delay, err := time.ParseDuration(t.Retry.InitialDelay)
if err != nil {
c.AbortWithError(http.StatusBadRequest, errors.Errorf("invalid initial delay duration: %s", t.Retry.InitialDelay))
return
}
if delay > (time.Minute * 5) {
c.AbortWithError(http.StatusBadRequest, errors.Errorf("can't specify retry.initialDelay greater than 5 minutes"))
return
}

}

n := time.Now()
t.ID = uuid.NewUUID()
t.State = task.Pending
Expand Down
32 changes: 29 additions & 3 deletions coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package coordinator
import (
"context"
"fmt"
"math"
"time"

"github.com/pkg/errors"
"github.com/rs/zerolog/log"

"github.com/tork/datastore"
Expand Down Expand Up @@ -140,9 +142,33 @@ func (c *Coordinator) taskFailedHandler(thread string) func(ctx context.Context,
Str("task-error", t.Error).
Str("thread", thread).
Msg("received task failure")
u.State = task.Failed
u.FailedAt = t.FailedAt
u.Error = t.Error
// check if the task has a retry policy and if it hadn't been exhausted
if u.Retry != nil && u.Retry.Attempts < u.Retry.Limit {
u.Retry.Attempts = u.Retry.Attempts + 1
t.Retry = u.Retry
t.State = task.Scheduled
t.Error = ""
qname := t.Queue
if qname == "" {
qname = mq.QUEUE_DEFAULT
}
dur, err := time.ParseDuration(t.Retry.InitialDelay)
if err != nil {
return errors.Wrapf(err, "invalid retry.initialDelay: %s", t.Retry.InitialDelay)
}
go func() {
delay := dur * time.Duration(math.Pow(float64(t.Retry.ScalingFactor), float64(u.Retry.Attempts-1)))
log.Debug().Msgf("delaying retry %s", delay)
time.Sleep(delay)
if err := c.broker.PublishTask(ctx, qname, t); err != nil {
log.Error().Err(err).Msg("error publishing retry task")
}
}()
} else {
u.State = task.Failed
u.FailedAt = t.FailedAt
u.Error = t.Error
}
return nil
})
}
Expand Down
17 changes: 17 additions & 0 deletions task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,21 @@ type Task struct {
Post []Task `json:"post,omitempty"`
Volumes []string `json:"volumes,omitempty"`
Node string `json:"node,omitempty"`
Retry *Retry `json:"retry,omitempty"`
}

const (
RETRY_DEFAULT_INITIAL_DELAY = "1s"
RETRY_DEFAULT_SCALING_FACTOR = 2
)

// Retry allows to specify a retry policy for a given
// task using the exponential backoff formula:
//
// initalDelay*scalingFactor^attempt
type Retry struct {
Limit int `json:"limit,omitempty"`
InitialDelay string `json:"initialDelay,omitempty"`
ScalingFactor int `json:"scalingFactor,omitempty"`
Attempts int `json:"attempts,omitempty"`
}

0 comments on commit 71da688

Please sign in to comment.