diff --git a/errno/errcode.go b/errno/errcode.go index bcf326aa57a4f..1651494fd9a2a 100644 --- a/errno/errcode.go +++ b/errno/errcode.go @@ -1069,6 +1069,7 @@ const ( ErrCannotCancelDDLJob = 8226 ErrSequenceUnsupportedTableOption = 8227 ErrColumnTypeUnsupportedNextValue = 8228 + ErrLockExpire = 8229 // TiKV/PD errors. ErrPDServerTimeout = 9001 diff --git a/errno/errname.go b/errno/errname.go index 5d0998f021f68..e04b43cf57efb 100644 --- a/errno/errname.go +++ b/errno/errname.go @@ -1066,6 +1066,7 @@ var MySQLErrName = map[uint16]string{ ErrInvalidWildCard: "Wildcard fields without any table name appears in wrong place", ErrMixOfGroupFuncAndFieldsIncompatible: "In aggregated query without GROUP BY, expression #%d of SELECT list contains nonaggregated column '%s'; this is incompatible with sql_mode=only_full_group_by", ErrUnsupportedSecondArgumentType: "JSON_OBJECTAGG: unsupported second argument type %v", + ErrLockExpire: "TTL manager has timed out, pessimistic locks may expire, please commit or rollback this transaction", // TiKV/PD errors. ErrPDServerTimeout: "PD server timeout", diff --git a/executor/executor.go b/executor/executor.go index e9904e9755a89..e3834929cb9fc 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -928,6 +928,7 @@ func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64) *kv.LockCtx { PessimisticLockWaited: &seVars.StmtCtx.PessimisticLockWaited, LockKeysDuration: &seVars.StmtCtx.LockKeysDuration, LockKeysCount: &seVars.StmtCtx.LockKeysCount, + LockExpired: &seVars.TxnCtx.LockExpire, } } diff --git a/kv/kv.go b/kv/kv.go index 79684ba9e0fbb..0c722a8ff46ac 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -213,6 +213,7 @@ type LockCtx struct { ReturnValues bool Values map[string]ReturnedValue ValuesLock sync.Mutex + LockExpired *uint32 } // ReturnedValue pairs the Value and AlreadyLocked flag for PessimisticLock return values result. diff --git a/session/pessimistic_test.go b/session/pessimistic_test.go index 03317154d8911..8974ff51b04e9 100644 --- a/session/pessimistic_test.go +++ b/session/pessimistic_test.go @@ -1229,3 +1229,29 @@ func (s *testPessimisticSuite) TestGenerateColPointGet(c *C) { tk2.MustExec("commit") } } + +func (s *testPessimisticSuite) TestTxnWithExpiredPessimisticLocks(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1 (c1 int primary key, c2 int, c3 int, unique key uk(c2))") + defer tk.MustExec("drop table if exists t1") + tk.MustExec("insert into t1 values (1, 1, 1)") + tk.MustExec("insert into t1 values (5, 5, 5)") + + tk.MustExec("begin pessimistic") + tk.MustQuery("select * from t1 where c1 in(1, 5) for update").Check(testkit.Rows("1 1 1", "5 5 5")) + atomic.StoreUint32(&tk.Se.GetSessionVars().TxnCtx.LockExpire, 1) + err := tk.ExecToErr("select * from t1 where c1 in(1, 5)") + c.Assert(terror.ErrorEqual(err, tikv.ErrLockExpire), IsTrue) + tk.MustExec("commit") + + tk.MustExec("begin pessimistic") + tk.MustQuery("select * from t1 where c1 in(1, 5) for update").Check(testkit.Rows("1 1 1", "5 5 5")) + atomic.StoreUint32(&tk.Se.GetSessionVars().TxnCtx.LockExpire, 1) + err = tk.ExecToErr("update t1 set c2 = c2 + 1") + c.Assert(terror.ErrorEqual(err, tikv.ErrLockExpire), IsTrue) + atomic.StoreUint32(&tk.Se.GetSessionVars().TxnCtx.LockExpire, 0) + tk.MustExec("update t1 set c2 = c2 + 1") + tk.MustExec("rollback") +} diff --git a/session/session.go b/session/session.go index 6afbabdbb8ee8..94f57f663bf02 100644 --- a/session/session.go +++ b/session/session.go @@ -598,7 +598,12 @@ func (s *session) isTxnRetryableError(err error) bool { } func (s *session) checkTxnAborted(stmt sqlexec.Statement) error { - if s.txn.doNotCommit == nil { + var err error + if s.txn.doNotCommit != nil { + err = errors.New("current transaction is aborted, commands ignored until end of transaction block:" + s.txn.doNotCommit.Error()) + } else if atomic.LoadUint32(&s.GetSessionVars().TxnCtx.LockExpire) > 0 { + err = tikv.ErrLockExpire + } else { return nil } // If the transaction is aborted, the following statements do not need to execute, except `commit` and `rollback`, @@ -609,7 +614,7 @@ func (s *session) checkTxnAborted(stmt sqlexec.Statement) error { if _, ok := stmt.(*executor.ExecStmt).StmtNode.(*ast.RollbackStmt); ok { return nil } - return errors.New("current transaction is aborted, commands ignored until end of transaction block:" + s.txn.doNotCommit.Error()) + return err } func (s *session) retry(ctx context.Context, maxCnt uint) (err error) { diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 2ddc02b4ad263..a726e1dfd601a 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -156,6 +156,7 @@ type TransactionContext struct { CouldRetry bool IsPessimistic bool Isolation string + LockExpire uint32 } // AddUnchangedRowKey adds an unchanged row key in update statement for pessimistic lock. diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index d613d3b4d5acd..f2a6f63d083e8 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -223,6 +223,7 @@ func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, erro ttlManager: ttlManager{ ch: make(chan struct{}), }, + isPessimistic: txn.IsPessimistic(), }, nil } @@ -681,17 +682,17 @@ const ( ) type ttlManager struct { - state ttlManagerState - ch chan struct{} - killed *uint32 + state ttlManagerState + ch chan struct{} + lockCtx *kv.LockCtx } -func (tm *ttlManager) run(c *twoPhaseCommitter, killed *uint32) { +func (tm *ttlManager) run(c *twoPhaseCommitter, lockCtx *kv.LockCtx) { // Run only once. if !atomic.CompareAndSwapUint32((*uint32)(&tm.state), uint32(stateUninitialized), uint32(stateRunning)) { return } - tm.killed = killed + tm.lockCtx = lockCtx go tm.keepAlive(c) } @@ -712,7 +713,7 @@ func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) { return case <-ticker.C: // If kill signal is received, the ttlManager should exit. - if tm.killed != nil && atomic.LoadUint32(tm.killed) != 0 { + if tm.lockCtx != nil && tm.lockCtx.Killed != nil && atomic.LoadUint32(tm.lockCtx.Killed) != 0 { return } bo := NewBackoffer(context.Background(), pessimisticLockMaxBackoff).WithVars(c.txn.vars) @@ -735,6 +736,11 @@ func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) { logutil.Logger(bo.ctx).Info("ttlManager live up to its lifetime", zap.Uint64("txnStartTS", c.startTS)) metrics.TiKVTTLLifeTimeReachCounter.Inc() + // the pessimistic locks may expire if the ttl manager has timed out, set `LockExpired` flag + // so that this transaction could only commit or rollback with no more statement executions + if c.isPessimistic && tm.lockCtx != nil && tm.lockCtx.LockExpired != nil { + atomic.StoreUint32(tm.lockCtx.LockExpired, 1) + } return } diff --git a/store/tikv/error.go b/store/tikv/error.go index db7e092ecdcf4..cc5c87acc0c35 100644 --- a/store/tikv/error.go +++ b/store/tikv/error.go @@ -42,6 +42,7 @@ var ( ErrLockAcquireFailAndNoWaitSet = terror.ClassTiKV.New(mysql.ErrLockAcquireFailAndNoWaitSet, mysql.MySQLErrName[mysql.ErrLockAcquireFailAndNoWaitSet]) ErrLockWaitTimeout = terror.ClassTiKV.New(mysql.ErrLockWaitTimeout, mysql.MySQLErrName[mysql.ErrLockWaitTimeout]) ErrTokenLimit = terror.ClassTiKV.New(mysql.ErrTiKVStoreLimit, mysql.MySQLErrName[mysql.ErrTiKVStoreLimit]) + ErrLockExpire = terror.ClassTiKV.New(mysql.ErrLockExpire, mysql.MySQLErrName[mysql.ErrLockExpire]) ErrUnknown = terror.ClassTiKV.New(mysql.ErrUnknown, mysql.MySQLErrName[mysql.ErrUnknown]) ) diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 83d800102156b..c30ef15afd8ad 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -395,7 +395,7 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput return err } if assignedPrimaryKey { - txn.committer.ttlManager.run(txn.committer, lockCtx.Killed) + txn.committer.ttlManager.run(txn.committer, lockCtx) } } txn.mu.Lock()