Skip to content

Commit

Permalink
Task middleware support
Browse files Browse the repository at this point in the history
  • Loading branch information
runabol committed Sep 9, 2023
1 parent 2c1eae8 commit a3f5d73
Show file tree
Hide file tree
Showing 12 changed files with 282 additions and 72 deletions.
22 changes: 15 additions & 7 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/runabol/tork/internal/coordinator"
"github.com/runabol/tork/internal/worker"
"github.com/runabol/tork/middleware"
"github.com/runabol/tork/middleware/task"
"github.com/runabol/tork/mq"
"github.com/runabol/tork/runtime"
)
Expand All @@ -30,6 +31,7 @@ type Engine struct {
terminate chan any
onStarted func() error
middlewares []middleware.MiddlewareFunc
taskmw []task.MiddlewareFunc
endpoints map[string]middleware.HandlerFunc
mode Mode
}
Expand All @@ -40,6 +42,7 @@ func New(mode Mode) *Engine {
terminate: make(chan any, 1),
onStarted: func() error { return nil },
middlewares: make([]middleware.MiddlewareFunc, 0),
taskmw: make([]task.MiddlewareFunc, 0),
endpoints: make(map[string]middleware.HandlerFunc, 0),
mode: mode,
}
Expand Down Expand Up @@ -227,13 +230,14 @@ func createBroker() (mq.Broker, error) {
func (e *Engine) createCoordinator(broker mq.Broker, ds datastore.Datastore) (*coordinator.Coordinator, error) {
queues := conf.IntMap("coordinator.queues")
c, err := coordinator.NewCoordinator(coordinator.Config{
Broker: broker,
DataStore: ds,
Queues: queues,
Address: conf.String("coordinator.address"),
Middlewares: e.middlewares,
Endpoints: e.endpoints,
Enabled: conf.BoolMap("coordinator.api.endpoints"),
Broker: broker,
DataStore: ds,
Queues: queues,
Address: conf.String("coordinator.address"),
Middlewares: e.middlewares,
Endpoints: e.endpoints,
Enabled: conf.BoolMap("coordinator.api.endpoints"),
TaskMiddlewares: e.taskmw,
})
if err != nil {
return nil, errors.Wrap(err, "error creating the coordinator")
Expand Down Expand Up @@ -282,6 +286,10 @@ func (e *Engine) RegisterMiddleware(mw middleware.MiddlewareFunc) {
e.middlewares = append(e.middlewares, mw)
}

func (e *Engine) RegisterTaskMiddleware(mw task.MiddlewareFunc) {
e.taskmw = append(e.taskmw, mw)
}

func (e *Engine) RegisterEndpoint(method, path string, handler middleware.HandlerFunc) {
e.endpoints[fmt.Sprintf("%s %s", method, path)] = handler
}
75 changes: 43 additions & 32 deletions internal/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/runabol/tork/internal/coordinator/api"
"github.com/runabol/tork/internal/coordinator/handlers"
"github.com/runabol/tork/middleware"
"github.com/runabol/tork/middleware/task"

"github.com/runabol/tork/mq"

Expand All @@ -28,22 +29,23 @@ type Coordinator struct {
api *api.API
ds datastore.Datastore
queues map[string]int
onPending tork.TaskHandler
onStarted tork.TaskHandler
onError tork.TaskHandler
onPending task.HandlerFunc
onStarted task.HandlerFunc
onError task.HandlerFunc
onJob tork.JobHandler
onHeartbeat tork.NodeHandler
onCompleted tork.TaskHandler
onCompleted task.HandlerFunc
}

type Config struct {
Broker mq.Broker
DataStore datastore.Datastore
Address string
Queues map[string]int
Middlewares []middleware.MiddlewareFunc
Endpoints map[string]middleware.HandlerFunc
Enabled map[string]bool
Broker mq.Broker
DataStore datastore.Datastore
Address string
Queues map[string]int
Middlewares []middleware.MiddlewareFunc
Endpoints map[string]middleware.HandlerFunc
Enabled map[string]bool
TaskMiddlewares []task.MiddlewareFunc
}

func NewCoordinator(cfg Config) (*Coordinator, error) {
Expand Down Expand Up @@ -93,35 +95,44 @@ func NewCoordinator(cfg Config) (*Coordinator, error) {
return nil, err
}

onPending := task.ApplyMiddleware(
handlers.NewPendingHandler(cfg.DataStore, cfg.Broker),
cfg.TaskMiddlewares,
)

onStarted := task.ApplyMiddleware(
handlers.NewStartedHandler(cfg.DataStore, cfg.Broker),
cfg.TaskMiddlewares,
)

onError := task.ApplyMiddleware(
handlers.NewErrorHandler(cfg.DataStore, cfg.Broker),
cfg.TaskMiddlewares,
)

onCompleted := task.ApplyMiddleware(
handlers.NewCompletedHandler(cfg.DataStore, cfg.Broker),
cfg.TaskMiddlewares,
)

return &Coordinator{
Name: name,
api: api,
broker: cfg.Broker,
ds: cfg.DataStore,
queues: cfg.Queues,
onPending: handlers.NewPendingHandler(
cfg.DataStore,
cfg.Broker,
),
onStarted: handlers.NewStartedHandler(
cfg.DataStore,
cfg.Broker,
),
onError: handlers.NewErrorHandler(
cfg.DataStore,
cfg.Broker,
),
Name: name,
api: api,
broker: cfg.Broker,
ds: cfg.DataStore,
queues: cfg.Queues,
onPending: onPending,
onStarted: onStarted,
onError: onError,
onJob: handlers.NewJobHandler(
cfg.DataStore,
cfg.Broker,
cfg.TaskMiddlewares...,
),
onHeartbeat: handlers.NewHeartbeatHandler(
cfg.DataStore,
),
onCompleted: handlers.NewCompletedHandler(
cfg.DataStore,
cfg.Broker,
),
onCompleted: onCompleted,
}, nil
}

Expand Down
82 changes: 59 additions & 23 deletions internal/coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package coordinator

import (
"context"
"errors"
"os"
"testing"
"time"

"github.com/runabol/tork"
"github.com/runabol/tork/datastore"
"github.com/runabol/tork/middleware/task"
"github.com/runabol/tork/mq"

"github.com/runabol/tork/runtime"
Expand All @@ -33,57 +35,91 @@ func TestNewCoordinatorOK(t *testing.T) {
assert.NotNil(t, c)
}

func TestStartCoordinator(t *testing.T) {
func TestTaskMiddlewareWithResult(t *testing.T) {
c, err := NewCoordinator(Config{
Broker: mq.NewInMemoryBroker(),
DataStore: datastore.NewInMemoryDatastore(),
Address: ":4444",
TaskMiddlewares: []task.MiddlewareFunc{
func(next task.HandlerFunc) task.HandlerFunc {
return func(ctx context.Context, t *tork.Task) error {
t.Result = "some result"
return nil
}
},
},
})
assert.NoError(t, err)
assert.NotNil(t, c)
err = c.Start()
assert.NoError(t, err)
}

func Test_handleConditionalTask(t *testing.T) {
ctx := context.Background()
b := mq.NewInMemoryBroker()
tk := &tork.Task{}
assert.NoError(t, c.onPending(context.Background(), tk))
assert.Equal(t, "some result", tk.Result)
}

completed := 0
err := b.SubscribeForTasks(mq.QUEUE_COMPLETED, func(t *tork.Task) error {
completed = completed + 1
return nil
func TestTaskMiddlewareWithError(t *testing.T) {
Err := errors.New("some error")
c, err := NewCoordinator(Config{
Broker: mq.NewInMemoryBroker(),
DataStore: datastore.NewInMemoryDatastore(),
TaskMiddlewares: []task.MiddlewareFunc{
func(next task.HandlerFunc) task.HandlerFunc {
return func(ctx context.Context, t *tork.Task) error {
return Err
}
},
},
})
assert.NoError(t, err)
assert.NotNil(t, c)

tk := &tork.Task{}
assert.ErrorIs(t, c.onPending(context.Background(), tk), Err)
}

func TestTaskMiddlewareNoOp(t *testing.T) {
ds := datastore.NewInMemoryDatastore()
c, err := NewCoordinator(Config{
Broker: b,
Broker: mq.NewInMemoryBroker(),
DataStore: ds,
TaskMiddlewares: []task.MiddlewareFunc{
func(next task.HandlerFunc) task.HandlerFunc {
return func(ctx context.Context, t *tork.Task) error {
return next(ctx, t)
}
},
},
})
assert.NoError(t, err)
assert.NotNil(t, c)

tk := &tork.Task{
ID: uuid.NewUUID(),
Queue: "test-queue",
If: "false",
Name: "my task",
State: tork.TaskStatePending,
}

err = ds.CreateTask(ctx, tk)
err = ds.CreateTask(context.Background(), tk)
assert.NoError(t, err)

err = c.onPending(context.Background(), tk)
assert.NoError(t, err)

err = c.onPending(ctx, tk)
t2, err := ds.GetTaskByID(context.Background(), tk.ID)
assert.NoError(t, err)

// wait for the task to get processed
time.Sleep(time.Millisecond * 100)
assert.Equal(t, tork.TaskStateScheduled, t2.State)
}

tk, err = ds.GetTaskByID(ctx, tk.ID)
func TestStartCoordinator(t *testing.T) {
c, err := NewCoordinator(Config{
Broker: mq.NewInMemoryBroker(),
DataStore: datastore.NewInMemoryDatastore(),
Address: ":4444",
})
assert.NoError(t, err)
assert.NotNil(t, c)
err = c.Start()
assert.NoError(t, err)
assert.Equal(t, tork.TaskStateScheduled, tk.State)
// task should only be processed once
assert.Equal(t, 1, completed)
}

func TestRunHelloWorldJob(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion internal/coordinator/handlers/completed.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/runabol/tork/datastore"
"github.com/runabol/tork/internal/eval"
"github.com/runabol/tork/internal/uuid"
"github.com/runabol/tork/middleware/task"
"github.com/runabol/tork/mq"
)

Expand All @@ -20,7 +21,7 @@ type completedHandler struct {
onJob func(context.Context, *tork.Job) error
}

func NewCompletedHandler(ds datastore.Datastore, b mq.Broker) tork.TaskHandler {
func NewCompletedHandler(ds datastore.Datastore, b mq.Broker) task.HandlerFunc {
h := &completedHandler{
ds: ds,
broker: b,
Expand Down
3 changes: 2 additions & 1 deletion internal/coordinator/handlers/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/runabol/tork/datastore"
"github.com/runabol/tork/internal/eval"
"github.com/runabol/tork/internal/uuid"
"github.com/runabol/tork/middleware/task"
"github.com/runabol/tork/mq"
)

Expand All @@ -18,7 +19,7 @@ type errorHandler struct {
broker mq.Broker
}

func NewErrorHandler(ds datastore.Datastore, b mq.Broker) tork.TaskHandler {
func NewErrorHandler(ds datastore.Datastore, b mq.Broker) task.HandlerFunc {
h := &errorHandler{
ds: ds,
broker: b,
Expand Down
7 changes: 4 additions & 3 deletions internal/coordinator/handlers/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/runabol/tork/datastore"
"github.com/runabol/tork/internal/eval"
"github.com/runabol/tork/internal/uuid"
"github.com/runabol/tork/middleware/task"
"github.com/runabol/tork/mq"
)

Expand All @@ -21,12 +22,12 @@ type jobHandler struct {
onCancel func(context.Context, *tork.Job) error
}

func NewJobHandler(ds datastore.Datastore, b mq.Broker) tork.JobHandler {
func NewJobHandler(ds datastore.Datastore, b mq.Broker, mw ...task.MiddlewareFunc) tork.JobHandler {
h := &jobHandler{
ds: ds,
broker: b,
onError: NewErrorHandler(ds, b),
onPending: NewPendingHandler(ds, b),
onError: task.ApplyMiddleware(NewErrorHandler(ds, b), mw),
onPending: task.ApplyMiddleware(NewPendingHandler(ds, b), mw),
onCancel: NewCancelHandler(ds, b),
}
return h.handle
Expand Down
3 changes: 2 additions & 1 deletion internal/coordinator/handlers/pending.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/runabol/tork"
"github.com/runabol/tork/datastore"
"github.com/runabol/tork/internal/coordinator/scheduler"
"github.com/runabol/tork/middleware/task"
"github.com/runabol/tork/mq"
)

Expand All @@ -19,7 +20,7 @@ type pendingHandler struct {
broker mq.Broker
}

func NewPendingHandler(ds datastore.Datastore, b mq.Broker) tork.TaskHandler {
func NewPendingHandler(ds datastore.Datastore, b mq.Broker) task.HandlerFunc {
h := &pendingHandler{
ds: ds,
broker: b,
Expand Down
Loading

0 comments on commit a3f5d73

Please sign in to comment.