From b51ab763dfdb757ee675a894721ad2b1355a1e63 Mon Sep 17 00:00:00 2001 From: Han Fei Date: Wed, 20 Apr 2016 13:32:20 +0800 Subject: [PATCH] plan: support limit pushdown (#1105) * plan: support limit pushdown --- context/context.go | 2 +- executor/executor_xapi.go | 4 +-- model/model.go | 2 +- optimizer/plan/plan_test.go | 58 +++++++++++++++++++++++++++++++++-- optimizer/plan/planbuilder.go | 22 +++++++++++++ optimizer/plan/plans.go | 4 +++ optimizer/plan/stringer.go | 21 ++++++++++--- session.go | 2 +- 8 files changed, 104 insertions(+), 11 deletions(-) diff --git a/context/context.go b/context/context.go index 502c80cb9c72b..cc2d23192d5d8 100644 --- a/context/context.go +++ b/context/context.go @@ -21,7 +21,7 @@ import ( // Context is an interface for transaction and executive args environment. type Context interface { - // GetTxn gets a transaction for futher execution. + // GetTxn gets a transaction for further execution. GetTxn(forceNew bool) (kv.Transaction, error) // FinishTxn commits or rolls back the current transaction. diff --git a/executor/executor_xapi.go b/executor/executor_xapi.go index 56da04780a2a0..d7094f84d2da6 100644 --- a/executor/executor_xapi.go +++ b/executor/executor_xapi.go @@ -103,7 +103,6 @@ func resultRowToRow(t table.Table, h int64, data []types.Datum) *Row { } func (e *XSelectTableExec) doRequest() error { - // TODO: add offset and limit. txn, err := e.ctx.GetTxn(false) if err != nil { return errors.Trace(err) @@ -114,6 +113,7 @@ func (e *XSelectTableExec) doRequest() error { selReq.Fields = resultFieldsToPBExpression(e.tablePlan.Fields()) selReq.Where = e.where selReq.Ranges = tableRangesToPBRanges(e.tablePlan.Ranges) + selReq.Limit = e.tablePlan.LimitCount columns := make([]*model.ColumnInfo, 0, len(e.tablePlan.Fields())) for _, v := range e.tablePlan.Fields() { @@ -282,11 +282,11 @@ func (e *XSelectIndexExec) doIndexRequest() (*xapi.SelectResult, error) { if err != nil { return nil, errors.Trace(err) } - // TODO: add offset and limit. selIdxReq := new(tipb.SelectRequest) startTs := txn.StartTS() selIdxReq.StartTs = &startTs selIdxReq.IndexInfo = tablecodec.IndexToProto(e.table.Meta(), e.indexPlan.Index) + selIdxReq.Limit = e.indexPlan.LimitCount fieldTypes := make([]*types.FieldType, len(e.indexPlan.Index.Columns)) for i, v := range e.indexPlan.Index.Columns { fieldTypes[i] = &(e.table.Cols()[v.Offset].FieldType) diff --git a/model/model.go b/model/model.go index 972970ccc9aea..a90863151aa43 100644 --- a/model/model.go +++ b/model/model.go @@ -182,7 +182,7 @@ func (db *DBInfo) Clone() *DBInfo { return &newInfo } -// CIStr is case insensitve string. +// CIStr is case insensitive string. type CIStr struct { O string `json:"O"` // Original string. L string `json:"L"` // Lower case string. diff --git a/optimizer/plan/plan_test.go b/optimizer/plan/plan_test.go index 8147e04fa6cce..699f31eece104 100644 --- a/optimizer/plan/plan_test.go +++ b/optimizer/plan/plan_test.go @@ -264,7 +264,31 @@ func (s *testPlanSuite) TestBestPlan(c *C) { }, { sql: "select * from t where a > 0 order by b limit 100", - best: "Index(t.b)->Fields->Limit", + best: "Index(t.b) + Limit(100)->Fields->Limit", + }, + { + sql: "select * from t where a > 0 order by b DESC limit 100", + best: "Index(t.b) + Limit(100)->Fields->Limit", + }, + { + sql: "select * from t where a > 0 order by b + a limit 100", + best: "Range(t)->Fields->Sort + Limit(100) + Offset(0)", + }, + { + sql: "select count(*) from t where a > 0 order by b limit 100", + best: "Range(t)->Aggregate->Fields->Sort + Limit(100) + Offset(0)", + }, + { + sql: "select count(*) from t where a > 0 limit 100", + best: "Range(t)->Aggregate->Fields->Limit", + }, + { + sql: "select distinct a from t where a > 0 limit 100", + best: "Range(t)->Fields->Distinct->Limit", + }, + { + sql: "select * from t where a > 0 order by a limit 100", + best: "Range(t) + Limit(100)->Fields->Limit", }, { sql: "select * from t where d = 0", @@ -300,7 +324,7 @@ func (s *testPlanSuite) TestBestPlan(c *C) { }, { sql: "select a from t where a = 1 limit 1 for update", - best: "Range(t)->Lock->Fields->Limit", + best: "Range(t) + Limit(1)->Lock->Fields->Limit", }, { sql: "admin show ddl", @@ -426,9 +450,15 @@ func mockResolve(node ast.Node) { type mockResolver struct { table *model.TableInfo tableName *ast.TableName + + contextStack [][]*ast.ResultField } func (b *mockResolver) Enter(in ast.Node) (ast.Node, bool) { + switch in.(type) { + case *ast.SelectStmt: + b.contextStack = append(b.contextStack, make([]*ast.ResultField, 0)) + } return in, false } @@ -447,6 +477,30 @@ func (b *mockResolver) Leave(in ast.Node) (ast.Node, bool) { } case *ast.TableName: x.TableInfo = b.table + case *ast.FieldList: + for _, v := range x.Fields { + if v.WildCard == nil { + rf := &ast.ResultField{ColumnAsName: v.AsName} + switch k := v.Expr.(type) { + case *ast.ColumnNameExpr: + rf = &ast.ResultField{ + Column: &model.ColumnInfo{ + Name: k.Name.Name, + }, + Table: b.table, + TableName: b.tableName, + } + case *ast.AggregateFuncExpr: + rf.Column = &model.ColumnInfo{} // Empty column info. + rf.Table = &model.TableInfo{} // Empty table info. + rf.Expr = k + b.contextStack[len(b.contextStack)-1] = append(b.contextStack[len(b.contextStack)-1], rf) + } + } + } + case *ast.SelectStmt: + x.SetResultFields(b.contextStack[len(b.contextStack)-1]) + b.contextStack = b.contextStack[:len(b.contextStack)-1] } return in, true } diff --git a/optimizer/plan/planbuilder.go b/optimizer/plan/planbuilder.go index 960b50224dfae..9450d31a11b5a 100644 --- a/optimizer/plan/planbuilder.go +++ b/optimizer/plan/planbuilder.go @@ -205,6 +205,7 @@ func (b *planBuilder) buildSubquery(n ast.Node) { func (b *planBuilder) buildSelect(sel *ast.SelectStmt) Plan { var aggFuncs []*ast.AggregateFuncExpr hasAgg := b.detectSelectAgg(sel) + canPushLimit := !hasAgg if hasAgg { aggFuncs = b.extractSelectAgg(sel) } @@ -231,6 +232,7 @@ func (b *planBuilder) buildSelect(sel *ast.SelectStmt) Plan { return nil } } else { + canPushLimit = false if sel.Where != nil { p = b.buildTableDual(sel) } @@ -249,18 +251,23 @@ func (b *planBuilder) buildSelect(sel *ast.SelectStmt) Plan { } } if sel.Distinct { + canPushLimit = false p = b.buildDistinct(p) if b.err != nil { return nil } } if sel.OrderBy != nil && !pushOrder(p, sel.OrderBy.Items) { + canPushLimit = false p = b.buildSort(p, sel.OrderBy.Items) if b.err != nil { return nil } } if sel.Limit != nil { + if canPushLimit { + pushLimit(p, sel.Limit) + } p = b.buildLimit(p, sel.Limit) if b.err != nil { return nil @@ -578,6 +585,19 @@ func buildResultField(tableName, name string, tp byte, size int) *ast.ResultFiel } } +func pushLimit(p Plan, limit *ast.Limit) { + switch x := p.(type) { + case *IndexScan: + limitCount := int64(limit.Offset + limit.Count) + x.LimitCount = &limitCount + case *TableScan: + limitCount := int64(limit.Offset + limit.Count) + x.LimitCount = &limitCount + case WithSrcPlan: + pushLimit(x.Src(), limit) + } +} + // pushOrder tries to push order by items to the plan, returns true if // order is pushed. func pushOrder(p Plan, items []*ast.ByItem) bool { @@ -774,6 +794,7 @@ func (b *planBuilder) buildUpdate(update *ast.UpdateStmt) Plan { } } if sel.Limit != nil { + pushLimit(p, sel.Limit) p = b.buildLimit(p, sel.Limit) if b.err != nil { return nil @@ -812,6 +833,7 @@ func (b *planBuilder) buildDelete(del *ast.DeleteStmt) Plan { } } if sel.Limit != nil { + pushLimit(p, sel.Limit) p = b.buildLimit(p, sel.Limit) if b.err != nil { return nil diff --git a/optimizer/plan/plans.go b/optimizer/plan/plans.go index 52d908d3d5ed6..1c251ebf9ac9f 100644 --- a/optimizer/plan/plans.go +++ b/optimizer/plan/plans.go @@ -61,6 +61,8 @@ type TableScan struct { // TableName is used to distinguish the same table selected multiple times in different place, // like 'select * from t where exists(select 1 from t as x where t.c < x.c)' TableName *ast.TableName + + LimitCount *int64 } // Accept implements Plan Accept interface. @@ -161,6 +163,8 @@ type IndexScan struct { // TableName is used to distinguish the same table selected multiple times in different place, // like 'select * from t where exists(select 1 from t as x where t.c < x.c)' TableName *ast.TableName + + LimitCount *int64 } // Accept implements Plan Accept interface. diff --git a/optimizer/plan/stringer.go b/optimizer/plan/stringer.go index 9b2c8ed3c0c04..290d570e88cbc 100644 --- a/optimizer/plan/stringer.go +++ b/optimizer/plan/stringer.go @@ -46,6 +46,9 @@ func (e *stringer) Leave(in Plan) (Plan, bool) { str = "CheckTable" case *IndexScan: str = fmt.Sprintf("Index(%s.%s)", x.Table.Name.L, x.Index.Name.L) + if x.LimitCount != nil { + str += fmt.Sprintf(" + Limit(%v)", *x.LimitCount) + } case *Limit: str = "Limit" case *SelectFields: @@ -56,6 +59,9 @@ func (e *stringer) Leave(in Plan) (Plan, bool) { str = "ShowDDL" case *Sort: str = "Sort" + if x.ExecLimit != nil { + str += fmt.Sprintf(" + Limit(%v) + Offset(%v)", x.ExecLimit.Count, x.ExecLimit.Offset) + } case *TableScan: if len(x.Ranges) > 0 { ran := x.Ranges[0] @@ -67,20 +73,27 @@ func (e *stringer) Leave(in Plan) (Plan, bool) { } else { str = fmt.Sprintf("Table(%s)", x.Table.Name.L) } + if x.LimitCount != nil { + str += fmt.Sprintf(" + Limit(%v)", *x.LimitCount) + } case *JoinOuter: last := len(e.idxs) - 1 idx := e.idxs[last] - chilrden := e.strs[idx:] + children := e.strs[idx:] e.strs = e.strs[:idx] - str = "OuterJoin{" + strings.Join(chilrden, "->") + "}" + str = "OuterJoin{" + strings.Join(children, "->") + "}" e.idxs = e.idxs[:last] case *JoinInner: last := len(e.idxs) - 1 idx := e.idxs[last] - chilrden := e.strs[idx:] + children := e.strs[idx:] e.strs = e.strs[:idx] - str = "InnerJoin{" + strings.Join(chilrden, "->") + "}" + str = "InnerJoin{" + strings.Join(children, "->") + "}" e.idxs = e.idxs[:last] + case *Aggregate: + str = "Aggregate" + case *Distinct: + str = "Distinct" default: str = fmt.Sprintf("%T", in) } diff --git a/session.go b/session.go index 3f13a9cf88a07..a7b1965d6aee6 100644 --- a/session.go +++ b/session.go @@ -52,7 +52,7 @@ import ( type Session interface { Status() uint16 // Flag of current status, such as autocommit LastInsertID() uint64 // Last inserted auto_increment id - AffectedRows() uint64 // Affected rows by lastest executed stmt + AffectedRows() uint64 // Affected rows by latest executed stmt Execute(sql string) ([]ast.RecordSet, error) // Execute a sql statement String() string // For debug FinishTxn(rollback bool) error