Skip to content

Commit

Permalink
support in subquery and compare subquery. (pingcap#1355)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanfei1991 authored Jun 29, 2016
1 parent 9bf6e03 commit 87bbb12
Show file tree
Hide file tree
Showing 8 changed files with 240 additions and 20 deletions.
14 changes: 14 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1400,8 +1400,22 @@ func (s *testSuite) TestNewSubquery(c *C) {
tk.MustExec("commit")
result := tk.MustQuery("select 1 = (select count(*) from t where t.c = k.d) from t k")
result.Check(testkit.Rows("1", "1", "0"))
result = tk.MustQuery("select t.c = any (select count(*) from t) from t")
result.Check(testkit.Rows("0", "0", "1"))
result = tk.MustQuery("select * from t where (t.c, 6) = all (select count(*), sum(t.c) from t)")
result.Check(testkit.Rows("3 4"))
result = tk.MustQuery("select t.c from t where (t.c) < all (select count(*) from t)")
result.Check(testkit.Rows("1", "2"))
result = tk.MustQuery("select t.c from t where (t.c, t.d) != any (select * from t)")
result.Check(testkit.Rows("1", "2", "3"))
result = tk.MustQuery("select t.c from t where (t.c, t.d) = all (select * from t)")
result.Check(testkit.Rows())
result = tk.MustQuery("select (select count(*) from t where t.c = k.d) from t k")
result.Check(testkit.Rows("1", "1", "0"))
result = tk.MustQuery("select t.c from t where (t.c, t.d) in (select * from t)")
result.Check(testkit.Rows("1", "2", "3"))
result = tk.MustQuery("select t.c from t where (t.c, t.d) not in (select * from t)")
result.Check(testkit.Rows())
plan.UseNewPlanner = false
}

Expand Down
11 changes: 10 additions & 1 deletion executor/new_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,21 @@ func (b *executorBuilder) buildNewSort(v *plan.NewSort) Executor {

func (b *executorBuilder) buildApply(v *plan.Apply) Executor {
src := b.build(v.GetChildByIndex(0))
return &ApplyExec{
apply := &ApplyExec{
schema: v.GetSchema(),
innerExec: b.build(v.InnerPlan),
outerSchema: v.OuterSchema,
Src: src,
}
if v.Checker != nil {
apply.checker = &conditionChecker{
all: v.Checker.All,
cond: v.Checker.Condition,
trimLen: len(src.Schema()),
ctx: b.ctx,
}
}
return apply
}

func (b *executorBuilder) buildExists(v *plan.Exists) Executor {
Expand Down
66 changes: 56 additions & 10 deletions executor/new_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,30 @@ type ApplyExec struct {
Src Executor
outerSchema expression.Schema
innerExec Executor
checker *conditionChecker
}

// conditionChecker checks if all or any of the row match this condition.
type conditionChecker struct {
cond expression.Expression
trimLen int
ctx context.Context
all bool
matched bool
}

func (c *conditionChecker) Exec(row *Row) (*Row, error) {
var err error
c.matched, err = expression.EvalBool(c.cond, row.Data, c.ctx)
if err != nil {
return nil, errors.Trace(err)
}
row.Data = row.Data[:c.trimLen]
if c.matched != c.all {
row.Data = append(row.Data, types.NewDatum(c.matched))
return row, nil
}
return nil, nil
}

// Schema implements Executor Schema interface.
Expand All @@ -865,6 +889,9 @@ func (e *ApplyExec) Fields() []*ast.ResultField {

// Close implements Executor Close interface.
func (e *ApplyExec) Close() error {
if e.checker != nil {
e.checker.matched = false
}
return e.Src.Close()
}

Expand All @@ -877,17 +904,36 @@ func (e *ApplyExec) Next() (*Row, error) {
if srcRow == nil {
return nil, nil
}
for _, col := range e.outerSchema {
idx := col.Index
col.SetValue(&srcRow.Data[idx])
}
outerRow, err := e.innerExec.Next()
e.innerExec.Close()
if err != nil {
return nil, errors.Trace(err)
for {
for _, col := range e.outerSchema {
idx := col.Index
col.SetValue(&srcRow.Data[idx])
}
innerRow, err := e.innerExec.Next()
if err != nil {
return nil, errors.Trace(err)
}
if innerRow != nil {
srcRow.Data = append(srcRow.Data, innerRow.Data...)
}
if e.checker == nil {
e.innerExec.Close()
return srcRow, nil
}
if innerRow == nil {
srcRow.Data = append(srcRow.Data, types.NewDatum(e.checker.matched))
e.innerExec.Close()
return srcRow, nil
}
resultRow, err := e.checker.Exec(srcRow)
if err != nil {
return nil, errors.Trace(err)
}
if resultRow != nil {
e.innerExec.Close()
return resultRow, nil
}
}
srcRow.Data = append(srcRow.Data, outerRow.Data...)
return srcRow, nil
}

// ExistsExec represents exists executor.
Expand Down
20 changes: 19 additions & 1 deletion plan/column_pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,14 +287,32 @@ func pruneApply(v *Apply, parentUsedCols []*expression.Column) ([]*expression.Co
newUsedCols = append(newUsedCols, used)
}
}
if v.Checker != nil {
condUsedCols, _ := extractColumn(v.Checker.Condition, nil, nil)
for _, used := range condUsedCols {
if v.GetChildByIndex(0).GetSchema().GetIndex(used) != -1 {
newUsedCols = append(newUsedCols, used)
}
}
}
outer, err = pruneColumnsAndResolveIndices(v.GetChildByIndex(0), newUsedCols)
for _, col := range v.OuterSchema {
col.Index = v.GetChildByIndex(0).GetSchema().GetIndex(col)
}
if err != nil {
return nil, errors.Trace(err)
}
v.schema = append(v.GetChildByIndex(0).GetSchema().DeepCopy(), v.InnerPlan.GetSchema().DeepCopy()...)
combinedSchema := append(v.GetChildByIndex(0).GetSchema().DeepCopy(), v.InnerPlan.GetSchema().DeepCopy()...)
if v.Checker == nil {
v.schema = combinedSchema
} else {
combinedSchema.InitIndices()
v.Checker.Condition, err = retrieveColumnsInExpression(v.Checker.Condition, combinedSchema)
if err != nil {
return nil, errors.Trace(err)
}
v.schema = append(v.GetChildByIndex(0).GetSchema().DeepCopy(), v.schema[len(v.schema)-1])
}
v.schema.InitIndices()
return outer, nil
}
108 changes: 102 additions & 6 deletions plan/expression_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func getRowArg(e expression.Expression, idx int) expression.Expression {
return &expression.Constant{Value: d, RetType: types.NewFieldType(d.Kind())}
}

// constructBinaryOpFunctions converts (a0,a1,a2) op (b0,b1,b2) to (a0 op b0) and (a1 op b1) and (a2 op b2).
func constructBinaryOpFunction(l expression.Expression, r expression.Expression, op string) (expression.Expression, error) {
lLen, rLen := getRowLen(l), getRowLen(r)
if lLen == 1 && rLen == 1 {
Expand Down Expand Up @@ -119,6 +120,59 @@ func (er *expressionRewriter) Enter(inNode ast.Node) (ast.Node, bool) {
er.ctxStack = append(er.ctxStack, col)
return inNode, true
}
case *ast.CompareSubqueryExpr:
v.L.Accept(er)
if er.err != nil {
return inNode, true
}
lexpr := er.ctxStack[len(er.ctxStack)-1]
subq, ok := v.R.(*ast.SubqueryExpr)
if !ok {
er.err = errors.Errorf("Unknown compare type %T.", v.R)
return inNode, true
}
np, outerSchema := er.buildSubquery(subq)
if er.err != nil {
return inNode, true
}
// Only (a,b,c) = all (...) and (a,b,c) != any () can use row expression.
canMultiCol := (v.All && v.Op == opcode.EQ) || (!v.All && v.Op == opcode.NE)
if !canMultiCol && (getRowLen(lexpr) != 1 || len(np.GetSchema()) != 1) {
er.err = errors.New("Operand should contain 1 column(s)")
return inNode, true
}
if getRowLen(lexpr) != len(np.GetSchema()) {
er.err = errors.Errorf("Operand should contain %d column(s)", getRowLen(lexpr))
return inNode, true
}
var checkCondition expression.Expression
var rexpr expression.Expression
if len(np.GetSchema()) == 1 {
rexpr = np.GetSchema()[0].DeepCopy()
} else {
args := make([]expression.Expression, 0, len(np.GetSchema()))
for _, col := range np.GetSchema() {
args = append(args, col.DeepCopy())
}
rexpr = expression.NewFunction(ast.RowFunc, types.NewFieldType(types.KindRow), args...)
}
switch v.Op {
// Only EQ, NE and NullEQ can be composed with and.
case opcode.EQ, opcode.NE, opcode.NullEQ:
var err error
checkCondition, err = constructBinaryOpFunction(lexpr, rexpr, opcode.Ops[v.Op])
if err != nil {
er.err = errors.Trace(err)
return inNode, true
}
// If op is not EQ, NE, NullEQ, say LT, it will remain as row(a,b) < row(c,d), and be compared as row datum.
default:
checkCondition = expression.NewFunction(opcode.Ops[v.Op], types.NewFieldType(mysql.TypeTiny), lexpr, rexpr)
}
er.p = er.b.buildApply(er.p, np, outerSchema, &ApplyConditionChecker{Condition: checkCondition, All: v.All})
// The parent expression only use the last column in schema, which represents whether the condition is matched.
er.ctxStack[len(er.ctxStack)-1] = er.p.GetSchema()[len(er.p.GetSchema())-1]
return inNode, true
case *ast.ExistsSubqueryExpr:
subq, ok := v.Sel.(*ast.SubqueryExpr)
if !ok {
Expand All @@ -131,7 +185,7 @@ func (er *expressionRewriter) Enter(inNode ast.Node) (ast.Node, bool) {
}
np = er.b.buildExists(np)
if np.IsCorrelated() {
er.p = er.b.buildApply(er.p, np, outerSchema)
er.p = er.b.buildApply(er.p, np, outerSchema, nil)
er.ctxStack = append(er.ctxStack, er.p.GetSchema()[len(er.p.GetSchema())-1])
} else {
_, err := pruneColumnsAndResolveIndices(np, np.GetSchema())
Expand All @@ -150,9 +204,49 @@ func (er *expressionRewriter) Enter(inNode ast.Node) (ast.Node, bool) {
}
return inNode, true
case *ast.PatternInExpr:
// TODO: support in subquery
if v.Sel != nil {
er.err = errors.New("In subquery doesn't currently supported.")
v.Expr.Accept(er)
if er.err != nil {
return inNode, true
}
lexpr := er.ctxStack[len(er.ctxStack)-1]
subq, ok := v.Sel.(*ast.SubqueryExpr)
if !ok {
er.err = errors.Errorf("Unknown compare type %T.", v.Sel)
return inNode, true
}
np, outerSchema := er.buildSubquery(subq)
if er.err != nil {
return inNode, true
}
if getRowLen(lexpr) != len(np.GetSchema()) {
er.err = errors.Errorf("Operand should contain %d column(s)", getRowLen(lexpr))
return inNode, true
}
var rexpr expression.Expression
if len(np.GetSchema()) == 1 {
rexpr = np.GetSchema()[0].DeepCopy()
} else {
args := make([]expression.Expression, 0, len(np.GetSchema()))
for _, col := range np.GetSchema() {
args = append(args, col.DeepCopy())
}
rexpr = expression.NewFunction(ast.RowFunc, types.NewFieldType(types.KindRow), args...)
}
// a in (subq) will be rewrited as a = any(subq).
// a not in (subq) will be rewrited as a != all(subq).
op, all := ast.EQ, false
if v.Not {
op, all = ast.NE, true
}
checkCondition, err := constructBinaryOpFunction(lexpr, rexpr, op)
if err != nil {
er.err = errors.Trace(err)
return inNode, true
}
er.p = er.b.buildApply(er.p, np, outerSchema, &ApplyConditionChecker{Condition: checkCondition, All: all})
// The parent expression only use the last column in schema, which represents whether the condition is matched.
er.ctxStack[len(er.ctxStack)-1] = er.p.GetSchema()[len(er.p.GetSchema())-1]
return inNode, true
}
case *ast.SubqueryExpr:
Expand All @@ -162,7 +256,7 @@ func (er *expressionRewriter) Enter(inNode ast.Node) (ast.Node, bool) {
}
np = er.b.buildMaxOneRow(np)
if np.IsCorrelated() {
er.p = er.b.buildApply(er.p, np, outerSchema)
er.p = er.b.buildApply(er.p, np, outerSchema, nil)
if len(np.GetSchema()) > 1 {
newCols := make([]expression.Expression, 0, len(np.GetSchema()))
for _, col := range np.GetSchema() {
Expand Down Expand Up @@ -230,7 +324,7 @@ func (er *expressionRewriter) Leave(inNode ast.Node) (retNode ast.Node, ok bool)
}
case *ast.ColumnName:
er.toColumn(v)
case *ast.ColumnNameExpr, *ast.ParenthesesExpr, *ast.WhenClause, *ast.SubqueryExpr, *ast.ExistsSubqueryExpr:
case *ast.ColumnNameExpr, *ast.ParenthesesExpr, *ast.WhenClause, *ast.SubqueryExpr, *ast.ExistsSubqueryExpr, *ast.CompareSubqueryExpr:
case *ast.ValueExpr:
value := &expression.Constant{Value: v.Datum, RetType: types.NewFieldType(v.Datum.Kind())}
er.ctxStack = append(er.ctxStack, value)
Expand Down Expand Up @@ -271,7 +365,9 @@ func (er *expressionRewriter) Leave(inNode ast.Node) (retNode ast.Node, ok bool)
case *ast.PatternLikeExpr:
er.likeToScalarFunc(v)
case *ast.PatternInExpr:
er.inToScalarFunc(v)
if v.Sel == nil {
er.inToScalarFunc(v)
}
case *ast.UnaryOperationExpr:
if getRowLen(er.ctxStack[stkLen-1]) != 1 {
er.err = errors.New("Operand should contain 1 column(s)")
Expand Down
21 changes: 21 additions & 0 deletions plan/new_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,27 @@ func (s *testPlanSuite) TestColumnPruning(c *C) {
"*plan.NewTableScan_1": {"a", "b", "c"},
},
},
{
sql: "select a from t where b < any (select c from t)",
ans: map[string][]string{
"*plan.NewTableScan_1": {"a", "b"},
"*plan.NewTableScan_2": {"c"},
},
},
{
sql: "select a from t where (b,a) = all (select c,d from t)",
ans: map[string][]string{
"*plan.NewTableScan_1": {"a", "b"},
"*plan.NewTableScan_2": {"c", "d"},
},
},
{
sql: "select a from t where (b,a) in (select c,d from t)",
ans: map[string][]string{
"*plan.NewTableScan_1": {"a", "b"},
"*plan.NewTableScan_2": {"c", "d"},
},
},
}
for _, ca := range cases {
comment := Commentf("for %s", ca.sql)
Expand Down
1 change: 1 addition & 0 deletions plan/new_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type Apply struct {

InnerPlan Plan
OuterSchema expression.Schema
Checker *ApplyConditionChecker
}

// Exists checks if a query returns result.
Expand Down
19 changes: 17 additions & 2 deletions plan/newplanbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -792,15 +792,30 @@ func (b *planBuilder) buildNewTableScanPlan(tn *ast.TableName) Plan {
return p
}

func (b *planBuilder) buildApply(p, inner Plan, schema expression.Schema) Plan {
// ApplyConditionChecker checks whether all or any output of apply matches a condition.
type ApplyConditionChecker struct {
Condition expression.Expression
All bool
}

func (b *planBuilder) buildApply(p, inner Plan, schema expression.Schema, checker *ApplyConditionChecker) Plan {
ap := &Apply{
InnerPlan: inner,
OuterSchema: schema,
Checker: checker,
}
ap.id = b.allocID(ap)
addChild(ap, p)
innerSchema := inner.GetSchema().DeepCopy()
ap.SetSchema(append(p.GetSchema().DeepCopy(), innerSchema...))
if checker == nil {
ap.SetSchema(append(p.GetSchema().DeepCopy(), innerSchema...))
} else {
ap.SetSchema(append(p.GetSchema().DeepCopy(), &expression.Column{
FromID: ap.id,
ColName: model.NewCIStr("exists_row"),
RetType: types.NewFieldType(mysql.TypeTiny),
}))
}
ap.correlated = p.IsCorrelated()
return ap
}
Expand Down

0 comments on commit 87bbb12

Please sign in to comment.