Skip to content

Commit

Permalink
support for job submission through middleware
Browse files Browse the repository at this point in the history
  • Loading branch information
runabol committed Sep 7, 2023
1 parent cf9778e commit aaeb8ee
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 8 deletions.
34 changes: 26 additions & 8 deletions internal/coordinator/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type api struct {

type apiContext struct {
ctx echo.Context
api *api
}

func (c *apiContext) Request() *http.Request {
Expand All @@ -53,6 +54,14 @@ func (c *apiContext) JSON(code int, data any) error {
return c.ctx.JSON(code, data)
}

func (c *apiContext) SubmitJob(j *input.Job) (*tork.Job, error) {
job, err := c.api.submitJob(c.ctx.Request().Context(), j)
if err != nil {
return nil, err
}
return job.Clone(), nil
}

func newAPI(cfg Config) *api {
r := echo.New()

Expand All @@ -66,7 +75,7 @@ func newAPI(cfg Config) *api {
}

for _, m := range cfg.Middlewares {
r.Use(middlewareAdapter(m))
r.Use(s.middlewareAdapter(m))
}

r.GET("/health", s.health)
Expand All @@ -82,7 +91,7 @@ func newAPI(cfg Config) *api {
return s
}

func middlewareAdapter(m middleware.MiddlewareFunc) echo.MiddlewareFunc {
func (s *api) middlewareAdapter(m middleware.MiddlewareFunc) echo.MiddlewareFunc {
nextAdapter := func(next echo.HandlerFunc, ec echo.Context) middleware.HandlerFunc {
return func(c middleware.Context) error {
return next(ec)
Expand All @@ -92,6 +101,7 @@ func middlewareAdapter(m middleware.MiddlewareFunc) echo.MiddlewareFunc {
return func(ec echo.Context) error {
return m(nextAdapter(next, ec))(&apiContext{
ctx: ec,
api: s,
})
}
}
Expand Down Expand Up @@ -138,18 +148,26 @@ func (s *api) createJob(c echo.Context) error {
default:
return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("unknown content type: %s", contentType))
}
if err := ji.Validate(); err != nil {
if j, err := s.submitJob(c.Request().Context(), ji); err != nil {
return echo.NewHTTPError(http.StatusBadRequest, err.Error())
} else {
return c.JSON(http.StatusOK, redactJob(j))
}
}

func (s *api) submitJob(ctx context.Context, ji *input.Job) (*tork.Job, error) {
if err := ji.Validate(); err != nil {
return nil, err
}
j := ji.ToJob()
if err := s.ds.CreateJob(c.Request().Context(), j); err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, err.Error())
if err := s.ds.CreateJob(ctx, j); err != nil {
return nil, err
}
log.Info().Str("job-id", j.ID).Msg("created job")
if err := s.broker.PublishJob(c.Request().Context(), j); err != nil {
return echo.NewHTTPError(http.StatusBadRequest, err.Error())
if err := s.broker.PublishJob(ctx, j); err != nil {
return nil, err
}
return c.JSON(http.StatusOK, redactJob(j))
return j, nil
}

func bindJobInputJSON(r io.ReadCloser) (*input.Job, error) {
Expand Down
39 changes: 39 additions & 0 deletions internal/coordinator/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/runabol/tork"
"github.com/runabol/tork/datastore"
"github.com/runabol/tork/input"
"github.com/runabol/tork/middleware"

"github.com/runabol/tork/mq"
Expand Down Expand Up @@ -561,3 +562,41 @@ func Test_middlewareMultiple(t *testing.T) {
assert.Equal(t, http.StatusOK, w.Code)
assert.Equal(t, "OK2", string(body))
}

func Test_middlewareSubmitJob(t *testing.T) {
mw := func(next middleware.HandlerFunc) middleware.HandlerFunc {
return func(c middleware.Context) error {
if !strings.HasPrefix(c.Request().URL.Path, "/create-special-job") {
return next(c)
}
_, err := c.SubmitJob(&input.Job{
Name: "Test Job",
Tasks: []input.Task{
{
Name: "first task",
Image: "some:image",
},
},
})
assert.NoError(t, err)
return c.String(http.StatusOK, "OK")
}
}
b := mq.NewInMemoryBroker()
api := newAPI(Config{
DataStore: datastore.NewInMemoryDatastore(),
Broker: b,
Middlewares: []middleware.MiddlewareFunc{mw},
})
assert.NotNil(t, api)
req, err := http.NewRequest("GET", "/create-special-job", nil)
assert.NoError(t, err)
w := httptest.NewRecorder()
api.server.Handler.ServeHTTP(w, req)
body, err := io.ReadAll(w.Body)
assert.NoError(t, err)

assert.NoError(t, err)
assert.Equal(t, http.StatusOK, w.Code)
assert.Equal(t, "OK", string(body))
}
6 changes: 6 additions & 0 deletions middleware/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package middleware

import (
"net/http"

"github.com/runabol/tork"
"github.com/runabol/tork/input"
)

type MiddlewareFunc func(next HandlerFunc) HandlerFunc
Expand All @@ -17,4 +20,7 @@ type Context interface {

// JSON sends a JSON response with status code.
JSON(code int, data any) error

// SubmitJob submits a job input for processing
SubmitJob(j *input.Job) (*tork.Job, error)
}

0 comments on commit aaeb8ee

Please sign in to comment.