Skip to content

Commit

Permalink
removing codec from storage builder
Browse files Browse the repository at this point in the history
  • Loading branch information
Diogo Behrens committed Nov 1, 2017
1 parent 424d4f0 commit cd07875
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 26 deletions.
2 changes: 1 addition & 1 deletion kafkamock.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (km *KafkaMock) SetGroupTableCreator(creator func() (string, []byte)) {
// )
func (km *KafkaMock) ProcessorOptions() []ProcessorOption {
return []ProcessorOption{
WithStorageBuilder(func(topic string, partition int32, c Codec, reg metrics.Registry) (storage.Storage, error) {
WithStorageBuilder(func(topic string, partition int32, reg metrics.Registry) (storage.Storage, error) {
return km.storage, nil
}),
WithConsumer(km.consumerMock),
Expand Down
4 changes: 2 additions & 2 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type UpdateCallback func(s storage.Storage, partition int32, key string, value [

// StorageBuilder creates a local storage (a persistent cache) for a topic
// table. StorageBuilder creates one storage for each partition of the topic.
type StorageBuilder func(topic string, partition int32, codec Codec, reg metrics.Registry) (storage.Storage, error)
type StorageBuilder func(topic string, partition int32, reg metrics.Registry) (storage.Storage, error)

///////////////////////////////////////////////////////////////////////////////
// default values
Expand Down Expand Up @@ -60,7 +60,7 @@ func DefaultUpdate(s storage.Storage, partition int32, key string, value []byte)
// DefaultStorageBuilder builds a LevelDB storage with default configuration.
// The database will be stored in the given path.
func DefaultStorageBuilder(path string) StorageBuilder {
return func(topic string, partition int32, codec Codec, reg metrics.Registry) (storage.Storage, error) {
return func(topic string, partition int32, reg metrics.Registry) (storage.Storage, error) {
fp := filepath.Join(path, fmt.Sprintf("%s.%d", topic, partition))
db, err := leveldb.OpenFile(fp, nil)
if err != nil {
Expand Down
14 changes: 6 additions & 8 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,8 +457,8 @@ func (g *Processor) Stop() {
// partition management (rebalance)
///////////////////////////////////////////////////////////////////////////////

func (g *Processor) newJoinStorage(topic string, id int32, codec Codec, update UpdateCallback, reg metrics.Registry) (*storageProxy, error) {
st, err := g.opts.builders.storage(topic, id, codec, reg)
func (g *Processor) newJoinStorage(topic string, id int32, update UpdateCallback, reg metrics.Registry) (*storageProxy, error) {
st, err := g.opts.builders.storage(topic, id, reg)
if err != nil {
return nil, err
}
Expand All @@ -469,7 +469,7 @@ func (g *Processor) newJoinStorage(topic string, id int32, codec Codec, update U
}, nil
}

func (g *Processor) newStorage(topic string, id int32, codec Codec, update UpdateCallback, reg metrics.Registry) (*storageProxy, error) {
func (g *Processor) newStorage(topic string, id int32, update UpdateCallback, reg metrics.Registry) (*storageProxy, error) {
if g.isStateless() {
return &storageProxy{
Storage: storage.NewMemory(),
Expand All @@ -478,7 +478,7 @@ func (g *Processor) newStorage(topic string, id int32, codec Codec, update Updat
}, nil
}

st, err := g.opts.builders.storage(topic, id, codec, reg)
st, err := g.opts.builders.storage(topic, id, reg)
if err != nil {
return nil, err
}
Expand All @@ -504,7 +504,7 @@ func (g *Processor) createPartitionViews(id int32) error {
reg := metrics.NewPrefixedChildRegistry(g.opts.gokaRegistry,
fmt.Sprintf("%s.%d.", t.Topic(), id))

st, err := g.newJoinStorage(t.Topic(), id, t.Codec(), DefaultUpdate, reg)
st, err := g.newJoinStorage(t.Topic(), id, DefaultUpdate, reg)
if err != nil {
return fmt.Errorf("processor: error creating storage: %v", err)
}
Expand Down Expand Up @@ -542,15 +542,13 @@ func (g *Processor) createPartition(id int32) error {
}
// TODO(diogo) what name to use for stateless processors?
var groupTable string
var groupCodec Codec
if gt := g.graph.GroupTable(); gt != nil {
groupTable = gt.Topic()
groupCodec = gt.Codec()
}
reg := metrics.NewPrefixedChildRegistry(g.opts.gokaRegistry,
fmt.Sprintf("%s.%d.", groupTable, id))

st, err := g.newStorage(groupTable, id, groupCodec, g.opts.updateCallback, reg)
st, err := g.newStorage(groupTable, id, g.opts.updateCallback, reg)
if err != nil {
return fmt.Errorf("processor: error creating storage: %v", err)
}
Expand Down
16 changes: 8 additions & 8 deletions processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var (
)

func nullStorageBuilder() StorageBuilder {
return func(topic string, partition int32, codec Codec, reg metrics.Registry) (storage.Storage, error) {
return func(topic string, partition int32, reg metrics.Registry) (storage.Storage, error) {
return &storage.Null{}, nil
}
}
Expand Down Expand Up @@ -524,7 +524,7 @@ func TestProcessor_StartWithErrorBeforeRebalance(t *testing.T) {
err error
consumer = mock.NewMockConsumer(ctrl)
st = mock.NewMockStorage(ctrl)
sb = func(topic string, par int32, c Codec, r metrics.Registry) (storage.Storage, error) {
sb = func(topic string, par int32, r metrics.Registry) (storage.Storage, error) {
return st, nil
}
final = make(chan bool)
Expand Down Expand Up @@ -564,7 +564,7 @@ func TestProcessor_StartWithErrorAfterRebalance(t *testing.T) {
err error
consumer = mock.NewMockConsumer(ctrl)
st = mock.NewMockStorage(ctrl)
sb = func(topic string, par int32, c Codec, r metrics.Registry) (storage.Storage, error) {
sb = func(topic string, par int32, r metrics.Registry) (storage.Storage, error) {
return st, nil
}
final = make(chan bool)
Expand Down Expand Up @@ -645,7 +645,7 @@ func TestProcessor_StartWithTableWithErrorAfterRebalance(t *testing.T) {
err error
consumer = mock.NewMockConsumer(ctrl)
st = mock.NewMockStorage(ctrl)
sb = func(topic string, par int32, c Codec, r metrics.Registry) (storage.Storage, error) {
sb = func(topic string, par int32, r metrics.Registry) (storage.Storage, error) {
return st, nil
}
final = make(chan bool)
Expand Down Expand Up @@ -765,7 +765,7 @@ func TestProcessor_Start(t *testing.T) {
err error
consumer = mock.NewMockConsumer(ctrl)
st = mock.NewMockStorage(ctrl)
sb = func(topic string, par int32, c Codec, r metrics.Registry) (storage.Storage, error) {
sb = func(topic string, par int32, r metrics.Registry) (storage.Storage, error) {
return st, nil
}
final = make(chan bool)
Expand Down Expand Up @@ -914,7 +914,7 @@ func TestProcessor_StartWithTable(t *testing.T) {
err error
consumer = mock.NewMockConsumer(ctrl)
st = mock.NewMockStorage(ctrl)
sb = func(topic string, par int32, c Codec, r metrics.Registry) (storage.Storage, error) {
sb = func(topic string, par int32, r metrics.Registry) (storage.Storage, error) {
return st, nil
}
final = make(chan bool)
Expand Down Expand Up @@ -1038,7 +1038,7 @@ func TestProcessor_rebalanceError(t *testing.T) {
wait = make(chan bool)
ch = make(chan kafka.Event)
p = createProcessor(t, ctrl, consumer, 1,
func(topic string, partition int32, c Codec, r metrics.Registry) (storage.Storage, error) {
func(topic string, partition int32, r metrics.Registry) (storage.Storage, error) {
return nil, errors.New("some error")
})
)
Expand Down Expand Up @@ -1072,7 +1072,7 @@ func TestProcessor_HasGet(t *testing.T) {

var (
st = mock.NewMockStorage(ctrl)
sb = func(topic string, partition int32, c Codec, r metrics.Registry) (storage.Storage, error) {
sb = func(topic string, partition int32, r metrics.Registry) (storage.Storage, error) {
return st, nil
}
consumer = mock.NewMockConsumer(ctrl)
Expand Down
2 changes: 1 addition & 1 deletion view.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (v *View) createPartitions(brokers []string) (err error) {
reg := metrics.NewPrefixedChildRegistry(v.opts.gokaRegistry,
fmt.Sprintf("%s.%d.", v.topic, p))

st, err := v.opts.builders.storage(v.topic, p, v.opts.tableCodec, reg)
st, err := v.opts.builders.storage(v.topic, p, reg)
if err != nil {
// TODO(diogo): gracefully terminate all partitions
return fmt.Errorf("Error creating local storage for partition %d: %v", p, err)
Expand Down
12 changes: 6 additions & 6 deletions view_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestView_createPartitions(t *testing.T) {
var (
consumer = mock.NewMockConsumer(ctrl)
st = mock.NewMockStorage(ctrl)
sb = func(topic string, partition int32, c Codec, r metrics.Registry) (storage.Storage, error) {
sb = func(topic string, partition int32, r metrics.Registry) (storage.Storage, error) {
return st, nil
}
tm = mock.NewMockTopicManager(ctrl)
Expand All @@ -83,7 +83,7 @@ func TestView_createPartitions(t *testing.T) {
err = v.createPartitions(nil)
ensure.NotNil(t, err)

sb = func(topic string, partition int32, c Codec, r metrics.Registry) (storage.Storage, error) {
sb = func(topic string, partition int32, r metrics.Registry) (storage.Storage, error) {
return nil, errors.New("some error")
}
tm.EXPECT().Partitions(tableName(group)).Return([]int32{0, 1}, nil)
Expand All @@ -100,7 +100,7 @@ func TestView_HasGet(t *testing.T) {

var (
st = mock.NewMockStorage(ctrl)
sb = func(topic string, partition int32, c Codec, r metrics.Registry) (storage.Storage, error) {
sb = func(topic string, partition int32, r metrics.Registry) (storage.Storage, error) {
return st, nil
}
consumer = mock.NewMockConsumer(ctrl)
Expand Down Expand Up @@ -133,7 +133,7 @@ func TestView_StartStop(t *testing.T) {

var (
st = mock.NewMockStorage(ctrl)
sb = func(topic string, partition int32, c Codec, r metrics.Registry) (storage.Storage, error) {
sb = func(topic string, partition int32, r metrics.Registry) (storage.Storage, error) {
return st, nil
}
consumer = mock.NewMockConsumer(ctrl)
Expand Down Expand Up @@ -185,7 +185,7 @@ func TestView_StartStopWithError(t *testing.T) {

var (
st = mock.NewMockStorage(ctrl)
sb = func(topic string, partition int32, c Codec, r metrics.Registry) (storage.Storage, error) {
sb = func(topic string, partition int32, r metrics.Registry) (storage.Storage, error) {
return st, nil
}
consumer = mock.NewMockConsumer(ctrl)
Expand Down Expand Up @@ -235,7 +235,7 @@ func TestView_GetErrors(t *testing.T) {

var (
st = mock.NewMockStorage(ctrl)
sb = func(topic string, partition int32, c Codec, r metrics.Registry) (storage.Storage, error) {
sb = func(topic string, partition int32, r metrics.Registry) (storage.Storage, error) {
return st, nil
}
consumer = mock.NewMockConsumer(ctrl)
Expand Down

0 comments on commit cd07875

Please sign in to comment.