Skip to content

Commit

Permalink
executor: fix some bugs about new plan. (pingcap#3231)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanfei1991 authored and zimulala committed May 10, 2017
1 parent ad52a24 commit c5e8645
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 17 deletions.
22 changes: 14 additions & 8 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,15 +234,21 @@ func IsPointGetWithPKOrUniqueKeyByAutoCommit(ctx context.Context, p plan.Plan) b
p = proj.Children()[0]
}

// get by index key
if indexScan, ok := p.(*plan.PhysicalIndexScan); ok {
switch v := p.(type) {
case *plan.PhysicalIndexScan:
return v.IsPointGetByUniqueKey(ctx.GetSessionVars().StmtCtx)
case *plan.PhysicalIndexReader:
indexScan := v.IndexPlans[0].(*plan.PhysicalIndexScan)
return indexScan.IsPointGetByUniqueKey(ctx.GetSessionVars().StmtCtx)
}

// get by primary key
if tableScan, ok := p.(*plan.PhysicalTableScan); ok {
case *plan.PhysicalIndexLookUpReader:
indexScan := v.IndexPlans[0].(*plan.PhysicalIndexScan)
return indexScan.IsPointGetByUniqueKey(ctx.GetSessionVars().StmtCtx)
case *plan.PhysicalTableScan:
return len(v.Ranges) == 1 && v.Ranges[0].IsPoint()
case *plan.PhysicalTableReader:
tableScan := v.TablePlans[0].(*plan.PhysicalTableScan)
return len(tableScan.Ranges) == 1 && tableScan.Ranges[0].IsPoint()
default:
return false
}

return false
}
39 changes: 39 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,13 @@ func (b *executorBuilder) buildUnionScanExec(v *plan.PhysicalUnionScan) Executor
us.desc = x.desc
us.dirty = getDirtyDB(b.ctx).getDirtyTable(x.table.Meta().ID)
us.conditions = v.Conditions
us.columns = x.Columns
us.buildAndSortAddedRows(x.table, x.asName)
case *TableReaderExecutor:
us.desc = x.desc
us.dirty = getDirtyDB(b.ctx).getDirtyTable(x.table.Meta().ID)
us.conditions = v.Conditions
us.columns = x.columns
us.buildAndSortAddedRows(x.table, x.asName)
case *XSelectIndexExec:
us.desc = x.desc
Expand All @@ -359,6 +366,35 @@ func (b *executorBuilder) buildUnionScanExec(v *plan.PhysicalUnionScan) Executor
}
us.dirty = getDirtyDB(b.ctx).getDirtyTable(x.table.Meta().ID)
us.conditions = v.Conditions
us.columns = x.columns
us.buildAndSortAddedRows(x.table, x.asName)
case *IndexReaderExecutor:
us.desc = x.desc
for _, ic := range x.index.Columns {
for i, col := range x.schema.Columns {
if col.ColName.L == ic.Name.L {
us.usedIndex = append(us.usedIndex, i)
break
}
}
}
us.dirty = getDirtyDB(b.ctx).getDirtyTable(x.table.Meta().ID)
us.conditions = v.Conditions
us.columns = x.columns
us.buildAndSortAddedRows(x.table, x.asName)
case *IndexLookUpExecutor:
us.desc = x.desc
for _, ic := range x.index.Columns {
for i, col := range x.schema.Columns {
if col.ColName.L == ic.Name.L {
us.usedIndex = append(us.usedIndex, i)
break
}
}
}
us.dirty = getDirtyDB(b.ctx).getDirtyTable(x.table.Meta().ID)
us.conditions = v.Conditions
us.columns = x.columns
us.buildAndSortAddedRows(x.table, x.asName)
default:
// The mem table will not be written by sql directly, so we can omit the union scan to avoid err reporting.
Expand Down Expand Up @@ -849,6 +885,7 @@ func (b *executorBuilder) buildTableReader(v *plan.PhysicalTableReader) Executor
keepOrder: ts.KeepOrder,
desc: ts.Desc,
ranges: ts.Ranges,
columns: ts.Columns,
}

