Skip to content

Commit

Permalink
Adding support for registering middleware
Browse files Browse the repository at this point in the history
  • Loading branch information
runabol committed Sep 7, 2023
1 parent 435a0e5 commit 265d41d
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 8 deletions.
15 changes: 11 additions & 4 deletions bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/runabol/tork/db/postgres"
"github.com/runabol/tork/internal/coordinator"
"github.com/runabol/tork/internal/worker"
"github.com/runabol/tork/middleware"
"github.com/runabol/tork/mq"
"github.com/runabol/tork/runtime"
)
Expand All @@ -33,6 +34,7 @@ var (
onStarted = defaultOnStartedHander
dsProviders = map[string]datastore.Provider{}
mqProviders = map[string]mq.Provider{}
middlewares = make([]middleware.MiddlewareFunc, 0)
)

func Start(mode Mode) error {
Expand Down Expand Up @@ -213,10 +215,11 @@ func createBroker() (mq.Broker, error) {
func createCoordinator(broker mq.Broker, ds datastore.Datastore) (*coordinator.Coordinator, error) {
queues := conf.IntMap("coordinator.queues")
c, err := coordinator.NewCoordinator(coordinator.Config{
Broker: broker,
DataStore: ds,
Queues: queues,
Address: conf.String("coordinator.address"),
Broker: broker,
DataStore: ds,
Queues: queues,
Address: conf.String("coordinator.address"),
Middlewares: middlewares,
})
if err != nil {
return nil, errors.Wrap(err, "error creating the coordinator")
Expand Down Expand Up @@ -315,3 +318,7 @@ func RegisterDatastoreProvider(dsType string, provider datastore.Provider) {
func RegisterBrokerProvider(mqType string, provider mq.Provider) {
mqProviders[mqType] = provider
}

func RegisterMiddleware(mw middleware.MiddlewareFunc) {
middlewares = append(middlewares, mw)
}
38 changes: 38 additions & 0 deletions internal/coordinator/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"github.com/runabol/tork/datastore"
"github.com/runabol/tork/internal/httpx"

"github.com/runabol/tork/middleware"

"github.com/runabol/tork"
"github.com/runabol/tork/mq"
"gopkg.in/yaml.v3"
Expand All @@ -34,6 +36,22 @@ type api struct {
ds datastore.Datastore
}

type apiContext struct {
ctx echo.Context
}

func (c *apiContext) Request() *http.Request {
return c.ctx.Request()
}

func (c *apiContext) String(code int, s string) error {
return c.ctx.String(code, s)
}

func (c *apiContext) JSON(code int, data any) error {
return c.ctx.JSON(code, data)
}

func newAPI(cfg Config) *api {
r := echo.New()

Expand All @@ -45,6 +63,11 @@ func newAPI(cfg Config) *api {
},
ds: cfg.DataStore,
}

for _, m := range cfg.Middlewares {
r.Use(middlewareAdapter(m))
}

r.GET("/health", s.health)
r.GET("/tasks/:id", s.getTask)
r.GET("/queues", s.listQueues)
Expand All @@ -58,6 +81,21 @@ func newAPI(cfg Config) *api {
return s
}

func middlewareAdapter(m middleware.MiddlewareFunc) echo.MiddlewareFunc {
nextAdapter := func(next echo.HandlerFunc, ec echo.Context) middleware.HandlerFunc {
return func(c middleware.Context) error {
return next(ec)
}
}
return func(next echo.HandlerFunc) echo.HandlerFunc {
return func(ec echo.Context) error {
return m(nextAdapter(next, ec))(&apiContext{
ctx: ec,
})
}
}
}

func (s *api) health(c echo.Context) error {
return c.JSON(http.StatusOK, map[string]string{
"status": "UP",
Expand Down
77 changes: 77 additions & 0 deletions internal/coordinator/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/runabol/tork"
"github.com/runabol/tork/datastore"
"github.com/runabol/tork/middleware"

"github.com/runabol/tork/mq"

Expand Down Expand Up @@ -484,3 +485,79 @@ func Test_restartRunningNoMoreTasksJob(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, http.StatusBadRequest, w.Code)
}

func Test_middleware(t *testing.T) {
mw := func(next middleware.HandlerFunc) middleware.HandlerFunc {
return func(c middleware.Context) error {
if strings.HasPrefix(c.Request().URL.Path, "/middleware") {
return c.String(http.StatusOK, "OK")
}
return next(c)
}
}
b := mq.NewInMemoryBroker()
api := newAPI(Config{
DataStore: datastore.NewInMemoryDatastore(),
Broker: b,
Middlewares: []middleware.MiddlewareFunc{mw},
})
assert.NotNil(t, api)
req, err := http.NewRequest("GET", "/middleware", nil)
assert.NoError(t, err)
w := httptest.NewRecorder()
api.server.Handler.ServeHTTP(w, req)
body, err := io.ReadAll(w.Body)
assert.NoError(t, err)

assert.NoError(t, err)
assert.Equal(t, http.StatusOK, w.Code)
assert.Equal(t, "OK", string(body))
}

func Test_middlewareMultiple(t *testing.T) {
mw1 := func(next middleware.HandlerFunc) middleware.HandlerFunc {
return func(c middleware.Context) error {
if strings.HasPrefix(c.Request().URL.Path, "/middleware1") {
return c.String(http.StatusOK, "OK1")
}
return next(c)
}
}
mw2 := func(next middleware.HandlerFunc) middleware.HandlerFunc {
return func(c middleware.Context) error {
if strings.HasPrefix(c.Request().URL.Path, "/middleware2") {
return c.String(http.StatusOK, "OK2")
}
return next(c)
}
}
b := mq.NewInMemoryBroker()
api := newAPI(Config{
DataStore: datastore.NewInMemoryDatastore(),
Broker: b,
Middlewares: []middleware.MiddlewareFunc{mw1, mw2},
})
assert.NotNil(t, api)

req, err := http.NewRequest("GET", "/middleware1", nil)
assert.NoError(t, err)
w := httptest.NewRecorder()
api.server.Handler.ServeHTTP(w, req)
body, err := io.ReadAll(w.Body)
assert.NoError(t, err)

assert.NoError(t, err)
assert.Equal(t, http.StatusOK, w.Code)
assert.Equal(t, "OK1", string(body))

req, err = http.NewRequest("GET", "/middleware2", nil)
assert.NoError(t, err)
w = httptest.NewRecorder()
api.server.Handler.ServeHTTP(w, req)
body, err = io.ReadAll(w.Body)
assert.NoError(t, err)

assert.NoError(t, err)
assert.Equal(t, http.StatusOK, w.Code)
assert.Equal(t, "OK2", string(body))
}
10 changes: 6 additions & 4 deletions internal/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/runabol/tork"
"github.com/runabol/tork/datastore"
"github.com/runabol/tork/internal/eval"
"github.com/runabol/tork/middleware"

"github.com/runabol/tork/mq"

Expand All @@ -31,10 +32,11 @@ type Coordinator struct {
}

type Config struct {
Broker mq.Broker
DataStore datastore.Datastore
Address string
Queues map[string]int
Broker mq.Broker
DataStore datastore.Datastore
Address string
Queues map[string]int
Middlewares []middleware.MiddlewareFunc
}

func NewCoordinator(cfg Config) (*Coordinator, error) {
Expand Down
20 changes: 20 additions & 0 deletions middleware/middleware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package middleware

import (
"net/http"
)

type MiddlewareFunc func(next HandlerFunc) HandlerFunc

type HandlerFunc func(c Context) error

type Context interface {
// Request returns `*http.Request`.
Request() *http.Request

// String sends a string response with status code.
String(code int, s string) error

// JSON sends a JSON response with status code.
JSON(code int, data any) error
}

0 comments on commit 265d41d

Please sign in to comment.