Skip to content

Commit

Permalink
kv, stmt: add kv.Option, optimize table scan
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing committed Nov 16, 2015
1 parent 9d4230f commit e48a462
Show file tree
Hide file tree
Showing 11 changed files with 95 additions and 9 deletions.
17 changes: 16 additions & 1 deletion kv/cache_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,34 @@ var _ Snapshot = (*cacheSnapshot)(nil)
type cacheSnapshot struct {
cache MemBuffer
snapshot Snapshot
opts Options
}

// NewCacheSnapshot creates a new snapshot with cache embedded.
func NewCacheSnapshot(snapshot Snapshot) Snapshot {
func NewCacheSnapshot(snapshot Snapshot, opts Options) Snapshot {
return &cacheSnapshot{
cache: p.Get().(MemBuffer),
snapshot: snapshot,
opts: opts,
}
}

// Get gets value from snapshot and saves it in cache.
func (c *cacheSnapshot) Get(k Key) ([]byte, error) {
v, err := c.cache.Get(k)
if IsErrNotFound(err) {
if opt, ok := c.opts.Get(RangePrefetchOnCacheMiss); ok {
if limit, ok := opt.(int); ok && limit > 0 {
vals, err2 := c.RangeGet(k, nil, limit)
if err2 != nil {
return nil, errors.Trace(err2)
}
if val, ok := vals[string(k)]; ok {
v, err = val, nil
}
}
}
}
if IsErrNotFound(err) {
v, err = c.snapshot.Get(k)
if err == nil {
Expand Down
8 changes: 7 additions & 1 deletion kv/cache_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type testCacheSnapshotSuite struct {

func (s *testCacheSnapshotSuite) SetUpTest(c *C) {
s.store = NewMemDbBuffer()
s.cache = NewCacheSnapshot(&mockSnapshot{s.store})
s.cache = NewCacheSnapshot(&mockSnapshot{s.store}, &mockOptions{})
}

func (s *testCacheSnapshotSuite) TearDownTest(c *C) {
Expand Down Expand Up @@ -147,3 +147,9 @@ func (s *mockSnapshot) NewIterator(param interface{}) Iterator {
func (s *mockSnapshot) Release() {
s.store.Release()
}

type mockOptions struct{}

func (opts *mockOptions) Get(opt Option) (interface{}, bool) {
return nil, false
}
21 changes: 21 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,23 @@ type EncodeFn func(raw interface{}) (interface{}, error)
// transaction is not committed.
var ErrNotCommitted = errors.New("this transaction is not committed")

// Option is used for customizing kv store's behaviors during a transaction.
type Option int

// Options is an interface of a set of options. Each option is associated with a value.
type Options interface {
// Get gets an option value.
Get(opt Option) (v interface{}, ok bool)
}

const (
// RangePrefetchOnCacheMiss directives that when dealing with a Get operation and failed to read data from cache,
// it will launch a RangePrefetch to underlying storage instead of Get. The range starts from requested key and
// has a limit of the option value. The feature is disabled if option value <= 0 or value type is not int.
// This option is particularly useful when we have to do sequential Gets, e.g. table scans.
RangePrefetchOnCacheMiss Option = 1 + iota
)

// Transaction defines the interface for operations inside a Transaction.
// This is not thread safe.
type Transaction interface {
Expand Down Expand Up @@ -127,6 +144,10 @@ type Transaction interface {
String() string
// LockKeys tries to lock the entries with the keys in KV store.
LockKeys(keys ...Key) error
// EnableOption enables an option and bind a value to it.
EnableOption(opt Option, val interface{})
// DisableOption disables an option.
DisableOption(opt Option)
}

// MvccSnapshot is used to get/seek a specific version in a snapshot.
Expand Down
4 changes: 2 additions & 2 deletions kv/union_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ type UnionStore struct {
}

// NewUnionStore builds a new UnionStore.
func NewUnionStore(snapshot Snapshot) UnionStore {
func NewUnionStore(snapshot Snapshot, opts Options) UnionStore {
buffer := p.Get().(MemBuffer)
return UnionStore{
WBuffer: buffer,
Snapshot: NewCacheSnapshot(snapshot),
Snapshot: NewCacheSnapshot(snapshot, opts),
}
}

Expand Down
2 changes: 1 addition & 1 deletion kv/union_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type testUnionStoreSuite struct {

func (s *testUnionStoreSuite) SetUpTest(c *C) {
s.store = NewMemDbBuffer()
s.us = NewUnionStore(&mockSnapshot{s.store})
s.us = NewUnionStore(&mockSnapshot{s.store}, &mockOptions{})
}

func (s *testUnionStoreSuite) TearDownTest(c *C) {
Expand Down
10 changes: 10 additions & 0 deletions plan/plans/from.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,16 @@ func (r *TableDefaultPlan) Next(ctx context.Context) (row *plan.Row, err error)
return nil, errors.Trace(err)
}

txn, err := ctx.GetTxn(false)
if err != nil {
return nil, errors.Trace(err)
}
// It is very likely that we will fetch rows after current row later, enable the RangePrefetchOnCacheMiss may
// help reducing RPC calls.
// TODO: choose a wiser option value.
txn.EnableOption(kv.RangePrefetchOnCacheMiss, 1024)
defer txn.DisableOption(kv.RangePrefetchOnCacheMiss)

// TODO: we could just fetch mentioned columns' values
row = &plan.Row{}
row.Data, err = r.T.Row(ctx, handle)
Expand Down
1 change: 0 additions & 1 deletion store/hbase/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ func (s *hbaseStore) Begin() (kv.Transaction, error) {
hbaseCli := s.conns[time.Now().UnixNano()%hbaseConnPoolSize]
t := themis.NewTxn(hbaseCli)
txn := newHbaseTxn(t, s.storeName)
txn.UnionStore = kv.NewUnionStore(newHbaseSnapshot(t, s.storeName))
return txn, nil
}

Expand Down
20 changes: 19 additions & 1 deletion store/hbase/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,18 @@ type hbaseTxn struct {
tid uint64
valid bool
version kv.Version // commit version
opts map[kv.Option]interface{}
}

func newHbaseTxn(t *themis.Txn, storeName string) *hbaseTxn {
opts := make(map[kv.Option]interface{})
return &hbaseTxn{
Txn: t,
valid: true,
storeName: storeName,
tid: t.GetStartTS(),
UnionStore: kv.NewUnionStore(newHbaseSnapshot(t, storeName)),
UnionStore: kv.NewUnionStore(newHbaseSnapshot(t, storeName), options(opts)),
opts: opts,
}
}

Expand Down Expand Up @@ -257,3 +260,18 @@ func (txn *hbaseTxn) LockKeys(keys ...kv.Key) error {
}
return nil
}

func (txn *hbaseTxn) EnableOption(opt kv.Option, val interface{}) {
txn.opts[opt] = val
}

func (txn *hbaseTxn) DisableOption(opt kv.Option) {
delete(txn.opts, opt)
}

type options map[kv.Option]interface{}

func (opts options) Get(opt kv.Option) (interface{}, bool) {
v, ok := opts[opt]
return v, ok
}
3 changes: 2 additions & 1 deletion store/localstore/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,14 @@ func (s *dbStore) Begin() (kv.Transaction, error) {
store: s,
version: kv.MinVersion,
snapshotVals: make(map[string]struct{}),
opts: make(map[kv.Option]interface{}),
}
log.Debugf("Begin txn:%d", txn.tid)
txn.UnionStore = kv.NewUnionStore(&dbSnapshot{
store: s,
db: s.db,
version: beginVer,
})
}, options(txn.opts))
return txn, nil
}

Expand Down
2 changes: 1 addition & 1 deletion store/localstore/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (s *dbSnapshot) BatchGet(keys []kv.Key) (map[string][]byte, error) {

func (s *dbSnapshot) RangeGet(start, end kv.Key, limit int) (map[string][]byte, error) {
m := make(map[string][]byte)
it := s.NewIterator(start)
it := s.NewIterator([]byte(start))
defer it.Close()
endKey := string(end)
for i := 0; i < limit; i++ {
Expand Down
16 changes: 16 additions & 0 deletions store/localstore/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type dbTxn struct {
valid bool
version kv.Version // commit version
snapshotVals map[string]struct{} // origin version in snapshot
opts map[kv.Option]interface{}
}

func (txn *dbTxn) markOrigin(k []byte) {
Expand Down Expand Up @@ -278,3 +279,18 @@ func (txn *dbTxn) LockKeys(keys ...kv.Key) error {
}
return nil
}

func (txn *dbTxn) EnableOption(opt kv.Option, val interface{}) {
txn.opts[opt] = val
}

func (txn *dbTxn) DisableOption(opt kv.Option) {
delete(txn.opts, opt)
}

type options map[kv.Option]interface{}

func (opts options) Get(opt kv.Option) (interface{}, bool) {
v, ok := opts[opt]
return v, ok
}

0 comments on commit e48a462

Please sign in to comment.