From 42d999f00aa59a52e25327a4b14c3f2bd03fee54 Mon Sep 17 00:00:00 2001 From: disksing Date: Thu, 24 Dec 2015 15:52:50 +0800 Subject: [PATCH 1/2] kv: remove RangePrefetchOnCacheMiss option --- kv/cache_snapshot.go | 13 ------------- kv/kv.go | 8 +------- plan/plans/from.go | 10 ---------- plan/plans/index.go | 8 -------- store/hbase/txn.go | 14 -------------- 5 files changed, 1 insertion(+), 52 deletions(-) diff --git a/kv/cache_snapshot.go b/kv/cache_snapshot.go index 8961271941e2d..ed74bdb6c58db 100644 --- a/kv/cache_snapshot.go +++ b/kv/cache_snapshot.go @@ -47,19 +47,6 @@ func (c *cacheSnapshot) Get(k Key) ([]byte, error) { return nil, errors.Trace(ErrNotExist) } } - if IsErrNotFound(err) { - if opt, ok := c.opts.Get(RangePrefetchOnCacheMiss); ok { - if limit, ok := opt.(int); ok && limit > 0 { - vals, err2 := c.RangeGet(k, nil, limit) - if err2 != nil { - return nil, errors.Trace(err2) - } - if val, ok := vals[string(k)]; ok { - v, err = val, nil - } - } - } - } if IsErrNotFound(err) { v, err = c.snapshot.Get(k) if err == nil { diff --git a/kv/kv.go b/kv/kv.go index 3299638bd6b85..e813cd771f137 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -14,17 +14,11 @@ package kv const ( - // RangePrefetchOnCacheMiss directives that when dealing with a Get operation but failing to read data from cache, - // it will launch a RangePrefetch to underlying storage instead of Get. The range starts from requested key and - // has a limit of the option value. The feature is disabled if option value <= 0 or value type is not int. - // This option is particularly useful when we have to do sequential Gets, e.g. table scans. - RangePrefetchOnCacheMiss Option = iota + 1 - // PresumeKeyNotExists directives that when dealing with a Get operation but failing to read data from cache, // we presume that the key does not exist in Store. The actual existence will be checked before the // transaction's commit. // This option is an optimization for frequent checks during a transaction, e.g. batch inserts. - PresumeKeyNotExists + PresumeKeyNotExists Option = iota + 1 ) // Retriever is the interface wraps the basic Get and Seek methods. diff --git a/plan/plans/from.go b/plan/plans/from.go index 1f3e5fddb8843..343dfe30f6a1c 100644 --- a/plan/plans/from.go +++ b/plan/plans/from.go @@ -301,16 +301,6 @@ func (r *TableDefaultPlan) Next(ctx context.Context) (row *plan.Row, err error) return nil, errors.Trace(err) } - txn, err := ctx.GetTxn(false) - if err != nil { - return nil, errors.Trace(err) - } - // It is very likely that we will fetch rows after current row later, enable the RangePrefetchOnCacheMiss - // option may help reducing RPC calls. - // TODO: choose a wiser option value. - txn.SetOption(kv.RangePrefetchOnCacheMiss, nil) - defer txn.DelOption(kv.RangePrefetchOnCacheMiss) - // TODO: we could just fetch mentioned columns' values row = &plan.Row{} row.Data, err = r.T.Row(ctx, handle) diff --git a/plan/plans/index.go b/plan/plans/index.go index 645efa44bf06c..f837cc6ab7599 100644 --- a/plan/plans/index.go +++ b/plan/plans/index.go @@ -385,15 +385,7 @@ func (r *indexPlan) Next(ctx context.Context) (*plan.Row, error) { } var row *plan.Row - txn, err := ctx.GetTxn(false) - if err != nil { - return nil, errors.Trace(err) - } - // Because we're in a range query, we can prefetch next few lines - // in one RPC call fill the cache, for reducing RPC calls. - txn.SetOption(kv.RangePrefetchOnCacheMiss, nil) row, err = r.lookupRow(ctx, h) - txn.DelOption(kv.RangePrefetchOnCacheMiss) if err != nil { return nil, errors.Trace(err) } diff --git a/store/hbase/txn.go b/store/hbase/txn.go index 17c12edd3a58e..7c90a1f4c5419 100644 --- a/store/hbase/txn.go +++ b/store/hbase/txn.go @@ -27,20 +27,6 @@ var ( _ kv.Transaction = (*hbaseTxn)(nil) ) -var ( - // default values for txn.SetOption - optionDefaultVals = map[kv.Option]interface{}{ - kv.RangePrefetchOnCacheMiss: 1024, - } -) - -func getOptionDefaultVal(opt kv.Option) interface{} { - if v, ok := optionDefaultVals[opt]; ok { - return v - } - return nil -} - // dbTxn implements kv.Transacton. It is not thread safe. type hbaseTxn struct { us kv.UnionStore From e9298e4826bbd3ec7a2aad9194bb646818235f7f Mon Sep 17 00:00:00 2001 From: disksing Date: Thu, 24 Dec 2015 15:57:51 +0800 Subject: [PATCH 2/2] kv: remove RangePrefetch and RangeGet --- kv/cache_snapshot.go | 16 -------------- kv/cache_snapshot_test.go | 42 ------------------------------------ kv/kv.go | 6 ------ kv/union_store.go | 9 -------- store/hbase/snapshot.go | 21 ------------------ store/hbase/txn.go | 4 ---- store/localstore/snapshot.go | 24 --------------------- store/localstore/txn.go | 4 ---- 8 files changed, 126 deletions(-) diff --git a/kv/cache_snapshot.go b/kv/cache_snapshot.go index ed74bdb6c58db..61bef188f4053 100644 --- a/kv/cache_snapshot.go +++ b/kv/cache_snapshot.go @@ -97,22 +97,6 @@ func (c *cacheSnapshot) BatchGet(keys []Key) (map[string][]byte, error) { return m, nil } -// RangeGet gets values from snapshot and saves them in cache. -// The range should be [start, end] as Snapshot.RangeGet() indicated. -func (c *cacheSnapshot) RangeGet(start, end Key, limit int) (map[string][]byte, error) { - values, err := c.snapshot.RangeGet(start, end, limit) - if err != nil { - return nil, errors.Trace(err) - } - for k, v := range values { - err = cachePut(c.cache, []byte(k), v) - if err != nil { - return nil, errors.Trace(err) - } - } - return values, nil -} - // Seek creates an iterator of snapshot. func (c *cacheSnapshot) Seek(k Key) (Iterator, error) { cacheIter, err := c.cache.Seek(k) diff --git a/kv/cache_snapshot_test.go b/kv/cache_snapshot_test.go index fdb3d38e88819..7cc73d0670f94 100644 --- a/kv/cache_snapshot_test.go +++ b/kv/cache_snapshot_test.go @@ -81,24 +81,6 @@ func (s *testCacheSnapshotSuite) TestBatchGet(c *C) { c.Assert(exist, IsFalse) } -func (s *testCacheSnapshotSuite) TestRangeGet(c *C) { - s.store.Set([]byte("1"), []byte("1")) - s.store.Set([]byte("2"), []byte("2")) - s.store.Set([]byte("3"), []byte("3")) - - m, err := s.cache.RangeGet([]byte("1"), []byte("2"), 100) - c.Assert(err, IsNil) - c.Assert(m, HasLen, 2) - c.Assert(m["1"], BytesEquals, []byte("1")) - c.Assert(m["2"], BytesEquals, []byte("2")) - - // result should be saved in cache - s.store.Set([]byte("1"), []byte("4")) - v, err := s.cache.Get([]byte("1")) - c.Assert(err, IsNil) - c.Assert(v, BytesEquals, []byte("1")) -} - type mockSnapshot struct { store MemBuffer } @@ -122,30 +104,6 @@ func (s *mockSnapshot) BatchGet(keys []Key) (map[string][]byte, error) { return m, nil } -func (s *mockSnapshot) RangeGet(start, end Key, limit int) (map[string][]byte, error) { - m := make(map[string][]byte) - it, err := s.Seek(start) - if err != nil { - return nil, errors.Trace(err) - } - defer it.Close() - endKey := string(end) - for i := 0; i < limit; i++ { - if !it.Valid() { - break - } - if it.Key() > endKey { - break - } - m[string(it.Key())] = it.Value() - err := it.Next() - if err != nil { - return nil, err - } - } - return m, nil -} - func (s *mockSnapshot) Seek(k Key) (Iterator, error) { return s.store.Seek(k) } diff --git a/kv/kv.go b/kv/kv.go index e813cd771f137..5db39c55af4a7 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -60,9 +60,6 @@ type Transaction interface { RetrieverMutator // BatchPrefetch fetches values from KV storage to cache for later use. BatchPrefetch(keys []Key) error - // RangePrefetch fetches values in the range [start, end] from KV storage - // to cache for later use. Maximum number of values is up to limit. - RangePrefetch(start, end Key, limit int) error // Commit commits the transaction operations to KV store. Commit() error // Rollback undoes the transaction operations to KV store. @@ -83,9 +80,6 @@ type Snapshot interface { Retriever // BatchGet gets a batch of values from snapshot. BatchGet(keys []Key) (map[string][]byte, error) - // RangeGet gets values in the range [start, end] from snapshot. Maximum - // number of values is up to limit. - RangeGet(start, end Key, limit int) (map[string][]byte, error) // Release releases the snapshot to store. Release() } diff --git a/kv/union_store.go b/kv/union_store.go index a1a2777e79cfb..c4b3bb1ab7c50 100644 --- a/kv/union_store.go +++ b/kv/union_store.go @@ -30,9 +30,6 @@ type UnionStore interface { CheckLazyConditionPairs() error // BatchPrefetch fetches values from KV storage to cache for later use. BatchPrefetch(keys []Key) error - // RangePrefetch fetches values in the range [start, end] from KV storage - // to cache for later use. Maximum number of values is up to limit. - RangePrefetch(start, end Key, limit int) error // WalkBuffer iterates all buffered kv pairs. WalkBuffer(f func(k Key, v []byte) error) error // SetOption sets an option with a value, when val is nil, uses the default @@ -133,12 +130,6 @@ func (us *unionStore) BatchPrefetch(keys []Key) error { return errors.Trace(err) } -// RangePrefetch implements the UnionStore interface. -func (us *unionStore) RangePrefetch(start, end Key, limit int) error { - _, err := us.snapshot.RangeGet(start, end, limit) - return errors.Trace(err) -} - // CheckLazyConditionPairs implements the UnionStore interface. func (us *unionStore) CheckLazyConditionPairs() error { var keys []Key diff --git a/store/hbase/snapshot.go b/store/hbase/snapshot.go index 5b529aa42fbf4..3a083d7436a28 100644 --- a/store/hbase/snapshot.go +++ b/store/hbase/snapshot.go @@ -75,27 +75,6 @@ func (s *hbaseSnapshot) BatchGet(keys []kv.Key) (map[string][]byte, error) { return m, nil } -// RangeGet implements kv.Snapshot.RangeGet interface. -// The range should be [start, end] as Snapshot.RangeGet() indicated. -func (s *hbaseSnapshot) RangeGet(start, end kv.Key, limit int) (map[string][]byte, error) { - scanner := s.txn.GetScanner([]byte(s.storeName), start, end, limit) - defer scanner.Close() - - m := make(map[string][]byte) - for i := 0; i < limit; i++ { - r := scanner.Next() - if r != nil && len(r.Columns) > 0 { - k := string(r.Row) - v := r.Columns[hbaseFmlAndQual].Value - m[k] = v - } else { - break - } - } - - return m, nil -} - func internalGet(s *hbaseSnapshot, g *hbase.Get) ([]byte, error) { r, err := s.txn.Get(s.storeName, g) if err != nil { diff --git a/store/hbase/txn.go b/store/hbase/txn.go index 7c90a1f4c5419..7a9c3bf496ac7 100644 --- a/store/hbase/txn.go +++ b/store/hbase/txn.go @@ -78,10 +78,6 @@ func (txn *hbaseTxn) BatchPrefetch(keys []kv.Key) error { return txn.us.BatchPrefetch(keys) } -func (txn *hbaseTxn) RangePrefetch(start, end kv.Key, limit int) error { - return txn.us.RangePrefetch(start, end, limit) -} - func (txn *hbaseTxn) SetOption(opt kv.Option, val interface{}) { txn.us.SetOption(opt, val) } diff --git a/store/localstore/snapshot.go b/store/localstore/snapshot.go index e1bdd24fc8c04..7ca70be0a0476 100644 --- a/store/localstore/snapshot.go +++ b/store/localstore/snapshot.go @@ -120,30 +120,6 @@ func (s *dbSnapshot) BatchGet(keys []kv.Key) (map[string][]byte, error) { return m, nil } -func (s *dbSnapshot) RangeGet(start, end kv.Key, limit int) (map[string][]byte, error) { - m := make(map[string][]byte) - it, err := s.Seek(start) - if err != nil { - return nil, errors.Trace(err) - } - defer it.Close() - endKey := string(end) - for i := 0; i < limit; i++ { - if !it.Valid() { - break - } - if it.Key() > endKey { - break - } - m[it.Key()] = it.Value() - err := it.Next() - if err != nil { - return nil, err - } - } - return m, nil -} - func (s *dbSnapshot) Seek(k kv.Key) (kv.Iterator, error) { it, err := newDBIter(s, k) return it, errors.Trace(err) diff --git a/store/localstore/txn.go b/store/localstore/txn.go index 44d19c67a42aa..4c0108042977e 100644 --- a/store/localstore/txn.go +++ b/store/localstore/txn.go @@ -78,10 +78,6 @@ func (txn *dbTxn) BatchPrefetch(keys []kv.Key) error { return txn.us.BatchPrefetch(keys) } -func (txn *dbTxn) RangePrefetch(start, end kv.Key, limit int) error { - return txn.us.RangePrefetch(start, end, limit) -} - func (txn *dbTxn) SetOption(opt kv.Option, val interface{}) { txn.us.SetOption(opt, val) }