Skip to content

Commit

Permalink
executor: support index merge on cluster index (pingcap#18699)
Browse files Browse the repository at this point in the history
* executor: support index merge on cluster index

* fix fmt

* fix

* fix

* fix

* fix

* address comments

* address comments

* address comments

* fix

* fix

* fix

Co-authored-by: ti-srebot <[email protected]>
  • Loading branch information
lzmhhh123 and ti-srebot authored Jul 27, 2020
1 parent f355424 commit 56fd348
Show file tree
Hide file tree
Showing 9 changed files with 150 additions and 57 deletions.
12 changes: 8 additions & 4 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2677,8 +2677,7 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd
feedbacks = append(feedbacks, feedback)

if is, ok := v.PartialPlans[i][0].(*plannercore.PhysicalIndexScan); ok {
// TODO: handle length for cluster index.
tempReq, tempStreaming, err = buildIndexReq(b, len(is.Index.Columns), 0, v.PartialPlans[i])
tempReq, tempStreaming, err = buildIndexReq(b, len(is.Index.Columns), ts.HandleCols.NumCols(), v.PartialPlans[i])
keepOrders = append(keepOrders, is.KeepOrder)
descs = append(descs, is.Desc)
indexes = append(indexes, is.Index)
Expand All @@ -2697,7 +2696,7 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd
partialReqs = append(partialReqs, tempReq)
partialStreamings = append(partialStreamings, tempStreaming)
}
tableReq, tableStreaming, table, err := buildTableReq(b, v.Schema().Len(), v.TablePlans)
tableReq, tableStreaming, tblInfo, err := buildTableReq(b, v.Schema().Len(), v.TablePlans)
if err != nil {
return nil, err
}
Expand All @@ -2709,7 +2708,7 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
dagPBs: partialReqs,
startTS: startTS,
table: table,
table: tblInfo,
indexes: indexes,
descs: descs,
tableRequest: tableReq,
Expand All @@ -2720,6 +2719,7 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd
tblPlans: v.TablePlans,
dataReaderBuilder: &dataReaderBuilder{executorBuilder: b},
feedbacks: feedbacks,
handleCols: ts.HandleCols,
}
collectTable := false
e.tableRequest.CollectRangeCounts = &collectTable
Expand All @@ -2740,6 +2740,10 @@ func (b *executorBuilder) buildIndexMergeReader(v *plannercore.PhysicalIndexMerg
sctx.IndexNames = append(sctx.IndexNames, is.Table.Name.O+":"+is.Index.Name.O)
} else {
ret.ranges = append(ret.ranges, v.PartialPlans[i][0].(*plannercore.PhysicalTableScan).Ranges)
if ret.table.Meta().IsCommonHandle {
tblInfo := ret.table.Meta()
sctx.IndexNames = append(sctx.IndexNames, tblInfo.Name.O+":"+tables.FindPrimaryIndex(tblInfo).Name.O)
}
}
}
ts := v.TablePlans[0].(*plannercore.PhysicalTableScan)
Expand Down
78 changes: 44 additions & 34 deletions executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,14 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -102,6 +101,8 @@ type IndexMergeReaderExecutor struct {
corColInAccess bool
idxCols [][]*expression.Column
colLens [][]int

handleCols plannercore.HandleCols
}

// Open implements the Executor Open interface
Expand All @@ -110,7 +111,15 @@ func (e *IndexMergeReaderExecutor) Open(ctx context.Context) error {
for i, plan := range e.partialPlans {
_, ok := plan[0].(*plannercore.PhysicalIndexScan)
if !ok {
e.keyRanges = append(e.keyRanges, nil)
if e.table.Meta().IsCommonHandle {
keyRanges, err := distsql.CommonHandleRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, getPhysicalTableID(e.table), e.ranges[i])
if err != nil {
return err
}
e.keyRanges = append(e.keyRanges, keyRanges)
} else {
e.keyRanges = append(e.keyRanges, nil)
}
continue
}
keyRange, err := distsql.IndexRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, getPhysicalTableID(e.table), e.indexes[i].ID, e.ranges[i], e.feedbacks[i])
Expand Down Expand Up @@ -194,13 +203,14 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
return err
}

result, err := distsql.SelectWithRuntimeStats(ctx, e.ctx, kvReq, []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, e.feedbacks[workID], getPhysicalPlanIDs(e.partialPlans[workID]), e.id)
result, err := distsql.SelectWithRuntimeStats(ctx, e.ctx, kvReq, e.handleCols.GetFieldsTypes(), e.feedbacks[workID], getPhysicalPlanIDs(e.partialPlans[workID]), e.id)
if err != nil {
return err
}

