Skip to content

Commit

Permalink
Cancel JOB API
Browse files Browse the repository at this point in the history
  • Loading branch information
runabol committed Aug 15, 2023
1 parent 552bc97 commit 0c5c83a
Show file tree
Hide file tree
Showing 11 changed files with 338 additions and 65 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -442,12 +442,12 @@ HTTP 200
}
```
## Cancel a running task
## Cancel a running job
**Path:**
```
PUT /task/{task id}/cancel
PUT /job/{job id}/cancel
```
**Response:**
Expand All @@ -468,7 +468,7 @@ Failure:
400 Bad Request

{
"error": "task in not running"
"error": "job in not running"
}
```
Expand Down
57 changes: 36 additions & 21 deletions coordinator/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ func newAPI(cfg Config) *api {
ds: cfg.DataStore,
}
r.GET("/status", s.status)
r.PUT("/task/:id/cancel", s.cancelTask) // TODO: replace with /job/:id/cancel
r.GET("/task/:id", s.getTask)
r.GET("/queue", s.listQueues)
r.GET("/node", s.listActiveNodes)
r.POST("/job", s.createJob)
r.GET("/job/:id", s.getJob)
r.PUT("/job/:id/cancel", s.cancelJob)
return s
}

Expand Down Expand Up @@ -78,7 +78,7 @@ func (s *api) listActiveNodes(c *gin.Context) {
c.JSON(http.StatusOK, nodes)
}

