Skip to content

Commit

Permalink
Job listener
Browse files Browse the repository at this point in the history
  • Loading branch information
runabol committed Sep 8, 2023
1 parent 1cad9ee commit be671ae
Show file tree
Hide file tree
Showing 11 changed files with 347 additions and 68 deletions.
12 changes: 10 additions & 2 deletions input/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,25 @@ import (
)

type Job struct {
id string
Name string `json:"name,omitempty" yaml:"name,omitempty" validate:"required"`
Description string `json:"description,omitempty" yaml:"description,omitempty"`
Tasks []Task `json:"tasks,omitempty" yaml:"tasks,omitempty" validate:"required,min=1,dive"`
Inputs map[string]string `json:"inputs,omitempty" yaml:"inputs,omitempty"`
Output string `json:"output,omitempty" yaml:"output,omitempty" validate:"expr"`
}

func (ji Job) ToJob() *tork.Job {
func (ji *Job) ID() string {
if ji.id == "" {
ji.id = uuid.NewUUID()
}
return ji.id
}

func (ji *Job) ToJob() *tork.Job {
n := time.Now().UTC()
j := &tork.Job{}
j.ID = uuid.NewUUID()
j.ID = ji.ID()
j.Description = ji.Description
j.Inputs = ji.Inputs
j.Name = ji.Name
Expand Down
21 changes: 19 additions & 2 deletions internal/coordinator/api/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@ import (
"net/http"

"github.com/labstack/echo/v4"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"github.com/runabol/tork"
"github.com/runabol/tork/input"
"github.com/runabol/tork/middleware"
"github.com/runabol/tork/mq"
)

type Context struct {
Expand All @@ -31,8 +35,21 @@ func (c *Context) JSON(code int, data any) error {
return c.ctx.JSON(code, data)
}

func (c *Context) SubmitJob(j *input.Job) (*tork.Job, error) {
job, err := c.api.submitJob(c.ctx.Request().Context(), j)
func (c *Context) SubmitJob(ij *input.Job, listeners ...middleware.JobListener) (*tork.Job, error) {
if err := c.api.broker.SubscribeForEvents(c.ctx.Request().Context(), mq.TOPIC_JOB, func(ev any) {
j, ok := ev.(*tork.Job)
if !ok {
log.Error().Msg("unable to cast event to *tork.Job")
}
if ij.ID() == j.ID {
for _, listener := range listeners {
listener(j)
}
}
}); err != nil {
return nil, errors.New("error subscribing for job events")
}
job, err := c.api.submitJob(c.ctx.Request().Context(), ij)
if err != nil {
return nil, err
}
Expand Down
57 changes: 57 additions & 0 deletions internal/coordinator/api/context_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package api

import (
"context"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/labstack/echo/v4"
"github.com/runabol/tork"
"github.com/runabol/tork/datastore"
"github.com/runabol/tork/input"

"github.com/runabol/tork/mq"
"github.com/stretchr/testify/assert"
)

func TestSubmitJob(t *testing.T) {
api, err := NewAPI(Config{
DataStore: datastore.NewInMemoryDatastore(),
Broker: mq.NewInMemoryBroker(),
})
assert.NoError(t, err)

req, err := http.NewRequest("GET", "/", nil)
assert.NoError(t, err)
w := httptest.NewRecorder()

ctx := Context{api: api, ctx: echo.New().NewContext(req, w)}

called := false
listener := func(j *tork.Job) {
called = true
assert.Equal(t, tork.JobStateCompleted, j.State)
}

j, err := ctx.SubmitJob(&input.Job{
Name: "test job",
Tasks: []input.Task{
{
Name: "first task",
Image: "some:image",
},
},
}, listener)
assert.NoError(t, err)

j.State = tork.JobStateCompleted

err = api.broker.PublishEvent(context.Background(), mq.TOPIC_JOB_COMPLETED, j)
assert.NoError(t, err)

time.Sleep(time.Millisecond * 100)

assert.True(t, called)
}
75 changes: 75 additions & 0 deletions internal/wildcard/wildcard.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package wildcard

// credit: https://github.com/vodkaslime/wildcard

const C = '*'

func isWildPattern(pattern string) bool {
for i := range pattern {
c := pattern[i]
if c == C {
return true
}
}

return false
}

func Match(pattern string, s string) bool {
// Edge cases.
if pattern == string(C) {
return true
}

if pattern == "" {
return s == ""
}

// If pattern does not contain wildcard chars, just compare the strings
// to avoid extra memory allocation.
if !isWildPattern(pattern) {
return pattern == s
}

// Initialize DP.
lp := len(pattern)
ls := len(s)
dp := make([][]bool, lp+1)
for i := 0; i < lp+1; i++ {
dp[i] = make([]bool, ls+1)
}

dp[0][0] = true

for i := 0; i < lp; i++ {
if pattern[i] == C {
dp[i+1][0] = dp[i][0]
} else {
dp[i+1][0] = false
}
}

for j := 0; j < ls; j++ {
dp[0][j+1] = false
}

// Start DP.
for i := 0; i < lp; i++ {
for j := 0; j < ls; j++ {
pc := pattern[i]
sc := s[j]
switch pattern[i] {
case C:
dp[i+1][j+1] = dp[i][j] || dp[i][j+1] || dp[i+1][j]
default:
if pc == sc {
dp[i+1][j+1] = dp[i][j]
} else {
dp[i+1][j+1] = false
}
}
}
}

return dp[lp][ls]
}
74 changes: 74 additions & 0 deletions internal/wildcard/wildcard_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package wildcard

import (
"testing"

"github.com/stretchr/testify/assert"
)

type wildPatternTestCase struct {
p string
m bool
}

type matchTestCase struct {
p string
s string
m bool
}

func TestIsWildPattern(t *testing.T) {
testCases1 := []wildPatternTestCase{
{"*", true},
{"**", true},
{".", false},
{"a", false},
}

for _, tc := range testCases1 {
b := isWildPattern(tc.p)
if !assert.Equal(t, b, tc.m) {
println(tc.p, tc.m)
}
}

}

func TestMatch(t *testing.T) {

testCases1 := []matchTestCase{
{"", "", true},
{"*", "", true},
{"", "a", false},
{"abc", "abc", true},
{"abc", "ac", false},
{"abc", "abd", false},
{"a*c", "abc", true},
{"a*c", "abcbc", true},
{"a*c", "abcbd", false},
{"a*b*c", "ajkembbcldkcedc", true},
}

for _, tc := range testCases1 {
m := Match(tc.p, tc.s)
if !assert.Equal(t, m, tc.m) {
println(tc.p, tc.s, tc.m)
}

}

}

func TestMatch2(t *testing.T) {
testCases1 := []matchTestCase{
{"jobs.*", "jobs.completed", true},
{"jobs.*", "jobs.long.completed", true},
{"tasks.*", "jobs.completed", false},
{"*.completed", "jobs.completed", true},
{"*.completed.thing", "jobs.completed", false},
}
for _, tc := range testCases1 {
m := Match(tc.p, tc.s)
assert.Equal(t, m, tc.m)
}
}
4 changes: 3 additions & 1 deletion middleware/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ type MiddlewareFunc func(next HandlerFunc) HandlerFunc

type HandlerFunc func(c Context) error

type JobListener func(j *tork.Job)

type Context interface {
// Request returns `*http.Request`.
Request() *http.Request
Expand All @@ -29,5 +31,5 @@ type Context interface {
Error(code int, err error)

// SubmitJob submits a job input for processing
SubmitJob(j *input.Job) (*tork.Job, error)
SubmitJob(j *input.Job, listeners ...JobListener) (*tork.Job, error)
}
3 changes: 2 additions & 1 deletion mq/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
const (
BROKER_INMEMORY = "inmemory"
BROKER_RABBITMQ = "rabbitmq"
TOPIC_JOB = "job.*"
TOPIC_JOB_COMPLETED = "job.completed"
TOPIC_JOB_FAILED = "job.failed"
)
Expand All @@ -24,5 +25,5 @@ type Broker interface {
SubscribeForJobs(handler func(j *tork.Job) error) error
Shutdown(ctx context.Context) error
PublishEvent(ctx context.Context, topic string, event any) error
SubscribeForEvents(ctx context.Context, topic string, handler func(event any)) error
SubscribeForEvents(ctx context.Context, pattern string, handler func(event any)) error
}
13 changes: 7 additions & 6 deletions mq/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/runabol/tork"

"github.com/runabol/tork/internal/syncx"
"github.com/runabol/tork/internal/wildcard"
)

const defaultQueueSize = 1000
Expand Down Expand Up @@ -264,11 +265,11 @@ func (b *InMemoryBroker) SubscribeForEvents(ctx context.Context, topic string, h
return nil
}

func (b *InMemoryBroker) PublishEvent(ctx context.Context, topic string, event any) error {
t, ok := b.topics.Get(topic)
if !ok {
return nil
}
t.publish(event)
func (b *InMemoryBroker) PublishEvent(ctx context.Context, topicName string, event any) error {
b.topics.Iterate(func(name string, topic *topic) {
if wildcard.Match(name, topicName) {
topic.publish(event)
}
})
return nil
}
6 changes: 4 additions & 2 deletions mq/inmemory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func TestInMemorSubsribeForEvent(t *testing.T) {
ID: uuid.NewUUID(),
State: tork.JobStateCompleted,
}
err := b.SubscribeForEvents(ctx, mq.TOPIC_JOB_COMPLETED, func(event any) {
err := b.SubscribeForEvents(ctx, mq.TOPIC_JOB, func(event any) {
j2 := event.(*tork.Job)
assert.Equal(t, j1.ID, j2.ID)
processed1 = processed1 + 1
Expand All @@ -221,9 +221,11 @@ func TestInMemorSubsribeForEvent(t *testing.T) {
for i := 0; i < 10; i++ {
err = b.PublishEvent(ctx, mq.TOPIC_JOB_COMPLETED, j1)
assert.NoError(t, err)
err = b.PublishEvent(ctx, mq.TOPIC_JOB_FAILED, j1)
assert.NoError(t, err)
}
// wait for task to be processed
time.Sleep(time.Millisecond * 500)
assert.Equal(t, 10, processed1)
assert.Equal(t, 20, processed1)
assert.Equal(t, 10, processed2)
}
Loading

0 comments on commit be671ae

Please sign in to comment.