Skip to content

Commit

Permalink
executor: analyze executor construct new distsql exec. (pingcap#3410)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanfei1991 authored and shenli committed Jun 7, 2017
1 parent 0fec425 commit e1debd4
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 27 deletions.
44 changes: 24 additions & 20 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/context"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessionctx/varsutil"
Expand All @@ -34,7 +35,7 @@ var _ Executor = &AnalyzeExec{}
// AnalyzeExec represents Analyze executor.
type AnalyzeExec struct {
ctx context.Context
tasks []analyzeTask
tasks []*analyzeTask
}

const (
Expand Down Expand Up @@ -76,10 +77,10 @@ func (e *AnalyzeExec) Next() (*Row, error) {
if err != nil {
return nil, errors.Trace(err)
}
taskCh := make(chan analyzeTask, len(e.tasks))
taskCh := make(chan *analyzeTask, len(e.tasks))
resultCh := make(chan analyzeResult, len(e.tasks))
for i := 0; i < concurrency; i++ {
go analyzeWorker(taskCh, resultCh)
go e.analyzeWorker(taskCh, resultCh)
}
for _, task := range e.tasks {
taskCh <- task
Expand Down Expand Up @@ -134,8 +135,11 @@ const (
)

type analyzeTask struct {
taskType taskType
src Executor
taskType taskType
tableInfo *model.TableInfo
indexInfo *model.IndexInfo
Columns []*model.ColumnInfo
src Executor
}

type analyzeResult struct {
Expand All @@ -146,32 +150,32 @@ type analyzeResult struct {
err error
}

func analyzeWorker(taskCh <-chan analyzeTask, resultCh chan<- analyzeResult) {
func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultCh chan<- analyzeResult) {
for task := range taskCh {
switch task.taskType {
case pkTask:
resultCh <- analyzePK(task.src.(*XSelectTableExec))
resultCh <- e.analyzePK(task)
case colTask:
resultCh <- analyzeColumns(task.src.(*XSelectTableExec))
resultCh <- e.analyzeColumns(task)
case idxTask:
resultCh <- analyzeIndex(task.src.(*XSelectIndexExec))
resultCh <- e.analyzeIndex(task)
}
}
}

func analyzePK(exec *XSelectTableExec) analyzeResult {
count, hg, err := statistics.BuildPK(exec.ctx, defaultBucketCount, exec.Columns[0].ID, &recordSet{executor: exec})
return analyzeResult{tableID: exec.tableInfo.ID, hist: []*statistics.Histogram{hg}, count: count, isIndex: 0, err: err}
func (e *AnalyzeExec) analyzePK(task *analyzeTask) analyzeResult {
count, hg, err := statistics.BuildPK(e.ctx, defaultBucketCount, task.Columns[0].ID, &recordSet{executor: task.src})
return analyzeResult{tableID: task.tableInfo.ID, hist: []*statistics.Histogram{hg}, count: count, isIndex: 0, err: err}
}

func analyzeColumns(exec *XSelectTableExec) analyzeResult {
collectors, err := CollectSamplesAndEstimateNDVs(&recordSet{executor: exec}, len(exec.Columns))
func (e *AnalyzeExec) analyzeColumns(task *analyzeTask) analyzeResult {
collectors, err := CollectSamplesAndEstimateNDVs(&recordSet{executor: task.src}, len(task.Columns))
if err != nil {
return analyzeResult{err: err}
}
result := analyzeResult{tableID: exec.tableInfo.ID, count: collectors[0].Count + collectors[0].NullCount, isIndex: 0}
for i, col := range exec.Columns {
hg, err := statistics.BuildColumn(exec.ctx, defaultBucketCount, col.ID, collectors[i].Sketch.NDV(), collectors[i].Count, collectors[i].NullCount, collectors[i].samples)
result := analyzeResult{tableID: task.tableInfo.ID, count: collectors[0].Count + collectors[0].NullCount, isIndex: 0}
for i, col := range task.Columns {
hg, err := statistics.BuildColumn(e.ctx, defaultBucketCount, col.ID, collectors[i].Sketch.NDV(), collectors[i].Count, collectors[i].NullCount, collectors[i].samples)
result.hist = append(result.hist, hg)
if err != nil && result.err == nil {
result.err = err
Expand All @@ -180,9 +184,9 @@ func analyzeColumns(exec *XSelectTableExec) analyzeResult {
return result
}

func analyzeIndex(exec *XSelectIndexExec) analyzeResult {
count, hg, err := statistics.BuildIndex(exec.ctx, defaultBucketCount, exec.index.ID, &recordSet{executor: exec})
return analyzeResult{tableID: exec.tableInfo.ID, hist: []*statistics.Histogram{hg}, count: count, isIndex: 1, err: err}
func (e *AnalyzeExec) analyzeIndex(task *analyzeTask) analyzeResult {
count, hg, err := statistics.BuildIndex(e.ctx, defaultBucketCount, task.indexInfo.ID, &recordSet{executor: task.src})
return analyzeResult{tableID: task.tableInfo.ID, hist: []*statistics.Histogram{hg}, count: count, isIndex: 1, err: err}
}

// SampleCollector will collect samples and calculate the count and ndv of an attribute.
Expand Down
83 changes: 76 additions & 7 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/juju/errors"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/context"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/inspectkv"
Expand Down Expand Up @@ -813,6 +814,33 @@ func (b *executorBuilder) buildTableScanForAnalyze(tblInfo *model.TableInfo, col
table, _ := b.is.TableByID(tblInfo.ID)
schema := expression.NewSchema(expression.ColumnInfos2Columns(tblInfo.Name, cols)...)
ranges := []types.IntColumnRange{{math.MinInt64, math.MaxInt64}}
if b.ctx.GetClient().IsRequestTypeSupported(kv.ReqTypeDAG, kv.ReqSubTypeBasic) {
e := &TableReaderExecutor{
table: table,
tableID: tblInfo.ID,
ranges: ranges,
dagPB: &tipb.DAGRequest{
StartTs: b.getStartTS(),
TimeZoneOffset: timeZoneOffset(b.ctx),
Flags: statementContextToFlags(b.ctx.GetSessionVars().StmtCtx),
},
schema: schema,
columns: cols,
ctx: b.ctx,
}
for i := range schema.Columns {
e.dagPB.OutputOffsets = append(e.dagPB.OutputOffsets, uint32(i))
}
e.dagPB.Executors = append(e.dagPB.Executors, &tipb.Executor{
Tp: tipb.ExecType_TypeTableScan,
TblScan: &tipb.TableScan{
TableId: tblInfo.ID,
Columns: distsql.ColumnsToProto(cols, tblInfo.PKIsHandle),
},
})
b.err = setPBColumnsDefaultValue(b.ctx, e.dagPB.Executors[0].TblScan.Columns, cols)
return e
}
e := &XSelectTableExec{
tableInfo: tblInfo,
ctx: b.ctx,
Expand All @@ -838,6 +866,35 @@ func (b *executorBuilder) buildIndexScanForAnalyze(tblInfo *model.TableInfo, idx
schema := expression.NewSchema(expression.ColumnInfos2Columns(tblInfo.Name, cols)...)
idxRange := &types.IndexRange{LowVal: []types.Datum{types.MinNotNullDatum()}, HighVal: []types.Datum{types.MaxValueDatum()}}
scanConcurrency := b.ctx.GetSessionVars().IndexSerialScanConcurrency
if b.ctx.GetClient().IsRequestTypeSupported(kv.ReqTypeDAG, kv.ReqSubTypeBasic) {
e := &IndexReaderExecutor{
table: table,
index: idxInfo,
tableID: tblInfo.ID,
ranges: []*types.IndexRange{idxRange},
dagPB: &tipb.DAGRequest{
StartTs: b.getStartTS(),
TimeZoneOffset: timeZoneOffset(b.ctx),
Flags: statementContextToFlags(b.ctx.GetSessionVars().StmtCtx),
},
schema: schema,
columns: cols,
ctx: b.ctx,
}
for i := range schema.Columns {
e.dagPB.OutputOffsets = append(e.dagPB.OutputOffsets, uint32(i))
}
e.dagPB.Executors = append(e.dagPB.Executors, &tipb.Executor{
Tp: tipb.ExecType_TypeIndexScan,
IdxScan: &tipb.IndexScan{
TableId: tblInfo.ID,
IndexId: idxInfo.ID,
Columns: distsql.ColumnsToProto(cols, tblInfo.PKIsHandle),
},
})
b.err = setPBColumnsDefaultValue(b.ctx, e.dagPB.Executors[0].IdxScan.Columns, cols)
return e
}
e := &XSelectIndexExec{
tableInfo: tblInfo,
ctx: b.ctx,
Expand All @@ -858,19 +915,31 @@ func (b *executorBuilder) buildIndexScanForAnalyze(tblInfo *model.TableInfo, idx
func (b *executorBuilder) buildAnalyze(v *plan.Analyze) Executor {
e := &AnalyzeExec{
ctx: b.ctx,
tasks: make([]analyzeTask, 0, len(v.Children())),
tasks: make([]*analyzeTask, 0, len(v.Children())),
}
for _, task := range v.PkTasks {
e.tasks = append(e.tasks, analyzeTask{taskType: pkTask,
src: b.buildTableScanForAnalyze(task.TableInfo, []*model.ColumnInfo{task.PKInfo})})
e.tasks = append(e.tasks, &analyzeTask{
taskType: pkTask,
src: b.buildTableScanForAnalyze(task.TableInfo, []*model.ColumnInfo{task.PKInfo}),
tableInfo: task.TableInfo,
Columns: []*model.ColumnInfo{task.PKInfo},
})
}
for _, task := range v.ColTasks {
e.tasks = append(e.tasks, analyzeTask{taskType: colTask,
src: b.buildTableScanForAnalyze(task.TableInfo, task.ColsInfo)})
e.tasks = append(e.tasks, &analyzeTask{
taskType: colTask,
src: b.buildTableScanForAnalyze(task.TableInfo, task.ColsInfo),
tableInfo: task.TableInfo,
Columns: task.ColsInfo,
})
}
for _, task := range v.IdxTasks {
e.tasks = append(e.tasks, analyzeTask{taskType: idxTask,
src: b.buildIndexScanForAnalyze(task.TableInfo, task.IndexInfo)})
e.tasks = append(e.tasks, &analyzeTask{
taskType: idxTask,
src: b.buildIndexScanForAnalyze(task.TableInfo, task.IndexInfo),
indexInfo: task.IndexInfo,
tableInfo: task.TableInfo,
})
}
return e
}
Expand Down

0 comments on commit e1debd4

Please sign in to comment.