for i := range v.Schema().Columns {
Expand Down Expand Up @@ -876,6 +913,7 @@ func (b *executorBuilder) buildIndexReader(v *plan.PhysicalIndexReader) Executor
keepOrder: !is.OutOfOrder,
desc: is.Desc,
ranges: is.Ranges,
columns: is.Columns,
}

for _, col := range v.Schema().Columns {
Expand Down Expand Up @@ -913,6 +951,7 @@ func (b *executorBuilder) buildIndexLookUpReader(v *plan.PhysicalIndexLookUpRead
desc: is.Desc,
ranges: is.Ranges,
tableRequest: tableReq,
columns: is.Columns,
}
return e
}
8 changes: 6 additions & 2 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1131,7 +1131,11 @@ func (s *testSuite) TestAdapterStatement(c *C) {
}

func (s *testSuite) TestPointGet(c *C) {
defer testleak.AfterTest(c)()
plan.UseDAGPlanBuilder = true
defer func() {
plan.UseDAGPlanBuilder = false
testleak.AfterTest(c)()
}()
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use mysql")
ctx := tk.Se.(context.Context)
Expand Down Expand Up @@ -1293,6 +1297,7 @@ func (s *testSuite) TestScanControlSelection(c *C) {
}

func (s *testSuite) TestSimpleDAG(c *C) {
plan.UseDAGPlanBuilder = true
defer func() {
plan.UseDAGPlanBuilder = false
s.cleanEnv(c)
Expand All @@ -1303,7 +1308,6 @@ func (s *testSuite) TestSimpleDAG(c *C) {
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int primary key, b int, c int)")
tk.MustExec("insert into t values (1, 1, 1), (2, 1, 1), (3, 1, 2), (4, 2, 3)")
plan.UseDAGPlanBuilder = true
tk.MustQuery("select a from t").Check(testkit.Rows("1", "2", "3", "4"))
tk.MustQuery("select * from t where a = 4").Check(testkit.Rows("4 2 3"))
tk.MustQuery("select a from t limit 1").Check(testkit.Rows("1"))
Expand Down
6 changes: 6 additions & 0 deletions executor/new_distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type TableReaderExecutor struct {
dagPB *tipb.DAGRequest
ctx context.Context
schema *expression.Schema
// columns are only required by union scan.
columns []*model.ColumnInfo

// result returns one or more distsql.PartialResult and each PartialResult is returned by one region.
result distsql.SelectResult
Expand Down Expand Up @@ -141,6 +143,8 @@ type IndexReaderExecutor struct {
// result returns one or more distsql.PartialResult and each PartialResult is returned by one region.
result distsql.SelectResult
partialResult distsql.PartialResult
// columns are only required by union scan.
columns []*model.ColumnInfo
}

// Schema implements the Executor Schema interface.
Expand Down Expand Up @@ -234,6 +238,8 @@ type IndexLookUpExecutor struct {
taskCurr *lookupTableTask

tableRequest *tipb.DAGRequest
// columns are only required by union scan.
columns []*model.ColumnInfo
}

// Open implements the Executor Open interface.
Expand Down
9 changes: 2 additions & 7 deletions executor/union_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ type UnionScanExec struct {
usedIndex []int
desc bool
conditions []expression.Expression
columns []*model.ColumnInfo

addedRows []*Row
cursor int
Expand Down Expand Up @@ -229,13 +230,7 @@ func (us *UnionScanExec) buildAndSortAddedRows(t table.Table, asName *model.CISt
newData = data
} else {
newData = make([]types.Datum, 0, us.schema.Len())
var columns []*model.ColumnInfo
if t, ok := us.children[0].(*XSelectTableExec); ok {
columns = t.Columns
} else {
columns = us.children[0].(*XSelectIndexExec).columns
}
for _, col := range columns {
for _, col := range us.columns {
newData = append(newData, data[col.Offset])
}
}
Expand Down
3 changes: 3 additions & 0 deletions executor/union_scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@ package executor_test

import (
. "github.com/pingcap/check"
"github.com/pingcap/tidb/plan"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
)

func (s *testSuite) TestDirtyTransaction(c *C) {
plan.UseDAGPlanBuilder = true
defer func() {
plan.UseDAGPlanBuilder = false
s.cleanEnv(c)
testleak.AfterTest(c)
}()
Expand Down

0 comments on commit c5e8645

Please sign in to comment.