Skip to content

Commit

Permalink
Merge pull request pingcap#669 from pingcap/disksing/unify-iterator
Browse files Browse the repository at this point in the history
kv: unify NewIterator and Seek
  • Loading branch information
disksing committed Dec 2, 2015
2 parents 52b13f5 + e1851c9 commit 2eb7b31
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 69 deletions.
33 changes: 23 additions & 10 deletions kv/btree_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
package kv

import (
"io"

"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/pingcap/tidb/kv/memkv"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/util/types"
)

Expand All @@ -43,11 +45,17 @@ func (b *btreeBuffer) Get(k Key) ([]byte, error) {
}

// Set associates the key with the value.
func (b *btreeBuffer) Set(k []byte, v []byte) error {
func (b *btreeBuffer) Set(k Key, v []byte) error {
b.tree.Set(toIfaces(k), toIfaces(v))
return nil
}

// Delete removes the entry from buffer with provided key.
func (b *btreeBuffer) Delete(k Key) error {
err := b.Set(k, nil)
return errors.Trace(err)
}

// Release clear the whole buffer.
func (b *btreeBuffer) Release() {
b.tree.Clear()
Expand All @@ -60,27 +68,29 @@ type btreeIter struct {
ok bool
}

// NewIterator creates a new Iterator based on the provided param
func (b *btreeBuffer) NewIterator(param interface{}) Iterator {
// Seek creates a new Iterator based on the provided key.
func (b *btreeBuffer) Seek(k Key) (Iterator, error) {
var e *memkv.Enumerator
var err error
if param == nil {
if k == nil {
e, err = b.tree.SeekFirst()
if err != nil {
return &btreeIter{ok: false}
if terror.ErrorEqual(err, io.EOF) {
return &btreeIter{ok: false}, nil
}
return &btreeIter{ok: false}, errors.Trace(err)
}
} else {
key := toIfaces(param.([]byte))
key := toIfaces([]byte(k))
e, _ = b.tree.Seek(key)
}
iter := &btreeIter{e: e}
// the initial push...
err = iter.Next()
if err != nil {
log.Error(err)
return &btreeIter{ok: false}
return &btreeIter{ok: false}, errors.Trace(err)
}
return iter
return iter, nil
}

// Close implements Iterator Close.
Expand All @@ -103,6 +113,9 @@ func (i *btreeIter) Next() error {
k, v, err := i.e.Next()
if err != nil {
i.ok = false
if terror.ErrorEqual(err, io.EOF) {
return nil
}
return errors.Trace(err)
}
i.k, i.v, i.ok = string(fromIfaces(k)), fromIfaces(v), true
Expand Down
14 changes: 11 additions & 3 deletions kv/cache_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,17 @@ func (c *cacheSnapshot) RangeGet(start, end Key, limit int) (map[string][]byte,
return values, nil
}

// NewIterator creates an iterator of snapshot.
func (c *cacheSnapshot) NewIterator(param interface{}) Iterator {
return newUnionIter(c.cache.NewIterator(param), c.snapshot.NewIterator(param))
// Seek creates an iterator of snapshot.
func (c *cacheSnapshot) Seek(k Key) (Iterator, error) {
cacheIter, err := c.cache.Seek(k)
if err != nil {
return nil, errors.Trace(err)
}
snapshotIter, err := c.snapshot.Seek(k)
if err != nil {
return nil, errors.Trace(err)
}
return newUnionIter(cacheIter, snapshotIter), nil
}

// Release reset membuffer and release snapshot.
Expand Down
9 changes: 6 additions & 3 deletions kv/cache_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,10 @@ func (s *mockSnapshot) BatchGet(keys []Key) (map[string][]byte, error) {

func (s *mockSnapshot) RangeGet(start, end Key, limit int) (map[string][]byte, error) {
m := make(map[string][]byte)
it := s.NewIterator([]byte(start))
it, err := s.Seek(start)
if err != nil {
return nil, errors.Trace(err)
}
defer it.Close()
endKey := string(end)
for i := 0; i < limit; i++ {
Expand All @@ -143,8 +146,8 @@ func (s *mockSnapshot) RangeGet(start, end Key, limit int) (map[string][]byte, e
return m, nil
}

func (s *mockSnapshot) NewIterator(param interface{}) Iterator {
return s.store.NewIterator(param)
func (s *mockSnapshot) Seek(k Key) (Iterator, error) {
return s.store.Seek(k)
}

func (s *mockSnapshot) Release() {
Expand Down
12 changes: 2 additions & 10 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,27 +201,19 @@ type MvccSnapshot interface {

// Snapshot defines the interface for the snapshot fetched from KV store.
type Snapshot interface {
// Get gets the value for key k from snapshot.
Get(k Key) ([]byte, error)
Retriever
// BatchGet gets a batch of values from snapshot.
BatchGet(keys []Key) (map[string][]byte, error)
// RangeGet gets values in the range [start, end] from snapshot. Maximum
// number of values is up to limit.
RangeGet(start, end Key, limit int) (map[string][]byte, error)
// NewIterator gets a new iterator on the snapshot.
NewIterator(param interface{}) Iterator
// Release releases the snapshot to store.
Release()
}

// MemBuffer is the interface for transaction buffer of update in a transaction
type MemBuffer interface {
// Get gets the value for key k from buffer. If not found, it returns ErrNotExist.
Get(k Key) ([]byte, error)
// Set associates key with value
Set([]byte, []byte) error
// NewIterator gets a new iterator on the buffer.
NewIterator(param interface{}) Iterator
RetrieverMutator
// Release releases the buffer.
Release()
}
Expand Down
31 changes: 21 additions & 10 deletions kv/mem_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ func valToStr(c *C, iter Iterator) string {
func checkNewIterator(c *C, buffer MemBuffer) {
for i := startIndex; i < testCount; i++ {
val := encodeInt(i * indexStep)
iter := buffer.NewIterator(val)
iter, err := buffer.Seek(val)
c.Assert(err, IsNil)
c.Assert(iter.Key(), Equals, string(val))
c.Assert(decodeInt([]byte(valToStr(c, iter))), Equals, i*indexStep)
iter.Close()
Expand All @@ -86,11 +87,12 @@ func checkNewIterator(c *C, buffer MemBuffer) {
// Test iterator Next()
for i := startIndex; i < testCount-1; i++ {
val := encodeInt(i * indexStep)
iter := buffer.NewIterator(val)
iter, err := buffer.Seek(val)
c.Assert(err, IsNil)
c.Assert(iter.Key(), Equals, string(val))
c.Assert(valToStr(c, iter), Equals, string(val))

err := iter.Next()
err = iter.Next()
c.Assert(err, IsNil)
c.Assert(iter.Valid(), IsTrue)

Expand All @@ -101,14 +103,16 @@ func checkNewIterator(c *C, buffer MemBuffer) {
}

// Non exist and beyond maximum seek test
iter := buffer.NewIterator(encodeInt(testCount * indexStep))
iter, err := buffer.Seek(encodeInt(testCount * indexStep))
c.Assert(err, IsNil)
c.Assert(iter.Valid(), IsFalse)

// Non exist but between existing keys seek test,
// it returns the smallest key that larger than the one we are seeking
inBetween := encodeInt((testCount-1)*indexStep - 1)
last := encodeInt((testCount - 1) * indexStep)
iter = buffer.NewIterator(inBetween)
iter, err = buffer.Seek(inBetween)
c.Assert(err, IsNil)
c.Assert(iter.Valid(), IsTrue)
c.Assert(iter.Key(), Not(Equals), string(inBetween))
c.Assert(iter.Key(), Equals, string(last))
Expand Down Expand Up @@ -143,7 +147,8 @@ func (s *testKVSuite) TestGetSet(c *C) {
func (s *testKVSuite) TestNewIterator(c *C) {
for _, buffer := range s.bs {
// should be invalid
iter := buffer.NewIterator(nil)
iter, err := buffer.Seek(nil)
c.Assert(err, IsNil)
c.Assert(iter.Valid(), IsFalse)

insertData(c, buffer)
Expand All @@ -154,7 +159,8 @@ func (s *testKVSuite) TestNewIterator(c *C) {

func (s *testKVSuite) TestBasicNewIterator(c *C) {
for _, buffer := range s.bs {
it := buffer.NewIterator([]byte("2"))
it, err := buffer.Seek([]byte("2"))
c.Assert(err, IsNil)
c.Assert(it.Valid(), IsFalse)
buffer.Release()
}
Expand All @@ -178,14 +184,16 @@ func (s *testKVSuite) TestNewIteratorMin(c *C) {
}

cnt := 0
it := buffer.NewIterator(nil)
it, err := buffer.Seek(nil)
c.Assert(err, IsNil)
for it.Valid() {
cnt++
it.Next()
}
c.Assert(cnt, Equals, 6)

it = buffer.NewIterator([]byte("DATA_test_main_db_tbl_tbl_test_record__00000000000000000000"))
it, err = buffer.Seek([]byte("DATA_test_main_db_tbl_tbl_test_record__00000000000000000000"))
c.Assert(err, IsNil)
c.Assert(string(it.Key()), Equals, "DATA_test_main_db_tbl_tbl_test_record__00000000000000000001")

buffer.Release()
Expand Down Expand Up @@ -296,7 +304,10 @@ func benchIterator(b *testing.B, buffer MemBuffer) {
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
iter := buffer.NewIterator(nil)
iter, err := buffer.Seek(nil)
if err != nil {
b.Error(err)
}
for iter.Valid() {
iter.Next()
}
Expand Down
19 changes: 13 additions & 6 deletions kv/memdb_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package kv

import (
"github.com/juju/errors"
"github.com/pingcap/tidb/terror"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/comparer"
Expand All @@ -37,16 +38,16 @@ func NewMemDbBuffer() MemBuffer {
return &memDbBuffer{db: memdb.New(comparer.DefaultComparer, 4*1024)}
}

// NewIterator creates an Iterator.
func (m *memDbBuffer) NewIterator(param interface{}) Iterator {
// Seek creates an Iterator.
func (m *memDbBuffer) Seek(k Key) (Iterator, error) {
var i Iterator
if param == nil {
if k == nil {
i = &memDbIter{iter: m.db.NewIterator(&util.Range{})}
} else {
i = &memDbIter{iter: m.db.NewIterator(&util.Range{Start: param.([]byte)})}
i = &memDbIter{iter: m.db.NewIterator(&util.Range{Start: []byte(k)})}
}
i.Next()
return i
return i, nil
}

// Get returns the value associated with key.
Expand All @@ -59,10 +60,16 @@ func (m *memDbBuffer) Get(k Key) ([]byte, error) {
}

// Set associates key with value.
func (m *memDbBuffer) Set(k []byte, v []byte) error {
func (m *memDbBuffer) Set(k Key, v []byte) error {
return m.db.Put(k, v)
}

// Delete removes the entry from buffer with provided key.
func (m *memDbBuffer) Delete(k Key) error {
err := m.Set(k, nil)
return errors.Trace(err)
}

// Release reset the buffer.
func (m *memDbBuffer) Release() {
m.db.Reset()
Expand Down
44 changes: 36 additions & 8 deletions kv/union_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,28 @@ func (lmb *lazyMemBuffer) Get(k Key) ([]byte, error) {
return lmb.mb.Get(k)
}

func (lmb *lazyMemBuffer) Set(key []byte, value []byte) error {
func (lmb *lazyMemBuffer) Set(key Key, value []byte) error {
if lmb.mb == nil {
lmb.mb = p.Get().(MemBuffer)
}

return lmb.mb.Set(key, value)
}

func (lmb *lazyMemBuffer) NewIterator(param interface{}) Iterator {
func (lmb *lazyMemBuffer) Delete(k Key) error {
if lmb.mb == nil {
lmb.mb = p.Get().(MemBuffer)
}

return lmb.mb.NewIterator(param)
return lmb.mb.Delete(k)
}

func (lmb *lazyMemBuffer) Seek(k Key) (Iterator, error) {
if lmb.mb == nil {
lmb.mb = p.Get().(MemBuffer)
}

return lmb.mb.Seek(k)
}

func (lmb *lazyMemBuffer) Release() {
Expand Down Expand Up @@ -166,8 +174,14 @@ func (us *unionStore) GetInt64(k Key) (int64, error) {

// Seek implements the Retriever interface.
func (us *unionStore) Seek(key Key) (Iterator, error) {
bufferIt := us.wbuffer.NewIterator([]byte(key))
cacheIt := us.snapshot.NewIterator([]byte(key))
bufferIt, err := us.wbuffer.Seek([]byte(key))
if err != nil {
return nil, errors.Trace(err)
}
cacheIt, err := us.snapshot.Seek([]byte(key))
if err != nil {
return nil, errors.Trace(err)
}
return newUnionIter(bufferIt, cacheIt), nil
}

Expand Down Expand Up @@ -199,7 +213,10 @@ func (us *unionStore) Delete(k Key) error {

// WalkWriteBuffer implements the UnionStore interface.
func (us *unionStore) WalkWriteBuffer(f func(Iterator) error) error {
iter := us.wbuffer.NewIterator(nil)
iter, err := us.wbuffer.Seek(nil)
if err != nil {
return errors.Trace(err)
}
defer iter.Close()
for ; iter.Valid(); iter.Next() {
if err := f(iter); err != nil {
Expand All @@ -224,17 +241,28 @@ func (us *unionStore) RangePrefetch(start, end Key, limit int) error {
// CheckLazyConditionPairs implements the UnionStore interface.
func (us *unionStore) CheckLazyConditionPairs() error {
var keys []Key
for it := us.lazyConditionPairs.NewIterator(nil); it.Valid(); it.Next() {
it, err := us.lazyConditionPairs.Seek(nil)
if err != nil {
return errors.Trace(err)
}
for ; it.Valid(); it.Next() {
keys = append(keys, []byte(it.Key()))
}
it.Close()

if len(keys) == 0 {
return nil
}
values, err := us.snapshot.BatchGet(keys)
if err != nil {
return errors.Trace(err)
}
for it := us.lazyConditionPairs.NewIterator(nil); it.Valid(); it.Next() {
it, err = us.lazyConditionPairs.Seek(nil)
if err != nil {
return errors.Trace(err)
}
defer it.Close()
for ; it.Valid(); it.Next() {
if len(it.Value()) == 0 {
if _, exist := values[it.Key()]; exist {
return errors.Trace(ErrKeyExists)
Expand Down
Loading

0 comments on commit 2eb7b31

Please sign in to comment.