Skip to content

Commit

Permalink
plan: support limit pushdown (pingcap#1105)
Browse files Browse the repository at this point in the history
* plan: support limit pushdown
  • Loading branch information
hanfei1991 authored and coocood committed Apr 20, 2016
1 parent 8a3a59d commit b51ab76
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 11 deletions.
2 changes: 1 addition & 1 deletion context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

// Context is an interface for transaction and executive args environment.
type Context interface {
// GetTxn gets a transaction for futher execution.
// GetTxn gets a transaction for further execution.
GetTxn(forceNew bool) (kv.Transaction, error)

// FinishTxn commits or rolls back the current transaction.
Expand Down
4 changes: 2 additions & 2 deletions executor/executor_xapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ func resultRowToRow(t table.Table, h int64, data []types.Datum) *Row {
}

func (e *XSelectTableExec) doRequest() error {
// TODO: add offset and limit.
txn, err := e.ctx.GetTxn(false)
if err != nil {
return errors.Trace(err)
Expand All @@ -114,6 +113,7 @@ func (e *XSelectTableExec) doRequest() error {
selReq.Fields = resultFieldsToPBExpression(e.tablePlan.Fields())
selReq.Where = e.where
selReq.Ranges = tableRangesToPBRanges(e.tablePlan.Ranges)
selReq.Limit = e.tablePlan.LimitCount

columns := make([]*model.ColumnInfo, 0, len(e.tablePlan.Fields()))
for _, v := range e.tablePlan.Fields() {
Expand Down Expand Up @@ -282,11 +282,11 @@ func (e *XSelectIndexExec) doIndexRequest() (*xapi.SelectResult, error) {
if err != nil {
return nil, errors.Trace(err)
}
// TODO: add offset and limit.
selIdxReq := new(tipb.SelectRequest)
startTs := txn.StartTS()
selIdxReq.StartTs = &startTs
selIdxReq.IndexInfo = tablecodec.IndexToProto(e.table.Meta(), e.indexPlan.Index)
selIdxReq.Limit = e.indexPlan.LimitCount
fieldTypes := make([]*types.FieldType, len(e.indexPlan.Index.Columns))
for i, v := range e.indexPlan.Index.Columns {
fieldTypes[i] = &(e.table.Cols()[v.Offset].FieldType)
Expand Down
2 changes: 1 addition & 1 deletion model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (db *DBInfo) Clone() *DBInfo {
return &newInfo
}

// CIStr is case insensitve string.
// CIStr is case insensitive string.
type CIStr struct {
O string `json:"O"` // Original string.
L string `json:"L"` // Lower case string.
Expand Down
58 changes: 56 additions & 2 deletions optimizer/plan/plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,31 @@ func (s *testPlanSuite) TestBestPlan(c *C) {
},
{
sql: "select * from t where a > 0 order by b limit 100",
best: "Index(t.b)->Fields->Limit",
best: "Index(t.b) + Limit(100)->Fields->Limit",
},
{
sql: "select * from t where a > 0 order by b DESC limit 100",
best: "Index(t.b) + Limit(100)->Fields->Limit",
},
{
sql: "select * from t where a > 0 order by b + a limit 100",
best: "Range(t)->Fields->Sort + Limit(100) + Offset(0)",
},
{
sql: "select count(*) from t where a > 0 order by b limit 100",
best: "Range(t)->Aggregate->Fields->Sort + Limit(100) + Offset(0)",
},
{
sql: "select count(*) from t where a > 0 limit 100",
best: "Range(t)->Aggregate->Fields->Limit",
},
{
sql: "select distinct a from t where a > 0 limit 100",
best: "Range(t)->Fields->Distinct->Limit",
},
{
sql: "select * from t where a > 0 order by a limit 100",
best: "Range(t) + Limit(100)->Fields->Limit",
},
{
sql: "select * from t where d = 0",
Expand Down Expand Up @@ -300,7 +324,7 @@ func (s *testPlanSuite) TestBestPlan(c *C) {
},
{
sql: "select a from t where a = 1 limit 1 for update",
best: "Range(t)->Lock->Fields->Limit",
best: "Range(t) + Limit(1)->Lock->Fields->Limit",
},
{
sql: "admin show ddl",
Expand Down Expand Up @@ -426,9 +450,15 @@ func mockResolve(node ast.Node) {
type mockResolver struct {
table *model.TableInfo
tableName *ast.TableName

contextStack [][]*ast.ResultField
}

func (b *mockResolver) Enter(in ast.Node) (ast.Node, bool) {
switch in.(type) {
case *ast.SelectStmt:
b.contextStack = append(b.contextStack, make([]*ast.ResultField, 0))
}
return in, false
}

Expand All @@ -447,6 +477,30 @@ func (b *mockResolver) Leave(in ast.Node) (ast.Node, bool) {
}
case *ast.TableName:
x.TableInfo = b.table
case *ast.FieldList:
for _, v := range x.Fields {
if v.WildCard == nil {
rf := &ast.ResultField{ColumnAsName: v.AsName}
switch k := v.Expr.(type) {
case *ast.ColumnNameExpr:
rf = &ast.ResultField{
Column: &model.ColumnInfo{
Name: k.Name.Name,
},
Table: b.table,
TableName: b.tableName,
}
case *ast.AggregateFuncExpr:
rf.Column = &model.ColumnInfo{} // Empty column info.
rf.Table = &model.TableInfo{} // Empty table info.
rf.Expr = k
b.contextStack[len(b.contextStack)-1] = append(b.contextStack[len(b.contextStack)-1], rf)
}
}
}
case *ast.SelectStmt:
x.SetResultFields(b.contextStack[len(b.contextStack)-1])
b.contextStack = b.contextStack[:len(b.contextStack)-1]
}
return in, true
}
Expand Down
22 changes: 22 additions & 0 deletions optimizer/plan/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ func (b *planBuilder) buildSubquery(n ast.Node) {
func (b *planBuilder) buildSelect(sel *ast.SelectStmt) Plan {
var aggFuncs []*ast.AggregateFuncExpr
hasAgg := b.detectSelectAgg(sel)
canPushLimit := !hasAgg
if hasAgg {
aggFuncs = b.extractSelectAgg(sel)
}
Expand All @@ -231,6 +232,7 @@ func (b *planBuilder) buildSelect(sel *ast.SelectStmt) Plan {
return nil
}
} else {
canPushLimit = false
if sel.Where != nil {
p = b.buildTableDual(sel)
}
Expand All @@ -249,18 +251,23 @@ func (b *planBuilder) buildSelect(sel *ast.SelectStmt) Plan {
}
}
if sel.Distinct {
canPushLimit = false
p = b.buildDistinct(p)
if b.err != nil {
return nil
}
}
if sel.OrderBy != nil && !pushOrder(p, sel.OrderBy.Items) {
canPushLimit = false
p = b.buildSort(p, sel.OrderBy.Items)
if b.err != nil {
return nil
}
}
if sel.Limit != nil {
if canPushLimit {
pushLimit(p, sel.Limit)
}
p = b.buildLimit(p, sel.Limit)
if b.err != nil {
return nil
Expand Down Expand Up @@ -578,6 +585,19 @@ func buildResultField(tableName, name string, tp byte, size int) *ast.ResultFiel
}
}

func pushLimit(p Plan, limit *ast.Limit) {
switch x := p.(type) {
case *IndexScan:
limitCount := int64(limit.Offset + limit.Count)
x.LimitCount = &limitCount
case *TableScan:
limitCount := int64(limit.Offset + limit.Count)
x.LimitCount = &limitCount
case WithSrcPlan:
pushLimit(x.Src(), limit)
}
}

// pushOrder tries to push order by items to the plan, returns true if
// order is pushed.
func pushOrder(p Plan, items []*ast.ByItem) bool {
Expand Down Expand Up @@ -774,6 +794,7 @@ func (b *planBuilder) buildUpdate(update *ast.UpdateStmt) Plan {
}
}
if sel.Limit != nil {
pushLimit(p, sel.Limit)
p = b.buildLimit(p, sel.Limit)
if b.err != nil {
return nil
Expand Down Expand Up @@ -812,6 +833,7 @@ func (b *planBuilder) buildDelete(del *ast.DeleteStmt) Plan {
}
}
if sel.Limit != nil {
pushLimit(p, sel.Limit)
p = b.buildLimit(p, sel.Limit)
if b.err != nil {
return nil
Expand Down
4 changes: 4 additions & 0 deletions optimizer/plan/plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ type TableScan struct {
// TableName is used to distinguish the same table selected multiple times in different place,
// like 'select * from t where exists(select 1 from t as x where t.c < x.c)'
TableName *ast.TableName

LimitCount *int64
}

// Accept implements Plan Accept interface.
Expand Down Expand Up @@ -161,6 +163,8 @@ type IndexScan struct {
// TableName is used to distinguish the same table selected multiple times in different place,
// like 'select * from t where exists(select 1 from t as x where t.c < x.c)'
TableName *ast.TableName

LimitCount *int64
}

// Accept implements Plan Accept interface.
Expand Down
21 changes: 17 additions & 4 deletions optimizer/plan/stringer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ func (e *stringer) Leave(in Plan) (Plan, bool) {
str = "CheckTable"
case *IndexScan:
str = fmt.Sprintf("Index(%s.%s)", x.Table.Name.L, x.Index.Name.L)
if x.LimitCount != nil {
str += fmt.Sprintf(" + Limit(%v)", *x.LimitCount)
}
case *Limit:
str = "Limit"
case *SelectFields:
Expand All @@ -56,6 +59,9 @@ func (e *stringer) Leave(in Plan) (Plan, bool) {
str = "ShowDDL"
case *Sort:
str = "Sort"
if x.ExecLimit != nil {
str += fmt.Sprintf(" + Limit(%v) + Offset(%v)", x.ExecLimit.Count, x.ExecLimit.Offset)
}
case *TableScan:
if len(x.Ranges) > 0 {
ran := x.Ranges[0]
Expand All @@ -67,20 +73,27 @@ func (e *stringer) Leave(in Plan) (Plan, bool) {
} else {
str = fmt.Sprintf("Table(%s)", x.Table.Name.L)
}
if x.LimitCount != nil {
str += fmt.Sprintf(" + Limit(%v)", *x.LimitCount)
}
case *JoinOuter:
last := len(e.idxs) - 1
idx := e.idxs[last]
chilrden := e.strs[idx:]
children := e.strs[idx:]
e.strs = e.strs[:idx]
str = "OuterJoin{" + strings.Join(chilrden, "->") + "}"
str = "OuterJoin{" + strings.Join(children, "->") + "}"
e.idxs = e.idxs[:last]
case *JoinInner:
last := len(e.idxs) - 1
idx := e.idxs[last]
chilrden := e.strs[idx:]
children := e.strs[idx:]
e.strs = e.strs[:idx]
str = "InnerJoin{" + strings.Join(chilrden, "->") + "}"
str = "InnerJoin{" + strings.Join(children, "->") + "}"
e.idxs = e.idxs[:last]
case *Aggregate:
str = "Aggregate"
case *Distinct:
str = "Distinct"
default:
str = fmt.Sprintf("%T", in)
}
Expand Down
2 changes: 1 addition & 1 deletion session.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import (
type Session interface {
Status() uint16 // Flag of current status, such as autocommit
LastInsertID() uint64 // Last inserted auto_increment id
AffectedRows() uint64 // Affected rows by lastest executed stmt
AffectedRows() uint64 // Affected rows by latest executed stmt
Execute(sql string) ([]ast.RecordSet, error) // Execute a sql statement
String() string // For debug
FinishTxn(rollback bool) error
Expand Down

0 comments on commit b51ab76

Please sign in to comment.