Skip to content

Commit

Permalink
Hanfei/rewrite plan (pingcap#1272)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanfei1991 authored and coocood committed Jun 1, 2016
1 parent a58a8fc commit f9c31d9
Show file tree
Hide file tree
Showing 17 changed files with 1,148 additions and 225 deletions.
22 changes: 11 additions & 11 deletions ast/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (n *AggregateFuncExpr) GetContext() *AggEvaluateContext {
if _, ok := n.contextPerGroupMap[n.CurrentGroup]; !ok {
c := &AggEvaluateContext{}
if n.Distinct {
c.distinctChecker = distinct.CreateDistinctChecker()
c.DistinctChecker = distinct.CreateDistinctChecker()
}
n.contextPerGroupMap[n.CurrentGroup] = c
}
Expand All @@ -235,7 +235,7 @@ func (n *AggregateFuncExpr) updateCount() error {
vals = append(vals, value)
}
if n.Distinct {
d, err := ctx.distinctChecker.Check(vals)
d, err := ctx.DistinctChecker.Check(vals)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -249,14 +249,14 @@ func (n *AggregateFuncExpr) updateCount() error {

func (n *AggregateFuncExpr) updateFirstRow() error {
ctx := n.GetContext()
if ctx.evaluated {
if ctx.Evaluated {
return nil
}
if len(n.Args) != 1 {
return errors.New("Wrong number of args for AggFuncFirstRow")
}
ctx.Value = n.Args[0].GetValue()
ctx.evaluated = true
ctx.Evaluated = true
return nil
}

Expand All @@ -266,9 +266,9 @@ func (n *AggregateFuncExpr) updateMaxMin(max bool) error {
return errors.New("Wrong number of args for AggFuncFirstRow")
}
v := n.Args[0].GetValue()
if !ctx.evaluated {
if !ctx.Evaluated {
ctx.Value = v
ctx.evaluated = true
ctx.Evaluated = true
return nil
}
c, err := types.Compare(ctx.Value, v)
Expand Down Expand Up @@ -296,7 +296,7 @@ func (n *AggregateFuncExpr) updateSum() error {
return nil
}
if n.Distinct {
d, err := ctx.distinctChecker.Check([]interface{}{value})
d, err := ctx.DistinctChecker.Check([]interface{}{value})
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -324,7 +324,7 @@ func (n *AggregateFuncExpr) updateGroupConcat() error {
vals = append(vals, value)
}
if n.Distinct {
d, err := ctx.distinctChecker.Check(vals)
d, err := ctx.DistinctChecker.Check(vals)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -394,11 +394,11 @@ func (a *AggregateFuncExtractor) Leave(n Node) (node Node, ok bool) {
return n, true
}

// AggEvaluateContext is used to store intermediate result when caculation aggregate functions.
// AggEvaluateContext is used to store intermediate result when calculating aggregate functions.
type AggEvaluateContext struct {
distinctChecker *distinct.Checker
DistinctChecker *distinct.Checker
Count int64
Value interface{}
Buffer *bytes.Buffer // Buffer is used for group_concat.
evaluated bool
Evaluated bool
}
33 changes: 33 additions & 0 deletions evaluator/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package evaluator
import (
"strings"

"github.com/juju/errors"
"github.com/pingcap/tidb/context"
"github.com/pingcap/tidb/util/types"
)
Expand Down Expand Up @@ -113,6 +114,38 @@ var Funcs = map[string]Func{
"if": {builtinIf, 3, 3},
"ifnull": {builtinIfNull, 2, 2},
"nullif": {builtinNullIf, 2, 2},

// only used by new plan
"&&": {builtinEmpty, 2, 2},
"<<": {builtinEmpty, 2, 2},
">>": {builtinEmpty, 2, 2},
"||": {builtinEmpty, 2, 2},
">=": {builtinEmpty, 2, 2},
"<=": {builtinEmpty, 2, 2},
"=": {builtinEmpty, 2, 2},
"!=": {builtinEmpty, 2, 2},
"<": {builtinEmpty, 2, 2},
">": {builtinEmpty, 2, 2},
"+": {builtinEmpty, 2, 2},
"-": {builtinEmpty, 2, 2},
"&": {builtinEmpty, 2, 2},
"|": {builtinEmpty, 2, 2},
"%": {builtinEmpty, 2, 2},
"^": {builtinEmpty, 2, 2},
"/": {builtinEmpty, 2, 2},
"*": {builtinEmpty, 2, 2},
"DIV": {builtinEmpty, 2, 2},
"XOR": {builtinEmpty, 2, 2},
"<=>": {builtinEmpty, 2, 2},
"not": {builtinEmpty, 1, 1},
"bitneg": {builtinEmpty, 1, 1},
"unaryplus": {builtinEmpty, 1, 1},
"unaryminus": {builtinEmpty, 1, 1},
}

// TODO: remove this when implementing executor.
func builtinEmpty(args []types.Datum, ctx context.Context) (d types.Datum, err error) {
return d, errors.New("Not implemented yet.")
}

// See: http://dev.mysql.com/doc/refman/5.7/en/comparison-operators.html#function_coalesce
Expand Down
15 changes: 8 additions & 7 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/ngaut/log"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/context"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/model"
Expand Down Expand Up @@ -112,14 +113,15 @@ func (b *executorBuilder) build(p plan.Plan) Executor {
}

// compose CNF items into a balance deep CNF tree, which benefits a lot for pb decoder/encoder.
func composeCondition(conditions []ast.ExprNode) ast.ExprNode {
func composeCondition(conditions []expression.Expression) expression.Expression {
length := len(conditions)
if length == 0 {
return nil
} else if length == 1 {
return conditions[0]
} else {
return &ast.BinaryOperationExpr{Op: opcode.AndAnd, L: composeCondition(conditions[:length/2]), R: composeCondition(conditions[length/2:])}
eqStr, _ := opcode.Ops[opcode.EQ]
return expression.NewFunction(model.NewCIStr(eqStr), []expression.Expression{composeCondition(conditions[0 : length/2]), composeCondition(conditions[length/2:])})
}
}

Expand All @@ -131,12 +133,11 @@ func (b *executorBuilder) buildJoin(v *plan.Join) Executor {
fields: v.Fields(),
ctx: b.ctx,
}
var leftHashKey, rightHashKey []ast.ExprNode
var leftHashKey, rightHashKey []*expression.Column
for _, eqCond := range v.EqualConditions {
binop, ok := eqCond.(*ast.BinaryOperationExpr)
if ok && binop.Op == opcode.EQ {
ln, lOK := binop.L.(*ast.ColumnNameExpr)
rn, rOK := binop.R.(*ast.ColumnNameExpr)
if eqCond.FuncName.L == "eq" {
ln, lOK := eqCond.Args[0].(*expression.Column)
rn, rOK := eqCond.Args[1].(*expression.Column)
if lOK && rOK {
leftHashKey = append(leftHashKey, ln)
rightHashKey = append(rightHashKey, rn)
Expand Down
2 changes: 1 addition & 1 deletion executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1101,7 +1101,7 @@ func (s *testSuite) TestJoin(c *C) {
}

func (s *testSuite) TestNewJoin(c *C) {
plan.UseNewPlanner = true
plan.UseNewPlanner = false
defer testleak.AfterTest(c)()
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
37 changes: 19 additions & 18 deletions executor/new_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,29 @@ import (
"github.com/juju/errors"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/context"
"github.com/pingcap/tidb/evaluator"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/types"
)

// HashJoinExec implements the hash join algorithm.
type HashJoinExec struct {
hashTable map[string][]*Row
smallHashKey []ast.ExprNode
bigHashKey []ast.ExprNode
smallHashKey []*expression.Column
bigHashKey []*expression.Column
smallExec Executor
bigExec Executor
prepared bool
fields []*ast.ResultField
ctx context.Context
smallFilter ast.ExprNode
bigFilter ast.ExprNode
otherFilter ast.ExprNode
outter bool
leftSmall bool
matchedRows []*Row
cursor int
smallFilter expression.Expression
bigFilter expression.Expression
otherFilter expression.Expression
//TODO: remove fields when abandon old plan.
fields []*ast.ResultField
outter bool
leftSmall bool
matchedRows []*Row
cursor int
}

func joinTwoRow(a *Row, b *Row) *Row {
Expand All @@ -53,10 +54,10 @@ func joinTwoRow(a *Row, b *Row) *Row {
return ret
}

func (e *HashJoinExec) getHashKey(exprs []ast.ExprNode) ([]byte, error) {
func (e *HashJoinExec) getHashKey(exprs []*expression.Column, row *Row) ([]byte, error) {
vals := make([]types.Datum, 0, len(exprs))
for _, expr := range exprs {
v, err := evaluator.Eval(e.ctx, expr)
v, err := expr.Eval(row.Data, e.ctx)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -96,15 +97,15 @@ func (e *HashJoinExec) prepare() error {

matched := true
if e.smallFilter != nil {
matched, err = evaluator.EvalBool(e.ctx, e.smallFilter)
matched, err = expression.EvalBool(e.smallFilter, row.Data, e.ctx)
if err != nil {
return errors.Trace(err)
}
if !matched {
continue
}
}
hashcode, err := e.getHashKey(e.smallHashKey)
hashcode, err := e.getHashKey(e.smallHashKey, row)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -120,7 +121,7 @@ func (e *HashJoinExec) prepare() error {
}

func (e *HashJoinExec) constructMatchedRows(bigRow *Row) (matchedRows []*Row, err error) {
hashcode, err := e.getHashKey(e.bigHashKey)
hashcode, err := e.getHashKey(e.bigHashKey, bigRow)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -141,7 +142,7 @@ func (e *HashJoinExec) constructMatchedRows(bigRow *Row) (matchedRows []*Row, er
for i, data := range smallRow.Data {
e.fields[i+startKey].Expr.SetValue(data.GetValue())
}
otherMatched, err = evaluator.EvalBool(e.ctx, e.otherFilter)
otherMatched, err = expression.EvalBool(e.otherFilter, bigRow.Data, e.ctx)
}
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -215,7 +216,7 @@ func (e *HashJoinExec) Next() (*Row, error) {
var matchedRows []*Row
bigMatched := true
if e.bigFilter != nil {
bigMatched, err = evaluator.EvalBool(e.ctx, e.bigFilter)
bigMatched, err = expression.EvalBool(e.bigFilter, row.Data, e.ctx)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
82 changes: 82 additions & 0 deletions expression/aggregation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package expression

import (
"github.com/pingcap/tidb/ast"
)

// AggregationFunction stands for aggregate functions.
type AggregationFunction interface {
// GetArgs stands for getting all arguments.
GetArgs() []Expression
}

// NewAggrFunction creates a new AggregationFunction.
func NewAggrFunction(funcType string, funcArgs []Expression) AggregationFunction {
switch funcType {
case ast.AggFuncSum:
return &sumFunction{aggrFunction: aggrFunction{Args: funcArgs, resultMapper: make(aggrCtxMapper, 0)}}
case ast.AggFuncCount:
return &countFunction{aggrFunction: aggrFunction{Args: funcArgs, resultMapper: make(aggrCtxMapper, 0)}}
case ast.AggFuncAvg:
return &avgFunction{aggrFunction: aggrFunction{Args: funcArgs, resultMapper: make(aggrCtxMapper, 0)}}
case ast.AggFuncGroupConcat:
return &concatFunction{aggrFunction: aggrFunction{Args: funcArgs, resultMapper: make(aggrCtxMapper, 0)}}
case ast.AggFuncMax:
return &maxMinFunction{aggrFunction: aggrFunction{Args: funcArgs, resultMapper: make(aggrCtxMapper, 0)}, isMax: true}
case ast.AggFuncMin:
return &maxMinFunction{aggrFunction: aggrFunction{Args: funcArgs, resultMapper: make(aggrCtxMapper, 0)}, isMax: false}
case ast.AggFuncFirstRow:
return &firstRowFunction{aggrFunction: aggrFunction{Args: funcArgs, resultMapper: make(aggrCtxMapper, 0)}}
}
return nil
}

type aggrCtxMapper map[string]*ast.AggEvaluateContext

type aggrFunction struct {
Args []Expression
resultMapper aggrCtxMapper
}

// GetArgs implements AggregationFunction interface.
func (af *aggrFunction) GetArgs() []Expression {
return af.Args
}

type sumFunction struct {
aggrFunction
}

type countFunction struct {
aggrFunction
}

type avgFunction struct {
aggrFunction
}

type concatFunction struct {
aggrFunction
}

type maxMinFunction struct {
aggrFunction
isMax bool
}

type firstRowFunction struct {
aggrFunction
}
Loading

0 comments on commit f9c31d9

Please sign in to comment.