func validateTask(t task.Task) error {
func sanitizeTask(t *task.Task) error {
if strings.TrimSpace(t.Image) == "" {
return errors.New("missing required field: image")
}
Expand All @@ -96,13 +96,13 @@ func validateTask(t task.Task) error {
return errors.Errorf("can't specify a retry.scalingFactor > 5")
}
if t.Retry.ScalingFactor < 2 {
t.Retry.ScalingFactor = 2
t.Retry.ScalingFactor = task.RETRY_DEFAULT_SCALING_FACTOR
}
if t.Retry.Limit < 0 {
t.Retry.Limit = 0
}
if t.Retry.InitialDelay == "" {
t.Retry.InitialDelay = "1s"
t.Retry.InitialDelay = task.RETRY_DEFAULT_INITIAL_DELAY
}
delay, err := time.ParseDuration(t.Retry.InitialDelay)
if err != nil {
Expand Down Expand Up @@ -138,16 +138,15 @@ func (s *api) createJob(c *gin.Context) {
return
}
default:

_ = c.AbortWithError(http.StatusBadRequest, errors.Errorf("unknown content type: %s", c.ContentType()))
return
}
if len(j.Tasks) == 0 {
_ = c.AbortWithError(http.StatusBadRequest, errors.New("job has not tasks"))
_ = c.AbortWithError(http.StatusBadRequest, errors.New("job has no tasks"))
return
}
for ix, t := range j.Tasks {
if err := validateTask(t); err != nil {
if err := sanitizeTask(&t); err != nil {
_ = c.AbortWithError(http.StatusBadRequest, errors.Wrapf(err, "tasks[%d]", ix))
return
}
Expand Down Expand Up @@ -190,28 +189,44 @@ func (s *api) getTask(c *gin.Context) {
c.JSON(http.StatusOK, redactTask(t))
}

func (s *api) cancelTask(c *gin.Context) {
func (s *api) cancelJob(c *gin.Context) {
id := c.Param("id")
err := s.ds.UpdateTask(c, id, func(u *task.Task) error {
if u.State != task.Running {
return errors.New("task in not running")
}
u.State = task.Cancelled
if u.Node != "" {
node, err := s.ds.GetNodeByID(c, u.Node)
if err != nil {
return err
}
if err := s.broker.PublishTask(c, node.Queue, *u); err != nil {
return err
}
err := s.ds.UpdateJob(c, id, func(u *job.Job) error {
if u.State != job.Running {
return errors.New("job in not running")
}
u.State = job.Cancelled
return nil
})
if err != nil {
_ = c.AbortWithError(http.StatusBadRequest, err)
return
}
tasks, err := s.ds.GetActiveTasks(c, id)
if err != nil {
_ = c.AbortWithError(http.StatusInternalServerError, err)
return
}
for _, t := range tasks {
err := s.ds.UpdateTask(c, t.ID, func(u *task.Task) error {
u.State = task.Cancelled
// notify the node to cancel the task
if u.Node != "" {
node, err := s.ds.GetNodeByID(c, u.Node)
if err != nil {
return err
}
if err := s.broker.PublishTask(c, node.Queue, *u); err != nil {
return err
}
}
return nil
})
if err != nil {
_ = c.AbortWithError(http.StatusBadRequest, err)
return
}
}
c.JSON(http.StatusOK, gin.H{"status": "OK"})
}

Expand Down
111 changes: 102 additions & 9 deletions coordinator/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package coordinator
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/httptest"
Expand All @@ -16,6 +17,7 @@ import (
"github.com/tork/mq"
"github.com/tork/node"
"github.com/tork/task"
"github.com/tork/uuid"
)

func Test_getQueues(t *testing.T) {
Expand Down Expand Up @@ -139,7 +141,7 @@ func Test_getTask(t *testing.T) {
assert.Equal(t, http.StatusOK, w.Code)
}

func Test_creteJob(t *testing.T) {
func Test_createJob(t *testing.T) {
api := newAPI(Config{
DataStore: datastore.NewInMemoryDatastore(),
Broker: mq.NewInMemoryBroker(),
Expand Down Expand Up @@ -190,8 +192,8 @@ func Test_getJob(t *testing.T) {
assert.Equal(t, http.StatusOK, w.Code)
}

func Test_validateTaskRetry(t *testing.T) {
err := validateTask(task.Task{
func Test_sanitizeTaskRetry(t *testing.T) {
err := sanitizeTask(&task.Task{
Image: "some:image",
Retry: &task.Retry{
Limit: 5,
Expand All @@ -200,36 +202,127 @@ func Test_validateTaskRetry(t *testing.T) {
},
})
assert.NoError(t, err)
err = validateTask(task.Task{
err = sanitizeTask(&task.Task{
Image: "some:image",
Retry: &task.Retry{
Limit: 3,
InitialDelay: "1h",
},
})
assert.Error(t, err)
err = validateTask(task.Task{
err = sanitizeTask(&task.Task{
Image: "some:image",
Retry: &task.Retry{
Limit: 100,
},
})
assert.Error(t, err)
err = validateTask(task.Task{
err = sanitizeTask(&task.Task{
Image: "some:image",
Timeout: "-10s",
})
assert.Error(t, err)
err = validateTask(task.Task{
err = sanitizeTask(&task.Task{
Image: "some:image",
Timeout: "10s",
})
assert.NoError(t, err)
rt1 := &task.Task{
Image: "some:image",
Timeout: "10s",
Retry: &task.Retry{},
}
err = sanitizeTask(rt1)
assert.NoError(t, err)
assert.Equal(t, task.RETRY_DEFAULT_INITIAL_DELAY, rt1.Retry.InitialDelay)
assert.Equal(t, task.RETRY_DEFAULT_SCALING_FACTOR, rt1.Retry.ScalingFactor)
rt2 := &task.Task{
Image: "some:image",
Timeout: "10s",
Retry: &task.Retry{
InitialDelay: "10s",
},
}
err = sanitizeTask(rt1)
assert.NoError(t, err)
assert.Equal(t, "10s", rt2.Retry.InitialDelay)
}

func Test_validateTaskBasic(t *testing.T) {
err := validateTask(task.Task{})
err := sanitizeTask(&task.Task{})
assert.Error(t, err)
err = validateTask(task.Task{Image: "some:image"})
err = sanitizeTask(&task.Task{Image: "some:image"})
assert.NoError(t, err)
}

func Test_cancelRunningJob(t *testing.T) {
ctx := context.Background()
ds := datastore.NewInMemoryDatastore()
j1 := job.Job{
ID: uuid.NewUUID(),
State: job.Running,
CreatedAt: time.Now().UTC(),
}
err := ds.CreateJob(ctx, j1)
assert.NoError(t, err)

now := time.Now().UTC()

tasks := []task.Task{{
ID: uuid.NewUUID(),
State: task.Pending,
CreatedAt: &now,
JobID: j1.ID,
}, {
ID: uuid.NewUUID(),
State: task.Scheduled,
CreatedAt: &now,
JobID: j1.ID,
}, {
ID: uuid.NewUUID(),
State: task.Running,
CreatedAt: &now,
JobID: j1.ID,
}, {
ID: uuid.NewUUID(),
State: task.Cancelled,
CreatedAt: &now,
JobID: j1.ID,
}, {
ID: uuid.NewUUID(),
State: task.Completed,
CreatedAt: &now,
JobID: j1.ID,
}, {
ID: uuid.NewUUID(),
State: task.Failed,
CreatedAt: &now,
JobID: j1.ID,
}}

for _, ta := range tasks {
err := ds.CreateTask(ctx, ta)
assert.NoError(t, err)
}

api := newAPI(Config{
DataStore: ds,
Broker: mq.NewInMemoryBroker(),
})

assert.NotNil(t, api)
req, err := http.NewRequest("PUT", fmt.Sprintf("/job/%s/cancel", j1.ID), nil)
assert.NoError(t, err)
w := httptest.NewRecorder()
api.server.Handler.ServeHTTP(w, req)
body, err := io.ReadAll(w.Body)
assert.NoError(t, err)

assert.NoError(t, err)
assert.Equal(t, `{"status":"OK"}`, string(body))
assert.Equal(t, http.StatusOK, w.Code)

actives, err := ds.GetActiveTasks(ctx, j1.ID)
assert.NoError(t, err)
assert.Empty(t, actives)
}
28 changes: 21 additions & 7 deletions coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (c *Coordinator) handleCompletedTask(t task.Task) error {
}); err != nil {
return errors.Wrapf(err, "error updating job in datastore")
}
// fire the next task (if the job isn't completed)
// fire the next task
j, err := c.ds.GetJobByID(ctx, t.JobID)
if err != nil {
return errors.Wrapf(err, "error getting job from datatstore")
Expand All @@ -171,11 +171,18 @@ func (c *Coordinator) handleCompletedTask(t task.Task) error {

func (c *Coordinator) handleFailedTask(t task.Task) error {
ctx := context.Background()
j, err := c.ds.GetJobByID(ctx, t.JobID)
if err != nil {
return errors.Wrapf(err, "unknown task: %s", t.ID)
}
log.Error().
Str("task-id", t.ID).
Str("task-error", t.Error).
Str("task-state", string(t.State)).
Msg("received task failure")
if t.Retry != nil && t.Retry.Attempts < t.Retry.Limit {
if j.State == job.Running &&
t.Retry != nil &&
t.Retry.Attempts < t.Retry.Limit {
// create a new retry task
rt := t
rt.ID = uuid.NewUUID()
Expand All @@ -200,18 +207,25 @@ func (c *Coordinator) handleFailedTask(t task.Task) error {
} else {
// mark the job as FAILED
if err := c.ds.UpdateJob(ctx, t.JobID, func(u *job.Job) error {
u.State = job.Failed
u.FailedAt = t.FailedAt
// we only want to make the job as FAILED
// if it's actually running as opposed to
// possibly being CANCELLED
if u.State == job.Running {
u.State = job.Failed
u.FailedAt = t.FailedAt
}
return nil
}); err != nil {
return errors.Wrapf(err, "error marking the job as failed in the datastore")
}
}
// mark the task as FAILED
return c.ds.UpdateTask(ctx, t.ID, func(u *task.Task) error {
u.State = task.Failed
u.FailedAt = t.FailedAt
u.Error = t.Error
if u.State.IsActive() {
u.State = task.Failed
u.FailedAt = t.FailedAt
u.Error = t.Error
}
return nil
})
}
Expand Down
1 change: 1 addition & 0 deletions datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Datastore interface {
CreateTask(ctx context.Context, t task.Task) error
UpdateTask(ctx context.Context, id string, modify func(u *task.Task) error) error
GetTaskByID(ctx context.Context, id string) (task.Task, error)
GetActiveTasks(ctx context.Context, jobID string) ([]task.Task, error)

CreateNode(ctx context.Context, n node.Node) error
UpdateNode(ctx context.Context, id string, modify func(u *node.Node) error) error
Expand Down
Loading

0 comments on commit 0c5c83a

Please sign in to comment.