Skip to content

Commit

Permalink
Merge pull request lovoo#31 from lovoo/snapshot-removal
Browse files Browse the repository at this point in the history
snapshot and batching removal, recovery transactions
  • Loading branch information
SamiHiltunen authored Aug 25, 2017
2 parents 3e4d6b6 + a089490 commit 6849029
Show file tree
Hide file tree
Showing 21 changed files with 494 additions and 976 deletions.
2 changes: 1 addition & 1 deletion kafkamock.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type Tester interface {
// NewKafkaMock returns a new testprocessor mocking every external service
func NewKafkaMock(t Tester, groupName Group) *KafkaMock {
kafkaMock := &KafkaMock{
storage: storage.NewMock(new(codec.Bytes)),
storage: storage.NewMemory(new(codec.Bytes)),
t: t,
incomingEvents: make(chan kafka.Event),
consumerEvents: make(chan kafka.Event),
Expand Down
10 changes: 10 additions & 0 deletions mock/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,16 @@ func (_mr *_MockStorageRecorder) Iterator() *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Iterator")
}

func (_m *MockStorage) MarkRecovered() error {
ret := _m.ctrl.Call(_m, "MarkRecovered")
ret0, _ := ret[0].(error)
return ret0
}

func (_mr *_MockStorageRecorder) MarkRecovered() *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "MarkRecovered")
}

