Skip to content

Commit

Permalink
Merge pull request pingcap#797 from pingcap/disksing/remove-rangeget
Browse files Browse the repository at this point in the history
kv: remove RangePrefetchOnCacheMiss option and RangeGet interface
  • Loading branch information
disksing committed Dec 24, 2015
2 parents 5391b3e + e9298e4 commit 7226d00
Show file tree
Hide file tree
Showing 10 changed files with 1 addition and 178 deletions.
29 changes: 0 additions & 29 deletions kv/cache_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,6 @@ func (c *cacheSnapshot) Get(k Key) ([]byte, error) {
return nil, errors.Trace(ErrNotExist)
}
}
if IsErrNotFound(err) {
if opt, ok := c.opts.Get(RangePrefetchOnCacheMiss); ok {
if limit, ok := opt.(int); ok && limit > 0 {
vals, err2 := c.RangeGet(k, nil, limit)
if err2 != nil {
return nil, errors.Trace(err2)
}
if val, ok := vals[string(k)]; ok {
v, err = val, nil
}
}
}
}
if IsErrNotFound(err) {
v, err = c.snapshot.Get(k)
if err == nil {
Expand Down Expand Up @@ -110,22 +97,6 @@ func (c *cacheSnapshot) BatchGet(keys []Key) (map[string][]byte, error) {
return m, nil
}

// RangeGet gets values from snapshot and saves them in cache.
// The range should be [start, end] as Snapshot.RangeGet() indicated.
func (c *cacheSnapshot) RangeGet(start, end Key, limit int) (map[string][]byte, error) {
values, err := c.snapshot.RangeGet(start, end, limit)
if err != nil {
return nil, errors.Trace(err)
}
for k, v := range values {
err = cachePut(c.cache, []byte(k), v)
if err != nil {
return nil, errors.Trace(err)
}
}
return values, nil
}

// Seek creates an iterator of snapshot.
func (c *cacheSnapshot) Seek(k Key) (Iterator, error) {
cacheIter, err := c.cache.Seek(k)
Expand Down
42 changes: 0 additions & 42 deletions kv/cache_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,24 +81,6 @@ func (s *testCacheSnapshotSuite) TestBatchGet(c *C) {
c.Assert(exist, IsFalse)
}

func (s *testCacheSnapshotSuite) TestRangeGet(c *C) {
s.store.Set([]byte("1"), []byte("1"))
s.store.Set([]byte("2"), []byte("2"))
s.store.Set([]byte("3"), []byte("3"))

m, err := s.cache.RangeGet([]byte("1"), []byte("2"), 100)
c.Assert(err, IsNil)
c.Assert(m, HasLen, 2)
c.Assert(m["1"], BytesEquals, []byte("1"))
c.Assert(m["2"], BytesEquals, []byte("2"))

// result should be saved in cache
s.store.Set([]byte("1"), []byte("4"))
v, err := s.cache.Get([]byte("1"))
c.Assert(err, IsNil)
c.Assert(v, BytesEquals, []byte("1"))
}

type mockSnapshot struct {
store MemBuffer
}
Expand All @@ -122,30 +104,6 @@ func (s *mockSnapshot) BatchGet(keys []Key) (map[string][]byte, error) {
return m, nil
}

func (s *mockSnapshot) RangeGet(start, end Key, limit int) (map[string][]byte, error) {
m := make(map[string][]byte)
it, err := s.Seek(start)
if err != nil {
return nil, errors.Trace(err)
}
defer it.Close()
endKey := string(end)
for i := 0; i < limit; i++ {
if !it.Valid() {
break
}
if it.Key() > endKey {
break
}
m[string(it.Key())] = it.Value()
err := it.Next()
if err != nil {
return nil, err
}
}
return m, nil
}

func (s *mockSnapshot) Seek(k Key) (Iterator, error) {
return s.store.Seek(k)
}
Expand Down
14 changes: 1 addition & 13 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,11 @@
package kv

const (
// RangePrefetchOnCacheMiss directives that when dealing with a Get operation but failing to read data from cache,
// it will launch a RangePrefetch to underlying storage instead of Get. The range starts from requested key and
// has a limit of the option value. The feature is disabled if option value <= 0 or value type is not int.
// This option is particularly useful when we have to do sequential Gets, e.g. table scans.
RangePrefetchOnCacheMiss Option = iota + 1

// PresumeKeyNotExists directives that when dealing with a Get operation but failing to read data from cache,
// we presume that the key does not exist in Store. The actual existence will be checked before the
// transaction's commit.
// This option is an optimization for frequent checks during a transaction, e.g. batch inserts.
PresumeKeyNotExists
PresumeKeyNotExists Option = iota + 1
)

// Retriever is the interface wraps the basic Get and Seek methods.
Expand Down Expand Up @@ -66,9 +60,6 @@ type Transaction interface {
RetrieverMutator
// BatchPrefetch fetches values from KV storage to cache for later use.
BatchPrefetch(keys []Key) error
// RangePrefetch fetches values in the range [start, end] from KV storage
// to cache for later use. Maximum number of values is up to limit.
RangePrefetch(start, end Key, limit int) error
// Commit commits the transaction operations to KV store.
Commit() error
// Rollback undoes the transaction operations to KV store.
Expand All @@ -89,9 +80,6 @@ type Snapshot interface {
Retriever
// BatchGet gets a batch of values from snapshot.
BatchGet(keys []Key) (map[string][]byte, error)
// RangeGet gets values in the range [start, end] from snapshot. Maximum
// number of values is up to limit.
RangeGet(start, end Key, limit int) (map[string][]byte, error)
// Release releases the snapshot to store.
Release()
}
Expand Down
9 changes: 0 additions & 9 deletions kv/union_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ type UnionStore interface {
CheckLazyConditionPairs() error
// BatchPrefetch fetches values from KV storage to cache for later use.
BatchPrefetch(keys []Key) error
// RangePrefetch fetches values in the range [start, end] from KV storage
// to cache for later use. Maximum number of values is up to limit.
RangePrefetch(start, end Key, limit int) error
// WalkBuffer iterates all buffered kv pairs.
WalkBuffer(f func(k Key, v []byte) error) error
// SetOption sets an option with a value, when val is nil, uses the default
Expand Down Expand Up @@ -133,12 +130,6 @@ func (us *unionStore) BatchPrefetch(keys []Key) error {
return errors.Trace(err)
}

// RangePrefetch implements the UnionStore interface.
func (us *unionStore) RangePrefetch(start, end Key, limit int) error {
_, err := us.snapshot.RangeGet(start, end, limit)
return errors.Trace(err)
}

// CheckLazyConditionPairs implements the UnionStore interface.
func (us *unionStore) CheckLazyConditionPairs() error {
var keys []Key
Expand Down
10 changes: 0 additions & 10 deletions plan/plans/from.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,16 +301,6 @@ func (r *TableDefaultPlan) Next(ctx context.Context) (row *plan.Row, err error)
return nil, errors.Trace(err)
}

txn, err := ctx.GetTxn(false)
if err != nil {
return nil, errors.Trace(err)
}
// It is very likely that we will fetch rows after current row later, enable the RangePrefetchOnCacheMiss
// option may help reducing RPC calls.
// TODO: choose a wiser option value.
txn.SetOption(kv.RangePrefetchOnCacheMiss, nil)
defer txn.DelOption(kv.RangePrefetchOnCacheMiss)

// TODO: we could just fetch mentioned columns' values
row = &plan.Row{}
row.Data, err = r.T.Row(ctx, handle)
Expand Down
8 changes: 0 additions & 8 deletions plan/plans/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,15 +385,7 @@ func (r *indexPlan) Next(ctx context.Context) (*plan.Row, error) {
}
var row *plan.Row

txn, err := ctx.GetTxn(false)
if err != nil {
return nil, errors.Trace(err)
}
// Because we're in a range query, we can prefetch next few lines
// in one RPC call fill the cache, for reducing RPC calls.
txn.SetOption(kv.RangePrefetchOnCacheMiss, nil)
row, err = r.lookupRow(ctx, h)
txn.DelOption(kv.RangePrefetchOnCacheMiss)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
21 changes: 0 additions & 21 deletions store/hbase/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,27 +75,6 @@ func (s *hbaseSnapshot) BatchGet(keys []kv.Key) (map[string][]byte, error) {
return m, nil
}

// RangeGet implements kv.Snapshot.RangeGet interface.
// The range should be [start, end] as Snapshot.RangeGet() indicated.
func (s *hbaseSnapshot) RangeGet(start, end kv.Key, limit int) (map[string][]byte, error) {
scanner := s.txn.GetScanner([]byte(s.storeName), start, end, limit)
defer scanner.Close()

m := make(map[string][]byte)
for i := 0; i < limit; i++ {
r := scanner.Next()
if r != nil && len(r.Columns) > 0 {
k := string(r.Row)
v := r.Columns[hbaseFmlAndQual].Value
m[k] = v
} else {
break
}
}

return m, nil
}

func internalGet(s *hbaseSnapshot, g *hbase.Get) ([]byte, error) {
r, err := s.txn.Get(s.storeName, g)
if err != nil {
Expand Down
18 changes: 0 additions & 18 deletions store/hbase/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,6 @@ var (
_ kv.Transaction = (*hbaseTxn)(nil)
)

var (
// default values for txn.SetOption
optionDefaultVals = map[kv.Option]interface{}{
kv.RangePrefetchOnCacheMiss: 1024,
}
)

func getOptionDefaultVal(opt kv.Option) interface{} {
if v, ok := optionDefaultVals[opt]; ok {
return v
}
return nil
}

// dbTxn implements kv.Transacton. It is not thread safe.
type hbaseTxn struct {
us kv.UnionStore
Expand Down Expand Up @@ -92,10 +78,6 @@ func (txn *hbaseTxn) BatchPrefetch(keys []kv.Key) error {
return txn.us.BatchPrefetch(keys)
}

func (txn *hbaseTxn) RangePrefetch(start, end kv.Key, limit int) error {
return txn.us.RangePrefetch(start, end, limit)
}

func (txn *hbaseTxn) SetOption(opt kv.Option, val interface{}) {
txn.us.SetOption(opt, val)
}
Expand Down
24 changes: 0 additions & 24 deletions store/localstore/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,30 +120,6 @@ func (s *dbSnapshot) BatchGet(keys []kv.Key) (map[string][]byte, error) {
return m, nil
}

func (s *dbSnapshot) RangeGet(start, end kv.Key, limit int) (map[string][]byte, error) {
m := make(map[string][]byte)
it, err := s.Seek(start)
if err != nil {
return nil, errors.Trace(err)
}
defer it.Close()
endKey := string(end)
for i := 0; i < limit; i++ {
if !it.Valid() {
break
}
if it.Key() > endKey {
break
}
m[it.Key()] = it.Value()
err := it.Next()
if err != nil {
return nil, err
}
}
return m, nil
}

func (s *dbSnapshot) Seek(k kv.Key) (kv.Iterator, error) {
it, err := newDBIter(s, k)
return it, errors.Trace(err)
Expand Down
4 changes: 0 additions & 4 deletions store/localstore/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,6 @@ func (txn *dbTxn) BatchPrefetch(keys []kv.Key) error {
return txn.us.BatchPrefetch(keys)
}

func (txn *dbTxn) RangePrefetch(start, end kv.Key, limit int) error {
return txn.us.RangePrefetch(start, end, limit)
}

func (txn *dbTxn) SetOption(opt kv.Option, val interface{}) {
txn.us.SetOption(opt, val)
}
Expand Down

0 comments on commit 7226d00

Please sign in to comment.