Skip to content

Commit

Permalink
store/tikv: encode keys before save to mvcc store. (pingcap#2108)
Browse files Browse the repository at this point in the history
* store/tikv: encode keys before save to mvcc store.
  • Loading branch information
disksing authored and zimulala committed Nov 30, 2016
1 parent bb203af commit d5afd1a
Show file tree
Hide file tree
Showing 12 changed files with 87 additions and 63 deletions.
3 changes: 2 additions & 1 deletion store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ func (s *testCommitterSuite) SetUpTest(c *C) {
mocktikv.BootstrapWithMultiRegions(s.cluster, []byte("a"), []byte("b"), []byte("c"))
mvccStore := mocktikv.NewMvccStore()
client := mocktikv.NewRPCClient(s.cluster, mvccStore)
store, err := newTikvStore("mock-tikv-store", mocktikv.NewPDClient(s.cluster), client, false)
pdCli := &codecPDClient{mocktikv.NewPDClient(s.cluster)}
store, err := newTikvStore("mock-tikv-store", pdCli, client, false)
c.Assert(err, IsNil)
s.store = store
}
Expand Down
3 changes: 2 additions & 1 deletion store/tikv/coprocessor_slow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ func (s *testCoprocessorSuite) TestBuildHugeTasks(c *C) {
mocktikv.BootstrapWithMultiRegions(cluster, splitKeys...)

bo := NewBackoffer(3000, context.Background())
cache := NewRegionCache(mocktikv.NewPDClient(cluster))
pdCli := &codecPDClient{mocktikv.NewPDClient(cluster)}
cache := NewRegionCache(pdCli)

const rangesPerRegion = 1e6
ranges := make([]kv.KeyRange, 0, 26*rangesPerRegion)
Expand Down
6 changes: 4 additions & 2 deletions store/tikv/coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
// <- 0 -> <- 1 -> <- 2 -> <- 3 ->
cluster := mocktikv.NewCluster()
_, regionIDs, _ := mocktikv.BootstrapWithMultiRegions(cluster, []byte("g"), []byte("n"), []byte("t"))
cache := NewRegionCache(mocktikv.NewPDClient(cluster))
pdCli := &codecPDClient{mocktikv.NewPDClient(cluster)}
cache := NewRegionCache(pdCli)

bo := NewBackoffer(3000, context.Background())

Expand Down Expand Up @@ -90,7 +91,8 @@ func (s *testCoprocessorSuite) TestRebuild(c *C) {
// <- 0 -> <- 1 ->
cluster := mocktikv.NewCluster()
storeID, regionIDs, peerIDs := mocktikv.BootstrapWithMultiRegions(cluster, []byte("m"))
cache := NewRegionCache(mocktikv.NewPDClient(cluster))
pdCli := &codecPDClient{mocktikv.NewPDClient(cluster)}
cache := NewRegionCache(pdCli)
bo := NewBackoffer(3000, context.Background())

tasks, err := buildCopTasks(bo, cache, s.buildKeyRanges("a", "z"), false)
Expand Down
3 changes: 2 additions & 1 deletion store/tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ func NewMockTikvStore() (kv.Storage, error) {
mvccStore := mocktikv.NewMvccStore()
client := mocktikv.NewRPCClient(cluster, mvccStore)
uuid := fmt.Sprintf("mock-tikv-store-:%v", time.Now().Unix())
return newTikvStore(uuid, mocktikv.NewPDClient(cluster), client, false)
pdCli := &codecPDClient{mocktikv.NewPDClient(cluster)}
return newTikvStore(uuid, pdCli, client, false)
}

func (s *tikvStore) Begin() (kv.Transaction, error) {
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/mock-tikv/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (c *Cluster) Split(regionID, newRegionID uint64, key []byte, peerIDs []uint
c.Lock()
defer c.Unlock()

newRegion := c.regions[regionID].split(newRegionID, key, peerIDs, leaderPeerID)
newRegion := c.regions[regionID].split(newRegionID, []byte(newMvccKey(key)), peerIDs, leaderPeerID)
c.regions[newRegionID] = newRegion
}

Expand Down
2 changes: 1 addition & 1 deletion store/tikv/mock-tikv/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import "fmt"
// ErrLocked is returned when trying to Read/Write on a locked key. Client should
// backoff or cleanup the lock then retry.
type ErrLocked struct {
Key []byte
Key mvccKey
Primary []byte
StartTS uint64
TTL uint64
Expand Down
53 changes: 41 additions & 12 deletions store/tikv/mock-tikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/juju/errors"
"github.com/petar/GoLLRB/llrb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/util/codec"
)

type mvccValueType int
Expand All @@ -46,12 +47,12 @@ type mvccLock struct {
}

type mvccEntry struct {
key []byte
key mvccKey
values []mvccValue
lock *mvccLock
}

func newEntry(key []byte) *mvccEntry {
func newEntry(key mvccKey) *mvccEntry {
return &mvccEntry{
key: key,
}
Expand Down Expand Up @@ -199,10 +200,10 @@ func (s *MvccStore) Get(key []byte, startTS uint64) ([]byte, error) {
s.RLock()
defer s.RUnlock()

return s.get(key, startTS)
return s.get(newMvccKey(key), startTS)
}

func (s *MvccStore) get(key []byte, startTS uint64) ([]byte, error) {
func (s *MvccStore) get(key mvccKey, startTS uint64) ([]byte, error) {
entry := s.tree.Get(newEntry(key))
if entry == nil {
return nil, nil
Expand All @@ -224,7 +225,7 @@ func (s *MvccStore) BatchGet(ks [][]byte, startTS uint64) []Pair {

var pairs []Pair
for _, k := range ks {
val, err := s.get(k, startTS)
val, err := s.get(newMvccKey(k), startTS)
if val == nil && err == nil {
continue
}
Expand All @@ -247,6 +248,9 @@ func (s *MvccStore) Scan(startKey, endKey []byte, limit int, startTS uint64) []P
s.RLock()
defer s.RUnlock()

startKey = newMvccKey(startKey)
endKey = newMvccKey(endKey)

var pairs []Pair
iterator := func(item llrb.Item) bool {
if len(pairs) >= limit {
Expand All @@ -259,7 +263,7 @@ func (s *MvccStore) Scan(startKey, endKey []byte, limit int, startTS uint64) []P
val, err := s.get(k, startTS)
if val != nil || err != nil {
pairs = append(pairs, Pair{
Key: k,
Key: k.Raw(),
Value: val,
Err: err,
})
Expand All @@ -276,6 +280,9 @@ func (s *MvccStore) ReverseScan(startKey, endKey []byte, limit int, startTS uint
s.RLock()
defer s.RUnlock()

startKey = newMvccKey(startKey)
endKey = newMvccKey(endKey)

var pairs []Pair
iterator := func(item llrb.Item) bool {
if len(pairs) >= limit {
Expand All @@ -291,7 +298,7 @@ func (s *MvccStore) ReverseScan(startKey, endKey []byte, limit int, startTS uint
val, err := s.get(k, startTS)
if val != nil || err != nil {
pairs = append(pairs, Pair{
Key: k,
Key: k.Raw(),
Value: val,
Err: err,
})
Expand Down Expand Up @@ -323,7 +330,7 @@ func (s *MvccStore) Prewrite(mutations []*kvrpcpb.Mutation, primary []byte, star

var errs []error
for _, m := range mutations {
entry := s.getOrNewEntry(m.Key)
entry := s.getOrNewEntry(newMvccKey(m.Key))
err := entry.Prewrite(m, startTS, primary, ttl)
s.submit(entry)
errs = append(errs, err)
Expand All @@ -338,7 +345,7 @@ func (s *MvccStore) Commit(keys [][]byte, startTS, commitTS uint64) error {

var ents []*mvccEntry
for _, k := range keys {
entry := s.getOrNewEntry(k)
entry := s.getOrNewEntry(newMvccKey(k))
err := entry.Commit(startTS, commitTS)
if err != nil {
return err
Expand All @@ -354,7 +361,7 @@ func (s *MvccStore) Cleanup(key []byte, startTS uint64) error {
s.Lock()
defer s.Unlock()

entry := s.getOrNewEntry(key)
entry := s.getOrNewEntry(newMvccKey(key))
err := entry.Rollback(startTS)
if err != nil {
return err
Expand All @@ -370,7 +377,7 @@ func (s *MvccStore) Rollback(keys [][]byte, startTS uint64) error {

var ents []*mvccEntry
for _, k := range keys {
entry := s.getOrNewEntry(k)
entry := s.getOrNewEntry(newMvccKey(k))
err := entry.Rollback(startTS)
if err != nil {
return err
Expand All @@ -396,7 +403,7 @@ func (s *MvccStore) ScanLock(startKey, endKey []byte, maxTS uint64) ([]*kvrpcpb.
locks = append(locks, &kvrpcpb.LockInfo{
PrimaryLock: ent.lock.primary,
LockVersion: ent.lock.startTS,
Key: ent.key,
Key: ent.key.Raw(),
})
}
return true
Expand Down Expand Up @@ -461,3 +468,25 @@ func (s *MvccStore) RawDelete(key []byte) {
defer s.Unlock()
delete(s.rawkv, string(key))
}

// On TiKV, keys are encoded before they are saved into storage engine.
type mvccKey []byte

func newMvccKey(key []byte) mvccKey {
if len(key) == 0 {
return nil
}
return codec.EncodeBytes(nil, key)
}

// Raw decodes a mvccKey to original key.
func (key mvccKey) Raw() []byte {
if len(key) == 0 {
return nil
}
_, k, err := codec.DecodeBytes(key)
if err != nil {
panic(err)
}
return k
}
61 changes: 24 additions & 37 deletions store/tikv/mock-tikv/mvcc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/util/codec"
)

func TestT(t *testing.T) {
Expand All @@ -31,24 +30,12 @@ type testMockTiKVSuite struct {

var _ = Suite(&testMockTiKVSuite{})

func encodeKey(s string) []byte {
return codec.EncodeBytes(nil, []byte(s))
}

func encodeKeys(ss []string) [][]byte {
var keys [][]byte
for _, s := range ss {
keys = append(keys, encodeKey(s))
}
return keys
}

func putMutations(kvpairs ...string) []*kvrpcpb.Mutation {
var mutations []*kvrpcpb.Mutation
for i := 0; i < len(kvpairs); i += 2 {
mutations = append(mutations, &kvrpcpb.Mutation{
Op: kvrpcpb.Op_Put,
Key: encodeKey(kvpairs[i]),
Key: []byte(kvpairs[i]),
Value: []byte(kvpairs[i+1]),
})
}
Expand All @@ -57,8 +44,8 @@ func putMutations(kvpairs ...string) []*kvrpcpb.Mutation {

func lock(key, primary string, ts uint64) *kvrpcpb.LockInfo {
return &kvrpcpb.LockInfo{
Key: encodeKey(key),
PrimaryLock: encodeKey(primary),
Key: []byte(key),
PrimaryLock: []byte(primary),
LockVersion: ts,
}
}
Expand All @@ -68,81 +55,81 @@ func (s *testMockTiKVSuite) SetUpTest(c *C) {
}

func (s *testMockTiKVSuite) mustGetNone(c *C, key string, ts uint64) {
val, err := s.store.Get(encodeKey(key), ts)
val, err := s.store.Get([]byte(key), ts)
c.Assert(err, IsNil)
c.Assert(val, IsNil)
}

func (s *testMockTiKVSuite) mustGetErr(c *C, key string, ts uint64) {
val, err := s.store.Get(encodeKey(key), ts)
val, err := s.store.Get([]byte(key), ts)
c.Assert(err, NotNil)
c.Assert(val, IsNil)
}

func (s *testMockTiKVSuite) mustGetOK(c *C, key string, ts uint64, expect string) {
val, err := s.store.Get(encodeKey(key), ts)
val, err := s.store.Get([]byte(key), ts)
c.Assert(err, IsNil)
c.Assert(string(val), Equals, expect)
}

func (s *testMockTiKVSuite) mustPutOK(c *C, key, value string, startTS, commitTS uint64) {
errs := s.store.Prewrite(putMutations(key, value), encodeKey(key), startTS, 0)
errs := s.store.Prewrite(putMutations(key, value), []byte(key), startTS, 0)
for _, err := range errs {
c.Assert(err, IsNil)
}
err := s.store.Commit([][]byte{encodeKey(key)}, startTS, commitTS)
err := s.store.Commit([][]byte{[]byte(key)}, startTS, commitTS)
c.Assert(err, IsNil)
}

func (s *testMockTiKVSuite) mustDeleteOK(c *C, key string, startTS, commitTS uint64) {
mutations := []*kvrpcpb.Mutation{
{
Op: kvrpcpb.Op_Del,
Key: encodeKey(key),
Key: []byte(key),
},
}
errs := s.store.Prewrite(mutations, encodeKey(key), startTS, 0)
errs := s.store.Prewrite(mutations, []byte(key), startTS, 0)
for _, err := range errs {
c.Assert(err, IsNil)
}
err := s.store.Commit([][]byte{encodeKey(key)}, startTS, commitTS)
err := s.store.Commit([][]byte{[]byte(key)}, startTS, commitTS)
c.Assert(err, IsNil)
}

func (s *testMockTiKVSuite) mustScanOK(c *C, start string, limit int, ts uint64, expect ...string) {
pairs := s.store.Scan(encodeKey(start), nil, limit, ts)
pairs := s.store.Scan([]byte(start), nil, limit, ts)
c.Assert(len(pairs)*2, Equals, len(expect))
for i := 0; i < len(pairs); i++ {
c.Assert(pairs[i].Err, IsNil)
c.Assert(pairs[i].Key, BytesEquals, encodeKey(expect[i*2]))
c.Assert(pairs[i].Key, BytesEquals, []byte(expect[i*2]))
c.Assert(string(pairs[i].Value), Equals, expect[i*2+1])
}
}

func (s *testMockTiKVSuite) mustPrewriteOK(c *C, mutations []*kvrpcpb.Mutation, primary string, startTS uint64) {
errs := s.store.Prewrite(mutations, encodeKey(primary), startTS, 0)
errs := s.store.Prewrite(mutations, []byte(primary), startTS, 0)
for _, err := range errs {
c.Assert(err, IsNil)
}
}

func (s *testMockTiKVSuite) mustCommitOK(c *C, keys []string, startTS, commitTS uint64) {
err := s.store.Commit(encodeKeys(keys), startTS, commitTS)
func (s *testMockTiKVSuite) mustCommitOK(c *C, keys [][]byte, startTS, commitTS uint64) {
err := s.store.Commit(keys, startTS, commitTS)
c.Assert(err, IsNil)
}

func (s *testMockTiKVSuite) mustCommitErr(c *C, keys []string, startTS, commitTS uint64) {
err := s.store.Commit(encodeKeys(keys), startTS, commitTS)
func (s *testMockTiKVSuite) mustCommitErr(c *C, keys [][]byte, startTS, commitTS uint64) {
err := s.store.Commit(keys, startTS, commitTS)
c.Assert(err, NotNil)
}

func (s *testMockTiKVSuite) mustRollbackOK(c *C, keys []string, startTS uint64) {
err := s.store.Rollback(encodeKeys(keys), startTS)
func (s *testMockTiKVSuite) mustRollbackOK(c *C, keys [][]byte, startTS uint64) {
err := s.store.Rollback(keys, startTS)
c.Assert(err, IsNil)
}

func (s *testMockTiKVSuite) mustRollbackErr(c *C, keys []string, startTS uint64) {
err := s.store.Rollback(encodeKeys(keys), startTS)
func (s *testMockTiKVSuite) mustRollbackErr(c *C, keys [][]byte, startTS uint64) {
err := s.store.Rollback(keys, startTS)
c.Assert(err, NotNil)
}

Expand Down Expand Up @@ -180,8 +167,8 @@ func (s *testMockTiKVSuite) TestCleanupRollback(c *C) {
s.mustPrewriteOK(c, putMutations("primary", "p-5", "secondary", "s-5"), "primary", 5)
s.mustGetErr(c, "secondary", 8)
s.mustGetErr(c, "secondary", 12)
s.mustCommitOK(c, []string{"primary"}, 5, 10)
s.mustRollbackErr(c, []string{"primary"}, 5)
s.mustCommitOK(c, [][]byte{[]byte("primary")}, 5, 10)
s.mustRollbackErr(c, [][]byte{[]byte("primary")}, 5)
}

func (s *testMockTiKVSuite) TestScan(c *C) {
Expand Down
Loading

0 comments on commit d5afd1a

Please sign in to comment.