Skip to content

Commit

Permalink
planner: refactor handle columns (pingcap#18391)
Browse files Browse the repository at this point in the history
  • Loading branch information
coocood authored Jul 7, 2020
1 parent ef355f8 commit 658132f
Show file tree
Hide file tree
Showing 26 changed files with 429 additions and 228 deletions.
2 changes: 1 addition & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,7 @@ func (b *executorBuilder) buildUnionScanExec(v *plannercore.PhysicalUnionScan) E
func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannercore.PhysicalUnionScan) Executor {
us := &UnionScanExec{baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), reader)}
// Get the handle column index of the below Plan.
us.belowHandleIndex = v.HandleCol.Index
us.belowHandleCols = v.HandleCols
us.mutableRow = chunk.MutRowFromTypes(retTypes(us))

// If the push-downed condition contains virtual column, we may build a selection upon reader
Expand Down
39 changes: 13 additions & 26 deletions executor/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package executor
import (
"context"

"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
plannercore "github.com/pingcap/tidb/planner/core"
Expand Down Expand Up @@ -49,22 +50,16 @@ func (e *DeleteExec) Next(ctx context.Context, req *chunk.Chunk) error {
return e.deleteSingleTableByChunk(ctx)
}

func (e *DeleteExec) deleteOneRow(tbl table.Table, handleIndex []int, isExtraHandle bool, row []types.Datum) error {
func (e *DeleteExec) deleteOneRow(tbl table.Table, handleCols plannercore.HandleCols, isExtraHandle bool, row []types.Datum) error {
end := len(row)
if isExtraHandle {
end--
}
var handle kv.Handle
if !tbl.Meta().IsCommonHandle {
handle = kv.IntHandle(row[handleIndex[0]].GetInt64())
} else {
var err error
handle, err = kv.BuildHandleFromDatumRow(e.ctx.GetSessionVars().StmtCtx, row, handleIndex)
if err != nil {
return err
}
handle, err := handleCols.BuildHandleByDatums(row)
if err != nil {
return err
}
err := e.removeRow(e.ctx, tbl, handle, row[:end])
err = e.removeRow(e.ctx, tbl, handle, row[:end])
if err != nil {
return err
}
Expand All @@ -75,16 +70,14 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error {
var (
tbl table.Table
isExtrahandle bool
handleIndex []int
handleCols plannercore.HandleCols
rowCount int
)
for _, info := range e.tblColPosInfos {
tbl = e.tblID2Table[info.TblID]
handleCols = info.HandleCols
if !tbl.Meta().IsCommonHandle {
handleIndex = []int{info.HandleOrdinal[0]}
isExtrahandle = handleIsExtra(e.children[0].Schema().Columns[info.HandleOrdinal[0]])
} else {
handleIndex = info.HandleOrdinal
isExtrahandle = handleCols.IsInt() && handleCols.GetCol(0).ID == model.ExtraHandleID
}
}

Expand Down Expand Up @@ -119,7 +112,7 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error {
}

datumRow := chunkRow.GetDatumRow(fields)
err = e.deleteOneRow(tbl, handleIndex, isExtrahandle, datumRow)
err = e.deleteOneRow(tbl, handleCols, isExtrahandle, datumRow)
if err != nil {
return err
}
Expand All @@ -137,15 +130,9 @@ func (e *DeleteExec) composeTblRowMap(tblRowMap tableRowMapType, colPosInfos []p
if tblRowMap[info.TblID] == nil {
tblRowMap[info.TblID] = kv.NewHandleMap()
}
var handle kv.Handle
if !info.IsCommonHandle {
handle = kv.IntHandle(joinedRow[info.HandleOrdinal[0]].GetInt64())
} else {
var err error
handle, err = kv.BuildHandleFromDatumRow(e.ctx.GetSessionVars().StmtCtx, joinedRow, info.HandleOrdinal)
if err != nil {
return err
}
handle, err := info.HandleCols.BuildHandleByDatums(joinedRow)
if err != nil {
return err
}
// tblRowMap[info.TblID][handle] hold the row datas binding to this table and this handle.
tblRowMap[info.TblID].Set(handle, joinedRow[info.Start:info.End])
Expand Down
8 changes: 6 additions & 2 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,7 +870,7 @@ type SelectLockExec struct {
Lock ast.SelectLockType
keys []kv.Key

tblID2Handle map[int64][]*expression.Column
tblID2Handle map[int64][]plannercore.HandleCols
partitionedTable []table.PartitionedTable

// tblID2Table is cached to reduce cost.
Expand Down Expand Up @@ -924,7 +924,11 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
}

for _, col := range cols {
e.keys = append(e.keys, tablecodec.EncodeRowKeyWithHandle(physicalID, kv.IntHandle(row.GetInt64(col.Index))))
handle, err := col.BuildHandle(row)
if err != nil {
return err
}
e.keys = append(e.keys, tablecodec.EncodeRowKeyWithHandle(physicalID, handle))
}
}
}
Expand Down
43 changes: 22 additions & 21 deletions executor/mem_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/table"
Expand All @@ -41,8 +42,8 @@ type memIndexReader struct {
addedRowsLen int
retFieldTypes []*types.FieldType
outputOffset []int
// belowHandleIndex is the handle's position of the below scan plan.
belowHandleIndex int
// belowHandleCols is the handle's position of the below scan plan.
belowHandleCols plannercore.HandleCols
}

func buildMemIndexReader(us *UnionScanExec, idxReader *IndexReaderExecutor) *memIndexReader {
Expand All @@ -52,16 +53,16 @@ func buildMemIndexReader(us *UnionScanExec, idxReader *IndexReaderExecutor) *mem
outputOffset = append(outputOffset, col.Index)
}
return &memIndexReader{
ctx: us.ctx,
index: idxReader.index,
table: idxReader.table.Meta(),
kvRanges: kvRanges,
desc: us.desc,
conditions: us.conditions,
addedRows: make([][]types.Datum, 0, us.dirty.addedRows.Len()),
retFieldTypes: retTypes(us),
outputOffset: outputOffset,
belowHandleIndex: us.belowHandleIndex,
ctx: us.ctx,
index: idxReader.index,
table: idxReader.table.Meta(),
kvRanges: kvRanges,
desc: us.desc,
conditions: us.conditions,
addedRows: make([][]types.Datum, 0, us.dirty.addedRows.Len()),
retFieldTypes: retTypes(us),
outputOffset: outputOffset,
belowHandleCols: us.belowHandleCols,
}
}

Expand Down Expand Up @@ -369,15 +370,15 @@ func buildMemIndexLookUpReader(us *UnionScanExec, idxLookUpReader *IndexLookUpEx
kvRanges := idxLookUpReader.kvRanges
outputOffset := []int{len(idxLookUpReader.index.Columns)}
memIdxReader := &memIndexReader{
ctx: us.ctx,
index: idxLookUpReader.index,
table: idxLookUpReader.table.Meta(),
kvRanges: kvRanges,
desc: idxLookUpReader.desc,
addedRowsLen: us.dirty.addedRows.Len(),
retFieldTypes: retTypes(us),
outputOffset: outputOffset,
belowHandleIndex: us.belowHandleIndex,
ctx: us.ctx,
index: idxLookUpReader.index,
table: idxLookUpReader.table.Meta(),
kvRanges: kvRanges,
desc: idxLookUpReader.desc,
addedRowsLen: us.dirty.addedRows.Len(),
retFieldTypes: retTypes(us),
outputOffset: outputOffset,
belowHandleCols: us.belowHandleCols,
}

return &memIndexLookUpReader{
Expand Down
23 changes: 9 additions & 14 deletions executor/union_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -103,8 +104,8 @@ type UnionScanExec struct {
conditionsWithVirCol []expression.Expression
columns []*model.ColumnInfo
table table.Table
// belowHandleIndex is the handle's position of the below scan plan.
belowHandleIndex int
// belowHandleCols is the handle's position of the below scan plan.
belowHandleCols plannercore.HandleCols

addedRows [][]types.Datum
cursor4AddRows int
Expand Down Expand Up @@ -244,7 +245,11 @@ func (us *UnionScanExec) getSnapshotRow(ctx context.Context) ([]types.Datum, err
}
iter := chunk.NewIterator4Chunk(us.snapshotChunkBuffer)
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
snapshotHandle := kv.IntHandle(row.GetInt64(us.belowHandleIndex))
var snapshotHandle kv.Handle
snapshotHandle, err = us.belowHandleCols.BuildHandle(row)
if err != nil {
return nil, err
}
if _, ok := us.dirty.deletedRows.Get(snapshotHandle); ok {
continue
}
Expand Down Expand Up @@ -301,15 +306,5 @@ func (us *UnionScanExec) compare(a, b []types.Datum) (int, error) {
return cmp, nil
}
}
aHandle := a[us.belowHandleIndex].GetInt64()
bHandle := b[us.belowHandleIndex].GetInt64()
var cmp int
if aHandle == bHandle {
cmp = 0
} else if aHandle > bHandle {
cmp = 1
} else {
cmp = -1
}
return cmp, nil
return us.belowHandleCols.Compare(a, b)
}
24 changes: 3 additions & 21 deletions executor/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/memory"
)

Expand Down Expand Up @@ -64,26 +63,9 @@ func (e *UpdateExec) exec(ctx context.Context, schema *expression.Schema, row, n
e.updatedRowKeys[content.TblID] = kv.NewHandleMap()
}
var handle kv.Handle
if !content.IsCommonHandle {
handleDatum := row[content.HandleOrdinal[0]]
if e.canNotUpdate(handleDatum) {
continue
}
handle = kv.IntHandle(row[content.HandleOrdinal[0]].GetInt64())
} else {
// TODO: Redesign update join for cluster index table.
pkDts := make([]types.Datum, 0, len(content.HandleOrdinal))
for _, ordinal := range content.HandleOrdinal {
pkDts = append(pkDts, row[ordinal])
}
handleBytes, err := codec.EncodeKey(e.ctx.GetSessionVars().StmtCtx, nil, pkDts...)
if err != nil {
return err
}
handle, err = kv.NewCommonHandle(handleBytes)
if err != nil {
return err
}
handle, err = content.HandleCols.BuildHandleByDatums(row)
if err != nil {
return err
}

oldData := row[content.Start:content.End]
Expand Down
12 changes: 12 additions & 0 deletions executor/update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,12 @@ func (s *testSuite11) TestUpdateClusterIndex(c *C) {
tk.MustExec("insert into t values('a', 'b');")
tk.MustExec("update t set a='c' where t.a='a' and b='b';")
tk.MustQuery("select * from t").Check(testkit.Rows("c b"))

tk.MustExec("drop table if exists s")
tk.MustExec("create table s (a int, b int, c int, primary key (a, b))")
tk.MustExec("insert s values (3, 3, 3), (5, 5, 5)")
tk.MustExec("update s set c = 10 where a = 3")
tk.MustQuery("select * from s").Check(testkit.Rows("3 3 10", "5 5 5"))
}

func (s *testSuite11) TestDeleteClusterIndex(c *C) {
Expand Down Expand Up @@ -314,6 +320,12 @@ func (s *testSuite11) TestDeleteClusterIndex(c *C) {
tk.MustExec(`delete from dt3pku where id = 'a'`)
tk.MustQuery(`select * from dt3pku`).Check(testkit.Rows())
tk.MustExec(`insert into dt3pku(id, uk, v) values('a', 1, 2)`)

tk.MustExec("drop table if exists s1")
tk.MustExec("create table s1 (a int, b int, c int, primary key (a, b))")
tk.MustExec("insert s1 values (3, 3, 3), (5, 5, 5)")
tk.MustExec("delete from s1 where a = 3")
tk.MustQuery("select * from s1").Check(testkit.Rows("5 5 5"))
}

func (s *testSuite11) TestReplaceClusterIndex(c *C) {
Expand Down
2 changes: 1 addition & 1 deletion planner/cascades/implementation_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ type ImplTableScan struct {
// Match implements ImplementationRule Match interface.
func (r *ImplTableScan) Match(expr *memo.GroupExpr, prop *property.PhysicalProperty) (matched bool) {
ts := expr.ExprNode.(*plannercore.LogicalTableScan)
return prop.IsEmpty() || (len(prop.Items) == 1 && ts.Handle != nil && prop.Items[0].Col.Equal(nil, ts.Handle))
return prop.IsEmpty() || (len(prop.Items) == 1 && ts.HandleCols != nil && prop.Items[0].Col.Equal(nil, ts.HandleCols.GetCol(0)))
}

// OnImplement implements ImplementationRule OnImplement interface.
Expand Down
20 changes: 10 additions & 10 deletions planner/cascades/testdata/integration_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,8 @@
{
"SQL": "select /*+ HASH_AGG() */ avg(distinct a) from t;",
"Plan": [
"HashAgg_16 1.00 root funcs:avg(distinct Column#8)->Column#5",
"└─Projection_17 8000.00 root cast(test.t.a, decimal(65,4) BINARY)->Column#8",
"HashAgg_16 1.00 root funcs:avg(distinct Column#7)->Column#5",
"└─Projection_17 8000.00 root cast(test.t.a, decimal(65,4) BINARY)->Column#7",
" └─TableReader_18 8000.00 root data:HashAgg_19",
" └─HashAgg_19 8000.00 cop[tikv] group by:test.t.a, ",
" └─TableFullScan_15 10000.00 cop[tikv] table:t keep order:false, stats:pseudo"
Expand All @@ -501,9 +501,9 @@
"SQL": "select /*+ HASH_AGG() */ a, count(distinct a) from t;",
"Plan": [
"Projection_8 1.00 root test.t.a, Column#5",
"└─HashAgg_12 1.00 root funcs:count(distinct test.t.a)->Column#5, funcs:firstrow(Column#7)->test.t.a",
"└─HashAgg_12 1.00 root funcs:count(distinct test.t.a)->Column#5, funcs:firstrow(Column#6)->test.t.a",
" └─TableReader_13 8000.00 root data:HashAgg_14",
" └─HashAgg_14 8000.00 cop[tikv] group by:test.t.a, funcs:firstrow(test.t.a)->Column#7",
" └─HashAgg_14 8000.00 cop[tikv] group by:test.t.a, funcs:firstrow(test.t.a)->Column#6",
" └─TableFullScan_11 10000.00 cop[tikv] table:t keep order:false, stats:pseudo"
],
"Result": [
Expand All @@ -514,9 +514,9 @@
"SQL": "select /*+ HASH_AGG() */ avg(b), c, avg(b), count(distinct A, B), count(distinct A), count(distinct c), sum(b) from t group by c;",
"Plan": [
"Projection_10 8000.00 root Column#5, test.t.c, Column#5, Column#6, Column#7, Column#8, Column#9",
"└─HashAgg_15 8000.00 root group by:test.t.c, funcs:avg(Column#11, Column#12)->Column#5, funcs:count(distinct test.t.a, test.t.b)->Column#6, funcs:count(distinct test.t.a)->Column#7, funcs:count(distinct test.t.c)->Column#8, funcs:sum(Column#13)->Column#9, funcs:firstrow(test.t.c)->test.t.c",
"└─HashAgg_15 8000.00 root group by:test.t.c, funcs:avg(Column#10, Column#11)->Column#5, funcs:count(distinct test.t.a, test.t.b)->Column#6, funcs:count(distinct test.t.a)->Column#7, funcs:count(distinct test.t.c)->Column#8, funcs:sum(Column#12)->Column#9, funcs:firstrow(test.t.c)->test.t.c",
" └─TableReader_16 8000.00 root data:HashAgg_17",
" └─HashAgg_17 8000.00 cop[tikv] group by:test.t.a, test.t.b, test.t.c, funcs:count(test.t.b)->Column#11, funcs:sum(test.t.b)->Column#12, funcs:sum(test.t.b)->Column#13",
" └─HashAgg_17 8000.00 cop[tikv] group by:test.t.a, test.t.b, test.t.c, funcs:count(test.t.b)->Column#10, funcs:sum(test.t.b)->Column#11, funcs:sum(test.t.b)->Column#12",
" └─TableFullScan_14 10000.00 cop[tikv] table:t keep order:false, stats:pseudo"
],
"Result": [
Expand Down Expand Up @@ -597,8 +597,8 @@
{
"SQL": "select /*+ HASH_AGG(), AGG_TO_COP() */ avg(distinct a) from t;",
"Plan": [
"HashAgg_8 1.00 root funcs:avg(distinct Column#7)->Column#5",
"└─Projection_9 10000.00 root cast(test.t.a, decimal(65,4) BINARY)->Column#7",
"HashAgg_8 1.00 root funcs:avg(distinct Column#6)->Column#5",
"└─Projection_9 10000.00 root cast(test.t.a, decimal(65,4) BINARY)->Column#6",
" └─TableReader_10 10000.00 root data:TableFullScan_11",
" └─TableFullScan_11 10000.00 cop[tikv] table:t keep order:false, stats:pseudo"
],
Expand All @@ -622,8 +622,8 @@
"SQL": "select /*+ HASH_AGG(), AGG_TO_COP() */ avg(b), c, avg(b), count(distinct A, B), count(distinct A), count(distinct c), sum(b) from t group by c;",
"Plan": [
"Projection_8 8000.00 root Column#5, test.t.c, Column#5, Column#6, Column#7, Column#8, Column#9",
"└─HashAgg_9 8000.00 root group by:test.t.c, funcs:avg(Column#11)->Column#5, funcs:count(distinct test.t.a, test.t.b)->Column#6, funcs:count(distinct test.t.a)->Column#7, funcs:count(distinct test.t.c)->Column#8, funcs:sum(Column#12)->Column#9, funcs:firstrow(test.t.c)->test.t.c",
" └─Projection_10 10000.00 root cast(test.t.b, decimal(65,4) BINARY)->Column#11, test.t.a, test.t.b, test.t.a, test.t.c, cast(test.t.b, decimal(65,0) BINARY)->Column#12, test.t.c, test.t.c",
"└─HashAgg_9 8000.00 root group by:test.t.c, funcs:avg(Column#10)->Column#5, funcs:count(distinct test.t.a, test.t.b)->Column#6, funcs:count(distinct test.t.a)->Column#7, funcs:count(distinct test.t.c)->Column#8, funcs:sum(Column#11)->Column#9, funcs:firstrow(test.t.c)->test.t.c",
" └─Projection_10 10000.00 root cast(test.t.b, decimal(65,4) BINARY)->Column#10, test.t.a, test.t.b, test.t.a, test.t.c, cast(test.t.b, decimal(65,0) BINARY)->Column#11, test.t.c, test.t.c",
" └─TableReader_11 10000.00 root data:TableFullScan_12",
" └─TableFullScan_12 10000.00 cop[tikv] table:t keep order:false, stats:pseudo"
],
Expand Down
6 changes: 3 additions & 3 deletions planner/cascades/transformation_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,16 +187,16 @@ func NewRulePushSelDownTableScan() Transformation {
func (r *PushSelDownTableScan) OnTransform(old *memo.ExprIter) (newExprs []*memo.GroupExpr, eraseOld bool, eraseAll bool, err error) {
sel := old.GetExpr().ExprNode.(*plannercore.LogicalSelection)
ts := old.Children[0].GetExpr().ExprNode.(*plannercore.LogicalTableScan)
if ts.Handle == nil {
if ts.HandleCols == nil {
return nil, false, false, nil
}
accesses, remained := ranger.DetachCondsForColumn(ts.SCtx(), sel.Conditions, ts.Handle)
accesses, remained := ranger.DetachCondsForColumn(ts.SCtx(), sel.Conditions, ts.HandleCols.GetCol(0))
if accesses == nil {
return nil, false, false, nil
}
newTblScan := plannercore.LogicalTableScan{
Source: ts.Source,
Handle: ts.Handle,
HandleCols: ts.HandleCols,
AccessConds: ts.AccessConds.Shallow(),
}.Init(ts.SCtx(), ts.SelectBlockOffset())
newTblScan.AccessConds = append(newTblScan.AccessConds, accesses...)
Expand Down
Loading

0 comments on commit 658132f

Please sign in to comment.