Skip to content

Commit

Permalink
expression: support push bit column down to TiKV (pingcap#32497)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yisaer authored Mar 1, 2022
1 parent d1bf078 commit fbfe36c
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 56 deletions.
10 changes: 10 additions & 0 deletions expression/builtin_cast.go
Original file line number Diff line number Diff line change
Expand Up @@ -1870,6 +1870,9 @@ func BuildCastFunction(ctx sessionctx.Context, expr Expression, tp *types.FieldT
fc = &castAsJSONFunctionClass{baseFunctionClass{ast.Cast, 1, 1}, tp}
case types.ETString:
fc = &castAsStringFunctionClass{baseFunctionClass{ast.Cast, 1, 1}, tp}
if expr.GetType().Tp == mysql.TypeBit {
tp.Flen = (expr.GetType().Flen + 7) / 8
}
}
f, err := fc.getFunction(ctx, []Expression{expr})
terror.Log(err)
Expand Down Expand Up @@ -1972,9 +1975,16 @@ func WrapWithCastAsString(ctx sessionctx.Context, expr Expression) Expression {
if exprTp.Tp == mysql.TypeNewDecimal && argLen != types.UnspecifiedFsp {
argLen += 3
}

if exprTp.EvalType() == types.ETInt {
argLen = mysql.MaxIntWidth
// For TypeBit, castAsString will make length as int(( bit_len + 7 ) / 8) bytes due to
// TiKV needs the bit's real len during calculating, eg: ascii(bit).
if exprTp.Tp == mysql.TypeBit {
argLen = (exprTp.Flen + 7) / 8
}
}

// Because we can't control the length of cast(float as char) for now, we can't determine the argLen.
if exprTp.Tp == mysql.TypeFloat || exprTp.Tp == mysql.TypeDouble {
argLen = -1
Expand Down
3 changes: 3 additions & 0 deletions expression/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@ func (c *Constant) EvalInt(ctx sessionctx.Context, row chunk.Row) (int64, bool,
} else if c.GetType().Hybrid() || dt.Kind() == types.KindString {
res, err := dt.ToInt64(ctx.GetSessionVars().StmtCtx)
return res, false, err
} else if dt.Kind() == types.KindMysqlBit {
uintVal, err := dt.GetBinaryLiteral().ToInt(ctx.GetSessionVars().StmtCtx)
return int64(uintVal), false, err
}
return dt.GetInt64(), false, nil
}
Expand Down
2 changes: 2 additions & 0 deletions expression/distsql_builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -1132,6 +1132,8 @@ func PBToExpr(expr *tipb.Expr, tps []*types.FieldType, sc *stmtctx.StatementCont
return convertString(expr.Val, expr.FieldType)
case tipb.ExprType_Bytes:
return &Constant{Value: types.NewBytesDatum(expr.Val), RetType: types.NewFieldType(mysql.TypeString)}, nil
case tipb.ExprType_MysqlBit:
return &Constant{Value: types.NewMysqlBitDatum(expr.Val), RetType: types.NewFieldType(mysql.TypeString)}, nil
case tipb.ExprType_Float32:
return convertFloat(expr.Val, true)
case tipb.ExprType_Float64:
Expand Down
5 changes: 4 additions & 1 deletion expression/expr_to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ func (pc *PbConverter) encodeDatum(ft *types.FieldType, d types.Datum) (tipb.Exp
case types.KindString, types.KindBinaryLiteral:
tp = tipb.ExprType_String
val = d.GetBytes()
case types.KindMysqlBit:
tp = tipb.ExprType_MysqlBit
val = d.GetBytes()
case types.KindBytes:
tp = tipb.ExprType_Bytes
val = d.GetBytes()
Expand Down Expand Up @@ -179,7 +182,7 @@ func (pc PbConverter) columnToPBExpr(column *Column) *tipb.Expr {
return nil
}
switch column.GetType().Tp {
case mysql.TypeBit, mysql.TypeSet, mysql.TypeGeometry, mysql.TypeUnspecified:
case mysql.TypeSet, mysql.TypeGeometry, mysql.TypeUnspecified:
return nil
case mysql.TypeEnum:
if !IsPushDownEnabled("enum", kv.UnSpecified) {
Expand Down
9 changes: 5 additions & 4 deletions expression/expr_to_pb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,9 @@ func TestColumn2Pb(t *testing.T) {
sc := new(stmtctx.StatementContext)
client := new(mock.Client)

colExprs = append(colExprs, genColumn(mysql.TypeBit, 1))
colExprs = append(colExprs, genColumn(mysql.TypeSet, 2))
colExprs = append(colExprs, genColumn(mysql.TypeGeometry, 4))
colExprs = append(colExprs, genColumn(mysql.TypeUnspecified, 5))
colExprs = append(colExprs, genColumn(mysql.TypeSet, 1))
colExprs = append(colExprs, genColumn(mysql.TypeGeometry, 2))
colExprs = append(colExprs, genColumn(mysql.TypeUnspecified, 3))

pushed, remained := PushDownExprs(sc, colExprs, client, kv.UnSpecified)
require.Len(t, pushed, 0)
Expand Down Expand Up @@ -168,6 +167,7 @@ func TestColumn2Pb(t *testing.T) {
colExprs = append(colExprs, genColumn(mysql.TypeVarString, 22))
colExprs = append(colExprs, genColumn(mysql.TypeString, 23))
colExprs = append(colExprs, genColumn(mysql.TypeEnum, 24))
colExprs = append(colExprs, genColumn(mysql.TypeBit, 25))
pushed, remained = PushDownExprs(sc, colExprs, client, kv.UnSpecified)
require.Len(t, pushed, len(colExprs))
require.Len(t, remained, 0)
Expand Down Expand Up @@ -198,6 +198,7 @@ func TestColumn2Pb(t *testing.T) {
"{\"tp\":201,\"val\":\"gAAAAAAAABY=\",\"sig\":0,\"field_type\":{\"tp\":253,\"flag\":0,\"flen\":-1,\"decimal\":-1,\"collate\":-46,\"charset\":\"utf8mb4\"},\"has_distinct\":false}",
"{\"tp\":201,\"val\":\"gAAAAAAAABc=\",\"sig\":0,\"field_type\":{\"tp\":254,\"flag\":0,\"flen\":-1,\"decimal\":-1,\"collate\":-46,\"charset\":\"utf8mb4\"},\"has_distinct\":false}",
"{\"tp\":201,\"val\":\"gAAAAAAAABg=\",\"sig\":0,\"field_type\":{\"tp\":247,\"flag\":0,\"flen\":-1,\"decimal\":-1,\"collate\":-63,\"charset\":\"binary\"},\"has_distinct\":false}",
"{\"tp\":201,\"val\":\"gAAAAAAAABk=\",\"sig\":0,\"field_type\":{\"tp\":16,\"flag\":0,\"flen\":-1,\"decimal\":-1,\"collate\":-63,\"charset\":\"binary\"},\"has_distinct\":false}",
}
for i, pbExpr := range pbExprs {
require.NotNil(t, pbExprs)
Expand Down
2 changes: 1 addition & 1 deletion kv/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (d RequestTypeSupportedChecker) supportExpr(exprType tipb.ExprType) bool {
switch exprType {
case tipb.ExprType_Null, tipb.ExprType_Int64, tipb.ExprType_Uint64, tipb.ExprType_String, tipb.ExprType_Bytes,
tipb.ExprType_MysqlDuration, tipb.ExprType_MysqlTime, tipb.ExprType_MysqlDecimal,
tipb.ExprType_Float32, tipb.ExprType_Float64, tipb.ExprType_ColumnRef, tipb.ExprType_MysqlEnum:
tipb.ExprType_Float32, tipb.ExprType_Float64, tipb.ExprType_ColumnRef, tipb.ExprType_MysqlEnum, tipb.ExprType_MysqlBit:
return true
// aggregate functions.
// NOTE: tipb.ExprType_GroupConcat is only supported by TiFlash, So checking it for TiKV case outside.
Expand Down
61 changes: 59 additions & 2 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2623,6 +2623,7 @@ func TestIndexJoinOnClusteredIndex(t *testing.T) {
tk.MustQuery(tt).Check(testkit.Rows(output[i].Res...))
}
}

func TestIssue18984(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
Expand All @@ -2645,13 +2646,65 @@ func TestIssue18984(t *testing.T) {
"3 3 3 2 4 3 5"))
}

func TestBitColumnPushDown(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t1(a bit(8), b int)")
tk.MustExec("create table t2(a bit(8), b int)")
tk.MustExec("insert into t1 values ('1', 1), ('2', 2), ('3', 3), ('4', 4), ('1', 1), ('2', 2), ('3', 3), ('4', 4)")
tk.MustExec("insert into t2 values ('1', 1), ('2', 2), ('3', 3), ('4', 4), ('1', 1), ('2', 2), ('3', 3), ('4', 4)")
sql := "select b from t1 where t1.b > (select min(t2.b) from t2 where t2.a < t1.a)"
tk.MustQuery(sql).Sort().Check(testkit.Rows("2", "2", "3", "3", "4", "4"))
rows := [][]interface{}{
{"Projection_15", "root", "test.t1.b"},
{"└─Apply_17", "root", "CARTESIAN inner join, other cond:gt(test.t1.b, Column#7)"},
{" ├─TableReader_20(Build)", "root", "data:Selection_19"},
{" │ └─Selection_19", "cop[tikv]", "not(isnull(test.t1.b))"},
{" │ └─TableFullScan_18", "cop[tikv]", "keep order:false, stats:pseudo"},
{" └─Selection_21(Probe)", "root", "not(isnull(Column#7))"},
{" └─StreamAgg_23", "root", "funcs:min(test.t2.b)->Column#7"},
{" └─TopN_24", "root", "test.t2.b, offset:0, count:1"},
{" └─TableReader_32", "root", "data:TopN_31"},
{" └─TopN_31", "cop[tikv]", "test.t2.b, offset:0, count:1"},
{" └─Selection_30", "cop[tikv]", "lt(test.t2.a, test.t1.a), not(isnull(test.t2.b))"},
{" └─TableFullScan_29", "cop[tikv]", "keep order:false, stats:pseudo"},
}
tk.MustQuery(fmt.Sprintf("explain analyze %s", sql)).CheckAt([]int{0, 3, 6}, rows)
tk.MustExec("insert t1 values ('A', 1);")
sql = "select a from t1 where ascii(a)=65"
tk.MustQuery(sql).Check(testkit.Rows("A"))
rows = [][]interface{}{
{"TableReader_7", "root", "data:Selection_6"},
{"└─Selection_6", "cop[tikv]", "eq(ascii(cast(test.t1.a, var_string(1))), 65)"},
{" └─TableFullScan_5", "cop[tikv]", "keep order:false, stats:pseudo"},
}
tk.MustQuery(fmt.Sprintf("explain analyze %s", sql)).CheckAt([]int{0, 3, 6}, rows)

rows[1][2] = `eq(concat(cast(test.t1.a, var_string(1)), "A"), "AA")`
sql = "select a from t1 where concat(a, 'A')='AA'"
tk.MustQuery(sql).Check(testkit.Rows("A"))
tk.MustQuery(fmt.Sprintf("explain analyze %s", sql)).CheckAt([]int{0, 3, 6}, rows)

rows[1][2] = `eq(cast(test.t1.a, binary(1)), "A")`
sql = "select a from t1 where binary a='A'"
tk.MustQuery(sql).Check(testkit.Rows("A"))
tk.MustQuery(fmt.Sprintf("explain analyze %s", sql)).CheckAt([]int{0, 3, 6}, rows)

rows[1][2] = `eq(cast(test.t1.a, var_string(1)), "A")`
sql = "select a from t1 where cast(a as char)='A'"
tk.MustQuery(sql).Check(testkit.Rows("A"))
tk.MustQuery(fmt.Sprintf("explain analyze %s", sql)).CheckAt([]int{0, 3, 6}, rows)
}

func TestScalarFunctionPushDown(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t(id int signed, id2 int unsigned ,c varchar(11), d datetime, b double)")
tk.MustExec("insert into t(id,c,d) values (1,'abc','2021-12-12')")
tk.MustExec("create table t(id int signed, id2 int unsigned, c varchar(11), d datetime, b double, e bit(10))")
tk.MustExec("insert into t(id, id2, c, d) values (-1, 1, 'abc', '2021-12-12')")
rows := [][]interface{}{
{"TableReader_7", "root", "data:Selection_6"},
{"└─Selection_6", "cop[tikv]", "right(test.t.c, 1)"},
Expand Down Expand Up @@ -2853,6 +2906,10 @@ func TestScalarFunctionPushDown(t *testing.T) {
tk.MustQuery("explain analyze select /*+read_from_storage(tikv[t])*/ * from t where d > sysdate()").
CheckAt([]int{0, 3, 6}, rows)

rows[1][2] = "ascii(cast(test.t.e, var_string(2)))"
tk.MustQuery("explain analyze select /*+read_from_storage(tikv[t])*/ * from t where ascii(e);").
CheckAt([]int{0, 3, 6}, rows)

rows[1][2] = "lower(test.t.c)"
tk.MustQuery("explain analyze select /*+read_from_storage(tikv[t])*/ * from t where lower(c)").
CheckAt([]int{0, 3, 6}, rows)
Expand Down
13 changes: 8 additions & 5 deletions planner/core/testdata/enforce_mpp_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -352,13 +352,16 @@
{
"SQL": "EXPLAIN format = 'brief' SELECT count(a) from t where d=1; -- 11.1. type not supported",
"Plan": [
"HashAgg 1.00 root funcs:count(test.t.a)->Column#6",
"└─Selection 8000.00 root eq(test.t.d, 1)",
" └─TableReader 10000.00 root data:ExchangeSender",
" └─ExchangeSender 10000.00 cop[tiflash] ExchangeType: PassThrough",
" └─TableFullScan 10000.00 cop[tiflash] table:t keep order:false, stats:pseudo"
"StreamAgg 1.00 root funcs:count(test.t.a)->Column#6",
"└─Selection 10.00 root eq(test.t.d, 1)",
" └─TableReader 10000.00 root data:TableFullScan",
" └─TableFullScan 10000.00 cop[tiflash] table:t keep order:false, stats:pseudo"
],
"Warn": [
"Expression about 'test.t.d' can not be pushed to TiFlash because it contains unsupported calculation of type 'bit'.",
"Expression about 'test.t.d' can not be pushed to TiFlash because it contains unsupported calculation of type 'bit'.",
"Expression about 'test.t.d' can not be pushed to TiFlash because it contains unsupported calculation of type 'bit'.",
"Expression about 'test.t.d' can not be pushed to TiFlash because it contains unsupported calculation of type 'bit'.",
"Expression about 'test.t.d' can not be pushed to TiFlash because it contains unsupported calculation of type 'bit'."
]
}
Expand Down
4 changes: 1 addition & 3 deletions planner/core/testdata/point_get_plan_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,7 @@
{
"SQL": "select * from t5 where id in ('0')",
"Plan": [
"Selection 8000.00 root eq(test.t5.id, 0)",
"└─TableReader 10000.00 root data:TableFullScan",
" └─TableFullScan 10000.00 cop[tikv] table:t5 keep order:false, stats:pseudo"
"Point_Get 1.00 root table:t5, clustered index:PRIMARY(id) "
],
"Res": null
}
Expand Down
82 changes: 42 additions & 40 deletions util/ranger/testdata/ranger_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -408,9 +408,9 @@
{
"SQL": "select * from t where a = 0;",
"Plan": [
"Selection_7 0.80 root eq(test.t.a, 0)",
"└─TableReader_9 1.00 root data:TableFullScan_8",
" └─TableFullScan_8 1.00 cop[tikv] table:t, partition:p0 keep order:false"
"TableReader_9 1.00 root data:Selection_8",
"└─Selection_8 1.00 cop[tikv] eq(test.t.a, 0)",
" └─TableFullScan_7 1.00 cop[tikv] table:t, partition:p0 keep order:false"
],
"Result": [
"\u0000 0"
Expand All @@ -419,9 +419,9 @@
{
"SQL": "select * from t where a = 0 or a = 4;",
"Plan": [
"Selection_7 0.80 root or(eq(test.t.a, 0), eq(test.t.a, 4))",
"└─TableReader_9 1.00 root data:TableFullScan_8",
" └─TableFullScan_8 1.00 cop[tikv] table:t, partition:p0 keep order:false"
"TableReader_9 1.00 root data:Selection_8",
"└─Selection_8 1.00 cop[tikv] or(eq(test.t.a, 0), eq(test.t.a, 4))",
" └─TableFullScan_7 1.00 cop[tikv] table:t, partition:p0 keep order:false"
],
"Result": [
"\u0000 0"
Expand All @@ -430,9 +430,9 @@
{
"SQL": "select * from t where a = 1;",
"Plan": [
"Selection_7 2.40 root eq(test.t.a, 1)",
"└─TableReader_9 3.00 root data:TableFullScan_8",
" └─TableFullScan_8 3.00 cop[tikv] table:t, partition:p1 keep order:false"
"TableReader_9 3.00 root data:Selection_8",
"└─Selection_8 3.00 cop[tikv] eq(test.t.a, 1)",
" └─TableFullScan_7 3.00 cop[tikv] table:t, partition:p1 keep order:false"
],
"Result": [
"\u0001 -1",
Expand All @@ -443,27 +443,26 @@
{
"SQL": "select * from t where a = -1;",
"Plan": [
"Selection_7 0.00 root eq(test.t.a, -1)",
"└─TableDual_8 0.00 root rows:0"
"TableDual_7 0.00 root rows:0"
],
"Result": null
},
{
"SQL": "select * from t where a = 3;",
"Plan": [
"Selection_7 0.00 root eq(test.t.a, 3)",
"└─TableDual_8 0.00 root rows:0"
"TableDual_7 0.00 root rows:0"
],
"Result": null
},
{
"SQL": "select * from t where a < 1;",
"Plan": [
"Selection_9 3.20 root lt(test.t.a, 1)",
"└─PartitionUnion_10 4.00 root ",
" ├─TableReader_12 1.00 root data:TableFullScan_11",
" │ └─TableFullScan_11 1.00 cop[tikv] table:t, partition:p0 keep order:false",
" └─TableReader_14 3.00 root data:TableFullScan_13",
"PartitionUnion_9 1.00 root ",
"├─TableReader_12 1.00 root data:Selection_11",
"│ └─Selection_11 1.00 cop[tikv] lt(test.t.a, 1)",
"│ └─TableFullScan_10 1.00 cop[tikv] table:t, partition:p0 keep order:false",
"└─TableReader_15 0.00 root data:Selection_14",
" └─Selection_14 0.00 cop[tikv] lt(test.t.a, 1)",
" └─TableFullScan_13 3.00 cop[tikv] table:t, partition:p1 keep order:false"
],
"Result": [
Expand All @@ -473,11 +472,12 @@
{
"SQL": "select * from t where a < 3;",
"Plan": [
"Selection_9 3.20 root lt(test.t.a, 3)",
"└─PartitionUnion_10 4.00 root ",
" ├─TableReader_12 1.00 root data:TableFullScan_11",
" │ └─TableFullScan_11 1.00 cop[tikv] table:t, partition:p0 keep order:false",
" └─TableReader_14 3.00 root data:TableFullScan_13",
"PartitionUnion_9 4.00 root ",
"├─TableReader_12 1.00 root data:Selection_11",
"│ └─Selection_11 1.00 cop[tikv] lt(test.t.a, 3)",
"│ └─TableFullScan_10 1.00 cop[tikv] table:t, partition:p0 keep order:false",
"└─TableReader_15 3.00 root data:Selection_14",
" └─Selection_14 3.00 cop[tikv] lt(test.t.a, 3)",
" └─TableFullScan_13 3.00 cop[tikv] table:t, partition:p1 keep order:false"
],
"Result": [
Expand All @@ -490,19 +490,19 @@
{
"SQL": "select * from t where a < -1;",
"Plan": [
"Selection_7 0.00 root lt(test.t.a, -1)",
"└─TableDual_8 0.00 root rows:0"
"TableDual_7 0.00 root rows:0"
],
"Result": null
},
{
"SQL": "select * from t where a > 0;",
"Plan": [
"Selection_9 3.20 root gt(test.t.a, 0)",
"└─PartitionUnion_10 4.00 root ",
" ├─TableReader_12 1.00 root data:TableFullScan_11",
" │ └─TableFullScan_11 1.00 cop[tikv] table:t, partition:p0 keep order:false",
" └─TableReader_14 3.00 root data:TableFullScan_13",
"PartitionUnion_9 3.00 root ",
"├─TableReader_12 0.00 root data:Selection_11",
"│ └─Selection_11 0.00 cop[tikv] gt(test.t.a, 0)",
"│ └─TableFullScan_10 1.00 cop[tikv] table:t, partition:p0 keep order:false",
"└─TableReader_15 3.00 root data:Selection_14",
" └─Selection_14 3.00 cop[tikv] gt(test.t.a, 0)",
" └─TableFullScan_13 3.00 cop[tikv] table:t, partition:p1 keep order:false"
],
"Result": [
Expand All @@ -514,11 +514,12 @@
{
"SQL": "select * from t where a > -1;",
"Plan": [
"Selection_9 3.20 root gt(test.t.a, -1)",
"└─PartitionUnion_10 4.00 root ",
" ├─TableReader_12 1.00 root data:TableFullScan_11",
" │ └─TableFullScan_11 1.00 cop[tikv] table:t, partition:p0 keep order:false",
" └─TableReader_14 3.00 root data:TableFullScan_13",
"PartitionUnion_9 4.00 root ",
"├─TableReader_12 1.00 root data:Selection_11",
"│ └─Selection_11 1.00 cop[tikv] gt(test.t.a, -1)",
"│ └─TableFullScan_10 1.00 cop[tikv] table:t, partition:p0 keep order:false",
"└─TableReader_15 3.00 root data:Selection_14",
" └─Selection_14 3.00 cop[tikv] gt(test.t.a, -1)",
" └─TableFullScan_13 3.00 cop[tikv] table:t, partition:p1 keep order:false"
],
"Result": [
Expand All @@ -531,11 +532,12 @@
{
"SQL": "select * from t where a > 3;",
"Plan": [
"Selection_9 3.20 root gt(test.t.a, 3)",
"└─PartitionUnion_10 4.00 root ",
" ├─TableReader_12 1.00 root data:TableFullScan_11",
" │ └─TableFullScan_11 1.00 cop[tikv] table:t, partition:p0 keep order:false",
" └─TableReader_14 3.00 root data:TableFullScan_13",
"PartitionUnion_9 0.00 root ",
"├─TableReader_12 0.00 root data:Selection_11",
"│ └─Selection_11 0.00 cop[tikv] gt(test.t.a, 3)",
"│ └─TableFullScan_10 1.00 cop[tikv] table:t, partition:p0 keep order:false",
"└─TableReader_15 0.00 root data:Selection_14",
" └─Selection_14 0.00 cop[tikv] gt(test.t.a, 3)",
" └─TableFullScan_13 3.00 cop[tikv] table:t, partition:p1 keep order:false"
],
"Result": null
Expand Down

0 comments on commit fbfe36c

Please sign in to comment.