Skip to content

Commit

Permalink
executor: add missing update table delta for TxnCtx (pingcap#20982)
Browse files Browse the repository at this point in the history
  • Loading branch information
blacktear23 authored Nov 19, 2020
1 parent f55aa04 commit e6e894d
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 1 deletion.
9 changes: 8 additions & 1 deletion executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ func (e *BatchPointGetExec) Next(ctx context.Context, req *chunk.Chunk) error {
if err := e.initialize(ctx); err != nil {
return err
}
if e.lock {
e.updateDeltaForTableID(e.tblInfo.ID)
}
}

if e.index >= len(e.values) {
Expand Down Expand Up @@ -232,7 +235,11 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
}
e.handles = append(e.handles, handle)
if e.tblInfo.Partition != nil {
e.physIDs = append(e.physIDs, tablecodec.DecodeTableID(key))
pid := tablecodec.DecodeTableID(key)
e.physIDs = append(e.physIDs, pid)
if e.lock {
e.updateDeltaForTableID(pid)
}
}
}

Expand Down
18 changes: 18 additions & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,12 @@ func (e *baseExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
return nil
}

func (e *baseExecutor) updateDeltaForTableID(id int64) {
txnCtx := e.ctx.GetSessionVars().TxnCtx
udpp := e.ctx.GetSessionVars().UseDynamicPartitionPrune()
txnCtx.UpdateDeltaForTable(id, id, 0, 0, map[int64]int64{}, udpp)
}

func newBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id int, children ...Executor) baseExecutor {
e := baseExecutor{
children: children,
Expand Down Expand Up @@ -943,6 +949,18 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
lockWaitTime = int64(e.Lock.WaitSec) * 1000
}

if len(e.tblID2Handle) > 0 {
for id := range e.tblID2Handle {
e.updateDeltaForTableID(id)
}
}
if len(e.partitionedTable) > 0 {
for _, p := range e.partitionedTable {
pid := p.Meta().ID
e.updateDeltaForTableID(pid)
}
}

return doLockKeys(ctx, e.ctx, newLockCtx(e.ctx.GetSessionVars(), lockWaitTime), e.keys...)
}

Expand Down
150 changes: 150 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6865,3 +6865,153 @@ func (s *testSuite) TestIssue19667(c *C) {
tk.MustExec("INSERT INTO t VALUES('1988-04-17 01:59:59')")
tk.MustQuery(`SELECT DATE_ADD(a, INTERVAL 1 SECOND) FROM t`).Check(testkit.Rows("1988-04-17 02:00:00"))
}

func issue20975Prepare(c *C, store kv.Storage) (*testkit.TestKit, *testkit.TestKit) {
tk1 := testkit.NewTestKit(c, store)
tk2 := testkit.NewTestKit(c, store)
tk1.MustExec("use test")
tk1.MustExec("drop table if exists t1, t2")
tk2.MustExec("use test")
tk1.MustExec("create table t1(id int primary key, c int)")
tk1.MustExec("insert into t1 values(1, 10), (2, 20)")
return tk1, tk2
}

func (s *testSuite) TestIssue20975UpdateNoChange(c *C) {
tk1, tk2 := issue20975Prepare(c, s.store)
tk1.MustExec("begin pessimistic")
tk1.MustExec("update t1 set c=c")
tk2.MustExec("create table t2(a int)")
tk1.MustExec("commit")
}

func (s *testSuite) TestIssue20975SelectForUpdate(c *C) {
tk1, tk2 := issue20975Prepare(c, s.store)
tk1.MustExec("begin")
tk1.MustExec("select * from t1 for update")
tk2.MustExec("create table t2(a int)")
tk1.MustExec("commit")

tk1.MustExec("begin pessimistic")
tk1.MustExec("select * from t1 for update")
tk2.MustExec("drop table t2")
tk1.MustExec("commit")
}

func (s *testSuite) TestIssue20975SelectForUpdatePointGet(c *C) {
tk1, tk2 := issue20975Prepare(c, s.store)
tk1.MustExec("begin")
tk1.MustExec("select * from t1 where id=1 for update")
tk2.MustExec("create table t2(a int)")
tk1.MustExec("commit")

tk1.MustExec("begin pessimistic")
tk1.MustExec("select * from t1 where id=1 for update")
tk2.MustExec("drop table t2")
tk1.MustExec("commit")
}

func (s *testSuite) TestIssue20975SelectForUpdateBatchPointGet(c *C) {
tk1, tk2 := issue20975Prepare(c, s.store)
tk1.MustExec("begin")
tk1.MustExec("select * from t1 where id in (1, 2) for update")
tk2.MustExec("create table t2(a int)")
tk1.MustExec("commit")

tk1.MustExec("begin pessimistic")
tk1.MustExec("select * from t1 where id in (1, 2) for update")
tk2.MustExec("drop table t2")
tk1.MustExec("commit")
}

func issue20975PreparePartitionTable(c *C, store kv.Storage) (*testkit.TestKit, *testkit.TestKit) {
tk1 := testkit.NewTestKit(c, store)
tk2 := testkit.NewTestKit(c, store)
tk1.MustExec("use test")
tk1.MustExec("drop table if exists t1, t2")
tk2.MustExec("use test")
tk1.MustExec(`create table t1(id int primary key, c int) partition by range (id) (
partition p1 values less than (10),
partition p2 values less than (20)
)`)
tk1.MustExec("insert into t1 values(1, 10), (2, 20), (11, 30), (12, 40)")
return tk1, tk2
}

