diff --git a/kv/interface_mock_test.go b/kv/interface_mock_test.go index 9e5c12fe0344b..aa6e2f4be3bf3 100644 --- a/kv/interface_mock_test.go +++ b/kv/interface_mock_test.go @@ -125,6 +125,10 @@ func (t *mockTxn) SetVars(vars *Variables) { } +func (t *mockTxn) GetVars() *Variables { + return nil +} + // newMockTxn new a mockTxn. func newMockTxn() Transaction { return &mockTxn{ diff --git a/kv/kv.go b/kv/kv.go index 96993cd438112..8868a8ef6d8ff 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -202,6 +202,8 @@ type Transaction interface { GetSnapshot() Snapshot // SetVars sets variables to the transaction. SetVars(vars *Variables) + // GetVars gets variables from the transaction. + GetVars() *Variables // BatchGet gets kv from the memory buffer of statement and transaction, and the kv storage. // Do not use len(value) == 0 or value == nil to represent non-exist. // If a key doesn't exist, there shouldn't be any corresponding entry in the result map. diff --git a/session/session.go b/session/session.go index 3d2fa0c1d013b..87a968a1a33ff 100644 --- a/session/session.go +++ b/session/session.go @@ -1450,6 +1450,7 @@ func (s *session) Txn(active bool) (kv.Transaction, error) { s.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, true) } s.sessionVars.TxnCtx.CouldRetry = s.isTxnRetryable() + s.txn.SetVars(s.sessionVars.KVVars) if s.sessionVars.GetReplicaRead().IsFollowerRead() { s.txn.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) } diff --git a/session/session_test.go b/session/session_test.go index 940280c1fe833..43730ea38984b 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -2627,52 +2627,25 @@ func (s *testSessionSuite2) TestDBUserNameLength(c *C) { } func (s *testSessionSerialSuite) TestKVVars(c *C) { - c.Skip("there is no backoff here in the large txn, so this test is stale") tk := testkit.NewTestKitWithInit(c, s.store) - tk.MustExec("create table kvvars (a int, b int)") - tk.MustExec("insert kvvars values (1, 1)") - tk2 := testkit.NewTestKitWithInit(c, s.store) - tk2.MustExec("set @@tidb_backoff_lock_fast = 1") - tk2.MustExec("set @@tidb_backoff_weight = 100") - backoffVal := new(int64) - backOffWeightVal := new(int32) - tk2.Se.GetSessionVars().KVVars.Hook = func(name string, vars *kv.Variables) { - atomic.StoreInt64(backoffVal, int64(vars.BackoffLockFast)) - atomic.StoreInt32(backOffWeightVal, int32(vars.BackOffWeight)) - } - wg := new(sync.WaitGroup) - wg.Add(2) - go func() { - for { - tk2.MustQuery("select * from kvvars") - if atomic.LoadInt64(backoffVal) != 0 { - break - } - } - wg.Done() - }() - - c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/mockSleepBetween2PC", "return"), IsNil) - go func() { - for { - tk.MustExec("update kvvars set b = b + 1 where a = 1") - if atomic.LoadInt64(backoffVal) != 0 { - break - } - } - wg.Done() - }() - wg.Wait() - c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/mockSleepBetween2PC"), IsNil) - - for { - tk2.MustQuery("select * from kvvars") - if atomic.LoadInt32(backOffWeightVal) != 0 { - break - } - } - c.Assert(atomic.LoadInt64(backoffVal), Equals, int64(1)) - c.Assert(atomic.LoadInt32(backOffWeightVal), Equals, int32(100)) + tk.MustExec("set @@tidb_backoff_lock_fast = 1") + tk.MustExec("set @@tidb_backoff_weight = 100") + tk.MustExec("create table if not exists kvvars (a int)") + tk.MustExec("begin") + txn, err := tk.Se.Txn(false) + c.Assert(err, IsNil) + vars := txn.GetVars() + c.Assert(vars.BackoffLockFast, Equals, 1) + c.Assert(vars.BackOffWeight, Equals, 100) + tk.MustExec("rollback") + tk.MustExec("set @@tidb_backoff_weight = 50") + tk.MustExec("set @@autocommit = 0") + tk.MustExec("select * from kvvars") + c.Assert(tk.Se.GetSessionVars().InTxn(), IsTrue) + txn, err = tk.Se.Txn(false) + c.Assert(err, IsNil) + vars = txn.GetVars() + c.Assert(vars.BackOffWeight, Equals, 50) } func (s *testSessionSuite2) TestCommitRetryCount(c *C) { diff --git a/store/tikv/txn.go b/store/tikv/txn.go index b148686fb8892..992aca1a5d23a 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -109,6 +109,10 @@ func (txn *tikvTxn) SetVars(vars *kv.Variables) { txn.snapshot.vars = vars } +func (txn *tikvTxn) GetVars() *kv.Variables { + return txn.vars +} + // tikvTxnStagingBuffer is the staging buffer returned to tikvTxn user. // Because tikvTxn needs to maintain dirty state when Flush staging data into txn. type tikvTxnStagingBuffer struct {