Skip to content

Commit

Permalink
txnkv/txnlock: init Lock in txnlock package (#241)
Browse files Browse the repository at this point in the history
Signed-off-by: shirly <[email protected]>
  • Loading branch information
AndreMouche authored Jul 21, 2021
1 parent ce04aa0 commit 654864d
Show file tree
Hide file tree
Showing 17 changed files with 281 additions and 204 deletions.
21 changes: 12 additions & 9 deletions integration_tests/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ import (
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv"
"github.com/tikv/client-go/v2/txnkv/txnlock"
)

var (
Expand Down Expand Up @@ -842,9 +844,9 @@ func (s *testCommitterSuite) TestDeleteAllYourWritesWithSFU() {
s.store.ClearTxnLatches()
err = txn2.Set(k3, []byte{33})
s.Nil(err)
var meetLocks []*tikv.Lock
resolver := tikv.LockResolverProbe{LockResolver: s.store.GetLockResolver()}
resolver.SetMeetLockCallback(func(locks []*tikv.Lock) {
var meetLocks []*txnkv.Lock
resolver := tikv.NewLockResolverProb(s.store.GetLockResolver())
resolver.SetMeetLockCallback(func(locks []*txnkv.Lock) {
meetLocks = append(meetLocks, locks...)
})
err = txn2.Commit(context.Background())
Expand Down Expand Up @@ -940,9 +942,9 @@ func (s *testCommitterSuite) TestPkNotFound() {
// while the secondary lock operation succeeded.
txn1.GetCommitter().CloseTTLManager()

var status tikv.TxnStatus
var status txnkv.TxnStatus
bo := tikv.NewBackofferWithVars(ctx, 5000, nil)
lockKey2 := &tikv.Lock{
lockKey2 := &txnkv.Lock{
Key: k2,
Primary: k1,
TxnID: txn1.StartTS(),
Expand All @@ -951,7 +953,8 @@ func (s *testCommitterSuite) TestPkNotFound() {
LockType: kvrpcpb.Op_PessimisticLock,
LockForUpdateTS: txn1.StartTS(),
}
resolver := tikv.LockResolverProbe{LockResolver: s.store.GetLockResolver()}

resolver := tikv.NewLockResolverProb(s.store.GetLockResolver())
status, err = resolver.GetTxnStatusFromLock(bo, lockKey2, oracle.GoTimeToTS(time.Now().Add(200*time.Millisecond)), false)
s.Nil(err)
s.Equal(status.Action(), kvrpcpb.Action_TTLExpirePessimisticRollback)
Expand All @@ -969,7 +972,7 @@ func (s *testCommitterSuite) TestPkNotFound() {
s.Nil(err)

// Pessimistic rollback using smaller forUpdateTS does not take effect.
lockKey3 := &tikv.Lock{
lockKey3 := &txnkv.Lock{
Key: k3,
Primary: k1,
TxnID: txn1.StartTS(),
Expand Down Expand Up @@ -1201,8 +1204,8 @@ func (s *testCommitterSuite) TestResolveMixed() {
// try to resolve the left optimistic locks, use clean whole region
time.Sleep(time.Duration(atomic.LoadUint64(&tikv.ManagedLockTTL)) * time.Millisecond)
optimisticLockInfo := s.getLockInfo(optimisticLockKey)
lock := tikv.NewLock(optimisticLockInfo)
resolver := tikv.LockResolverProbe{LockResolver: s.store.GetLockResolver()}
lock := txnlock.NewLock(optimisticLockInfo)
resolver := tikv.NewLockResolverProb(s.store.GetLockResolver())
err = resolver.ResolveLock(ctx, lock)
s.Nil(err)

Expand Down
2 changes: 1 addition & 1 deletion integration_tests/async_commit_fail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (s *testAsyncCommitFailSuite) TestSecondaryListInPrimaryLock() {

primary := txn.GetCommitter().GetPrimaryKey()
bo := tikv.NewBackofferWithVars(context.Background(), 5000, nil)
lockResolver := tikv.LockResolverProbe{LockResolver: s.store.GetLockResolver()}
lockResolver := tikv.NewLockResolverProb(s.store.GetLockResolver())
txnStatus, err := lockResolver.GetTxnStatus(bo, txn.StartTS(), primary, 0, 0, false, false, nil)
s.Nil(err)
s.False(txnStatus.IsCommitted())
Expand Down
13 changes: 7 additions & 6 deletions integration_tests/async_commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ import (
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv"
"github.com/tikv/client-go/v2/txnkv/txnlock"
"github.com/tikv/client-go/v2/util"
)

Expand Down Expand Up @@ -108,7 +110,7 @@ func (s *testAsyncCommitCommon) mustGetFromTxn(txn tikv.TxnProbe, key, expectedV
s.Equal(v, expectedValue)
}

func (s *testAsyncCommitCommon) mustGetLock(key []byte) *tikv.Lock {
func (s *testAsyncCommitCommon) mustGetLock(key []byte) *txnkv.Lock {
ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
s.Nil(err)
req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{
Expand All @@ -123,8 +125,7 @@ func (s *testAsyncCommitCommon) mustGetLock(key []byte) *tikv.Lock {
s.NotNil(resp.Resp)
keyErr := resp.Resp.(*kvrpcpb.GetResponse).GetError()
s.NotNil(keyErr)
var lockutil tikv.LockProbe
lock, err := lockutil.ExtractLockFromKeyErr(keyErr)
lock, err := tikv.ExtractLockFromKeyErr(keyErr)
s.Nil(err)
return lock
}
Expand Down Expand Up @@ -247,10 +248,10 @@ func (s *testAsyncCommitSuite) TestCheckSecondaries() {
lock.UseAsyncCommit = true
ts, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
s.Nil(err)
var lockutil tikv.LockProbe
var lockutil txnlock.LockProbe
status := lockutil.NewLockStatus(nil, true, ts)

resolver := tikv.LockResolverProbe{LockResolver: s.store.GetLockResolver()}
resolver := tikv.NewLockResolverProb(s.store.GetLockResolver())
err = resolver.ResolveLockAsync(s.bo, lock, status)
s.Nil(err)
currentTS, err := s.store.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
Expand Down Expand Up @@ -319,7 +320,7 @@ func (s *testAsyncCommitSuite) TestCheckSecondaries() {
s.store.SetTiKVClient(&mock)

status = lockutil.NewLockStatus([][]byte{[]byte("a"), []byte("i")}, true, 0)
lock = &tikv.Lock{
lock = &txnkv.Lock{
Key: []byte("a"),
Primary: []byte("z"),
TxnID: ts,
Expand Down
30 changes: 16 additions & 14 deletions integration_tests/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ import (
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv"
"github.com/tikv/client-go/v2/txnkv/txnlock"
)

var getMaxBackoff = tikv.ConfigProbe{}.GetGetMaxBackoff()
Expand Down Expand Up @@ -317,13 +319,13 @@ func (s *testLockSuite) TestCheckTxnStatus() {

// Test the ResolveLocks API
lock := s.mustGetLock([]byte("second"))
timeBeforeExpire, _, err := resolver.ResolveLocks(bo, currentTS, []*tikv.Lock{lock})
timeBeforeExpire, _, err := resolver.ResolveLocks(bo, currentTS, []*txnkv.Lock{lock})
s.Nil(err)
s.True(timeBeforeExpire > int64(0))

// Force rollback the lock using lock.TTL = 0.
lock.TTL = uint64(0)
timeBeforeExpire, _, err = resolver.ResolveLocks(bo, currentTS, []*tikv.Lock{lock})
timeBeforeExpire, _, err = resolver.ResolveLocks(bo, currentTS, []*txnkv.Lock{lock})
s.Nil(err)
s.Equal(timeBeforeExpire, int64(0))

Expand Down Expand Up @@ -375,7 +377,7 @@ func (s *testLockSuite) TestCheckTxnStatusNoWait() {
errCh <- committer.PrewriteMutations(context.Background(), committer.MutationsOfKeys([][]byte{[]byte("key")}))
}()

lock := &tikv.Lock{
lock := &txnkv.Lock{
Key: []byte("second"),
Primary: []byte("key"),
TxnID: txn.StartTS(),
Expand All @@ -391,7 +393,7 @@ func (s *testLockSuite) TestCheckTxnStatusNoWait() {
// Call getTxnStatusFromLock to cover TxnNotFound and retry timeout.
startTS, err := o.GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
s.Nil(err)
lock = &tikv.Lock{
lock = &txnkv.Lock{
Key: []byte("second"),
Primary: []byte("key_not_exist"),
TxnID: startTS,
Expand Down Expand Up @@ -419,7 +421,7 @@ func (s *testLockSuite) prewriteTxnWithTTL(txn tikv.TxnProbe, ttl uint64) {
s.Nil(err)
}

func (s *testLockSuite) mustGetLock(key []byte) *tikv.Lock {
func (s *testLockSuite) mustGetLock(key []byte) *txnkv.Lock {
ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
s.Nil(err)
bo := tikv.NewBackofferWithVars(context.Background(), getMaxBackoff, nil)
Expand All @@ -434,7 +436,7 @@ func (s *testLockSuite) mustGetLock(key []byte) *tikv.Lock {
s.NotNil(resp.Resp)
keyErr := resp.Resp.(*kvrpcpb.GetResponse).GetError()
s.NotNil(keyErr)
lock, err := tikv.LockProbe{}.ExtractLockFromKeyErr(keyErr)
lock, err := tikv.ExtractLockFromKeyErr(keyErr)
s.Nil(err)
return lock
}
Expand Down Expand Up @@ -506,7 +508,7 @@ func (s *testLockSuite) TestBatchResolveLocks() {
committer.PrewriteAllMutations(context.Background())
s.Nil(err)

var locks []*tikv.Lock
var locks []*txnkv.Lock
for _, key := range []string{"k1", "k2", "k3", "k4"} {
l := s.mustGetLock([]byte(key))
locks = append(locks, l)
Expand Down Expand Up @@ -544,7 +546,7 @@ func (s *testLockSuite) TestBatchResolveLocks() {
}

func (s *testLockSuite) TestNewLockZeroTTL() {
l := tikv.NewLock(&kvrpcpb.LockInfo{})
l := txnlock.NewLock(&kvrpcpb.LockInfo{})
s.Equal(l.TTL, uint64(0))
}

Expand All @@ -565,19 +567,19 @@ func (s *testLockSuite) TestZeroMinCommitTS() {
s.Nil(failpoint.Disable("tikvclient/mockZeroCommitTS"))

lock := s.mustGetLock([]byte("key"))
expire, pushed, err := s.store.NewLockResolver().ResolveLocks(bo, 0, []*tikv.Lock{lock})
expire, pushed, err := s.store.NewLockResolver().ResolveLocks(bo, 0, []*txnkv.Lock{lock})
s.Nil(err)
s.Len(pushed, 0)
s.Greater(expire, int64(0))

expire, pushed, err = s.store.NewLockResolver().ResolveLocks(bo, math.MaxUint64, []*tikv.Lock{lock})
expire, pushed, err = s.store.NewLockResolver().ResolveLocks(bo, math.MaxUint64, []*txnkv.Lock{lock})
s.Nil(err)
s.Len(pushed, 1)
s.Greater(expire, int64(0))

// Clean up this test.
lock.TTL = uint64(0)
expire, _, err = s.store.NewLockResolver().ResolveLocks(bo, 0, []*tikv.Lock{lock})
expire, _, err = s.store.NewLockResolver().ResolveLocks(bo, 0, []*txnkv.Lock{lock})
s.Nil(err)
s.Equal(expire, int64(0))
}
Expand Down Expand Up @@ -617,7 +619,7 @@ func (s *testLockSuite) TestCheckLocksFallenBackFromAsyncCommit() {
lr := s.store.NewLockResolver()
status, err := lr.GetTxnStatusFromLock(bo, lock, 0, false)
s.Nil(err)
s.Equal(tikv.LockProbe{}.GetPrimaryKeyFromTxnStatus(status), []byte("fb1"))
s.Equal(txnlock.LockProbe{}.GetPrimaryKeyFromTxnStatus(status), []byte("fb1"))

err = lr.CheckAllSecondaries(bo, lock, &status)
s.True(lr.IsNonAsyncCommitLock(err))
Expand All @@ -634,7 +636,7 @@ func (s *testLockSuite) TestResolveTxnFallenBackFromAsyncCommit() {
lock := s.mustGetLock([]byte("fb1"))
s.True(lock.UseAsyncCommit)
bo := tikv.NewBackoffer(context.Background(), getMaxBackoff)
expire, pushed, err := s.store.NewLockResolver().ResolveLocks(bo, 0, []*tikv.Lock{lock})
expire, pushed, err := s.store.NewLockResolver().ResolveLocks(bo, 0, []*txnkv.Lock{lock})
s.Nil(err)
s.Equal(expire, int64(0))
s.Equal(len(pushed), 0)
Expand All @@ -655,7 +657,7 @@ func (s *testLockSuite) TestBatchResolveTxnFallenBackFromAsyncCommit() {
bo := tikv.NewBackoffer(context.Background(), getMaxBackoff)
loc, err := s.store.GetRegionCache().LocateKey(bo, []byte("fb1"))
s.Nil(err)
ok, err := s.store.NewLockResolver().BatchResolveLocks(bo, []*tikv.Lock{lock}, loc.Region)
ok, err := s.store.NewLockResolver().BatchResolveLocks(bo, []*txnkv.Lock{lock}, loc.Region)
s.Nil(err)
s.True(ok)

Expand Down
8 changes: 8 additions & 0 deletions tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,14 @@ func (c *twoPhaseCommitter) asyncSecondaries() [][]byte {

const bytesPerMiB = 1024 * 1024

// ttl = ttlFactor * sqrt(writeSizeInMiB)
var ttlFactor = 6000

// By default, locks after 3000ms is considered unusual (the client created the
// lock might be dead). Other client may cleanup this kind of lock.
// For locks created recently, we will do backoff and retry.
var defaultLockTTL uint64 = 3000

func txnLockTTL(startTime time.Time, txnSize int) uint64 {
// Increase lockTTL for large transactions.
// The formula is `ttl = ttlFactor * sqrt(sizeInMiB)`.
Expand Down
5 changes: 3 additions & 2 deletions tikv/client_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (

"github.com/tikv/client-go/v2/internal/locate"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv/txnlock"
"github.com/tikv/client-go/v2/util"
)

Expand All @@ -48,7 +49,7 @@ import (
// the request. If there is no context information about the resolved locks, we'll
// meet the secondary lock again and run into a deadloop.
type ClientHelper struct {
lockResolver *LockResolver
lockResolver *txnlock.LockResolver
regionCache *locate.RegionCache
resolvedLocks *util.TSSet
client Client
Expand All @@ -67,7 +68,7 @@ func NewClientHelper(store *KVStore, resolvedLocks *util.TSSet) *ClientHelper {
}

// ResolveLocks wraps the ResolveLocks function and store the resolved result.
func (ch *ClientHelper) ResolveLocks(bo *Backoffer, callerStartTS uint64, locks []*Lock) (int64, error) {
func (ch *ClientHelper) ResolveLocks(bo *Backoffer, callerStartTS uint64, locks []*txnlock.Lock) (int64, error) {
var err error
var resolvedLocks []uint64
var msBeforeTxnExpired int64
Expand Down
11 changes: 6 additions & 5 deletions tikv/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv/rangetask"
"github.com/tikv/client-go/v2/txnkv/txnlock"
zap "go.uber.org/zap"
)

Expand Down Expand Up @@ -70,7 +71,7 @@ func (s *KVStore) resolveLocks(ctx context.Context, safePoint uint64, concurrenc
}

// We don't want gc to sweep out the cached info belong to other processes, like coprocessor.
const gcScanLockLimit = ResolvedCacheSize / 2
const gcScanLockLimit = txnlock.ResolvedCacheSize / 2

func (s *KVStore) resolveLocksForRange(ctx context.Context, safePoint uint64, startKey []byte, endKey []byte) (rangetask.TaskStat, error) {
// for scan lock request, we must return all locks even if they are generated
Expand Down Expand Up @@ -122,7 +123,7 @@ func (s *KVStore) resolveLocksForRange(ctx context.Context, safePoint uint64, st
return stat, nil
}

func (s *KVStore) scanLocksInRegionWithStartKey(bo *retry.Backoffer, startKey []byte, maxVersion uint64, limit uint32) (locks []*Lock, loc *locate.KeyLocation, err error) {
func (s *KVStore) scanLocksInRegionWithStartKey(bo *retry.Backoffer, startKey []byte, maxVersion uint64, limit uint32) (locks []*txnlock.Lock, loc *locate.KeyLocation, err error) {
for {
loc, err := s.GetRegionCache().LocateKey(bo, startKey)
if err != nil {
Expand Down Expand Up @@ -157,9 +158,9 @@ func (s *KVStore) scanLocksInRegionWithStartKey(bo *retry.Backoffer, startKey []
return nil, loc, errors.Errorf("unexpected scanlock error: %s", locksResp)
}
locksInfo := locksResp.GetLocks()
locks = make([]*Lock, len(locksInfo))
locks = make([]*txnlock.Lock, len(locksInfo))
for i := range locksInfo {
locks[i] = NewLock(locksInfo[i])
locks[i] = txnlock.NewLock(locksInfo[i])
}
return locks, loc, nil
}
Expand All @@ -170,7 +171,7 @@ func (s *KVStore) scanLocksInRegionWithStartKey(bo *retry.Backoffer, startKey []
// It returns error when meet an unretryable error.
// When the locks are not in one region, resolve locks should be failed, it returns with nil resolveLocation and nil err.
// Used it in gcworker only!
func (s *KVStore) batchResolveLocksInARegion(bo *Backoffer, locks []*Lock, expectedLoc *locate.KeyLocation) (resolvedLocation *locate.KeyLocation, err error) {
func (s *KVStore) batchResolveLocksInARegion(bo *Backoffer, locks []*txnlock.Lock, expectedLoc *locate.KeyLocation) (resolvedLocation *locate.KeyLocation, err error) {
resolvedLocation = expectedLoc
for {
ok, err := s.GetLockResolver().BatchResolveLocks(bo, locks, resolvedLocation.Region)
Expand Down
3 changes: 2 additions & 1 deletion tikv/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/tikv/client-go/v2/internal/locate"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv/txnlock"
)

// Storage represent the kv.Storage runs on TiKV.
Expand All @@ -49,7 +50,7 @@ type Storage interface {
SendReq(bo *Backoffer, req *tikvrpc.Request, regionID locate.RegionVerID, timeout time.Duration) (*tikvrpc.Response, error)

// GetLockResolver gets the LockResolver.
GetLockResolver() *LockResolver
GetLockResolver() *txnlock.LockResolver

// GetSafePointKV gets the SafePointKV.
GetSafePointKV() SafePointKV
Expand Down
Loading

0 comments on commit 654864d

Please sign in to comment.