Skip to content

Commit

Permalink
fixes restartable view
Browse files Browse the repository at this point in the history
  • Loading branch information
Diogo Behrens committed Mar 20, 2018
1 parent 984fc95 commit add7fb6
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 12 deletions.
4 changes: 2 additions & 2 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,8 @@ func WithViewClientID(clientID string) ViewOption {
}

// WithViewRestartable defines the view can be restarted, even when Start()
// returns errors. If the view is restartable, the client must call Stop() to
// release all resources.
// returns errors. If the view is restartable, the client must call Terminate()
// to release all resources, ie, close the local storage.
func WithViewRestartable() ViewOption {
return func(o *voptions) {
o.restartable = true
Expand Down
2 changes: 1 addition & 1 deletion processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ func (g *Processor) Start() (rerr error) {
}
return nil
})
defer func() { g.errors.Collect(v.Close()) }()
defer func() { g.errors.Collect(v.Terminate()) }()
}

// subscribe for streams
Expand Down
24 changes: 15 additions & 9 deletions view.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,16 +132,21 @@ func (v *View) Start() error {
return v.startWithContext(ctx)
}

// Stop stops the view.
func (v *View) Stop() {
if v.cancel != nil {
v.cancel()
}
}

func (v *View) startWithContext(ctx context.Context) error {
v.opts.log.Printf("view: starting")
defer v.opts.log.Printf("view: stopped")

if err := v.reinit(); err != nil {
return err
}

errg, ctx := multierr.NewErrGroup(ctx)
errg.Go(func() error { return v.run(ctx) })

Expand Down Expand Up @@ -172,7 +177,6 @@ func (v *View) startWithContext(ctx context.Context) error {
v.terminated = true
errs = errs.Merge(v.close())
}
v.opts.log.Printf("view: shutdown complete")

return errs.NilOrError()
}
Expand All @@ -183,13 +187,18 @@ func (v *View) close() *multierr.Errors {
for _, p := range v.partitions {
errs.Collect(p.st.Close())
}
return nil
v.partitions = nil
return errs
}

// Close closes storage partitions. Close should only be called if the view is
// restartable. Once Close is called, the view cannot be restarted anymore.
func (v *View) Close() error {
v.opts.log.Printf("View: stopping")
// Terminate closes storage partitions. Close must be called only if the view is
// restartable (see WithViewRestartable() option). Once Terminate() is called,
// the view cannot be restarted anymore.
func (v *View) Terminate() error {
if !v.opts.restartable {
return nil
}
v.opts.log.Printf("View: closing")

// do not allow any reinitialization
if v.terminated {
Expand Down Expand Up @@ -333,9 +342,6 @@ func (v *View) Evict(key string) error {
}

func (v *View) run(ctx context.Context) error {
v.opts.log.Printf("View: started")
defer v.opts.log.Printf("View: stopped")

for {
select {
case ev := <-v.consumer.Events():
Expand Down
161 changes: 161 additions & 0 deletions view_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,167 @@ func TestView_StartStopWithError(t *testing.T) {
ensure.Nil(t, err)
}

func TestView_RestartNonRestartable(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

var (
st = mock.NewMockStorage(ctrl)
sb = func(topic string, partition int32) (storage.Storage, error) {
return st, nil
}
consumer = mock.NewMockConsumer(ctrl)
tm = mock.NewMockTopicManager(ctrl)
v = createTestView(t, consumer, sb, tm)
initial = make(chan bool)
final = make(chan bool)
ch = make(chan kafka.Event)
chClose = func() { close(ch) }
initialClose = func() { close(initial) }

offset = int64(123)
par = int32(0)
)
v.opts.restartable = false

gomock.InOrder(
tm.EXPECT().Partitions(tableName(group)).Return([]int32{0}, nil),
tm.EXPECT().Close(),
consumer.EXPECT().Events().Do(initialClose).Return(ch),
)
gomock.InOrder(
st.EXPECT().Open(),
st.EXPECT().GetOffset(int64(-2)).Return(int64(123), nil),
consumer.EXPECT().AddPartition(tableName(group), int32(par), int64(offset)),
)
gomock.InOrder(
consumer.EXPECT().RemovePartition(tableName(group), int32(par)),
consumer.EXPECT().Close().Do(chClose).Return(nil),
st.EXPECT().Close(),
)

err := v.createPartitions(nil)
ensure.Nil(t, err)

go func() {
errs := v.Start()
ensure.Nil(t, errs)
close(final)
}()

err = doTimed(t, func() {
<-initial
v.Stop()
<-final
})
ensure.Nil(t, err)

// restart view
final = make(chan bool)

go func() {
err = v.Start()
ensure.NotNil(t, err)
ensure.StringContains(t, err.Error(), "terminated")
close(final)
}()

err = doTimed(t, func() {
<-final
})
ensure.Nil(t, err)

err = v.Terminate() // silent
ensure.Nil(t, err)
}

func TestView_Restart(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

var (
st = mock.NewMockStorage(ctrl)
sb = func(topic string, partition int32) (storage.Storage, error) {
return st, nil
}
consumer = mock.NewMockConsumer(ctrl)
tm = mock.NewMockTopicManager(ctrl)
v = createTestView(t, consumer, sb, tm)
initial = make(chan bool)
final = make(chan bool)
ch = make(chan kafka.Event)
chClose = func() { close(ch) }
initialClose = func() { close(initial) }

offset = int64(123)
par = int32(0)
)
v.opts.restartable = true

gomock.InOrder(
tm.EXPECT().Partitions(tableName(group)).Return([]int32{0}, nil),
tm.EXPECT().Close(),
consumer.EXPECT().Events().Do(initialClose).Return(ch),
)
gomock.InOrder(
st.EXPECT().Open(),
st.EXPECT().GetOffset(int64(-2)).Return(int64(123), nil),
consumer.EXPECT().AddPartition(tableName(group), int32(par), int64(offset)),
)
gomock.InOrder(
consumer.EXPECT().RemovePartition(tableName(group), int32(par)),
consumer.EXPECT().Close().Do(chClose).Return(nil),
)

err := v.createPartitions(nil)
ensure.Nil(t, err)

go func() {
errs := v.Start()
ensure.Nil(t, errs)
close(final)
}()

err = doTimed(t, func() {
<-initial
v.Stop()
<-final
})
ensure.Nil(t, err)

// restart view
final = make(chan bool)
initial = make(chan bool, 3)
initialPush := func() { initial <- true }
ch = make(chan kafka.Event)
chClose = func() { close(ch) }

// st.Open is not called because of openOnce in the storageProxy
st.EXPECT().GetOffset(int64(-2)).Return(int64(123), nil)
consumer.EXPECT().AddPartition(tableName(group), int32(0), int64(offset))
consumer.EXPECT().Events().Return(ch)
consumer.EXPECT().RemovePartition(tableName(group), int32(0))
consumer.EXPECT().Close().Do(chClose).Return(nil)

_ = initialPush
go func() {
err = v.Start()
ensure.Nil(t, err)
close(final)
}()
time.Sleep(2 * time.Second)

err = doTimed(t, func() {
v.Stop()
<-final
})
ensure.Nil(t, err)

st.EXPECT().Close()
err = v.Terminate()
ensure.Nil(t, err)
}

func TestView_GetErrors(t *testing.T) {
v := &View{opts: &voptions{hasher: DefaultHasher()}}
_, err := v.Get("hey")
Expand Down

0 comments on commit add7fb6

Please sign in to comment.