Skip to content

Commit

Permalink
txn: stop more statement executions if pessimistic transaction ttl ma…
Browse files Browse the repository at this point in the history
…nager timed out (pingcap#15877)
  • Loading branch information
cfzjywxk authored Mar 31, 2020
1 parent 17530c4 commit 94fb1bf
Show file tree
Hide file tree
Showing 10 changed files with 52 additions and 9 deletions.
1 change: 1 addition & 0 deletions errno/errcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -1069,6 +1069,7 @@ const (
ErrCannotCancelDDLJob = 8226
ErrSequenceUnsupportedTableOption = 8227
ErrColumnTypeUnsupportedNextValue = 8228
ErrLockExpire = 8229

// TiKV/PD errors.
ErrPDServerTimeout = 9001
Expand Down
1 change: 1 addition & 0 deletions errno/errname.go
Original file line number Diff line number Diff line change
Expand Up @@ -1066,6 +1066,7 @@ var MySQLErrName = map[uint16]string{
ErrInvalidWildCard: "Wildcard fields without any table name appears in wrong place",
ErrMixOfGroupFuncAndFieldsIncompatible: "In aggregated query without GROUP BY, expression #%d of SELECT list contains nonaggregated column '%s'; this is incompatible with sql_mode=only_full_group_by",
ErrUnsupportedSecondArgumentType: "JSON_OBJECTAGG: unsupported second argument type %v",
ErrLockExpire: "TTL manager has timed out, pessimistic locks may expire, please commit or rollback this transaction",

// TiKV/PD errors.
ErrPDServerTimeout: "PD server timeout",
Expand Down
1 change: 1 addition & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,7 @@ func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64) *kv.LockCtx {
PessimisticLockWaited: &seVars.StmtCtx.PessimisticLockWaited,
LockKeysDuration: &seVars.StmtCtx.LockKeysDuration,
LockKeysCount: &seVars.StmtCtx.LockKeysCount,
LockExpired: &seVars.TxnCtx.LockExpire,
}
}

Expand Down
1 change: 1 addition & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ type LockCtx struct {
ReturnValues bool
Values map[string]ReturnedValue
ValuesLock sync.Mutex
LockExpired *uint32
}

// ReturnedValue pairs the Value and AlreadyLocked flag for PessimisticLock return values result.
Expand Down
26 changes: 26 additions & 0 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1229,3 +1229,29 @@ func (s *testPessimisticSuite) TestGenerateColPointGet(c *C) {
tk2.MustExec("commit")
}
}

func (s *testPessimisticSuite) TestTxnWithExpiredPessimisticLocks(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1 (c1 int primary key, c2 int, c3 int, unique key uk(c2))")
defer tk.MustExec("drop table if exists t1")
tk.MustExec("insert into t1 values (1, 1, 1)")
tk.MustExec("insert into t1 values (5, 5, 5)")

tk.MustExec("begin pessimistic")
tk.MustQuery("select * from t1 where c1 in(1, 5) for update").Check(testkit.Rows("1 1 1", "5 5 5"))
atomic.StoreUint32(&tk.Se.GetSessionVars().TxnCtx.LockExpire, 1)
err := tk.ExecToErr("select * from t1 where c1 in(1, 5)")
c.Assert(terror.ErrorEqual(err, tikv.ErrLockExpire), IsTrue)
tk.MustExec("commit")

tk.MustExec("begin pessimistic")
tk.MustQuery("select * from t1 where c1 in(1, 5) for update").Check(testkit.Rows("1 1 1", "5 5 5"))
atomic.StoreUint32(&tk.Se.GetSessionVars().TxnCtx.LockExpire, 1)
err = tk.ExecToErr("update t1 set c2 = c2 + 1")
c.Assert(terror.ErrorEqual(err, tikv.ErrLockExpire), IsTrue)
atomic.StoreUint32(&tk.Se.GetSessionVars().TxnCtx.LockExpire, 0)
tk.MustExec("update t1 set c2 = c2 + 1")
tk.MustExec("rollback")
}
9 changes: 7 additions & 2 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,12 @@ func (s *session) isTxnRetryableError(err error) bool {
}

