Skip to content

Commit

Permalink
executor: Add insert/replace ignore/on duplicate key support for loca…
Browse files Browse the repository at this point in the history
…l temporary table (pingcap#26636)
  • Loading branch information
lcwangchao authored Aug 4, 2021
1 parent 29f0037 commit 6a1e521
Show file tree
Hide file tree
Showing 10 changed files with 212 additions and 34 deletions.
4 changes: 2 additions & 2 deletions executor/batch_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,9 @@ func formatDataForDupError(data []types.Datum) (string, error) {

// getOldRow gets the table record row from storage for batch check.
// t could be a normal table or a partition, but it must not be a PartitionedTable.
func getOldRow(ctx context.Context, sctx sessionctx.Context, txn kv.Transaction, t table.Table, handle kv.Handle,
func getOldRow(ctx context.Context, sctx sessionctx.Context, kvGetter kv.Getter, t table.Table, handle kv.Handle,
genExprs []expression.Expression) ([]types.Datum, error) {
oldValue, err := txn.Get(ctx, tablecodec.EncodeRecordKey(t.RecordPrefix(), handle))
oldValue, err := kvGetter.Get(ctx, tablecodec.EncodeRecordKey(t.RecordPrefix(), handle))
if err != nil {
return nil, err
}
Expand Down
10 changes: 10 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8463,9 +8463,19 @@ func (s testSerialSuite) assertTemporaryTableNoNetwork(c *C, temporaryTableType
tk.MustQuery("select /*+ USE_INDEX(tmp_t, a) */ b from tmp_t where a = 1").Check(testkit.Rows("1"))
tk.MustExec("rollback")

// prepare some data for local temporary table, when for global temporary table, the below operations have no effect.
tk.MustExec("insert into tmp_t value(10, 10, 10)")
tk.MustExec("insert into tmp_t value(11, 11, 11)")

// Pessimistic lock
tk.MustExec("begin pessimistic")
tk.MustExec("insert into tmp_t values (3, 3, 3)")
tk.MustExec("insert ignore into tmp_t values (4, 4, 4)")
tk.MustExec("insert into tmp_t values (5, 5, 5) on duplicate key update a=100")
tk.MustExec("insert into tmp_t values (10, 10, 10) on duplicate key update a=100")
tk.MustExec("insert ignore into tmp_t values (10, 10, 10) on duplicate key update id=11")
tk.MustExec("replace into tmp_t values(6, 6, 6)")
tk.MustExec("replace into tmp_t values(11, 100, 100)")
tk.MustExec("update tmp_t set id = id + 1 where a = 1")
tk.MustExec("delete from tmp_t where a > 1")
tk.MustQuery("select count(*) from tmp_t where a >= 1 for update")
Expand Down
23 changes: 16 additions & 7 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"runtime/trace"
"time"

"github.com/pingcap/parser/model"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/expression"
Expand Down Expand Up @@ -166,7 +168,12 @@ func prefetchConflictedOldRows(ctx context.Context, txn kv.Transaction, rows []t
return err
}

func prefetchDataCache(ctx context.Context, txn kv.Transaction, rows []toBeCheckedRow) error {
func (e *InsertValues) prefetchDataCache(ctx context.Context, txn kv.Transaction, rows []toBeCheckedRow) error {
// Temporary table need not to do prefetch because its all data are stored in the memory.
if e.Table.Meta().TempTableType != model.TempTableNone {
return nil
}

if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("prefetchDataCache", opentracing.ChildOf(span.Context()))
defer span1.Finish()
Expand All @@ -180,8 +187,8 @@ func prefetchDataCache(ctx context.Context, txn kv.Transaction, rows []toBeCheck
}

// updateDupRow updates a duplicate row to a new row.
func (e *InsertExec) updateDupRow(ctx context.Context, idxInBatch int, txn kv.Transaction, row toBeCheckedRow, handle kv.Handle, onDuplicate []*expression.Assignment) error {
oldRow, err := getOldRow(ctx, e.ctx, txn, row.t, handle, e.GenExprs)
func (e *InsertExec) updateDupRow(ctx context.Context, idxInBatch int, kvGetter kv.Getter, row toBeCheckedRow, handle kv.Handle, onDuplicate []*expression.Assignment) error {
oldRow, err := getOldRow(ctx, e.ctx, kvGetter, row.t, handle, e.GenExprs)
if err != nil {
return err
}
Expand Down Expand Up @@ -222,20 +229,22 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
prefetchStart := time.Now()
// Use BatchGet to fill cache.
// It's an optimization and could be removed without affecting correctness.
if err = prefetchDataCache(ctx, txn, toBeCheckedRows); err != nil {
if err = e.prefetchDataCache(ctx, txn, toBeCheckedRows); err != nil {
return err
}
if e.stats != nil {
e.stats.Prefetch += time.Since(prefetchStart)
}

txnValueGetter := e.txnValueGetter(txn)
for i, r := range toBeCheckedRows {
if r.handleKey != nil {
handle, err := tablecodec.DecodeRowKey(r.handleKey.newKey)
if err != nil {
return err
}

err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate)
err = e.updateDupRow(ctx, i, txnValueGetter, r, handle, e.OnDuplicate)
if err == nil {
continue
}
Expand All @@ -245,7 +254,7 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
}

for _, uk := range r.uniqueKeys {
val, err := txn.Get(ctx, uk.newKey)
val, err := txnValueGetter.Get(ctx, uk.newKey)
if err != nil {
if kv.IsErrNotFound(err) {
continue
Expand All @@ -257,7 +266,7 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
return err
}

err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate)
err = e.updateDupRow(ctx, i, txnValueGetter, r, handle, e.OnDuplicate)
if err != nil {
if kv.IsErrNotFound(err) {
// Data index inconsistent? A unique key provide the handle information, but the
Expand Down
22 changes: 18 additions & 4 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -1051,21 +1051,26 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
}
prefetchStart := time.Now()
// Fill cache using BatchGet, the following Get requests don't need to visit TiKV.
if _, err = prefetchUniqueIndices(ctx, txn, toBeCheckedRows); err != nil {
return err
// Temporary table need not to do prefetch because its all data are stored in the memory.
if e.Table.Meta().TempTableType == model.TempTableNone {
if _, err = prefetchUniqueIndices(ctx, txn, toBeCheckedRows); err != nil {
return err
}
}

if e.stats != nil {
e.stats.Prefetch += time.Since(prefetchStart)
}

txnValueGetter := e.txnValueGetter(txn)
// append warnings and get no duplicated error rows
for i, r := range toBeCheckedRows {
if r.ignored {
continue
}
skip := false
if r.handleKey != nil {
_, err := txn.Get(ctx, r.handleKey.newKey)
_, err := txnValueGetter.Get(ctx, r.handleKey.newKey)
if err == nil {
e.ctx.GetSessionVars().StmtCtx.AppendWarning(r.handleKey.dupErr)
continue
Expand All @@ -1075,7 +1080,7 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
}
}
for _, uk := range r.uniqueKeys {
_, err := txn.Get(ctx, uk.newKey)
_, err := txnValueGetter.Get(ctx, uk.newKey)
if err == nil {
// If duplicate keys were found in BatchGet, mark row = nil.
e.ctx.GetSessionVars().StmtCtx.AppendWarning(uk.dupErr)
Expand Down Expand Up @@ -1104,6 +1109,15 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
return nil
}

func (e *InsertValues) txnValueGetter(txn kv.Transaction) kv.Getter {
tblInfo := e.Table.Meta()
if tblInfo.TempTableType == model.TempTableNone {
return txn
}

return e.ctx.GetSessionVars().TemporaryTableTxnReader(txn, tblInfo)
}

func (e *InsertValues) addRecord(ctx context.Context, row []types.Datum) error {
return e.addRecordWithAutoIDHint(ctx, row, 0)
}
Expand Down
2 changes: 1 addition & 1 deletion executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ func (e *PointGetExecutor) get(ctx context.Context, key kv.Key) ([]byte, error)

// Local temporary table always get snapshot value from session
if e.tblInfo.TempTableType == model.TempTableLocal {
return e.ctx.GetSessionVars().GetTemporaryTableSnapshotValue(ctx, key)
return e.ctx.GetSessionVars().TemporaryTableSnapshotReader(e.tblInfo).Get(ctx, key)
}

lock := e.tblInfo.Lock
Expand Down
20 changes: 11 additions & 9 deletions executor/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ func (e *ReplaceExec) Open(ctx context.Context) error {

// removeRow removes the duplicate row and cleanup its keys in the key-value map,
// but if the to-be-removed row equals to the to-be-added row, no remove or add things to do.
func (e *ReplaceExec) removeRow(ctx context.Context, txn kv.Transaction, handle kv.Handle, r toBeCheckedRow) (bool, error) {
func (e *ReplaceExec) removeRow(ctx context.Context, kvGetter kv.Getter, handle kv.Handle, r toBeCheckedRow) (bool, error) {
newRow := r.row
oldRow, err := getOldRow(ctx, e.ctx, txn, r.t, handle, e.GenExprs)
oldRow, err := getOldRow(ctx, e.ctx, kvGetter, r.t, handle, e.GenExprs)
if err != nil {
logutil.BgLogger().Error("get old row failed when replace",
zap.String("handle", handle.String()),
Expand Down Expand Up @@ -119,14 +119,15 @@ func (e *ReplaceExec) replaceRow(ctx context.Context, r toBeCheckedRow) error {
return err
}

txnValueGetter := e.txnValueGetter(txn)
if r.handleKey != nil {
handle, err := tablecodec.DecodeRowKey(r.handleKey.newKey)
if err != nil {
return err
}

if _, err := txn.Get(ctx, r.handleKey.newKey); err == nil {
rowUnchanged, err := e.removeRow(ctx, txn, handle, r)
if _, err := txnValueGetter.Get(ctx, r.handleKey.newKey); err == nil {
rowUnchanged, err := e.removeRow(ctx, txnValueGetter, handle, r)
if err != nil {
return err
}
Expand All @@ -142,7 +143,7 @@ func (e *ReplaceExec) replaceRow(ctx context.Context, r toBeCheckedRow) error {

// Keep on removing duplicated rows.
for {
rowUnchanged, foundDupKey, err := e.removeIndexRow(ctx, txn, r)
rowUnchanged, foundDupKey, err := e.removeIndexRow(ctx, txnValueGetter, r)
if err != nil {
return err
}
Expand All @@ -169,9 +170,9 @@ func (e *ReplaceExec) replaceRow(ctx context.Context, r toBeCheckedRow) error {
// 2. bool: true when found the duplicated key. This only means that duplicated key was found,
// and the row was removed.
// 3. error: the error.
func (e *ReplaceExec) removeIndexRow(ctx context.Context, txn kv.Transaction, r toBeCheckedRow) (bool, bool, error) {
func (e *ReplaceExec) removeIndexRow(ctx context.Context, kvGetter kv.Getter, r toBeCheckedRow) (bool, bool, error) {
for _, uk := range r.uniqueKeys {
val, err := txn.Get(ctx, uk.newKey)
val, err := kvGetter.Get(ctx, uk.newKey)
if err != nil {
if kv.IsErrNotFound(err) {
continue
Expand All @@ -182,7 +183,7 @@ func (e *ReplaceExec) removeIndexRow(ctx context.Context, txn kv.Transaction, r
if err != nil {
return false, true, err
}
rowUnchanged, err := e.removeRow(ctx, txn, handle, r)
rowUnchanged, err := e.removeRow(ctx, kvGetter, handle, r)
if err != nil {
return false, true, err
}
Expand Down Expand Up @@ -228,9 +229,10 @@ func (e *ReplaceExec) exec(ctx context.Context, newRows [][]types.Datum) error {
prefetchStart := time.Now()
// Use BatchGet to fill cache.
// It's an optimization and could be removed without affecting correctness.
if err = prefetchDataCache(ctx, txn, toBeCheckedRows); err != nil {
if err = e.prefetchDataCache(ctx, txn, toBeCheckedRows); err != nil {
return err
}

if e.stats != nil {
e.stats.Prefetch = time.Since(prefetchStart)
}
Expand Down
117 changes: 117 additions & 0 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4994,6 +4994,123 @@ func (s *testSessionSuite) TestLocalTemporaryTableInsert(c *C) {
tk.MustQuery("select * from tmp1 where id=5").Check(testkit.Rows())
}

func (s *testSessionSuite) TestLocalTemporaryTableInsertIgnore(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("set @@tidb_enable_noop_functions=1")
tk.MustExec("use test")
tk.MustExec("create temporary table tmp1 (id int primary key auto_increment, u int unique, v int)")
tk.MustExec("insert into tmp1 values(1, 11, 101)")
tk.MustExec("insert into tmp1 values(2, 12, 102)")

// test outside transaction
tk.MustExec("insert ignore into tmp1 values(1, 100, 1000)")
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1062 Duplicate entry '1' for key 'PRIMARY'"))
tk.MustQuery("select * from tmp1 where id=1").Check(testkit.Rows("1 11 101"))
tk.MustExec("insert ignore into tmp1 values(5, 15, 105)")
tk.MustQuery("show warnings").Check(testkit.Rows())
tk.MustQuery("select * from tmp1 where id=5").Check(testkit.Rows("5 15 105"))

// test in transaction and rollback
tk.MustExec("begin")
tk.MustExec("insert ignore into tmp1 values(1, 100, 1000)")
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1062 Duplicate entry '1' for key 'PRIMARY'"))
tk.MustQuery("select * from tmp1 where id=1").Check(testkit.Rows("1 11 101"))
tk.MustExec("insert ignore into tmp1 values(3, 13, 103)")
tk.MustQuery("show warnings").Check(testkit.Rows())
tk.MustQuery("select * from tmp1 where id=3").Check(testkit.Rows("3 13 103"))
tk.MustExec("insert ignore into tmp1 values(3, 100, 1000)")
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1062 Duplicate entry '3' for key 'PRIMARY'"))
tk.MustQuery("select * from tmp1 where id=3").Check(testkit.Rows("3 13 103"))
tk.MustExec("rollback")
tk.MustQuery("select * from tmp1").Check(testkit.Rows("1 11 101", "2 12 102", "5 15 105"))

// test commit
tk.MustExec("begin")
tk.MustExec("insert ignore into tmp1 values(1, 100, 1000)")
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1062 Duplicate entry '1' for key 'PRIMARY'"))
tk.MustExec("insert ignore into tmp1 values(3, 13, 103)")
tk.MustQuery("show warnings").Check(testkit.Rows())
tk.MustExec("insert ignore into tmp1 values(3, 100, 1000)")
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1062 Duplicate entry '3' for key 'PRIMARY'"))
tk.MustExec("commit")
tk.MustQuery("select * from tmp1").Check(testkit.Rows("1 11 101", "2 12 102", "3 13 103", "5 15 105"))
}

func (s *testSessionSuite) TestLocalTemporaryTableInsertOnDuplicateKeyUpdate(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("set @@tidb_enable_noop_functions=1")
tk.MustExec("use test")
tk.MustExec("create temporary table tmp1 (id int primary key auto_increment, u int unique, v int)")
tk.MustExec("insert into tmp1 values(1, 11, 101)")
tk.MustExec("insert into tmp1 values(2, 12, 102)")

// test outside transaction
tk.MustExec("insert ignore into tmp1 values(1, 100, 1000) on duplicate key update u=12")
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1062 Duplicate entry '12' for key 'u'"))
tk.MustQuery("select * from tmp1 where id=1").Check(testkit.Rows("1 11 101"))
tk.MustExec("insert into tmp1 values(2, 100, 1000) on duplicate key update v=202")
tk.MustQuery("show warnings").Check(testkit.Rows())
tk.MustQuery("select * from tmp1 where id=2").Check(testkit.Rows("2 12 202"))
tk.MustExec("insert into tmp1 values(3, 13, 103) on duplicate key update v=203")
tk.MustQuery("show warnings").Check(testkit.Rows())
tk.MustQuery("select * from tmp1 where id=3").Check(testkit.Rows("3 13 103"))

// test in transaction and rollback
tk.MustExec("begin")
tk.MustExec("insert ignore into tmp1 values(1, 100, 1000) on duplicate key update u=12")
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1062 Duplicate entry '12' for key 'u'"))
tk.MustQuery("select * from tmp1 where id=1").Check(testkit.Rows("1 11 101"))
tk.MustExec("insert into tmp1 values(2, 100, 1000) on duplicate key update v=302")
tk.MustQuery("show warnings").Check(testkit.Rows())
tk.MustQuery("select * from tmp1 where id=2").Check(testkit.Rows("2 12 302"))
tk.MustExec("insert into tmp1 values(4, 14, 104) on duplicate key update v=204")
tk.MustQuery("show warnings").Check(testkit.Rows())
tk.MustQuery("select * from tmp1 where id=4").Check(testkit.Rows("4 14 104"))
tk.MustExec("rollback")
tk.MustQuery("select * from tmp1").Check(testkit.Rows("1 11 101", "2 12 202", "3 13 103"))

// test commit
tk.MustExec("begin")
tk.MustExec("insert ignore into tmp1 values(1, 100, 1000) on duplicate key update u=12")
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1062 Duplicate entry '12' for key 'u'"))
tk.MustExec("insert into tmp1 values(2, 100, 1000) on duplicate key update v=302")
tk.MustExec("insert into tmp1 values(4, 14, 104) on duplicate key update v=204")
tk.MustExec("commit")
tk.MustQuery("select * from tmp1").Check(testkit.Rows("1 11 101", "2 12 302", "3 13 103", "4 14 104"))
}

func (s *testSessionSuite) TestLocalTemporaryTableReplace(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("set @@tidb_enable_noop_functions=1")
tk.MustExec("use test")
tk.MustExec("create temporary table tmp1 (id int primary key auto_increment, u int unique, v int)")
tk.MustExec("insert into tmp1 values(1, 11, 101)")
tk.MustExec("insert into tmp1 values(2, 12, 102)")
tk.MustExec("insert into tmp1 values(3, 13, 103)")

// out of transaction
tk.MustExec("replace into tmp1 values(1, 12, 1000)")
tk.MustQuery("select * from tmp1").Check(testkit.Rows("1 12 1000", "3 13 103"))
tk.MustExec("replace into tmp1 values(4, 14, 104)")
tk.MustQuery("select * from tmp1 where id=4").Check(testkit.Rows("4 14 104"))

// in transaction and rollback
tk.MustExec("begin")
tk.MustExec("replace into tmp1 values(1, 13, 999)")
tk.MustQuery("select * from tmp1").Check(testkit.Rows("1 13 999", "4 14 104"))
tk.MustExec("replace into tmp1 values(5, 15, 105)")
tk.MustQuery("select * from tmp1 where id=5").Check(testkit.Rows("5 15 105"))
tk.MustExec("rollback")
tk.MustQuery("select * from tmp1").Check(testkit.Rows("1 12 1000", "3 13 103", "4 14 104"))

// out of transaction
tk.MustExec("begin")
tk.MustExec("replace into tmp1 values(1, 13, 999)")
tk.MustExec("replace into tmp1 values(5, 15, 105)")
tk.MustExec("commit")
tk.MustQuery("select * from tmp1").Check(testkit.Rows("1 13 999", "4 14 104", "5 15 105"))
}

func (s *testSessionSuite) TestLocalTemporaryTableDelete(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("set @@tidb_enable_noop_functions=1")
Expand Down
Loading

0 comments on commit 6a1e521

Please sign in to comment.