Skip to content

Commit

Permalink
*: support index where and aggregate push down. (pingcap#1567)
Browse files Browse the repository at this point in the history
  • Loading branch information
coocood authored Aug 15, 2016
1 parent ac4b8c9 commit 39e4d52
Show file tree
Hide file tree
Showing 15 changed files with 324 additions and 173 deletions.
3 changes: 1 addition & 2 deletions executor/new_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,9 +335,8 @@ func (b *executorBuilder) buildNewIndexScan(v *plan.PhysicalIndexScan, s *plan.S
ret = b.buildNewUnionScanExec(ret, expression.ComposeCNFCondition(v.AccessCondition))
}
}
// TODO: IndexScan doesn't support where condition push down.
// It will forbid limit and aggregation to push down.
if s != nil && v.DoubleRead {
if s != nil {
st.where, s.Conditions = b.toPBExpr(s.Conditions, st.tableInfo)
}
return ret
Expand Down
20 changes: 17 additions & 3 deletions executor/new_executor_xapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,15 @@ func (e *NewXSelectIndexExec) AddAggregate(funcs []*tipb.Expr, byItems []*tipb.B
e.byItems = byItems
e.aggFields = fields
e.aggregate = true
e.indexPlan.DoubleRead = true
txn, err := e.ctx.GetTxn(false)
if err != nil {
e.indexPlan.DoubleRead = true
return
}
client := txn.GetClient()
if !client.SupportRequestType(kv.ReqTypeIndex, kv.ReqSubTypeGroupBy) {
e.indexPlan.DoubleRead = true
}
}

// AddLimit implements NewXExecutor interface.
Expand Down Expand Up @@ -152,6 +160,10 @@ func (e *NewXSelectIndexExec) nextForSingleRead() (*Row, error) {
if err != nil {
return nil, errors.Trace(err)
}
if e.aggregate {
// The returned rows should be aggregate partial result.
e.result.SetFields(e.aggFields)
}
}
for {
if e.subResult == nil {
Expand All @@ -173,8 +185,7 @@ func (e *NewXSelectIndexExec) nextForSingleRead() (*Row, error) {
continue
}
if e.aggregate {
// TODO: Implement aggregation push down in single read index
return nil, errors.New("Can't push aggr in a single read index executor!")
return &Row{Data: rowData}, nil
}
rowData = e.indexRowToTableRow(h, rowData)
return resultRowToRow(e.table, h, rowData, e.asName), nil
Expand Down Expand Up @@ -288,6 +299,9 @@ func (e *NewXSelectIndexExec) doIndexRequest() (*xapi.SelectResult, error) {
concurrency := 1
if !e.indexPlan.DoubleRead {
concurrency = defaultConcurrency
selIdxReq.Aggregates = e.aggFuncs
selIdxReq.GroupBy = e.byItems
selIdxReq.Where = e.where
} else if e.indexPlan.OutOfOrder {
concurrency = defaultConcurrency
}
Expand Down
29 changes: 18 additions & 11 deletions plan/new_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,15 +271,15 @@ func (s *testPlanSuite) TestCBO(c *C) {
},
{
sql: "select * from t a where 1 = a.c and a.d > 1 order by a.d desc limit 2",
best: "Index(t.c_d_e)[(1 1,1 <nil>]]->Projection",
best: "Index(t.c_d_e)[(1 1,1 +inf]]->Projection",
},
{
sql: "select * from t a where a.c < 10000 order by a.a limit 2",
best: "Table(t)->Selection->Limit->Projection",
},
{
sql: "select * from (select * from t) a left outer join (select * from t) b on 1 order by a.c",
best: "LeftHashJoin{Index(t.c_d_e)[[<nil>,<nil>]]->Projection->Table(t)->Projection}->Projection",
best: "LeftHashJoin{Index(t.c_d_e)[[<nil>,+inf]]->Projection->Table(t)->Projection}->Projection",
},
{
sql: "select * from (select * from t) a left outer join (select * from t) b on 1 order by b.c",
Expand All @@ -291,19 +291,19 @@ func (s *testPlanSuite) TestCBO(c *C) {
},
{
sql: "select * from (select * from t) a right outer join (select * from t) b on 1 order by b.c",
best: "RightHashJoin{Table(t)->Projection->Index(t.c_d_e)[[<nil>,<nil>]]->Projection}->Projection",
best: "RightHashJoin{Table(t)->Projection->Index(t.c_d_e)[[<nil>,+inf]]->Projection}->Projection",
},
{
sql: "select * from t a where exists(select * from t b where a.a = b.a) and a.c = 1 order by a.d limit 3",
best: "SemiJoin{Index(t.c_d_e)[[1,1]]->Table(t)}->Limit->Projection",
},
{
sql: "select exists(select * from t b where a.a = b.a and b.c = 1) from t a order by a.c limit 3",
best: "SemiJoinWithAux{Index(t.c_d_e)[[<nil>,<nil>]]->Index(t.c_d_e)[[1,1]]}->Projection->Trim",
best: "SemiJoinWithAux{Index(t.c_d_e)[[<nil>,+inf]]->Index(t.c_d_e)[[1,1]]}->Projection->Trim",
},
{
sql: "select * from (select t.a from t union select t.d from t where t.c = 1 union select t.c from t) k order by a limit 1",
best: "UnionAll{Table(t)->Projection->Index(t.c_d_e)[[1,1]]->Projection->Index(t.c_d_e)[[<nil>,<nil>]]->Projection}->Distinct->Limit->Projection",
best: "UnionAll{Table(t)->Projection->Index(t.c_d_e)[[1,1]]->Projection->Index(t.c_d_e)[[<nil>,+inf]]->Projection}->Distinct->Limit->Projection",
},
{
sql: "select * from (select t.a from t union select t.d from t union select t.c from t) k order by a limit 1",
Expand Down Expand Up @@ -349,15 +349,15 @@ func (s *testPlanSuite) TestRefine(c *C) {
}{
{
sql: "select a from t where c is not null",
best: "Table(t)->Selection->Projection",
best: "Index(t.c_d_e)[[-inf,+inf]]->Projection",
},
{
sql: "select a from t where c >= 4",
best: "Index(t.c_d_e)[[4,<nil>]]->Projection",
best: "Index(t.c_d_e)[[4,+inf]]->Projection",
},
{
sql: "select a from t where c <= 4",
best: "Index(t.c_d_e)[[<nil>,4]]->Projection",
best: "Index(t.c_d_e)[[-inf,4]]->Projection",
},
{
sql: "select a from t where c = 4 and d = 5 and e = 6",
Expand Down Expand Up @@ -388,7 +388,7 @@ func (s *testPlanSuite) TestRefine(c *C) {
best: "Index(t.c_d_e)[[1,1] [2,2] [3,3]]->Projection",
},
{
sql: "select a from t where c = 1 or c = 2 or c = 3 or c = 4 or c = 5",
sql: "select b from t where c = 1 or c = 2 or c = 3 or c = 4 or c = 5",
best: "Table(t)->Selection->Projection",
},
{
Expand Down Expand Up @@ -924,19 +924,26 @@ func (s *testPlanSuite) TestCoveringIndex(c *C) {
{[]string{"a", "b"}, []string{"b", "c"}, []int{-1, -1}, false},
{[]string{"a", "b"}, []string{"a", "b"}, []int{50, -1}, false},
{[]string{"a", "b"}, []string{"a", "c"}, []int{-1, -1}, false},
{[]string{"id", "a"}, []string{"a", "b"}, []int{-1, -1}, true},
}
for _, ca := range cases {
var columns []*model.ColumnInfo
var pkIsHandle bool
for _, cn := range ca.columnNames {
columns = append(columns, &model.ColumnInfo{Name: model.NewCIStr(cn)})
col := &model.ColumnInfo{Name: model.NewCIStr(cn)}
if cn == "id" {
pkIsHandle = true
col.Flag = mysql.PriKeyFlag
}
columns = append(columns, col)
}
var indexCols []*model.IndexColumn
for i := range ca.indexNames {
icn := ca.indexNames[i]
icl := ca.indexLens[i]
indexCols = append(indexCols, &model.IndexColumn{Name: model.NewCIStr(icn), Length: icl})
}
covering := isCoveringIndex(columns, indexCols)
covering := isCoveringIndex(columns, indexCols, pkIsHandle)
c.Assert(covering, Equals, ca.isCovering)
}
}
Expand Down
7 changes: 5 additions & 2 deletions plan/physical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,16 @@ func (p *DataSource) handleIndexScan(prop requiredProperty, index *model.IndexIn
rb := rangeBuilder{}
is.Ranges = rb.buildIndexRanges(fullRange)
}
is.DoubleRead = !isCoveringIndex(is.Columns, is.Index.Columns)
is.DoubleRead = !isCoveringIndex(is.Columns, is.Index.Columns, is.Table.PKIsHandle)
rowCounts := []uint64{rowCount}
return resultPlan.matchProperty(prop, rowCounts), resultPlan.matchProperty(nil, rowCounts), nil
}

func isCoveringIndex(columns []*model.ColumnInfo, indexColumns []*model.IndexColumn) bool {
func isCoveringIndex(columns []*model.ColumnInfo, indexColumns []*model.IndexColumn, pkIsHandle bool) bool {
for _, colInfo := range columns {
if pkIsHandle && mysql.HasPriKeyFlag(colInfo.Flag) {
continue
}
isIndexColumn := false
for _, indexCol := range indexColumns {
if colInfo.Name.L == indexCol.Name.L && indexCol.Length == types.UnspecifiedLength {
Expand Down
12 changes: 10 additions & 2 deletions plan/plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,19 @@ func (ir *IndexRange) IsPoint() bool {
func (ir *IndexRange) String() string {
lowStrs := make([]string, 0, len(ir.LowVal))
for _, d := range ir.LowVal {
lowStrs = append(lowStrs, fmt.Sprintf("%v", d.GetValue()))
if d.Kind() == types.KindMinNotNull {
lowStrs = append(lowStrs, "-inf")
} else {
lowStrs = append(lowStrs, fmt.Sprintf("%v", d.GetValue()))
}
}
highStrs := make([]string, 0, len(ir.LowVal))
for _, d := range ir.HighVal {
highStrs = append(highStrs, fmt.Sprintf("%v", d.GetValue()))
if d.Kind() == types.KindMaxValue {
highStrs = append(highStrs, "+inf")
} else {
highStrs = append(highStrs, fmt.Sprintf("%v", d.GetValue()))
}
}
l, r := "[", "]"
if ir.LowExclude {
Expand Down
20 changes: 16 additions & 4 deletions session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2223,11 +2223,20 @@ func (s *testSessionSuite) TestMultiColumnIndex(c *C) {

// Test varchar type.
mustExecSQL(c, se, "drop table t;")
mustExecSQL(c, se, "create table t (c1 varchar(64), c2 varchar(64), index c1_c2 (c1, c2));")
mustExecSQL(c, se, "insert into t values ('abc', 'def')")
mustExecSQL(c, se, "create table t (id int unsigned primary key auto_increment, c1 varchar(64), c2 varchar(64), index c1_c2 (c1, c2));")
mustExecSQL(c, se, "insert into t (c1, c2) values ('abc', 'def')")
sql = "select c1 from t where c1 = 'abc'"
mustExecMatch(c, se, sql, [][]interface{}{{[]byte("abc")}})

mustExecSQL(c, se, "insert into t (c1, c2) values ('abc', 'xyz')")
mustExecSQL(c, se, "insert into t (c1, c2) values ('abd', 'abc')")
mustExecSQL(c, se, "insert into t (c1, c2) values ('abd', 'def')")
sql = "select c1 from t where c1 >= 'abc' and c2 = 'def'"
mustExecMatch(c, se, sql, [][]interface{}{{[]byte("abc")}, {[]byte("abd")}})

sql = "select c1, c2 from t where c1 = 'abc' and id < 2"
mustExecMatch(c, se, sql, [][]interface{}{{[]byte("abc"), []byte("def")}})

err := se.Close()
c.Assert(err, IsNil)
err = store.Close()
Expand Down Expand Up @@ -2487,7 +2496,7 @@ func (s *testSessionSuite) TestXAggregateWithIndexScan(c *C) {
store := newStore(c, s.dbName)
se := newSession(c, store, s.dbName)
mustExecMultiSQL(c, se, initSQL)
sql := "SELECT COUNT(c) FROM t WHERE c IS NOT NULL;"
sql := "SELECT COUNT(c) FROM t WHERE c > 0;"
mustExecMatch(c, se, sql, [][]interface{}{{"2"}})

initSQL = `
Expand All @@ -2497,7 +2506,10 @@ func (s *testSessionSuite) TestXAggregateWithIndexScan(c *C) {
INSERT INTO tab1 VALUES(0,656,638.70,'zsiag',614,231.92,'dkfhp');
`
mustExecMultiSQL(c, se, initSQL)
sql = "SELECT DISTINCT + - COUNT( col3 ) AS col1 FROM tab1 AS cor0 WHERE col3 IS NOT NULL;"
sql = "SELECT DISTINCT + - COUNT( col3 ) AS col1 FROM tab1 AS cor0 WHERE col3 > 0;"
mustExecMatch(c, se, sql, [][]interface{}{{"-1"}})

sql = "SELECT DISTINCT + - COUNT( col3 ) AS col1 FROM tab1 AS cor0 WHERE col3 > 0 group by col0;"
mustExecMatch(c, se, sql, [][]interface{}{{"-1"}})
}

Expand Down
9 changes: 2 additions & 7 deletions store/localstore/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,13 @@ func (c *dbClient) Send(req *kv.Request) kv.Response {

func (c *dbClient) SupportRequestType(reqType, subType int64) bool {
switch reqType {
case kv.ReqTypeSelect:
case kv.ReqTypeSelect, kv.ReqTypeIndex:
switch subType {
case kv.ReqSubTypeGroupBy:
case kv.ReqSubTypeGroupBy, kv.ReqSubTypeBasic:
return true
default:
return supportExpr(tipb.ExprType(subType))
}
case kv.ReqTypeIndex:
switch subType {
case kv.ReqSubTypeDesc, kv.ReqSubTypeBasic:
return true
}
}
return false
}
Expand Down
Loading

0 comments on commit 39e4d52

Please sign in to comment.