func (s *testSuite) TestIssue20975UpdateNoChangeWithPartitionTable(c *C) {
tk1, tk2 := issue20975PreparePartitionTable(c, s.store)
tk1.MustExec("begin pessimistic")
tk1.MustExec("update t1 set c=c")
tk2.MustExec("create table t2(a int)")
tk1.MustExec("commit")
}

func (s *testSuite) TestIssue20975SelectForUpdateWithPartitionTable(c *C) {
tk1, tk2 := issue20975PreparePartitionTable(c, s.store)
tk1.MustExec("begin")
tk1.MustExec("select * from t1 for update")
tk2.MustExec("create table t2(a int)")
tk1.MustExec("commit")

tk1.MustExec("begin pessimistic")
tk1.MustExec("select * from t1 for update")
tk2.MustExec("drop table t2")
tk1.MustExec("commit")
}

func (s *testSuite) TestIssue20975SelectForUpdatePointGetWithPartitionTable(c *C) {
tk1, tk2 := issue20975PreparePartitionTable(c, s.store)
tk1.MustExec("begin")
tk1.MustExec("select * from t1 where id=1 for update")
tk2.MustExec("create table t2(a int)")
tk1.MustExec("commit")

tk1.MustExec("begin")
tk1.MustExec("select * from t1 where id=12 for update")
tk2.MustExec("drop table t2")
tk1.MustExec("commit")

tk1.MustExec("begin pessimistic")
tk1.MustExec("select * from t1 where id=1 for update")
tk2.MustExec("create table t2(a int)")
tk1.MustExec("commit")

tk1.MustExec("begin pessimistic")
tk1.MustExec("select * from t1 where id=12 for update")
tk2.MustExec("drop table t2")
tk1.MustExec("commit")
}

func (s *testSuite) TestIssue20975SelectForUpdateBatchPointGetWithPartitionTable(c *C) {
tk1, tk2 := issue20975PreparePartitionTable(c, s.store)
tk1.MustExec("begin")
tk1.MustExec("select * from t1 where id in (1, 2) for update")
tk2.MustExec("create table t2(a int)")
tk1.MustExec("commit")

tk1.MustExec("begin")
tk1.MustExec("select * from t1 where id in (11, 12) for update")
tk2.MustExec("drop table t2")
tk1.MustExec("commit")

tk1.MustExec("begin")
tk1.MustExec("select * from t1 where id in (1, 11) for update")
tk2.MustExec("create table t2(a int)")
tk1.MustExec("commit")

tk1.MustExec("begin pessimistic")
tk1.MustExec("select * from t1 where id in (1, 2) for update")
tk2.MustExec("drop table t2")
tk1.MustExec("commit")

tk1.MustExec("begin pessimistic")
tk1.MustExec("select * from t1 where id in (11, 12) for update")
tk2.MustExec("create table t2(a int)")
tk1.MustExec("commit")

tk1.MustExec("begin pessimistic")
tk1.MustExec("select * from t1 where id in (1, 11) for update")
tk2.MustExec("drop table t2")
tk1.MustExec("commit")

}
3 changes: 3 additions & 0 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
} else {
tblID = e.tblInfo.ID
}
if e.lock {
e.updateDeltaForTableID(tblID)
}
if e.idxInfo != nil {
if isCommonHandleRead(e.tblInfo, e.idxInfo) {
handleBytes, err := EncodeUniqueIndexValuesForKey(e.ctx, e.tblInfo, e.idxInfo, e.idxVals)
Expand Down
9 changes: 9 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ type TransactionContext struct {
LockExpire uint32
ForUpdate uint32

// TableDeltaMap lock to prevent potential data race
tdmLock sync.Mutex

// TxnScope stores the value of 'txn_scope'.
TxnScope string
}
Expand Down Expand Up @@ -221,6 +224,8 @@ func (tc *TransactionContext) CollectUnchangedRowKeys(buf []kv.Key) []kv.Key {

// UpdateDeltaForTable updates the delta info for some table.
func (tc *TransactionContext) UpdateDeltaForTable(logicalTableID, physicalTableID int64, delta int64, count int64, colSize map[int64]int64, saveAsLogicalTblID bool) {
tc.tdmLock.Lock()
defer tc.tdmLock.Unlock()
if tc.TableDeltaMap == nil {
tc.TableDeltaMap = make(map[int64]TableDelta)
}
Expand Down Expand Up @@ -265,13 +270,17 @@ func (tc *TransactionContext) Cleanup() {
// tc.InfoSchema = nil; we cannot do it now, because some operation like handleFieldList depend on this.
tc.Binlog = nil
tc.History = nil
tc.tdmLock.Lock()
tc.TableDeltaMap = nil
tc.tdmLock.Unlock()
tc.pessimisticLockCache = nil
}

// ClearDelta clears the delta map.
func (tc *TransactionContext) ClearDelta() {
tc.tdmLock.Lock()
tc.TableDeltaMap = nil
tc.tdmLock.Unlock()
}

// GetForUpdateTS returns the ts for update.
Expand Down

0 comments on commit e6e894d

Please sign in to comment.