Skip to content

Commit

Permalink
Feature: On Broker Init callback
Browse files Browse the repository at this point in the history
  • Loading branch information
runabol committed Dec 1, 2023
1 parent fdb858a commit 188d8c1
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 14 deletions.
5 changes: 5 additions & 0 deletions engine/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ func (e *Engine) initBroker() error {
if err != nil {
return err
}
for _, cb := range e.onBrokerInit {
if err := cb(broker); err != nil {
return err
}
}
e.broker = broker
return nil
}
Expand Down
4 changes: 4 additions & 0 deletions engine/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ func SubmitJob(ctx context.Context, ij *input.Job, listeners ...web.JobListener)
return defaultEngine.SubmitJob(ctx, ij, listeners...)
}

func OnBrokerInit(fn func(b mq.Broker) error) {
defaultEngine.OnBrokerInit(fn)
}

func Start() error {
return defaultEngine.Start()
}
Expand Down
36 changes: 22 additions & 14 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,21 @@ const (
)

type Engine struct {
quit chan os.Signal
terminate chan any
terminated chan any
cfg Config
state string
mu sync.Mutex
broker mq.Broker
ds datastore.Datastore
mounters map[string]*runtime.MultiMounter
runtime runtime.Runtime
coordinator *coordinator.Coordinator
worker *worker.Worker
dsProviders map[string]datastore.Provider
mqProviders map[string]mq.Provider
quit chan os.Signal
terminate chan any
terminated chan any
cfg Config
state string
mu sync.Mutex
broker mq.Broker
ds datastore.Datastore
mounters map[string]*runtime.MultiMounter
runtime runtime.Runtime
coordinator *coordinator.Coordinator
worker *worker.Worker
dsProviders map[string]datastore.Provider
mqProviders map[string]mq.Provider
onBrokerInit []func(b mq.Broker) error
}

type Config struct {
Expand Down Expand Up @@ -337,6 +338,13 @@ func (e *Engine) SubmitJob(ctx context.Context, ij *input.Job, listeners ...web.
return job.Clone(), nil
}

func (e *Engine) OnBrokerInit(fn func(b mq.Broker) error) {
e.mu.Lock()
defer e.mu.Unlock()
e.mustState(StateIdle)
e.onBrokerInit = append(e.onBrokerInit, fn)
}

func (e *Engine) awaitTerm() {
signal.Notify(e.quit, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
select {
Expand Down
21 changes: 21 additions & 0 deletions engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,3 +255,24 @@ func TestRegisterDatastoreProvider(t *testing.T) {
err = eng.Terminate()
assert.NoError(t, err)
}

func TestOnBrokerInit(t *testing.T) {
eng := New(Config{Mode: ModeStandalone})
assert.Equal(t, StateIdle, eng.state)

c := make(chan any)
eng.OnBrokerInit(func(b mq.Broker) error {
assert.NotNil(t, b)
close(c)
return nil
})

err := eng.Start()
assert.NoError(t, err)
assert.Equal(t, StateRunning, eng.state)

<-c

err = eng.Terminate()
assert.NoError(t, err)
}

0 comments on commit 188d8c1

Please sign in to comment.