result.Fetch(ctx)
worker := &partialIndexWorker{
sc: e.ctx,
batchSize: e.maxChunkSize,
maxBatchSize: e.ctx.GetSessionVars().IndexLookupSize,
maxChunkSize: e.maxChunkSize,
Expand All @@ -220,7 +230,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
var err error
util.WithRecovery(
func() {
_, err = worker.fetchHandles(ctx1, result, exitCh, fetchCh, e.resultCh, e.finished)
_, err = worker.fetchHandles(ctx1, result, exitCh, fetchCh, e.resultCh, e.finished, e.handleCols)
},
e.handleHandlesFetcherPanic(ctx, e.resultCh, "partialIndexWorker"),
)
Expand Down Expand Up @@ -261,6 +271,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context,
}
tableInfo := e.partialPlans[workID][0].(*plannercore.PhysicalTableScan).Table
worker := &partialTableWorker{
sc: e.ctx,
batchSize: e.maxChunkSize,
maxBatchSize: e.ctx.GetSessionVars().IndexLookupSize,
maxChunkSize: e.maxChunkSize,
Expand All @@ -277,7 +288,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context,
var err error
util.WithRecovery(
func() {
_, err = worker.fetchHandles(ctx1, exitCh, fetchCh, e.resultCh, e.finished)
_, err = worker.fetchHandles(ctx1, exitCh, fetchCh, e.resultCh, e.finished, e.handleCols)
},
e.handleHandlesFetcherPanic(ctx, e.resultCh, "partialTableWorker"),
)
Expand All @@ -294,6 +305,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context,
}

