Skip to content

Commit

Permalink
*: Improve test for push down. (pingcap#1927)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanfei1991 authored Nov 7, 2016
1 parent 1b96db9 commit 88a66a5
Show file tree
Hide file tree
Showing 8 changed files with 217 additions and 195 deletions.
10 changes: 5 additions & 5 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,10 +543,10 @@ func (b *executorBuilder) buildTableScan(v *plan.PhysicalTableScan) Executor {
keepOrder: v.KeepOrder,
where: v.ConditionPBExpr,
aggregate: v.Aggregated,
aggFuncs: v.AggFuncs,
aggFuncs: v.AggFuncsPB,
aggFields: v.AggFields,
byItems: v.GbyItems,
orderByList: v.SortItems,
byItems: v.GbyItemsPB,
orderByList: v.SortItemsPB,
}
st.scanConcurrency, b.err = getScanConcurrency(b.ctx)
return st
Expand Down Expand Up @@ -592,9 +592,9 @@ func (b *executorBuilder) buildIndexScan(v *plan.PhysicalIndexScan) Executor {
startTS: startTS,
where: v.ConditionPBExpr,
aggregate: v.Aggregated,
aggFuncs: v.AggFuncs,
aggFuncs: v.AggFuncsPB,
aggFields: v.AggFields,
byItems: v.GbyItems,
byItems: v.GbyItemsPB,
}
st.scanConcurrency, b.err = getScanConcurrency(b.ctx)
return st
Expand Down
4 changes: 2 additions & 2 deletions executor/executor_distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,8 +563,8 @@ func (e *XSelectIndexExec) doIndexRequest() (distsql.SelectResult, error) {
selIdxReq.StartTs = e.startTS
selIdxReq.TimeZoneOffset = proto.Int64(timeZoneOffset())
selIdxReq.IndexInfo = distsql.IndexToProto(e.table.Meta(), e.indexPlan.Index)
if len(e.indexPlan.SortItems) > 0 {
selIdxReq.OrderBy = e.indexPlan.SortItems
if len(e.indexPlan.SortItemsPB) > 0 {
selIdxReq.OrderBy = e.indexPlan.SortItemsPB
} else if e.indexPlan.Desc {
selIdxReq.OrderBy = []*tipb.ByItem{{Desc: e.indexPlan.Desc}}
}
Expand Down
3 changes: 2 additions & 1 deletion plan/expr_to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ import (
"github.com/pingcap/tipb/go-tipb"
)

func expressionsToPB(exprs []expression.Expression, client kv.Client) (pbExpr *tipb.Expr, remained []expression.Expression) {
func expressionsToPB(exprs []expression.Expression, client kv.Client) (pbExpr *tipb.Expr, pushed []expression.Expression, remained []expression.Expression) {
for _, expr := range exprs {
v := exprToPB(client, expr)
if v == nil {
remained = append(remained, expr)
continue
}
pushed = append(pushed, expr)
if pbExpr == nil {
pbExpr = v
} else {
Expand Down
5 changes: 5 additions & 0 deletions plan/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,11 @@ type ByItems struct {
Desc bool
}

// String implements fmt.Stringer interface.
func (by *ByItems) String() string {
return fmt.Sprintf("(%s, %v)", by.Expr, by.Desc)
}

func (b *planBuilder) buildSort(p LogicalPlan, byItems []*ast.ByItem, aggMapper map[*ast.AggregateFuncExpr]int) LogicalPlan {
var exprs []*ByItems
sort := &Sort{baseLogicalPlan: newBaseLogicalPlan(Srt, b.allocator)}
Expand Down
33 changes: 10 additions & 23 deletions plan/match_property.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,10 @@ package plan
import (
"math"

"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/util/types"
)

func tryToAddUnionScan(readOnly bool, conds []expression.Expression, resultPlan PhysicalPlan) PhysicalPlan {
if !readOnly {
us := &PhysicalUnionScan{
Condition: expression.ComposeCNFCondition(conds),
}
us.SetChildren(resultPlan)
us.SetSchema(resultPlan.GetSchema())
return us
}
return resultPlan
}

// matchProperty implements PhysicalPlan matchProperty interface.
func (ts *PhysicalTableScan) matchProperty(prop *requiredProperty, infos ...*physicalPlanInfo) *physicalPlanInfo {
rowCount := float64(infos[0].count)
Expand All @@ -43,15 +30,15 @@ func (ts *PhysicalTableScan) matchProperty(prop *requiredProperty, infos ...*phy
if len(prop.props) == 0 {
newTS := *ts
newTS.addLimit(prop.limit)
p := tryToAddUnionScan(newTS.readOnly, newTS.conditions, &newTS)
p := newTS.tryToAddUnionScan(&newTS)
return enforceProperty(prop, &physicalPlanInfo{p: p, cost: cost, count: infos[0].count})
}
if len(prop.props) == 1 && ts.pkCol != nil && ts.pkCol == prop.props[0].col {
sortedTs := *ts
sortedTs.Desc = prop.props[0].desc
sortedTs.KeepOrder = true
sortedTs.addLimit(prop.limit)
p := tryToAddUnionScan(ts.readOnly, ts.conditions, &sortedTs)
sortedTS := *ts
sortedTS.Desc = prop.props[0].desc
sortedTS.KeepOrder = true
sortedTS.addLimit(prop.limit)
p := sortedTS.tryToAddUnionScan(&sortedTS)
return enforceProperty(&requiredProperty{limit: prop.limit}, &physicalPlanInfo{
p: p,
cost: cost,
Expand All @@ -66,7 +53,7 @@ func (ts *PhysicalTableScan) matchProperty(prop *requiredProperty, infos ...*phy
cost = rowCount * netWorkFactor
}
sortedTS.KeepOrder = true
p := tryToAddUnionScan(ts.readOnly, ts.conditions, &sortedTS)
p := sortedTS.tryToAddUnionScan(&sortedTS)
return enforceProperty(prop, &physicalPlanInfo{
p: p,
cost: cost,
Expand Down Expand Up @@ -117,7 +104,7 @@ func (is *PhysicalIndexScan) matchProperty(prop *requiredProperty, infos ...*phy
cost *= 2
}
if len(prop.props) == 0 {
p := tryToAddUnionScan(is.readOnly, is.conditions, is)
p := is.tryToAddUnionScan(is)
return enforceProperty(&requiredProperty{limit: prop.limit}, &physicalPlanInfo{p: p, cost: cost, count: infos[0].count})
}
matchedIdx := 0
Expand Down Expand Up @@ -148,7 +135,7 @@ func (is *PhysicalIndexScan) matchProperty(prop *requiredProperty, infos ...*phy
sortedIS.OutOfOrder = false
sortedIS.Desc = allDesc && !allAsc
sortedIS.addLimit(prop.limit)
p := tryToAddUnionScan(is.readOnly, is.conditions, &sortedIS)
p := sortedIS.tryToAddUnionScan(&sortedIS)
return enforceProperty(&requiredProperty{limit: prop.limit}, &physicalPlanInfo{
p: p,
cost: sortedCost,
Expand All @@ -164,7 +151,7 @@ func (is *PhysicalIndexScan) matchProperty(prop *requiredProperty, infos ...*phy
cost = float64(infos[0].count) * netWorkFactor
}
sortedIS.OutOfOrder = true
p := tryToAddUnionScan(is.readOnly, is.conditions, &sortedIS)
p := sortedIS.tryToAddUnionScan(&sortedIS)
return enforceProperty(prop, &physicalPlanInfo{
p: p,
cost: cost,
Expand Down
8 changes: 3 additions & 5 deletions plan/physical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ func (p *DataSource) convert2TableScan(prop *requiredProperty) (*physicalPlanInf
ts.SetSchema(p.GetSchema())
resultPlan = ts
if sel, ok := p.GetParentByIndex(0).(*Selection); ok {
ts.conditions = sel.Conditions
newSel := *sel
conds := make([]expression.Expression, 0, len(sel.Conditions))
for _, cond := range sel.Conditions {
Expand All @@ -144,7 +143,7 @@ func (p *DataSource) convert2TableScan(prop *requiredProperty) (*physicalPlanInf
memDB = true
}
if !memDB && client.SupportRequestType(kv.ReqTypeSelect, 0) {
ts.ConditionPBExpr, newSel.Conditions = expressionsToPB(newSel.Conditions, client)
ts.ConditionPBExpr, ts.conditions, newSel.Conditions = expressionsToPB(newSel.Conditions, client)
}
}
err := buildTableRange(ts)
Expand Down Expand Up @@ -212,7 +211,6 @@ func (p *DataSource) convert2IndexScan(prop *requiredProperty, index *model.Inde
rowCount := uint64(statsTbl.Count)
resultPlan = is
if sel, ok := p.GetParentByIndex(0).(*Selection); ok {
is.conditions = sel.Conditions
rowCount = 0
newSel := *sel
conds := make([]expression.Expression, 0, len(sel.Conditions))
Expand All @@ -226,8 +224,8 @@ func (p *DataSource) convert2IndexScan(prop *requiredProperty, index *model.Inde
case "information_schema", "performance_schema":
memDB = true
}
if !memDB && client.SupportRequestType(kv.ReqTypeSelect, 0) {
is.ConditionPBExpr, newSel.Conditions = expressionsToPB(newSel.Conditions, client)
if !memDB && client.SupportRequestType(kv.ReqTypeIndex, 0) {
is.ConditionPBExpr, is.conditions, newSel.Conditions = expressionsToPB(newSel.Conditions, client)
}
}
err := buildIndexRange(is)
Expand Down
91 changes: 63 additions & 28 deletions plan/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ type PhysicalIndexScan struct {
basePlan
physicalTableSource

readOnly bool
Table *model.TableInfo
Index *model.IndexInfo
Ranges []*IndexRange
Expand All @@ -52,8 +51,6 @@ type PhysicalIndexScan struct {
DoubleRead bool

accessEqualCount int
AccessCondition []expression.Expression
conditions []expression.Expression

TableAsName *model.CIStr
}
Expand All @@ -70,22 +67,41 @@ type physicalTableSource struct {
client kv.Client

Aggregated bool
readOnly bool
AggFields []*types.FieldType
AggFuncs []*tipb.Expr
GbyItems []*tipb.ByItem
AggFuncsPB []*tipb.Expr
GbyItemsPB []*tipb.ByItem

// ConditionPBExpr is the pb structure of conditions that be pushed down.
ConditionPBExpr *tipb.Expr

LimitCount *int64
SortItems []*tipb.ByItem
// AccessCondition is used to calculate range.
AccessCondition []expression.Expression

LimitCount *int64
SortItemsPB []*tipb.ByItem

// The following fields are used for explaining and testing. Because pb structures are not human-readable.
aggFuncs []expression.AggregationFunction
gbyItems []expression.Expression
sortItems []*ByItems
conditions []expression.Expression
}

func (p *physicalTableSource) clear() {
func (p *physicalTableSource) clearForAggPushDown() {
p.AggFields = nil
p.AggFuncs = nil
p.GbyItems = nil
p.AggFuncsPB = nil
p.GbyItemsPB = nil
p.Aggregated = false

p.aggFuncs = nil
p.gbyItems = nil
}

func (p *physicalTableSource) clearForTopnPushDown() {
p.sortItems = nil
p.SortItemsPB = nil
p.LimitCount = nil
}

func needCount(af expression.AggregationFunction) bool {
Expand All @@ -97,6 +113,18 @@ func needValue(af expression.AggregationFunction) bool {
af.GetName() == ast.AggFuncMax || af.GetName() == ast.AggFuncMin || af.GetName() == ast.AggFuncGroupConcat
}

func (p *physicalTableSource) tryToAddUnionScan(resultPlan PhysicalPlan) PhysicalPlan {
if p.readOnly {
return resultPlan
}
us := &PhysicalUnionScan{
Condition: expression.ComposeCNFCondition(append(p.conditions, p.AccessCondition...)),
}
us.SetChildren(resultPlan)
us.SetSchema(resultPlan.GetSchema())
return us
}

func (p *physicalTableSource) addLimit(l *Limit) {
if l != nil {
count := int64(l.Count + l.Offset)
Expand All @@ -117,8 +145,15 @@ func (p *physicalTableSource) addTopN(prop *requiredProperty) bool {
}
count := int64(prop.limit.Count + prop.limit.Offset)
p.LimitCount = &count
for _, item := range prop.props {
p.SortItems = append(p.SortItems, sortByItemToPB(p.client, item.col, item.desc))
for _, prop := range prop.props {
item := sortByItemToPB(p.client, prop.col, prop.desc)
if item == nil {
// When we fail to convert any sortItem to PB struct, we should clear the environments.
p.clearForTopnPushDown()
return false
}
p.SortItemsPB = append(p.SortItemsPB, item)
p.sortItems = append(p.sortItems, &ByItems{Expr: prop.col, Desc: prop.desc})
}
return true
}
Expand All @@ -130,18 +165,22 @@ func (p *physicalTableSource) addAggregation(agg *PhysicalAggregation) expressio
for _, f := range agg.AggFuncs {
pb := aggFuncToPBExpr(p.client, f)
if pb == nil {
p.clear()
// When we fail to convert any agg function to PB struct, we should clear the environments.
p.clearForAggPushDown()
return nil
}
p.AggFuncs = append(p.AggFuncs, pb)
p.AggFuncsPB = append(p.AggFuncsPB, pb)
p.aggFuncs = append(p.aggFuncs, f.Clone())
}
for _, item := range agg.GroupByItems {
pb := groupByItemToPB(p.client, item)
if pb == nil {
p.clear()
// When we fail to convert any group-by item to PB struct, we should clear the environments.
p.clearForAggPushDown()
return nil
}
p.GbyItems = append(p.GbyItems, pb)
p.GbyItemsPB = append(p.GbyItemsPB, pb)
p.gbyItems = append(p.gbyItems, item.Clone())
}
p.Aggregated = true
gk := types.NewFieldType(mysql.TypeBlob)
Expand Down Expand Up @@ -186,16 +225,12 @@ type PhysicalTableScan struct {
basePlan
physicalTableSource

readOnly bool
Table *model.TableInfo
Columns []*model.ColumnInfo
DBName *model.CIStr
Desc bool
Ranges []TableRange
pkCol *expression.Column

AccessCondition []expression.Expression
conditions []expression.Expression
Table *model.TableInfo
Columns []*model.ColumnInfo
DBName *model.CIStr
Desc bool
Ranges []TableRange
pkCol *expression.Column

TableAsName *model.CIStr

Expand Down Expand Up @@ -303,7 +338,7 @@ func (p *PhysicalIndexScan) MarshalJSON() ([]byte, error) {
"\n \"access condition\": %s,"+
"\n \"count of pushed aggregate functions\": %d,"+
"\n \"limit\": %d\n}",
p.DBName.O, p.Table.Name.O, p.Index.Name.O, p.Ranges, p.Desc, p.OutOfOrder, p.DoubleRead, access, len(p.AggFuncs), limit))
p.DBName.O, p.Table.Name.O, p.Index.Name.O, p.Ranges, p.Desc, p.OutOfOrder, p.DoubleRead, access, len(p.AggFuncsPB), limit))
return buffer.Bytes(), nil
}

Expand Down Expand Up @@ -332,7 +367,7 @@ func (p *PhysicalTableScan) MarshalJSON() ([]byte, error) {
"\n \"access condition\": %s,"+
"\n \"count of pushed aggregate functions\": %d,"+
"\n \"limit\": %d}",
p.DBName.O, p.Table.Name.O, p.Desc, p.KeepOrder, access, len(p.AggFuncs), limit))
p.DBName.O, p.Table.Name.O, p.Desc, p.KeepOrder, access, len(p.AggFuncsPB), limit))
return buffer.Bytes(), nil
}

Expand Down
Loading

0 comments on commit 88a66a5

Please sign in to comment.