Skip to content

Commit

Permalink
ticlient: use arrays to store mutations (pingcap#15053)
Browse files Browse the repository at this point in the history
  • Loading branch information
sticnarf authored Mar 8, 2020
1 parent 0693abc commit 15e9ea1
Show file tree
Hide file tree
Showing 9 changed files with 311 additions and 203 deletions.
355 changes: 198 additions & 157 deletions store/tikv/2pc.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion store/tikv/2pc_fail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,6 @@ func (s *testCommitterSuite) TestFailPrewriteRegionError(c *C) {
c.Assert(err, IsNil)

ctx := context.Background()
err = committer.prewriteKeys(NewBackoffer(ctx, 1000), committer.keys)
err = committer.prewriteMutations(NewBackoffer(ctx, 1000), committer.mutations)
c.Assert(err, NotNil)
}
55 changes: 34 additions & 21 deletions store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ func (s *testCommitterSuite) TestPrewriteRollback(c *C) {
"a": "a0",
"b": "b0",
})

ctx := context.Background()
txn1 := s.begin(c)
err := txn1.Set([]byte("a"), []byte("a1"))
Expand All @@ -145,15 +144,15 @@ func (s *testCommitterSuite) TestPrewriteRollback(c *C) {
c.Assert(err, IsNil)
committer, err := newTwoPhaseCommitterWithInit(txn1, 0)
c.Assert(err, IsNil)
err = committer.prewriteKeys(NewBackoffer(ctx, PrewriteMaxBackoff), committer.keys)
err = committer.prewriteMutations(NewBackoffer(ctx, PrewriteMaxBackoff), committer.mutations)
c.Assert(err, IsNil)

txn2 := s.begin(c)
v, err := txn2.Get(context.TODO(), []byte("a"))
c.Assert(err, IsNil)
c.Assert(v, BytesEquals, []byte("a0"))

err = committer.prewriteKeys(NewBackoffer(ctx, PrewriteMaxBackoff), committer.keys)
err = committer.prewriteMutations(NewBackoffer(ctx, PrewriteMaxBackoff), committer.mutations)
if err != nil {
// Retry.
txn1 = s.begin(c)
Expand All @@ -163,12 +162,12 @@ func (s *testCommitterSuite) TestPrewriteRollback(c *C) {
c.Assert(err, IsNil)
committer, err = newTwoPhaseCommitterWithInit(txn1, 0)
c.Assert(err, IsNil)
err = committer.prewriteKeys(NewBackoffer(ctx, PrewriteMaxBackoff), committer.keys)
err = committer.prewriteMutations(NewBackoffer(ctx, PrewriteMaxBackoff), committer.mutations)
c.Assert(err, IsNil)
}
committer.commitTS, err = s.store.oracle.GetTimestamp(ctx)
c.Assert(err, IsNil)
err = committer.commitKeys(NewBackoffer(ctx, CommitMaxBackoff), [][]byte{[]byte("a")})
err = committer.commitMutations(NewBackoffer(ctx, CommitMaxBackoff), committerMutations{keys: [][]byte{[]byte("a")}})
c.Assert(err, IsNil)

txn3 := s.begin(c)
Expand All @@ -189,7 +188,7 @@ func (s *testCommitterSuite) TestContextCancel(c *C) {
bo := NewBackoffer(context.Background(), PrewriteMaxBackoff)
backoffer, cancel := bo.Fork()
cancel() // cancel the context
err = committer.prewriteKeys(backoffer, committer.keys)
err = committer.prewriteMutations(backoffer, committer.mutations)
c.Assert(errors.Cause(err), Equals, context.Canceled)
}

Expand All @@ -215,7 +214,7 @@ func (s *testCommitterSuite) TestContextCancelRetryable(c *C) {
c.Assert(err, IsNil)
committer, err := newTwoPhaseCommitterWithInit(txn1, 0)
c.Assert(err, IsNil)
err = committer.prewriteKeys(NewBackoffer(context.Background(), PrewriteMaxBackoff), committer.keys)
err = committer.prewriteMutations(NewBackoffer(context.Background(), PrewriteMaxBackoff), committer.mutations)
c.Assert(err, IsNil)
// txn3 writes "c"
err = txn3.Set([]byte("c"), []byte("c3"))
Expand Down Expand Up @@ -351,9 +350,9 @@ func (s *testCommitterSuite) TestCommitBeforePrewrite(c *C) {
committer, err := newTwoPhaseCommitterWithInit(txn, 0)
c.Assert(err, IsNil)
ctx := context.Background()
err = committer.cleanupKeys(NewBackoffer(ctx, cleanupMaxBackoff), committer.keys)
err = committer.cleanupMutations(NewBackoffer(ctx, cleanupMaxBackoff), committer.mutations)
c.Assert(err, IsNil)
err = committer.prewriteKeys(NewBackoffer(ctx, PrewriteMaxBackoff), committer.keys)
err = committer.prewriteMutations(NewBackoffer(ctx, PrewriteMaxBackoff), committer.mutations)
c.Assert(err, NotNil)
errMsgMustContain(c, err, "conflictCommitTS")
}
Expand Down Expand Up @@ -395,7 +394,7 @@ func (s *testCommitterSuite) TestPrewritePrimaryKeyFailed(c *C) {
ctx := context.Background()
committer, err := newTwoPhaseCommitterWithInit(txn2, 0)
c.Assert(err, IsNil)
err = committer.cleanupKeys(NewBackoffer(ctx, cleanupMaxBackoff), committer.keys)
err = committer.cleanupMutations(NewBackoffer(ctx, cleanupMaxBackoff), committer.mutations)
c.Assert(err, IsNil)

// check the data after rollback twice.
Expand Down Expand Up @@ -466,7 +465,7 @@ func (s *testCommitterSuite) TestPrewriteTxnSize(c *C) {
c.Assert(err, IsNil)

ctx := context.Background()
err = committer.prewriteKeys(NewBackoffer(ctx, PrewriteMaxBackoff), committer.keys)
err = committer.prewriteMutations(NewBackoffer(ctx, PrewriteMaxBackoff), committer.mutations)
c.Assert(err, IsNil)

// Check the written locks in the first region (50 keys)
Expand All @@ -491,11 +490,12 @@ func (s *testCommitterSuite) TestRejectCommitTS(c *C) {
bo := NewBackoffer(context.Background(), getMaxBackoff)
loc, err := s.store.regionCache.LocateKey(bo, []byte("x"))
c.Assert(err, IsNil)
batch := batchKeys{region: loc.Region, keys: [][]byte{[]byte("x")}}
mutations := make([]*kvrpcpb.Mutation, len(batch.keys))
for i, k := range batch.keys {
tmp := committer.mutations[string(k)]
mutations[i] = &tmp.Mutation
mutations := []*kvrpcpb.Mutation{
{
Op: committer.mutations.ops[0],
Key: committer.mutations.keys[0],
Value: committer.mutations.values[0],
},
}
prewrite := &kvrpcpb.PrewriteRequest{
Mutations: mutations,
Expand All @@ -512,7 +512,7 @@ func (s *testCommitterSuite) TestRejectCommitTS(c *C) {
committer.commitTS = committer.startTS + 1
// Ensure that the new commit ts is greater than minCommitTS when retry
time.Sleep(3 * time.Millisecond)
err = committer.commitKeys(bo, committer.keys)
err = committer.commitMutations(bo, committer.mutations)
c.Assert(err, IsNil)

// Use startTS+2 to read the data and get nothing.
Expand Down Expand Up @@ -540,8 +540,8 @@ func (s *testCommitterSuite) TestPessimisticPrewriteRequest(c *C) {
committer, err := newTwoPhaseCommitterWithInit(txn, 0)
c.Assert(err, IsNil)
committer.forUpdateTS = 100
var batch batchKeys
batch.keys = append(batch.keys, []byte("t1"))
var batch batchMutations
batch.mutations = committer.mutations.subRange(0, 1)
batch.region = RegionVerID{1, 1, 1}
req := committer.buildPrewriteRequest(batch, 1)
c.Assert(len(req.Prewrite().IsPessimisticLock), Greater, 0)
Expand Down Expand Up @@ -718,7 +718,7 @@ func (s *testCommitterSuite) getLockInfo(c *C, key []byte) *kvrpcpb.LockInfo {
bo := NewBackoffer(context.Background(), getMaxBackoff)
loc, err := s.store.regionCache.LocateKey(bo, key)
c.Assert(err, IsNil)
batch := batchKeys{region: loc.Region, keys: [][]byte{key}}
batch := batchMutations{region: loc.Region, mutations: committer.mutations.subRange(0, 1)}
req := committer.buildPrewriteRequest(batch, 1)
resp, err := s.store.SendReq(bo, req, loc.Region, readTimeoutShort)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -754,7 +754,7 @@ func (s *testCommitterSuite) TestPkNotFound(c *C) {
// while the secondary lock operation succeeded
bo := NewBackoffer(context.Background(), pessimisticLockMaxBackoff)
txn1.committer.ttlManager.close()
err = txn1.committer.pessimisticRollbackKeys(bo, [][]byte{k1})
err = txn1.committer.pessimisticRollbackMutations(bo, committerMutations{keys: [][]byte{k1}})
c.Assert(err, IsNil)

// Txn2 tries to lock the secondary key k2, dead loop if the left secondary lock by txn1 not resolved
Expand Down Expand Up @@ -826,3 +826,16 @@ func (s *testCommitterSuite) TestPessimisticLockPrimary(c *C) {
waitErr := <-doneCh
c.Assert(ErrLockWaitTimeout.Equal(waitErr), IsTrue)
}

func (c *twoPhaseCommitter) mutationsOfKeys(keys [][]byte) committerMutations {
var res committerMutations
for i := range c.mutations.keys {
for _, key := range keys {
if bytes.Equal(c.mutations.keys[i], key) {
res.push(c.mutations.ops[i], c.mutations.keys[i], c.mutations.values[i], c.mutations.isPessimisticLock[i])
break
}
}
}
return res
}
19 changes: 7 additions & 12 deletions store/tikv/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package tikv

import (
"bytes"
"context"
"fmt"
"math"
Expand Down Expand Up @@ -61,20 +60,16 @@ func (s *testLockSuite) lockKey(c *C, key, value, primaryKey, primaryValue []byt
c.Assert(err, IsNil)
tpc, err := newTwoPhaseCommitterWithInit(txn, 0)
c.Assert(err, IsNil)
if bytes.Equal(key, primaryKey) {
tpc.keys = [][]byte{primaryKey}
} else {
tpc.keys = [][]byte{primaryKey, key}
}
tpc.primaryKey = primaryKey

ctx := context.Background()
err = tpc.prewriteKeys(NewBackoffer(ctx, PrewriteMaxBackoff), tpc.keys)
err = tpc.prewriteMutations(NewBackoffer(ctx, PrewriteMaxBackoff), tpc.mutations)
c.Assert(err, IsNil)

if commitPrimary {
tpc.commitTS, err = s.store.oracle.GetTimestamp(ctx)
c.Assert(err, IsNil)
err = tpc.commitKeys(NewBackoffer(ctx, CommitMaxBackoff), [][]byte{primaryKey})
err = tpc.commitMutations(NewBackoffer(ctx, CommitMaxBackoff), tpc.mutationsOfKeys([][]byte{primaryKey}))
c.Assert(err, IsNil)
}
return txn.startTS, tpc.commitTS
Expand Down Expand Up @@ -335,7 +330,7 @@ func (s *testLockSuite) TestCheckTxnStatusNoWait(c *C) {

// Only prewrite the secondary key to simulate a concurrent prewrite case:
// prewrite secondary regions success and prewrite the primary region is pending.
err = committer.prewriteKeys(NewBackoffer(context.Background(), PrewriteMaxBackoff), [][]byte{[]byte("second")})
err = committer.prewriteMutations(NewBackoffer(context.Background(), PrewriteMaxBackoff), committer.mutationsOfKeys([][]byte{[]byte("second")}))
c.Assert(err, IsNil)

oracle := s.store.GetOracle()
Expand All @@ -352,7 +347,7 @@ func (s *testLockSuite) TestCheckTxnStatusNoWait(c *C) {

errCh := make(chan error)
go func() {
errCh <- committer.prewriteKeys(NewBackoffer(context.Background(), PrewriteMaxBackoff), [][]byte{[]byte("key")})
errCh <- committer.prewriteMutations(NewBackoffer(context.Background(), PrewriteMaxBackoff), committer.mutationsOfKeys([][]byte{[]byte("key")}))
}()

lock := &Lock{
Expand All @@ -366,7 +361,7 @@ func (s *testLockSuite) TestCheckTxnStatusNoWait(c *C) {
c.Assert(err, IsNil)
c.Assert(status.ttl, Greater, uint64(0))
c.Assert(<-errCh, IsNil)
c.Assert(committer.cleanupKeys(bo, committer.keys), IsNil)
c.Assert(committer.cleanupMutations(bo, committer.mutations), IsNil)

// Call getTxnStatusFromLock to cover TxnNotFound and retry timeout.
startTS, err := oracle.GetTimestamp(context.Background())
Expand Down Expand Up @@ -395,7 +390,7 @@ func (s *testLockSuite) prewriteTxnWithTTL(c *C, txn *tikvTxn, ttl uint64) {
elapsed := time.Since(txn.startTime) / time.Millisecond
committer.lockTTL = uint64(elapsed) + ttl
}
err = committer.prewriteKeys(NewBackoffer(context.Background(), PrewriteMaxBackoff), committer.keys)
err = committer.prewriteMutations(NewBackoffer(context.Background(), PrewriteMaxBackoff), committer.mutations)
c.Assert(err, IsNil)
}

Expand Down
8 changes: 4 additions & 4 deletions store/tikv/rawkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,9 @@ func (c *RawKVClient) DeleteRange(startKey []byte, endKey []byte) error {

// Scan queries continuous kv pairs in range [startKey, endKey), up to limit pairs.
// If endKey is empty, it means unbounded.
// If you want to exclude the startKey or include the endKey, append a '\0' to the key. For example, to scan
// If you want to exclude the startKey or include the endKey, push a '\0' to the key. For example, to scan
// (startKey, endKey], you can write:
// `Scan(append(startKey, '\0'), append(endKey, '\0'), limit)`.
// `Scan(push(startKey, '\0'), push(endKey, '\0'), limit)`.
func (c *RawKVClient) Scan(startKey, endKey []byte, limit int) (keys [][]byte, values [][]byte, err error) {
start := time.Now()
defer func() { tikvRawkvCmdHistogramWithRawScan.Observe(time.Since(start).Seconds()) }()
Expand Down Expand Up @@ -320,9 +320,9 @@ func (c *RawKVClient) Scan(startKey, endKey []byte, limit int) (keys [][]byte, v
// ReverseScan queries continuous kv pairs in range [endKey, startKey), up to limit pairs.
// Direction is different from Scan, upper to lower.
// If endKey is empty, it means unbounded.
// If you want to include the startKey or exclude the endKey, append a '\0' to the key. For example, to scan
// If you want to include the startKey or exclude the endKey, push a '\0' to the key. For example, to scan
// (endKey, startKey], you can write:
// `ReverseScan(append(startKey, '\0'), append(endKey, '\0'), limit)`.
// `ReverseScan(push(startKey, '\0'), push(endKey, '\0'), limit)`.
// It doesn't support Scanning from "", because locating the last Region is not yet implemented.
func (c *RawKVClient) ReverseScan(startKey, endKey []byte, limit int) (keys [][]byte, values [][]byte, err error) {
start := time.Now()
Expand Down
37 changes: 37 additions & 0 deletions store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,43 @@ func (c *RegionCache) GroupKeysByRegion(bo *Backoffer, keys [][]byte, filter fun
return groups, first, nil
}

type groupedMutations struct {
region RegionVerID
mutations committerMutations
}

// GroupSortedMutationsByRegion separates keys into groups by their belonging Regions.
func (c *RegionCache) GroupSortedMutationsByRegion(bo *Backoffer, m committerMutations) ([]groupedMutations, error) {
var (
groups []groupedMutations
lastLoc *KeyLocation
)
lastUpperBound := 0
for i := range m.keys {
if lastLoc == nil || !lastLoc.Contains(m.keys[i]) {
if lastLoc != nil {
groups = append(groups, groupedMutations{
region: lastLoc.Region,
mutations: m.subRange(lastUpperBound, i),
})
lastUpperBound = i
}
var err error
lastLoc, err = c.LocateKey(bo, m.keys[i])
if err != nil {
return nil, errors.Trace(err)
}
}
}
if lastLoc != nil {
groups = append(groups, groupedMutations{
region: lastLoc.Region,
mutations: m.subRange(lastUpperBound, m.len()),
})
}
return groups, nil
}

// ListRegionIDsInKeyRange lists ids of regions in [start_key,end_key].
func (c *RegionCache) ListRegionIDsInKeyRange(bo *Backoffer, startKey, endKey []byte) (regionIDs []uint64, err error) {
for {
Expand Down
24 changes: 23 additions & 1 deletion store/tikv/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,28 @@ func (s *tikvSnapshot) BatchGet(ctx context.Context, keys []kv.Key) (map[string]
return m, nil
}

type batchKeys struct {
region RegionVerID
keys [][]byte
}

// appendBatchKeysBySize appends keys to b. It may split the keys to make
// sure each batch's size does not exceed the limit.
func appendBatchKeysBySize(b []batchKeys, region RegionVerID, keys [][]byte, sizeFn func([]byte) int, limit int) []batchKeys {
var start, end int
for start = 0; start < len(keys); start = end {
var size int
for end = start; end < len(keys) && size < limit; end++ {
size += sizeFn(keys[end])
}
b = append(b, batchKeys{
region: region,
keys: keys[start:end],
})
}
return b
}

func (s *tikvSnapshot) batchGetKeysByRegions(bo *Backoffer, keys [][]byte, collectF func(k, v []byte)) error {
groups, _, err := s.store.regionCache.GroupKeysByRegion(bo, keys, nil)
if err != nil {
Expand All @@ -161,7 +183,7 @@ func (s *tikvSnapshot) batchGetKeysByRegions(bo *Backoffer, keys [][]byte, colle

var batches []batchKeys
for id, g := range groups {
batches = appendBatchBySize(batches, id, g, func([]byte) int { return 1 }, batchGetSize)
batches = appendBatchKeysBySize(batches, id, g, func([]byte) int { return 1 }, batchGetSize)
}

if len(batches) == 0 {
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (s *testSnapshotSuite) TestSkipLargeTxnLock(c *C) {
committer, err := newTwoPhaseCommitterWithInit(txn, 0)
c.Assert(err, IsNil)
committer.lockTTL = 3000
c.Assert(committer.prewriteKeys(bo, committer.keys), IsNil)
c.Assert(committer.prewriteMutations(bo, committer.mutations), IsNil)

txn1 := s.beginTxn(c)
// txn1 is not blocked by txn in the large txn protocol.
Expand All @@ -234,7 +234,7 @@ func (s *testSnapshotSuite) TestSkipLargeTxnLock(c *C) {

// Commit txn, check the final commit ts is pushed.
committer.commitTS = txn.StartTS() + 1
c.Assert(committer.commitKeys(bo, committer.keys), IsNil)
c.Assert(committer.commitMutations(bo, committer.mutations), IsNil)
status, err := s.store.lockResolver.GetTxnStatus(txn.StartTS(), 0, x)
c.Assert(err, IsNil)
c.Assert(status.IsCommitted(), IsTrue)
Expand Down
Loading

0 comments on commit 15e9ea1

Please sign in to comment.