type partialTableWorker struct {
sc sessionctx.Context
batchSize int
maxBatchSize int
maxChunkSize int
Expand All @@ -302,25 +314,10 @@ type partialTableWorker struct {
}

func (w *partialTableWorker) fetchHandles(ctx context.Context, exitCh <-chan struct{}, fetchCh chan<- *lookupTableTask, resultCh chan<- *lookupTableTask,
finished <-chan struct{}) (count int64, err error) {
var chk *chunk.Chunk
handleOffset := -1
if w.tableInfo.PKIsHandle {
handleCol := w.tableInfo.GetPkColInfo()
columns := w.tableInfo.Columns
for i := 0; i < len(columns); i++ {
if columns[i].Name.L == handleCol.Name.L {
handleOffset = i
break
}
}
} else {
return 0, errors.Errorf("cannot find the column for handle")
}

chk = chunk.NewChunkWithCapacity(retTypes(w.tableReader), w.maxChunkSize)
finished <-chan struct{}, handleCols plannercore.HandleCols) (count int64, err error) {
chk := chunk.NewChunkWithCapacity(retTypes(w.tableReader), w.maxChunkSize)
for {
handles, retChunk, err := w.extractTaskHandles(ctx, chk, handleOffset)
handles, retChunk, err := w.extractTaskHandles(ctx, chk, handleCols)
if err != nil {
doneCh := make(chan error, 1)
doneCh <- err
Expand All @@ -346,7 +343,7 @@ func (w *partialTableWorker) fetchHandles(ctx context.Context, exitCh <-chan str
}
}

func (w *partialTableWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, handleOffset int) (
func (w *partialTableWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, handleCols plannercore.HandleCols) (
handles []kv.Handle, retChk *chunk.Chunk, err error) {
handles = make([]kv.Handle, 0, w.batchSize)
for len(handles) < w.batchSize {
Expand All @@ -359,8 +356,11 @@ func (w *partialTableWorker) extractTaskHandles(ctx context.Context, chk *chunk.
return handles, retChk, nil
}
for i := 0; i < chk.NumRows(); i++ {
h := kv.IntHandle(chk.GetRow(i).GetInt64(handleOffset))
handles = append(handles, h)
handle, err := handleCols.BuildHandle(chk.GetRow(i))
if err != nil {
return nil, nil, err
}
handles = append(handles, handle)
}
}
w.batchSize *= 2
Expand Down Expand Up @@ -557,15 +557,23 @@ func (w *indexMergeProcessWorker) handleLoopFetcherPanic(ctx context.Context, re
}

type partialIndexWorker struct {
sc sessionctx.Context
batchSize int
maxBatchSize int
maxChunkSize int
}

func (w *partialIndexWorker) fetchHandles(ctx context.Context, result distsql.SelectResult, exitCh <-chan struct{}, fetchCh chan<- *lookupTableTask, resultCh chan<- *lookupTableTask, finished <-chan struct{}) (count int64, err error) {
chk := chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, w.maxChunkSize)
func (w *partialIndexWorker) fetchHandles(
ctx context.Context,
result distsql.SelectResult,
exitCh <-chan struct{},
fetchCh chan<- *lookupTableTask,
resultCh chan<- *lookupTableTask,
finished <-chan struct{},
handleCols plannercore.HandleCols) (count int64, err error) {
chk := chunk.NewChunkWithCapacity(handleCols.GetFieldsTypes(), w.maxChunkSize)
for {
handles, retChunk, err := w.extractTaskHandles(ctx, chk, result)
handles, retChunk, err := w.extractTaskHandles(ctx, chk, result, handleCols)
if err != nil {
doneCh := make(chan error, 1)
doneCh <- err
Expand All @@ -591,9 +599,8 @@ func (w *partialIndexWorker) fetchHandles(ctx context.Context, result distsql.Se
}
}

func (w *partialIndexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, idxResult distsql.SelectResult) (
func (w *partialIndexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, idxResult distsql.SelectResult, handleCols plannercore.HandleCols) (
handles []kv.Handle, retChk *chunk.Chunk, err error) {
handleOffset := chk.NumCols() - 1
handles = make([]kv.Handle, 0, w.batchSize)
for len(handles) < w.batchSize {
chk.SetRequiredRows(w.batchSize-len(handles), w.maxChunkSize)
Expand All @@ -605,8 +612,11 @@ func (w *partialIndexWorker) extractTaskHandles(ctx context.Context, chk *chunk.
return handles, retChk, nil
}
for i := 0; i < chk.NumRows(); i++ {
h := kv.IntHandle(chk.GetRow(i).GetInt64(handleOffset))
handles = append(handles, h)
handle, err := handleCols.BuildHandleFromIndexRow(chk.GetRow(i))
if err != nil {
return nil, nil, err
}
handles = append(handles, handle)
}
}
w.batchSize *= 2
Expand Down
24 changes: 20 additions & 4 deletions planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,10 @@ func (ds *DataSource) convertToIndexMergeScan(prop *property.PhysicalProperty, c
totalRowCount += rowCount
}

ts, partialCost := ds.buildIndexMergeTableScan(prop, path.TableFilters, totalRowCount)
ts, partialCost, err := ds.buildIndexMergeTableScan(prop, path.TableFilters, totalRowCount)
if err != nil {
return nil, err
}
totalCost += partialCost
cop.tablePlan = ts
cop.idxMergePartPlans = scans
Expand Down Expand Up @@ -842,7 +845,7 @@ func (ds *DataSource) convertToPartialTableScan(prop *property.PhysicalProperty,
return tablePlan, partialCost, rowCount
}

func (ds *DataSource) buildIndexMergeTableScan(prop *property.PhysicalProperty, tableFilters []expression.Expression, totalRowCount float64) (PhysicalPlan, float64) {
func (ds *DataSource) buildIndexMergeTableScan(prop *property.PhysicalProperty, tableFilters []expression.Expression, totalRowCount float64) (PhysicalPlan, float64, error) {
var partialCost float64
sessVars := ds.ctx.GetSessionVars()
ts := PhysicalTableScan{
Expand All @@ -852,8 +855,21 @@ func (ds *DataSource) buildIndexMergeTableScan(prop *property.PhysicalProperty,
DBName: ds.DBName,
isPartition: ds.isPartition,
physicalTableID: ds.physicalTableID,
HandleCols: ds.handleCols,
}.Init(ds.ctx, ds.blockOffset)
ts.SetSchema(ds.schema.Clone())
if ts.HandleCols == nil {
handleCol := ds.getPKIsHandleCol()
if handleCol == nil {
handleCol, _ = ts.appendExtraHandleCol(ds)
}
ts.HandleCols = NewIntHandleCols(handleCol)
}
var err error
ts.HandleCols, err = ts.HandleCols.ResolveIndices(ts.schema)
if err != nil {
return nil, 0, err
}
ts.Columns = ExpandVirtualColumn(ts.Columns, ts.schema, ts.Table.Columns)
if ts.Table.PKIsHandle {
if pkColInfo := ts.Table.GetPkColInfo(); pkColInfo != nil {
Expand All @@ -877,9 +893,9 @@ func (ds *DataSource) buildIndexMergeTableScan(prop *property.PhysicalProperty,
}
sel := PhysicalSelection{Conditions: tableFilters}.Init(ts.ctx, ts.stats.ScaleByExpectCnt(selectivity*totalRowCount), ts.blockOffset)
sel.SetChildren(ts)
return sel, partialCost
return sel, partialCost, nil
}
return ts, partialCost
return ts, partialCost, nil
}

func indexCoveringCol(col *expression.Column, indexCols []*expression.Column, idxColLens []int) bool {
Expand Down
Loading

0 comments on commit 56fd348

Please sign in to comment.