Skip to content

Commit

Permalink
Submit Job API
Browse files Browse the repository at this point in the history
  • Loading branch information
runabol committed Sep 12, 2023
1 parent b8553f5 commit 63f5c56
Show file tree
Hide file tree
Showing 13 changed files with 207 additions and 216 deletions.
4 changes: 2 additions & 2 deletions internal/coordinator/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,14 +238,14 @@ func (s *API) createJob(c echo.Context) error {
default:
return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("unknown content type: %s", contentType))
}
if j, err := s.submitJob(c.Request().Context(), ji); err != nil {
if j, err := s.SubmitJob(c.Request().Context(), ji); err != nil {
return echo.NewHTTPError(http.StatusBadRequest, err.Error())
} else {
return c.JSON(http.StatusOK, tork.NewJobSummary(j))
}
}

func (s *API) submitJob(ctx context.Context, ji *input.Job) (*tork.Job, error) {
func (s *API) SubmitJob(ctx context.Context, ji *input.Job) (*tork.Job, error) {
if err := ji.Validate(); err != nil {
return nil, err
}
Expand Down
42 changes: 0 additions & 42 deletions internal/coordinator/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/pkg/errors"
"github.com/runabol/tork"
"github.com/runabol/tork/datastore"
"github.com/runabol/tork/pkg/input"
"github.com/runabol/tork/pkg/middleware/web"

"github.com/runabol/tork/mq"
Expand Down Expand Up @@ -612,47 +611,6 @@ func Test_middlewareMultiple(t *testing.T) {
assert.Equal(t, "OK2", string(body))
}

func Test_middlewareSubmitJob(t *testing.T) {
mw := func(next web.HandlerFunc) web.HandlerFunc {
return func(c web.Context) error {
if !strings.HasPrefix(c.Request().URL.Path, "/create-special-job") {
return next(c)
}
_, err := c.SubmitJob(&input.Job{
Name: "Test Job",
Tasks: []input.Task{
{
Name: "first task",
Image: "some:image",
},
},
})
assert.NoError(t, err)
return c.String(http.StatusOK, "OK")
}
}
b := mq.NewInMemoryBroker()
api, err := NewAPI(Config{
DataStore: datastore.NewInMemoryDatastore(),
Broker: b,
Middleware: Middleware{
Web: []web.MiddlewareFunc{mw},
},
})
assert.NoError(t, err)
assert.NotNil(t, api)
req, err := http.NewRequest("GET", "/create-special-job", nil)
assert.NoError(t, err)
w := httptest.NewRecorder()
api.server.Handler.ServeHTTP(w, req)
body, err := io.ReadAll(w.Body)
assert.NoError(t, err)

assert.NoError(t, err)
assert.Equal(t, http.StatusOK, w.Code)
assert.Equal(t, "OK", string(body))
}

func Test_customEndpoint(t *testing.T) {
h := func(c web.Context) error {
return c.String(http.StatusOK, "OK")
Expand Down
28 changes: 0 additions & 28 deletions internal/coordinator/api/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,6 @@ import (
"net/http"

"github.com/labstack/echo/v4"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"github.com/runabol/tork"
"github.com/runabol/tork/pkg/input"
"github.com/runabol/tork/pkg/middleware/web"

"github.com/runabol/tork/mq"
)

type Context struct {
Expand Down Expand Up @@ -48,27 +41,6 @@ func (c *Context) JSON(code int, data any) error {
return c.ctx.JSON(code, data)
}

func (c *Context) SubmitJob(ij *input.Job, listeners ...web.JobListener) (*tork.Job, error) {
if err := c.api.broker.SubscribeForEvents(c.ctx.Request().Context(), mq.TOPIC_JOB, func(ev any) {
j, ok := ev.(*tork.Job)
if !ok {
log.Error().Msg("unable to cast event to *tork.Job")
}
if ij.ID() == j.ID {
for _, listener := range listeners {
listener(j)
}
}
}); err != nil {
return nil, errors.New("error subscribing for job events")
}
job, err := c.api.submitJob(c.ctx.Request().Context(), ij)
if err != nil {
return nil, err
}
return job.Clone(), nil
}

func (c *Context) Error(code int, err error) {
c.err = err
c.code = code
Expand Down
57 changes: 0 additions & 57 deletions internal/coordinator/api/context_test.go

This file was deleted.

5 changes: 5 additions & 0 deletions internal/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/runabol/tork/internal/coordinator/api"
"github.com/runabol/tork/internal/coordinator/handlers"

"github.com/runabol/tork/pkg/input"
"github.com/runabol/tork/pkg/middleware/job"
"github.com/runabol/tork/pkg/middleware/node"
"github.com/runabol/tork/pkg/middleware/task"
Expand Down Expand Up @@ -163,6 +164,10 @@ func NewCoordinator(cfg Config) (*Coordinator, error) {
}, nil
}

func (c *Coordinator) SubmitJob(ctx context.Context, ij *input.Job) (*tork.Job, error) {
return c.api.SubmitJob(ctx, ij)
}

func (c *Coordinator) Start() error {
log.Info().Msgf("starting Coordinator")
// start the coordinator API
Expand Down
44 changes: 8 additions & 36 deletions pkg/engine/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,62 +2,34 @@ package engine

import (
"github.com/pkg/errors"
"github.com/runabol/tork/datastore"
"github.com/runabol/tork/mq"
"github.com/runabol/tork/pkg/conf"
)

func createDatastore() (datastore.Datastore, error) {
dstype := conf.StringDefault("datastore.type", datastore.DATASTORE_INMEMORY)
var ds datastore.Datastore
ds, err := datastore.NewFromProvider(dstype)
if err != nil && !errors.Is(err, datastore.ErrProviderNotFound) {
return nil, err
}
if ds != nil {
return ds, nil
}
switch dstype {
case datastore.DATASTORE_INMEMORY:
ds = datastore.NewInMemoryDatastore()
case datastore.DATASTORE_POSTGRES:
dsn := conf.StringDefault(
"datastore.postgres.dsn",
"host=localhost user=tork password=tork dbname=tork port=5432 sslmode=disable",
)
pg, err := datastore.NewPostgresDataStore(dsn)
if err != nil {
return nil, err
}
ds = pg
default:
return nil, errors.Errorf("unknown datastore type: %s", dstype)
}
return ds, nil
}

func createBroker() (mq.Broker, error) {
func (e *Engine) initBroker() error {
var b mq.Broker
bt := conf.StringDefault("broker.type", mq.BROKER_INMEMORY)

b, err := mq.NewFromProvider(bt)
if err != nil && !errors.Is(err, mq.ErrProviderNotFound) {
return nil, err
return err
}
if b != nil {
return b, nil
e.broker = b
return nil
}
switch bt {
case "inmemory":
b = mq.NewInMemoryBroker()
case "rabbitmq":
rb, err := mq.NewRabbitMQBroker(conf.StringDefault("broker.rabbitmq.url", "amqp://guest:guest@localhost:5672/"))
if err != nil {
return nil, errors.Wrapf(err, "unable to connect to RabbitMQ")
return errors.Wrapf(err, "unable to connect to RabbitMQ")
}
b = rb
default:
return nil, errors.Errorf("invalid broker type: %s", bt)
return errors.Errorf("invalid broker type: %s", bt)
}
return b, nil
e.broker = b
return nil
}
17 changes: 9 additions & 8 deletions pkg/engine/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,19 @@ import (
"github.com/labstack/echo/v4/middleware"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"github.com/runabol/tork/datastore"
"github.com/runabol/tork/internal/coordinator"
"github.com/runabol/tork/internal/uuid"
"github.com/runabol/tork/mq"
"github.com/runabol/tork/pkg/conf"
"github.com/runabol/tork/pkg/middleware/job"
"golang.org/x/time/rate"
)

func (e *Engine) createCoordinator(broker mq.Broker, ds datastore.Datastore) (*coordinator.Coordinator, error) {
func (e *Engine) initCoordinator() error {
queues := conf.IntMap("coordinator.queues")

cfg := coordinator.Config{
Broker: broker,
DataStore: ds,
Broker: e.broker,
DataStore: e.ds,
Queues: queues,
Address: conf.String("coordinator.address"),
Middleware: coordinator.Middleware{
Expand All @@ -43,13 +41,16 @@ func (e *Engine) createCoordinator(broker mq.Broker, ds datastore.Datastore) (*c

c, err := coordinator.NewCoordinator(cfg)
if err != nil {
return nil, errors.Wrap(err, "error creating the coordinator")
return errors.Wrap(err, "error creating the coordinator")
}

if err := c.Start(); err != nil {
return nil, err
return err
}
return c, nil

e.coordinator = c

return nil
}

func echoMiddleware() []echo.MiddlewareFunc {
Expand Down
38 changes: 38 additions & 0 deletions pkg/engine/datastore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package engine

import (
"github.com/pkg/errors"
"github.com/runabol/tork/datastore"
"github.com/runabol/tork/pkg/conf"
)

func (e *Engine) initDatastore() error {
dstype := conf.StringDefault("datastore.type", datastore.DATASTORE_INMEMORY)
var ds datastore.Datastore
ds, err := datastore.NewFromProvider(dstype)
if err != nil && !errors.Is(err, datastore.ErrProviderNotFound) {
return err
}
if ds != nil {
e.ds = ds
return nil
}
switch dstype {
case datastore.DATASTORE_INMEMORY:
ds = datastore.NewInMemoryDatastore()
case datastore.DATASTORE_POSTGRES:
dsn := conf.StringDefault(
"datastore.postgres.dsn",
"host=localhost user=tork password=tork dbname=tork port=5432 sslmode=disable",
)
pg, err := datastore.NewPostgresDataStore(dsn)
if err != nil {
return err
}
ds = pg
default:
return errors.Errorf("unknown datastore type: %s", dstype)
}
e.ds = ds
return nil
}
8 changes: 8 additions & 0 deletions pkg/engine/default.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package engine

import (
"context"

"github.com/runabol/tork"
"github.com/runabol/tork/pkg/input"
"github.com/runabol/tork/pkg/middleware/job"
"github.com/runabol/tork/pkg/middleware/node"
"github.com/runabol/tork/pkg/middleware/task"
Expand Down Expand Up @@ -29,6 +33,10 @@ func RegisterEndpoint(method, path string, handler web.HandlerFunc) {
defaultEngine.RegisterEndpoint(method, path, handler)
}

func SubmitJob(ctx context.Context, ij *input.Job, listeners ...web.JobListener) (*tork.Job, error) {
return defaultEngine.SubmitJob(ctx, ij, listeners...)
}

func Start() error {
return defaultEngine.Start()
}
Expand Down
Loading

0 comments on commit 63f5c56

Please sign in to comment.