Skip to content

Commit

Permalink
Better API input validation
Browse files Browse the repository at this point in the history
  • Loading branch information
runabol committed Aug 21, 2023
1 parent 08f36b9 commit cdd9a0a
Show file tree
Hide file tree
Showing 13 changed files with 738 additions and 243 deletions.
139 changes: 41 additions & 98 deletions coordinator/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package coordinator

import (
"context"
"encoding/json"
"io"

"net/http"
"strings"
Expand All @@ -15,7 +17,7 @@ import (
"github.com/runabol/tork/mq"
"github.com/runabol/tork/node"
"github.com/runabol/tork/task"
"github.com/runabol/tork/uuid"
"gopkg.in/yaml.v3"
)

type api struct {
Expand Down Expand Up @@ -78,130 +80,71 @@ func (s *api) listActiveNodes(c *gin.Context) {
c.JSON(http.StatusOK, nodes)
}

func sanitizeTask(t *task.Task) error {
if len(t.Parallel) > 0 && t.Each != nil {
return errors.New("parallel and each tasks are mutually exclusive")
}
if len(t.Parallel) > 0 && t.SubJob != nil {
return errors.New("parallel and subjob tasks are mutually exclusive")
}
if t.Each != nil && t.SubJob != nil {
return errors.New("each and subjob tasks are mutually exclusive")
}
if (len(t.Parallel) > 0 || t.Each != nil) && (len(t.Pre) > 0 || len(t.Post) > 0) {
return errors.New("composite tasks do not support pre/post tasks")
}
if (len(t.Parallel) > 0 || t.Each != nil) && t.Image != "" {
return errors.New("composite tasks do not support image")
}
if (len(t.Parallel) > 0 || t.Each != nil) && t.Queue != "" {
return errors.New("composite tasks do not support queue assignment")
}
if t.SubJob != nil && (len(t.Pre) > 0 || len(t.Post) > 0) {
return errors.New("subjob tasks do not support pre/post tasks")
}
if t.SubJob != nil && t.Image != "" {
return errors.New("subjob tasks do not support image")
}
if t.SubJob != nil && t.Queue != "" {
return errors.New("subjob tasks do not support queue assignment")
}
if t.Each != nil {
if t.Each.List == "" {
return errors.New("each must provide a list expression")
}
if t.Each.Task == nil {
return errors.New("each must provide a task")
}
if err := sanitizeTask(t.Each.Task); err != nil {
return err
}
}
for _, pt := range t.Parallel {
if err := sanitizeTask(pt); err != nil {
return err
}
}
if len(t.Parallel) > 0 || t.Each != nil || t.SubJob != nil {
return nil
}
if strings.TrimSpace(t.Image) == "" {
return errors.New("missing required field: image")
}
if mq.IsCoordinatorQueue(t.Queue) || strings.HasPrefix(t.Queue, mq.QUEUE_EXCLUSIVE_PREFIX) {
return errors.Errorf("can't route to special queue: %s", t.Queue)
}
if t.Retry != nil {
if t.Retry.Attempts != 0 {
return errors.Errorf("can't specify retry.attempts")
}
if t.Retry.Limit > 10 {
return errors.Errorf("can't specify retry.limit > 10")
}
if t.Retry.Limit < 0 {
t.Retry.Limit = 0
}
if t.Retry.Attempts != 0 {
t.Retry.Attempts = 0
}
}
if t.Timeout != "" {
timeout, err := time.ParseDuration(t.Timeout)
if err != nil {
return errors.Errorf("invalid timeout duration: %s", t.Timeout)
}
if timeout < 0 {
return errors.Errorf("invalid timeout duration: %s", t.Timeout)
}
}
return nil
}

func (s *api) createJob(c *gin.Context) {
j := &job.Job{}
var ji *jobInput
var err error
switch c.ContentType() {
case "application/json":
if err := c.BindJSON(&j); err != nil {
ji, err = bindJobInputJSON(c.Request.Body)
if err != nil {
_ = c.AbortWithError(http.StatusBadRequest, err)
return
}
case "text/yaml":
if err := c.BindYAML(&j); err != nil {
ji, err = bindJobInputYAML(c.Request.Body)
if err != nil {
_ = c.AbortWithError(http.StatusBadRequest, err)
return
}
default:
_ = c.AbortWithError(http.StatusBadRequest, errors.Errorf("unknown content type: %s", c.ContentType()))
return
}
if len(j.Tasks) == 0 {
_ = c.AbortWithError(http.StatusBadRequest, errors.New("job has no tasks"))
if err := ji.validate(); err != nil {
_ = c.AbortWithError(http.StatusBadRequest, err)
return
}
for ix, t := range j.Tasks {
if err := sanitizeTask(t); err != nil {
_ = c.AbortWithError(http.StatusBadRequest, errors.Wrapf(err, "tasks[%d]", ix))
return
}
}
n := time.Now()
j.ID = uuid.NewUUID()
j.State = job.Pending
j.CreatedAt = n
j.Context = job.Context{}
j.Context.Inputs = j.Inputs
j := ji.toJob()
if err := s.ds.CreateJob(c, j); err != nil {
_ = c.AbortWithError(http.StatusInternalServerError, err)
return
}
log.Info().Str("task-id", j.ID).Msg("created job")
log.Info().Str("job-id", j.ID).Msg("created job")
if err := s.broker.PublishJob(c, j); err != nil {
_ = c.AbortWithError(http.StatusBadRequest, err)
return
}
c.JSON(http.StatusOK, redactJob(j))
}

func bindJobInputJSON(r io.ReadCloser) (*jobInput, error) {
ji := jobInput{}
body, err := io.ReadAll(r)
if err != nil {
return nil, err
}
dec := json.NewDecoder(strings.NewReader(string(body)))
dec.DisallowUnknownFields()
if err := dec.Decode(&ji); err != nil {
return nil, err
}
return &ji, nil
}

func bindJobInputYAML(r io.ReadCloser) (*jobInput, error) {
ji := jobInput{}
body, err := io.ReadAll(r)
if err != nil {
return nil, err
}
dec := yaml.NewDecoder(strings.NewReader(string(body)))
dec.KnownFields(true)
if err := dec.Decode(&ji); err != nil {
return nil, err
}
return &ji, nil
}

func (s *api) getJob(c *gin.Context) {
id := c.Param("id")
j, err := s.ds.GetJobByID(c, id)
Expand Down
123 changes: 21 additions & 102 deletions coordinator/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,9 @@ func Test_createJob(t *testing.T) {
})
assert.NotNil(t, api)
req, err := http.NewRequest("POST", "/job", strings.NewReader(`{
"name":"test job",
"tasks":[{
"name":"test task",
"image":"some:image"
}]
}`))
Expand All @@ -165,6 +167,25 @@ func Test_createJob(t *testing.T) {
assert.Equal(t, http.StatusOK, w.Code)
}

func Test_createJobInvalidProperty(t *testing.T) {
api := newAPI(Config{
DataStore: datastore.NewInMemoryDatastore(),
Broker: mq.NewInMemoryBroker(),
})
assert.NotNil(t, api)
req, err := http.NewRequest("POST", "/job", strings.NewReader(`{
"tasks":[{
"nosuch":"thing",
"image":"some:image"
}]
}`))
req.Header.Add("Content-Type", "application/json")
assert.NoError(t, err)
w := httptest.NewRecorder()
api.server.Handler.ServeHTTP(w, req)
assert.Equal(t, http.StatusBadRequest, w.Code)
}

func Test_getJob(t *testing.T) {
ds := datastore.NewInMemoryDatastore()
err := ds.CreateJob(context.Background(), &job.Job{
Expand Down Expand Up @@ -192,108 +213,6 @@ func Test_getJob(t *testing.T) {
assert.Equal(t, http.StatusOK, w.Code)
}

func Test_sanitizeTaskRetry(t *testing.T) {
err := sanitizeTask(&task.Task{
Image: "some:image",
Retry: &task.Retry{
Limit: 5,
},
})
assert.NoError(t, err)
err = sanitizeTask(&task.Task{
Image: "some:image",
Retry: &task.Retry{
Limit: 100,
},
})
assert.Error(t, err)
err = sanitizeTask(&task.Task{
Image: "some:image",
Timeout: "-10s",
})
assert.Error(t, err)
err = sanitizeTask(&task.Task{
Image: "some:image",
Timeout: "10s",
})
assert.NoError(t, err)
rt1 := &task.Task{
Image: "some:image",
Timeout: "10s",
Retry: &task.Retry{},
}
err = sanitizeTask(rt1)
assert.NoError(t, err)
err = sanitizeTask(&task.Task{
Parallel: []*task.Task{
{
Image: "some:image",
},
},
})
assert.NoError(t, err)
err = sanitizeTask(&task.Task{
Image: "some:image",
Parallel: []*task.Task{
{
Image: "some:image",
},
},
})
assert.Error(t, err)
err = sanitizeTask(&task.Task{
Parallel: []*task.Task{
{
Image: "some:image",
},
},
Each: &task.Each{
List: "${ some expression }",
Task: &task.Task{
Image: "some:image",
},
},
})
assert.Error(t, err)
err = sanitizeTask(&task.Task{
Each: &task.Each{
List: "${ some expression }",
Task: &task.Task{
Image: "some:image",
},
},
})
assert.NoError(t, err)
err = sanitizeTask(&task.Task{
Each: &task.Each{
Task: &task.Task{
Image: "some:image",
},
},
})
assert.Error(t, err)
err = sanitizeTask(&task.Task{
SubJob: &task.SubJob{},
Each: &task.Each{},
})
assert.Error(t, err)
}

func Test_sanitizeTaskBasic(t *testing.T) {
err := sanitizeTask(&task.Task{})
assert.Error(t, err)
err = sanitizeTask(&task.Task{Image: "some:image"})
assert.NoError(t, err)
err = sanitizeTask(&task.Task{
Parallel: []*task.Task{{Image: "some:image"}},
})
assert.NoError(t, err)
err = sanitizeTask(&task.Task{
Parallel: []*task.Task{{Name: "bad task"}},
})
assert.Error(t, err)
}

func Test_cancelRunningJob(t *testing.T) {
ctx := context.Background()
ds := datastore.NewInMemoryDatastore()
Expand Down
Loading

0 comments on commit cdd9a0a

Please sign in to comment.