Skip to content

Commit

Permalink
store/tikv: handle the large transaction commit dead lock (pingcap#15072
Browse files Browse the repository at this point in the history
)
  • Loading branch information
tiancaiamao authored Mar 12, 2020
1 parent ab17254 commit d7a8eab
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 25 deletions.
3 changes: 3 additions & 0 deletions store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,9 @@ func (h *rpcHandler) handleKvScan(req *kvrpcpb.ScanRequest) *kvrpcpb.ScanRespons
}

func (h *rpcHandler) handleKvPrewrite(req *kvrpcpb.PrewriteRequest) *kvrpcpb.PrewriteResponse {
regionID := req.Context.RegionId
h.cluster.handleDelay(req.StartVersion, regionID)

for _, m := range req.Mutations {
if !h.checkKeyInRegion(m.Key) {
panic("KvPrewrite: key not in region")
Expand Down
13 changes: 6 additions & 7 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,16 +401,16 @@ func txnLockTTL(startTime time.Time, txnSize int) uint64 {
// Increase lockTTL for large transactions.
// The formula is `ttl = ttlFactor * sqrt(sizeInMiB)`.
// When writeSize is less than 256KB, the base ttl is defaultTTL (3s);
// When writeSize is 1MiB, 100MiB, or 400MiB, ttl is 6s, 60s, 120s correspondingly;
// When writeSize is 1MiB, 4MiB, or 10MiB, ttl is 6s, 12s, 20s correspondingly;
lockTTL := defaultLockTTL
if txnSize >= txnCommitBatchSize {
sizeMiB := float64(txnSize) / bytesPerMiB
lockTTL = uint64(float64(ttlFactor) * math.Sqrt(sizeMiB))
if lockTTL < defaultLockTTL {
lockTTL = defaultLockTTL
}
if lockTTL > maxLockTTL {
lockTTL = maxLockTTL
if lockTTL > ManagedLockTTL {
lockTTL = ManagedLockTTL
}
}

Expand Down Expand Up @@ -602,7 +602,7 @@ func (actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, bat
keyErrs := prewriteResp.GetErrors()
if len(keyErrs) == 0 {
if bytes.Equal(c.primary(), batch.mutations.keys[0]) {
// After writing the primary key, if the size of the transaction is large than 4M,
// After writing the primary key, if the size of the transaction is large than 32M,
// start the ttlManager. The ttlManager will be closed in tikvTxn.Commit().
if c.txnSize > 32*1024*1024 {
c.run(c, nil)
Expand All @@ -627,14 +627,13 @@ func (actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, bat
if err1 != nil {
return errors.Trace(err1)
}
logutil.BgLogger().Debug("prewrite encounters lock",
logutil.BgLogger().Warn("prewrite encounters lock",
zap.Uint64("conn", c.connID),
zap.Stringer("lock", lock))
locks = append(locks, lock)
}
start := time.Now()
// Set callerStartTS to 0 so as not to update minCommitTS.
msBeforeExpired, _, err := c.store.lockResolver.ResolveLocks(bo, 0, locks)
msBeforeExpired, err := c.store.lockResolver.resolveLocksForWrite(bo, c.startTS, locks)
if err != nil {
return errors.Trace(err)
}
Expand Down
60 changes: 58 additions & 2 deletions store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"math"
"math/rand"
"strings"
"sync"
"sync/atomic"
"time"

Expand All @@ -34,8 +35,9 @@ import (

type testCommitterSuite struct {
OneByOneSuite
cluster *mocktikv.Cluster
store *tikvStore
cluster *mocktikv.Cluster
store *tikvStore
mvccStore mocktikv.MVCCStore
}

var _ = SerialSuites(&testCommitterSuite{})
Expand All @@ -50,6 +52,7 @@ func (s *testCommitterSuite) SetUpTest(c *C) {
mocktikv.BootstrapWithMultiRegions(s.cluster, []byte("a"), []byte("b"), []byte("c"))
mvccStore, err := mocktikv.NewMVCCLevelDB("")
c.Assert(err, IsNil)
s.mvccStore = mvccStore
client := mocktikv.NewRPCClient(s.cluster, mvccStore)
pdCli := &codecPDClient{mocktikv.NewPDClient(s.cluster)}
spkv := NewMockSafePointKV()
Expand Down Expand Up @@ -839,3 +842,56 @@ func (c *twoPhaseCommitter) mutationsOfKeys(keys [][]byte) committerMutations {
}
return res
}

func (s *testCommitterSuite) TestCommitDeadLock(c *C) {
// Split into two region and let k1 k2 in different regions.
s.cluster.SplitKeys(s.mvccStore, kv.Key("z"), kv.Key("a"), 2)
k1 := kv.Key("a_deadlock_k1")
k2 := kv.Key("y_deadlock_k2")

region1, _ := s.cluster.GetRegionByKey(k1)
region2, _ := s.cluster.GetRegionByKey(k2)
c.Assert(region1.Id != region2.Id, IsTrue)

txn1 := s.begin(c)
txn1.Set(k1, []byte("t1"))
txn1.Set(k2, []byte("t1"))
commit1, err := newTwoPhaseCommitterWithInit(txn1, 1)
c.Assert(err, IsNil)
commit1.primaryKey = k1
commit1.txnSize = 1000 * 1024 * 1024
commit1.lockTTL = txnLockTTL(txn1.startTime, commit1.txnSize)

txn2 := s.begin(c)
txn2.Set(k1, []byte("t2"))
txn2.Set(k2, []byte("t2"))
commit2, err := newTwoPhaseCommitterWithInit(txn2, 2)
c.Assert(err, IsNil)
commit2.primaryKey = k2
commit2.txnSize = 1000 * 1024 * 1024
commit2.lockTTL = txnLockTTL(txn1.startTime, commit2.txnSize)

s.cluster.ScheduleDelay(txn2.startTS, region1.Id, 5*time.Millisecond)
s.cluster.ScheduleDelay(txn1.startTS, region2.Id, 5*time.Millisecond)

// Txn1 prewrites k1, k2 and txn2 prewrites k2, k1, the large txn
// protocol run ttlManager and update their TTL, cause dead lock.
ch := make(chan error, 2)
var wg sync.WaitGroup
wg.Add(1)
go func() {
ch <- commit2.execute(context.Background())
wg.Done()
}()
ch <- commit1.execute(context.Background())
wg.Wait()
close(ch)

res := 0
for e := range ch {
if e != nil {
res++
}
}
c.Assert(res, Equals, 1)
}
40 changes: 33 additions & 7 deletions store/tikv/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
pd "github.com/pingcap/pd/v4/client"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/util/logutil"
Expand All @@ -45,6 +46,8 @@ var (
tikvLockResolverCountWithNotExpired = metrics.TiKVLockResolverCounter.WithLabelValues("not_expired")
tikvLockResolverCountWithWaitExpired = metrics.TiKVLockResolverCounter.WithLabelValues("wait_expired")
tikvLockResolverCountWithResolve = metrics.TiKVLockResolverCounter.WithLabelValues("resolve")
tikvLockResolverCountWithResolveForWrite = metrics.TiKVLockResolverCounter.WithLabelValues("resolve_for_write")
tikvLockResolverCountWithWriteConflict = metrics.TiKVLockResolverCounter.WithLabelValues("write_conflict")
tikvLockResolverCountWithQueryTxnStatus = metrics.TiKVLockResolverCounter.WithLabelValues("query_txn_status")
tikvLockResolverCountWithQueryTxnStatusCommitted = metrics.TiKVLockResolverCounter.WithLabelValues("query_txn_status_committed")
tikvLockResolverCountWithQueryTxnStatusRolledBack = metrics.TiKVLockResolverCounter.WithLabelValues("query_txn_status_rolled_back")
Expand Down Expand Up @@ -124,9 +127,6 @@ func (s TxnStatus) CommitTS() uint64 { return uint64(s.commitTS) }
// For locks created recently, we will do backoff and retry.
var defaultLockTTL uint64 = 3000

// TODO: Consider if it's appropriate.
var maxLockTTL uint64 = 120000

// ttl = ttlFactor * sqrt(writeSizeInMiB)
var ttlFactor = 6000

Expand Down Expand Up @@ -282,18 +282,31 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi
// 3) Send `ResolveLock` cmd to the lock's region to resolve all locks belong to
// the same transaction.
func (lr *LockResolver) ResolveLocks(bo *Backoffer, callerStartTS uint64, locks []*Lock) (int64, []uint64 /*pushed*/, error) {
return lr.resolveLocks(bo, callerStartTS, locks, false)
}

func (lr *LockResolver) resolveLocks(bo *Backoffer, callerStartTS uint64, locks []*Lock, forWrite bool) (int64, []uint64 /*pushed*/, error) {
var msBeforeTxnExpired txnExpireTime
if len(locks) == 0 {
return msBeforeTxnExpired.value(), nil, nil
}

tikvLockResolverCountWithResolve.Inc()
if forWrite {
tikvLockResolverCountWithResolveForWrite.Inc()
} else {
tikvLockResolverCountWithResolve.Inc()
}

var pushFail bool
// TxnID -> []Region, record resolved Regions.
// TODO: Maybe put it in LockResolver and share by all txns.
cleanTxns := make(map[uint64]map[RegionVerID]struct{})
pushed := make([]uint64, 0, len(locks))
var pushed []uint64
// pushed is only used in the read operation.
if !forWrite {
pushed = make([]uint64, 0, len(locks))
}

for _, l := range locks {
status, err := lr.getTxnStatusFromLock(bo, l, callerStartTS)
if err != nil {
Expand Down Expand Up @@ -327,8 +340,16 @@ func (lr *LockResolver) ResolveLocks(bo *Backoffer, callerStartTS uint64, locks
// Update the txn expire time.
msBeforeLockExpired := lr.store.GetOracle().UntilExpired(l.TxnID, status.ttl)
msBeforeTxnExpired.update(msBeforeLockExpired)
// In the write conflict scenes, callerStartTS is set to 0 to avoid unnecessary push minCommitTS operation.
if callerStartTS > 0 {
if forWrite {
// Write conflict detected!
// If it's a optimistic conflict and current txn is earlier than the lock owner,
// abort current transaction.
// This could avoids the deadlock scene of two large transaction.
if l.LockType != kvrpcpb.Op_PessimisticLock && l.TxnID > callerStartTS {
tikvLockResolverCountWithWriteConflict.Inc()
return msBeforeTxnExpired.value(), nil, kv.ErrWriteConflict.GenWithStackByArgs(callerStartTS, l.TxnID, status.commitTS, l.Key)
}
} else {
if status.action != kvrpcpb.Action_MinCommitTSPushed {
pushFail = true
continue
Expand All @@ -348,6 +369,11 @@ func (lr *LockResolver) ResolveLocks(bo *Backoffer, callerStartTS uint64, locks
return msBeforeTxnExpired.value(), pushed, nil
}

func (lr *LockResolver) resolveLocksForWrite(bo *Backoffer, callerStartTS uint64, locks []*Lock) (int64, error) {
msBeforeTxnExpired, _, err := lr.resolveLocks(bo, callerStartTS, locks, true)
return msBeforeTxnExpired, err
}

type txnExpireTime struct {
initialized bool
txnExpire int64
Expand Down
15 changes: 6 additions & 9 deletions store/tikv/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,21 +248,21 @@ func (s *testLockSuite) TestTxnHeartBeat(c *C) {
s.prewriteTxn(c, txn.(*tikvTxn))

bo := NewBackoffer(context.Background(), PrewriteMaxBackoff)
newTTL, err := sendTxnHeartBeat(bo, s.store, []byte("key"), txn.StartTS(), 666)
newTTL, err := sendTxnHeartBeat(bo, s.store, []byte("key"), txn.StartTS(), 6666)
c.Assert(err, IsNil)
c.Assert(newTTL, Equals, uint64(666))
c.Assert(newTTL, Equals, uint64(6666))

newTTL, err = sendTxnHeartBeat(bo, s.store, []byte("key"), txn.StartTS(), 555)
newTTL, err = sendTxnHeartBeat(bo, s.store, []byte("key"), txn.StartTS(), 5555)
c.Assert(err, IsNil)
c.Assert(newTTL, Equals, uint64(666))
c.Assert(newTTL, Equals, uint64(6666))

lock := s.mustGetLock(c, []byte("key"))
status := TxnStatus{ttl: newTTL}
cleanRegions := make(map[RegionVerID]struct{})
err = newLockResolver(s.store).resolveLock(bo, lock, status, cleanRegions)
c.Assert(err, IsNil)

newTTL, err = sendTxnHeartBeat(bo, s.store, []byte("key"), txn.StartTS(), 666)
newTTL, err = sendTxnHeartBeat(bo, s.store, []byte("key"), txn.StartTS(), 6666)
c.Assert(err, NotNil)
c.Assert(newTTL, Equals, uint64(0))
}
Expand Down Expand Up @@ -430,7 +430,7 @@ func (s *testLockSuite) TestLockTTL(c *C) {
c.Assert(err, IsNil)
txn.Set(kv.Key("key"), []byte("value"))
time.Sleep(time.Millisecond)
s.prewriteTxnWithTTL(c, txn.(*tikvTxn), 1000)
s.prewriteTxnWithTTL(c, txn.(*tikvTxn), 3100)
l := s.mustGetLock(c, []byte("key"))
c.Assert(l.TTL >= defaultLockTTL, IsTrue)

Expand Down Expand Up @@ -487,9 +487,6 @@ func (s *testLockSuite) TestNewLockZeroTTL(c *C) {

func init() {
// Speed up tests.
defaultLockTTL = 3
maxLockTTL = 120
ttlFactor = 6
oracleUpdateInterval = 2
}

Expand Down

0 comments on commit d7a8eab

Please sign in to comment.