func (s *session) checkTxnAborted(stmt sqlexec.Statement) error {
if s.txn.doNotCommit == nil {
var err error
if s.txn.doNotCommit != nil {
err = errors.New("current transaction is aborted, commands ignored until end of transaction block:" + s.txn.doNotCommit.Error())
} else if atomic.LoadUint32(&s.GetSessionVars().TxnCtx.LockExpire) > 0 {
err = tikv.ErrLockExpire
} else {
return nil
}
// If the transaction is aborted, the following statements do not need to execute, except `commit` and `rollback`,
Expand All @@ -609,7 +614,7 @@ func (s *session) checkTxnAborted(stmt sqlexec.Statement) error {
if _, ok := stmt.(*executor.ExecStmt).StmtNode.(*ast.RollbackStmt); ok {
return nil
}
return errors.New("current transaction is aborted, commands ignored until end of transaction block:" + s.txn.doNotCommit.Error())
return err
}

func (s *session) retry(ctx context.Context, maxCnt uint) (err error) {
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ type TransactionContext struct {
CouldRetry bool
IsPessimistic bool
Isolation string
LockExpire uint32
}

// AddUnchangedRowKey adds an unchanged row key in update statement for pessimistic lock.
Expand Down
18 changes: 12 additions & 6 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, erro
ttlManager: ttlManager{
ch: make(chan struct{}),
},
isPessimistic: txn.IsPessimistic(),
}, nil
}

Expand Down Expand Up @@ -681,17 +682,17 @@ const (
)

type ttlManager struct {
state ttlManagerState
ch chan struct{}
killed *uint32
state ttlManagerState
ch chan struct{}
lockCtx *kv.LockCtx
}

func (tm *ttlManager) run(c *twoPhaseCommitter, killed *uint32) {
func (tm *ttlManager) run(c *twoPhaseCommitter, lockCtx *kv.LockCtx) {
// Run only once.
if !atomic.CompareAndSwapUint32((*uint32)(&tm.state), uint32(stateUninitialized), uint32(stateRunning)) {
return
}
tm.killed = killed
tm.lockCtx = lockCtx
go tm.keepAlive(c)
}

Expand All @@ -712,7 +713,7 @@ func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) {
return
case <-ticker.C:
// If kill signal is received, the ttlManager should exit.
if tm.killed != nil && atomic.LoadUint32(tm.killed) != 0 {
if tm.lockCtx != nil && tm.lockCtx.Killed != nil && atomic.LoadUint32(tm.lockCtx.Killed) != 0 {
return
}
bo := NewBackoffer(context.Background(), pessimisticLockMaxBackoff).WithVars(c.txn.vars)
Expand All @@ -735,6 +736,11 @@ func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) {
logutil.Logger(bo.ctx).Info("ttlManager live up to its lifetime",
zap.Uint64("txnStartTS", c.startTS))
metrics.TiKVTTLLifeTimeReachCounter.Inc()
// the pessimistic locks may expire if the ttl manager has timed out, set `LockExpired` flag
// so that this transaction could only commit or rollback with no more statement executions
if c.isPessimistic && tm.lockCtx != nil && tm.lockCtx.LockExpired != nil {
atomic.StoreUint32(tm.lockCtx.LockExpired, 1)
}
return
}

Expand Down
1 change: 1 addition & 0 deletions store/tikv/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ var (
ErrLockAcquireFailAndNoWaitSet = terror.ClassTiKV.New(mysql.ErrLockAcquireFailAndNoWaitSet, mysql.MySQLErrName[mysql.ErrLockAcquireFailAndNoWaitSet])
ErrLockWaitTimeout = terror.ClassTiKV.New(mysql.ErrLockWaitTimeout, mysql.MySQLErrName[mysql.ErrLockWaitTimeout])
ErrTokenLimit = terror.ClassTiKV.New(mysql.ErrTiKVStoreLimit, mysql.MySQLErrName[mysql.ErrTiKVStoreLimit])
ErrLockExpire = terror.ClassTiKV.New(mysql.ErrLockExpire, mysql.MySQLErrName[mysql.ErrLockExpire])
ErrUnknown = terror.ClassTiKV.New(mysql.ErrUnknown, mysql.MySQLErrName[mysql.ErrUnknown])
)

Expand Down
2 changes: 1 addition & 1 deletion store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput
return err
}
if assignedPrimaryKey {
txn.committer.ttlManager.run(txn.committer, lockCtx.Killed)
txn.committer.ttlManager.run(txn.committer, lockCtx)
}
}
txn.mu.Lock()
Expand Down

0 comments on commit 94fb1bf

Please sign in to comment.