Skip to content

Commit

Permalink
*: pass column index to kv store. (pingcap#3280)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanfei1991 authored May 17, 2017
1 parent ffee89b commit 917b65c
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 44 deletions.
4 changes: 2 additions & 2 deletions expression/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,10 @@ func NewAggFunction(funcType string, funcArgs []Expression, distinct bool) Aggre
}

// NewDistAggFunc creates new Aggregate function for mock tikv.
func NewDistAggFunc(expr *tipb.Expr, colsID map[int64]int, fieldTps []*types.FieldType, sc *variable.StatementContext) (AggregationFunction, error) {
func NewDistAggFunc(expr *tipb.Expr, fieldTps []*types.FieldType, sc *variable.StatementContext) (AggregationFunction, error) {
args := make([]Expression, 0, len(expr.Children))
for _, child := range expr.Children {
arg, err := PBToExpr(child, colsID, fieldTps, sc)
arg, err := PBToExpr(child, fieldTps, sc)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
12 changes: 4 additions & 8 deletions expression/distsql_builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,18 +123,14 @@ func reinferFuncType(sc *variable.StatementContext, funcName string, args []Expr
}

// PBToExpr converts pb structure to expression.
func PBToExpr(expr *tipb.Expr, colIDs map[int64]int, tps []*types.FieldType, sc *variable.StatementContext) (Expression, error) {
func PBToExpr(expr *tipb.Expr, tps []*types.FieldType, sc *variable.StatementContext) (Expression, error) {
switch expr.Tp {
case tipb.ExprType_ColumnRef:
_, id, err := codec.DecodeInt(expr.Val)
_, offset, err := codec.DecodeInt(expr.Val)
if err != nil {
return nil, errors.Trace(err)
}
offset, ok := colIDs[id]
if !ok {
return nil, errors.Errorf("Can't find column id %d", id)
}
return &Column{Index: offset, RetType: tps[offset]}, nil
return &Column{Index: int(offset), RetType: tps[offset]}, nil
case tipb.ExprType_Null:
return &Constant{Value: types.Datum{}, RetType: types.NewFieldType(mysql.TypeNull)}, nil
case tipb.ExprType_Int64:
Expand Down Expand Up @@ -168,7 +164,7 @@ func PBToExpr(expr *tipb.Expr, colIDs map[int64]int, tps []*types.FieldType, sc
args = append(args, results...)
continue
}
arg, err := PBToExpr(child, colIDs, tps, sc)
arg, err := PBToExpr(child, tps, sc)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
12 changes: 5 additions & 7 deletions expression/distsql_builtin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ type testEvalSuite struct{}
// TODO: add more tests.
func (s *testEvalSuite) TestEval(c *C) {
row := []types.Datum{types.NewDatum(100)}
colIDs := make(map[int64]int)
colIDs[int64(1)] = 0
fieldTps := make([]*types.FieldType, 1)
fieldTps[0] = types.NewFieldType(mysql.TypeDouble)
tests := []struct {
Expand Down Expand Up @@ -78,7 +76,7 @@ func (s *testEvalSuite) TestEval(c *C) {
types.NewDecimalDatum(types.NewDecFromFloatForTest(1.1)),
},
{
columnExpr(1),
columnExpr(0),
types.NewIntDatum(100),
},
// Comparison operations.
Expand Down Expand Up @@ -277,7 +275,7 @@ func (s *testEvalSuite) TestEval(c *C) {
}
sc := new(variable.StatementContext)
for _, tt := range tests {
expr, err := PBToExpr(tt.expr, colIDs, fieldTps, sc)
expr, err := PBToExpr(tt.expr, fieldTps, sc)
c.Assert(err, IsNil)
result, err := expr.Eval(row)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -413,7 +411,7 @@ func (s *testEvalSuite) TestLike(c *C) {
}
sc := new(variable.StatementContext)
for _, tt := range tests {
expr, err := PBToExpr(tt.expr, nil, nil, sc)
expr, err := PBToExpr(tt.expr, nil, sc)
c.Check(err, IsNil)
res, err := expr.Eval(nil)
c.Check(err, IsNil)
Expand Down Expand Up @@ -461,7 +459,7 @@ func (s *testEvalSuite) TestWhereIn(c *C) {
}
sc := new(variable.StatementContext)
for _, tt := range tests {
expr, err := PBToExpr(tt.expr, nil, nil, sc)
expr, err := PBToExpr(tt.expr, nil, sc)
c.Check(err, IsNil)
res, err := expr.Eval(nil)
c.Check(err, IsNil)
Expand Down Expand Up @@ -499,7 +497,7 @@ func (s *testEvalSuite) TestEvalIsNull(c *C) {
}
sc := new(variable.StatementContext)
for _, tt := range tests {
expr, err := PBToExpr(tt.expr, nil, nil, sc)
expr, err := PBToExpr(tt.expr, nil, sc)
c.Assert(err, IsNil)
result, err := expr.Eval(nil)
c.Assert(err, IsNil)
Expand Down
6 changes: 6 additions & 0 deletions expression/expr_to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ func (pc pbConverter) columnToPBExpr(column *Column) *tipb.Expr {
return nil
}

if pc.client.IsRequestTypeSupported(kv.ReqTypeDAG, kv.ReqSubTypeBasic) {
return &tipb.Expr{
Tp: tipb.ExprType_ColumnRef,
Val: codec.EncodeInt(nil, int64(column.Index)),
}
}
id := column.ID
// Zero Column ID is not a column from table, can not support for now.
if id == 0 || id == -1 {
Expand Down
11 changes: 11 additions & 0 deletions plan/resolve_indices.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ func (p *PhysicalUnionScan) ResolveIndices() {
}
}

// ResolveIndices implements Plan interface.
func (p *PhysicalTableReader) ResolveIndices() {
p.tablePlan.ResolveIndices()
}

// ResolveIndices implements Plan interface.
func (p *PhysicalIndexReader) ResolveIndices() {
p.indexPlan.ResolveIndices()
Expand All @@ -120,6 +125,12 @@ func (p *PhysicalIndexReader) ResolveIndices() {
}
}

// ResolveIndices implements Plan interface.
func (p *PhysicalIndexLookUpReader) ResolveIndices() {
p.tablePlan.ResolveIndices()
p.indexPlan.ResolveIndices()
}

// ResolveIndices implements Plan interface.
func (p *Selection) ResolveIndices() {
p.basePlan.ResolveIndices()
Expand Down
8 changes: 4 additions & 4 deletions store/tikv/mock-tikv/cop_handler_dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (h *rpcHandler) buildSelection(ctx *dagContext, executor *tipb.Executor) (*
return nil, errors.Trace(err)
}
}
conds, err := convertToExprs(ctx.evalCtx.sc, ctx.evalCtx.colIDs, ctx.evalCtx.fieldTps, pbConds)
conds, err := convertToExprs(ctx.evalCtx.sc, ctx.evalCtx.fieldTps, pbConds)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -183,7 +183,7 @@ func (h *rpcHandler) buildAggregation(ctx *dagContext, executor *tipb.Executor)
var err error
var relatedColOffsets []int
for _, expr := range executor.Aggregation.AggFunc {
aggExpr, err := expression.NewDistAggFunc(expr, ctx.evalCtx.colIDs, ctx.evalCtx.fieldTps, ctx.evalCtx.sc)
aggExpr, err := expression.NewDistAggFunc(expr, ctx.evalCtx.fieldTps, ctx.evalCtx.sc)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -199,7 +199,7 @@ func (h *rpcHandler) buildAggregation(ctx *dagContext, executor *tipb.Executor)
return nil, errors.Trace(err)
}
}
groupBys, err := convertToExprs(ctx.evalCtx.sc, ctx.evalCtx.colIDs, ctx.evalCtx.fieldTps, executor.Aggregation.GetGroupBy())
groupBys, err := convertToExprs(ctx.evalCtx.sc, ctx.evalCtx.fieldTps, executor.Aggregation.GetGroupBy())
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -235,7 +235,7 @@ func (h *rpcHandler) buildTopN(ctx *dagContext, executor *tipb.Executor) (*topNE
},
}

conds, err := convertToExprs(ctx.evalCtx.sc, ctx.evalCtx.colIDs, ctx.evalCtx.fieldTps, pbConds)
conds, err := convertToExprs(ctx.evalCtx.sc, ctx.evalCtx.fieldTps, pbConds)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
29 changes: 6 additions & 23 deletions store/tikv/mock-tikv/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,11 @@ import (
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/types"
"github.com/pingcap/tipb/go-tipb"
)

// Error instances.
var (
errInvalid = terror.ClassMockTikv.New(codeInvalid, "invalid operation")
)

// Error codes.
const (
codeInvalid = 1
)

type executor interface {
SetSrcExec(executor)
Next() (int64, [][]byte, error)
Expand Down Expand Up @@ -607,20 +596,14 @@ func extractOffsetsInExpr(expr *tipb.Expr, columns []*tipb.ColumnInfo, collector
return nil, nil
}
if expr.GetTp() == tipb.ExprType_ColumnRef {
_, i, err := codec.DecodeInt(expr.Val)
_, idx, err := codec.DecodeInt(expr.Val)
if err != nil {
return nil, errors.Trace(err)
}
for idx, c := range columns {
if c.GetColumnId() != i {
continue
}
if !isDuplicated(collector, idx) {
collector = append(collector, idx)
}
return collector, nil
if !isDuplicated(collector, int(idx)) {
collector = append(collector, int(idx))
}
return nil, errInvalid.Gen("column %d not found", i)
return collector, nil
}
var err error
for _, child := range expr.Children {
Expand All @@ -632,10 +615,10 @@ func extractOffsetsInExpr(expr *tipb.Expr, columns []*tipb.ColumnInfo, collector
return collector, nil
}

func convertToExprs(sc *variable.StatementContext, colIDs map[int64]int, fieldTps []*types.FieldType, pbExprs []*tipb.Expr) ([]expression.Expression, error) {
func convertToExprs(sc *variable.StatementContext, fieldTps []*types.FieldType, pbExprs []*tipb.Expr) ([]expression.Expression, error) {
exprs := make([]expression.Expression, 0, len(pbExprs))
for _, expr := range pbExprs {
e, err := expression.PBToExpr(expr, colIDs, fieldTps, sc)
e, err := expression.PBToExpr(expr, fieldTps, sc)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down

0 comments on commit 917b65c

Please sign in to comment.