func (_m *MockStorage) Open() error {
ret := _m.ctrl.Call(_m, "Open")
ret0, _ := ret[0].(error)
Expand Down
112 changes: 40 additions & 72 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package goka
import (
"fmt"
"path/filepath"
"time"

"github.com/lovoo/goka/kafka"
"github.com/lovoo/goka/logger"
"github.com/lovoo/goka/storage"

metrics "github.com/rcrowley/go-metrics"
"github.com/syndtr/goleveldb/leveldb"
)

// UpdateCallback is invoked upon arrival of a message for a table partition.
Expand All @@ -25,9 +25,22 @@ type StorageBuilder func(topic string, partition int32, codec Codec, reg metrics
///////////////////////////////////////////////////////////////////////////////

const (
defaultBaseStoragePath = "/tmp/goka"

defaultClientID = "goka"
)

// DefaultProcessorStoragePath is the default path where processor state
// will be stored.
func DefaultProcessorStoragePath(group Group) string {
return filepath.Join(defaultBaseStoragePath, "processor", string(group))
}

// DefaultViewStoragePath returns the default path where view state will be stored.
func DefaultViewStoragePath() string {
return filepath.Join(defaultBaseStoragePath, "view")
}

// DefaultUpdate is the default callback used to update the local storage with
// from the table topic in Kafka. It is called for every message received
// during recovery of processors and during the normal operation of views.
Expand All @@ -37,6 +50,20 @@ func DefaultUpdate(s storage.Storage, partition int32, key string, value []byte)
return s.SetEncoded(key, value)
}

// DefaultStorageBuilder builds a LevelDB storage with default configuration.
// The database will be stored in the given path.
func DefaultStorageBuilder(path string) StorageBuilder {
return func(topic string, partition int32, codec Codec, reg metrics.Registry) (storage.Storage, error) {
fp := filepath.Join(path, fmt.Sprintf("%s.%d", topic, partition))
db, err := leveldb.OpenFile(fp, nil)
if err != nil {
return nil, fmt.Errorf("error opening leveldb: %v", err)
}

return storage.New(db, codec)
}
}

type consumerBuilder func(brokers []string, group string, registry metrics.Registry) (kafka.Consumer, error)
type producerBuilder func(brokers []string, registry metrics.Registry) (kafka.Producer, error)
type topicmgrBuilder func(brokers []string) (kafka.TopicManager, error)
Expand Down Expand Up @@ -67,11 +94,9 @@ type poptions struct {
log logger.Logger
clientID string

updateCallback UpdateCallback
storagePath string
storageSnapshotInterval time.Duration
registry metrics.Registry
partitionChannelSize int
updateCallback UpdateCallback
registry metrics.Registry
partitionChannelSize int

builders struct {
storage StorageBuilder
Expand Down Expand Up @@ -141,13 +166,6 @@ func WithProducer(p kafka.Producer) ProcessorOption {
}
}

// WithStoragePath defines the base path for the local storage on disk
func WithStoragePath(storagePath string) ProcessorOption {
return func(o *poptions) {
o.storagePath = storagePath
}
}

// WithKafkaMetrics sets a go-metrics registry to collect
// kafka metrics.
// The metric-points are https://godoc.org/github.com/Shopify/sarama
Expand All @@ -166,15 +184,6 @@ func WithPartitionChannelSize(size int) ProcessorOption {
}
}

// WithStorageSnapshotInterval sets the interval in which the storage will snapshot to disk (if it is supported by the storage at all)
// Greater interval -> less writes to disk, more memory usage
// Smaller interval -> more writes to disk, less memory usage
func WithStorageSnapshotInterval(interval time.Duration) ProcessorOption {
return func(o *poptions) {
o.storageSnapshotInterval = interval
}
}

// WithLogger sets the logger the processor should use. By default, processors
// use the standard library logger.
func WithLogger(log logger.Logger) ProcessorOption {
Expand All @@ -199,9 +208,9 @@ func (opt *poptions) applyOptions(group string, opts ...ProcessorOption) error {
o(opt)
}

// config not set, use default one
// StorageBuilder should always be set as a default option in NewProcessor
if opt.builders.storage == nil {
opt.builders.storage = opt.defaultStorageBuilder
return fmt.Errorf("StorageBuilder not set")
}
if opt.builders.consumer == nil {
opt.builders.consumer = defaultConsumerBuilder
Expand All @@ -212,9 +221,6 @@ func (opt *poptions) applyOptions(group string, opts ...ProcessorOption) error {
if opt.builders.topicmgr == nil {
opt.builders.topicmgr = defaultTopicManagerBuilder
}
if opt.storageSnapshotInterval == 0 {
opt.storageSnapshotInterval = storage.DefaultStorageSnapshotInterval
}

// prefix registry
opt.gokaRegistry = metrics.NewPrefixedChildRegistry(opt.registry, fmt.Sprintf("goka.processor-%s.", group))
Expand All @@ -227,14 +233,6 @@ func (opt *poptions) applyOptions(group string, opts ...ProcessorOption) error {
return nil
}

func (opt *poptions) storagePathForPartition(topic string, partitionID int32) string {
return filepath.Join(opt.storagePath, "processor", fmt.Sprintf("%s.%d", topic, partitionID))
}

func (opt *poptions) defaultStorageBuilder(topic string, partition int32, codec Codec, reg metrics.Registry) (storage.Storage, error) {
return storage.New(opt.log, opt.storagePathForPartition(topic, partition), codec, reg, opt.storageSnapshotInterval)
}

///////////////////////////////////////////////////////////////////////////////
// view options
///////////////////////////////////////////////////////////////////////////////
Expand All @@ -243,13 +241,11 @@ func (opt *poptions) defaultStorageBuilder(topic string, partition int32, codec
type ViewOption func(*voptions)

type voptions struct {
log logger.Logger
tableCodec Codec
updateCallback UpdateCallback
storagePath string
storageSnapshotInterval time.Duration
registry metrics.Registry
partitionChannelSize int
log logger.Logger
tableCodec Codec
updateCallback UpdateCallback
registry metrics.Registry
partitionChannelSize int

builders struct {
storage StorageBuilder
Expand Down Expand Up @@ -316,22 +312,6 @@ func WithViewTopicManager(tm kafka.TopicManager) ViewOption {
}
}

// WithViewStoragePath defines the base path for the local storage on disk
func WithViewStoragePath(storagePath string) ViewOption {
return func(o *voptions) {
o.storagePath = storagePath
}
}

// WithViewStorageSnapshotInterval sets the interval in which the storage will snapshot to disk (if it is supported by the storage at all)
// Greater interval -> less writes to disk, more memory usage
// Smaller interval -> more writes to disk, less memory usage
func WithViewStorageSnapshotInterval(interval time.Duration) ViewOption {
return func(o *voptions) {
o.storageSnapshotInterval = interval
}
}

// WithViewKafkaMetrics sets a go-metrics registry to collect
// kafka metrics.
// The metric-points are https://godoc.org/github.com/Shopify/sarama
Expand All @@ -355,9 +335,9 @@ func (opt *voptions) applyOptions(topic Table, opts ...ViewOption) error {
o(opt)
}

// config not set, use default one
// StorageBuilder should always be set as a default option in NewView
if opt.builders.storage == nil {
opt.builders.storage = opt.defaultStorageBuilder
return fmt.Errorf("StorageBuilder not set")
}
if opt.builders.consumer == nil {
opt.builders.consumer = defaultConsumerBuilder
Expand All @@ -374,21 +354,9 @@ func (opt *voptions) applyOptions(topic Table, opts ...ViewOption) error {
// prefix registry
opt.gokaRegistry = metrics.NewPrefixedChildRegistry(opt.registry, fmt.Sprintf("goka.view-%s.", topic))

if opt.storageSnapshotInterval == 0 {
opt.storageSnapshotInterval = storage.DefaultStorageSnapshotInterval
}

return nil
}

func (opt *voptions) storagePathForPartition(topic string, partitionID int32) string {
return filepath.Join(opt.storagePath, "view", fmt.Sprintf("%s.%d", topic, partitionID))
}

func (opt *voptions) defaultStorageBuilder(topic string, partition int32, codec Codec, reg metrics.Registry) (storage.Storage, error) {
return storage.New(opt.log, opt.storagePathForPartition(topic, partition), codec, reg, opt.storageSnapshotInterval)
}

///////////////////////////////////////////////////////////////////////////////
// emitter options
///////////////////////////////////////////////////////////////////////////////
Expand Down
14 changes: 5 additions & 9 deletions options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package goka

import (
"fmt"
"regexp"
"testing"

"github.com/facebookgo/ensure"
Expand All @@ -10,17 +11,12 @@ import (
func newMockOptions(t *testing.T) *poptions {
opts := new(poptions)
err := opts.applyOptions("")
ensure.Err(t, err, regexp.MustCompile("StorageBuilder not set$"))

opts.builders.storage = nullStorageBuilder()
err = opts.applyOptions("")
ensure.Nil(t, err)
opts.storagePath = "/tmp/goka-test"

fmt.Printf("%+v\n", opts)
return opts
}

func TestOptions_storagePathForPartition(t *testing.T) {
topic := "test"
var id int32
opts := newMockOptions(t)
path := opts.storagePathForPartition(topic, id)
ensure.DeepEqual(t, path, fmt.Sprintf("/tmp/goka-test/processor/%s.%d", topic, id))
}
28 changes: 22 additions & 6 deletions partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ type partition struct {
readyFlag int32
initialHwm int64

st *storageProxy
proxy kafkaProxy
process processCallback
st *storageProxy
recoveredOnce sync.Once
proxy kafkaProxy
process processCallback

// metrics
registry metrics.Registry
Expand Down Expand Up @@ -89,9 +90,10 @@ func newPartition(log logger.Logger, topic string, cb processCallback, st *stora
dying: make(chan bool),
done: make(chan bool),

st: st,
proxy: proxy,
process: cb,
st: st,
recoveredOnce: sync.Once{},
proxy: proxy,
process: cb,

// metrics
registry: reg,
Expand Down Expand Up @@ -289,6 +291,13 @@ func (p *partition) load(catchup bool) error {
p.log.Printf("readyFlag was false when EOF arrived")
p.mxStatus.Update(partitionRecovered)
atomic.StoreInt32(&p.readyFlag, 1)
var err error
p.recoveredOnce.Do(func() {
err = p.st.MarkRecovered()
})
if err != nil {
return fmt.Errorf("error setting recovered: %v", err)
}
}
if catchup {
continue
Expand Down Expand Up @@ -318,6 +327,13 @@ func (p *partition) load(catchup bool) error {
p.mxStatus.Update(partitionRecovering)
} else {
p.mxStatus.Update(partitionRecovered)
var err error
p.recoveredOnce.Do(func() {
err = p.st.MarkRecovered()
})
if err != nil {
return fmt.Errorf("error setting recovered: %v", err)
}
}
case *kafka.NOP:
// don't do anything
Expand Down
6 changes: 5 additions & 1 deletion partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func newStorageProxy(st storage.Storage, id int32, update UpdateCallback) *stora

func newNullStorageProxy(id int32) *storageProxy {
return &storageProxy{
Storage: storage.NewMock(nil),
Storage: storage.NewMemory(nil),
partition: id,
stateless: true,
}
Expand Down Expand Up @@ -344,6 +344,7 @@ func TestPartition_runStatefulWithError(t *testing.T) {
st.EXPECT().Open().Return(nil),
st.EXPECT().GetOffset(int64(-2)).Return(int64(offset), nil),
proxy.EXPECT().Add(topic, offset),
st.EXPECT().MarkRecovered(),
proxy.EXPECT().Remove(topic),
proxy.EXPECT().AddGroup(),
st.EXPECT().Sync(),
Expand Down Expand Up @@ -427,6 +428,7 @@ func TestPartition_loadStateful(t *testing.T) {
st.EXPECT().SetEncoded(key, value),
st.EXPECT().SetOffset(int64(offset)).Return(nil),
st.EXPECT().Sync(),
st.EXPECT().MarkRecovered(),
proxy.EXPECT().Remove(topic),
proxy.EXPECT().AddGroup(),
st.EXPECT().Sync(),
Expand Down Expand Up @@ -629,6 +631,7 @@ func TestPartition_catchupStateful(t *testing.T) {
st.EXPECT().SetEncoded(key, value),
st.EXPECT().SetOffset(offset+1).Return(nil),
st.EXPECT().Sync(),
st.EXPECT().MarkRecovered(),
st.EXPECT().SetEncoded(key, value),
st.EXPECT().SetOffset(offset+2).Return(nil),
st.EXPECT().Sync(),
Expand Down Expand Up @@ -749,6 +752,7 @@ func TestPartition_catchupStatefulWithError(t *testing.T) {
st.EXPECT().SetEncoded(key, value),
st.EXPECT().SetOffset(offset+1).Return(nil),
st.EXPECT().Sync(),
st.EXPECT().MarkRecovered(),
proxy.EXPECT().Remove(topic),
st.EXPECT().Close().Return(nil),
proxy.EXPECT().Stop(),
Expand Down
4 changes: 2 additions & 2 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ func NewProcessor(brokers []string, gg *GroupGraph, options ...ProcessorOption)
WithRegistry(metrics.NewRegistry()),
WithLogger(logger.Default()),
WithUpdateCallback(DefaultUpdate),
WithStoragePath("/tmp/goka"),
WithPartitionChannelSize(defaultPartitionChannelSize),
WithStorageBuilder(DefaultStorageBuilder(DefaultProcessorStoragePath(gg.Group()))),
},

// user-defined options (may overwrite default ones)
Expand Down Expand Up @@ -470,7 +470,7 @@ func (g *Processor) newJoinStorage(topic string, id int32, codec Codec, update U
func (g *Processor) newStorage(topic string, id int32, codec Codec, update UpdateCallback, reg metrics.Registry) (*storageProxy, error) {
if g.isStateless() {
return &storageProxy{
Storage: storage.NewMock(codec),
Storage: storage.NewMemory(codec),
partition: id,
stateless: true,
}, nil
Expand Down
Loading

0 comments on commit 6849029

Please sign in to comment.