Skip to content

Commit

Permalink
expression: vectorized builtinSleepSig (pingcap#15193)
Browse files Browse the repository at this point in the history
  • Loading branch information
ziyi-yan authored Mar 11, 2020
1 parent 188abc8 commit 6f6c415
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 31 deletions.
17 changes: 17 additions & 0 deletions expression/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,23 @@ func (g *selectStringGener) gen() interface{} {
return g.candidates[g.randGen.Intn(len(g.candidates))]
}

// selectRealGener select one real number randomly from the candidates array
type selectRealGener struct {
candidates []float64
randGen *defaultRandGen
}

func newSelectRealGener(candidates []float64) *selectRealGener {
return &selectRealGener{candidates, newDefaultRandGen()}
}

func (g *selectRealGener) gen() interface{} {
if len(g.candidates) == 0 {
return nil
}
return g.candidates[g.randGen.Intn(len(g.candidates))]
}

type constJSONGener struct {
jsonStr string
}
Expand Down
30 changes: 5 additions & 25 deletions expression/builtin_miscellaneous.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"math"
"net"
"strings"
"sync/atomic"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -117,40 +116,21 @@ func (b *builtinSleepSig) evalInt(row chunk.Row) (int64, bool, error) {
}

sessVars := b.ctx.GetSessionVars()
if isNull {
if sessVars.StrictSQLMode {
return 0, true, errIncorrectArgs.GenWithStackByArgs("sleep")
}
return 0, true, nil
}
// processing argument is negative
if val < 0 {
if isNull || val < 0 {
if sessVars.StrictSQLMode {
return 0, false, errIncorrectArgs.GenWithStackByArgs("sleep")
}
err := errIncorrectArgs.GenWithStackByArgs("sleep")
sessVars.StmtCtx.AppendWarning(err)
return 0, false, nil
}

if val > math.MaxFloat64/float64(time.Second.Nanoseconds()) {
return 0, false, errIncorrectArgs.GenWithStackByArgs("sleep")
}

dur := time.Duration(val * float64(time.Second.Nanoseconds()))
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
start := time.Now()
finish := false
for !finish {
select {
case now := <-ticker.C:
if now.Sub(start) > dur {
finish = true
}
default:
if atomic.CompareAndSwapUint32(&sessVars.Killed, 1, 0) {
return 1, false, nil
}
}
if isKilled := doSleep(val, sessVars); isKilled {
return 1, false, nil
}

return 0, false, nil
Expand Down
70 changes: 67 additions & 3 deletions expression/builtin_miscellaneous_vec.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import (
"math"
"net"
"strings"
"sync/atomic"
"time"

"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
)
Expand Down Expand Up @@ -281,11 +283,73 @@ func (b *builtinNameConstTimeSig) vecEvalTime(input *chunk.Chunk, result *chunk.
}

func (b *builtinSleepSig) vectorized() bool {
return false
return true
}

// vecEvalInt evals a builtinSleepSig in a vectorized manner.
// See https://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_sleep
func (b *builtinSleepSig) vecEvalInt(input *chunk.Chunk, result *chunk.Column) error {
return errors.Errorf("not implemented")
n := input.NumRows()
buf, err := b.bufAllocator.get(types.ETReal, n)
if err != nil {
return err
}
defer b.bufAllocator.put(buf)

err = b.args[0].VecEvalReal(b.ctx, input, buf)
if err != nil {
return err
}

result.ResizeInt64(n, false)
i64s := result.Int64s()

for i := 0; i < n; i++ {
isNull := buf.IsNull(i)
val := buf.GetFloat64(i)

sessVars := b.ctx.GetSessionVars()
if isNull || val < 0 {
if sessVars.StrictSQLMode {
return errIncorrectArgs.GenWithStackByArgs("sleep")
}
err := errIncorrectArgs.GenWithStackByArgs("sleep")
sessVars.StmtCtx.AppendWarning(err)
continue
}

if val > math.MaxFloat64/float64(time.Second.Nanoseconds()) {
return errIncorrectArgs.GenWithStackByArgs("sleep")
}

if isKilled := doSleep(val, sessVars); isKilled {
for j := i; j < n; j++ {
i64s[j] = 1
}
return nil
}
}

return nil
}

func doSleep(secs float64, sessVars *variable.SessionVars) (isKilled bool) {
dur := time.Duration(secs * float64(time.Second.Nanoseconds()))
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
start := time.Now()
for {
select {
case now := <-ticker.C:
if atomic.CompareAndSwapUint32(&sessVars.Killed, 1, 0) {
return true
}

if now.Sub(start) > dur {
return false
}
}
}
}

func (b *builtinIsIPv4MappedSig) vectorized() bool {
Expand Down
111 changes: 109 additions & 2 deletions expression/builtin_miscellaneous_vec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@
package expression

import (
"sync/atomic"
"testing"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/parser/ast"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/mock"
)

var vecBuiltinMiscellaneousCases = map[string][]vecExprBenchCase{
Expand All @@ -28,8 +32,12 @@ var vecBuiltinMiscellaneousCases = map[string][]vecExprBenchCase{
ast.IsIPv6: {
{retEvalType: types.ETInt, childrenTypes: []types.EvalType{types.ETString}},
},
ast.Sleep: {},
ast.UUID: {},
ast.Sleep: {
{retEvalType: types.ETInt, childrenTypes: []types.EvalType{types.ETReal}, geners: []dataGenerator{
newSelectRealGener([]float64{0, 0.000001}),
}},
},
ast.UUID: {},
ast.Inet6Ntoa: {
{retEvalType: types.ETString, childrenTypes: []types.EvalType{types.ETString}, geners: []dataGenerator{
newSelectStringGener(
Expand Down Expand Up @@ -108,3 +116,102 @@ func BenchmarkVectorizedBuiltinMiscellaneousEvalOneVec(b *testing.B) {
func BenchmarkVectorizedBuiltinMiscellaneousFunc(b *testing.B) {
benchmarkVectorizedBuiltinFunc(b, vecBuiltinMiscellaneousCases)
}

type counter struct {
count int
}

func (c *counter) add(diff int) int {
c.count += diff
return c.count
}

func (s *testEvaluatorSuite) TestSleepVectorized(c *C) {
ctx := mock.NewContext()
sessVars := ctx.GetSessionVars()

fc := funcs[ast.Sleep]
ft := eType2FieldType(types.ETReal)
col0 := &Column{RetType: ft, Index: 0}
f, err := fc.getFunction(ctx, []Expression{col0})
c.Assert(err, IsNil)
input := chunk.NewChunkWithCapacity([]*types.FieldType{ft}, 1024)
result := chunk.NewColumn(ft, 1024)
warnCnt := counter{}

// non-strict model
sessVars.StrictSQLMode = false
input.AppendFloat64(0, 1)
err = f.vecEvalInt(input, result)
c.Assert(err, IsNil)
c.Assert(result.GetInt64(0), Equals, int64(0))
c.Assert(sessVars.StmtCtx.WarningCount(), Equals, uint16(warnCnt.add(0)))

input.Reset()
input.AppendFloat64(0, -1)
err = f.vecEvalInt(input, result)
c.Assert(err, IsNil)
c.Assert(result.GetInt64(0), Equals, int64(0))
c.Assert(sessVars.StmtCtx.WarningCount(), Equals, uint16(warnCnt.add(1)))

input.Reset()
input.AppendNull(0)
err = f.vecEvalInt(input, result)
c.Assert(err, IsNil)
c.Assert(result.GetInt64(0), Equals, int64(0))
c.Assert(sessVars.StmtCtx.WarningCount(), Equals, uint16(warnCnt.add(1)))

input.Reset()
input.AppendNull(0)
input.AppendFloat64(0, 1)
input.AppendFloat64(0, -1)
err = f.vecEvalInt(input, result)
c.Assert(err, IsNil)
c.Assert(result.GetInt64(0), Equals, int64(0))
c.Assert(result.GetInt64(1), Equals, int64(0))
c.Assert(result.GetInt64(2), Equals, int64(0))
c.Assert(sessVars.StmtCtx.WarningCount(), Equals, uint16(warnCnt.add(2)))

// for error case under the strict model
sessVars.StrictSQLMode = true
input.Reset()
input.AppendNull(0)
err = f.vecEvalInt(input, result)
c.Assert(err, NotNil)
c.Assert(result.GetInt64(0), Equals, int64(0))

sessVars.StmtCtx.SetWarnings(nil)
input.Reset()
input.AppendFloat64(0, -2.5)
err = f.vecEvalInt(input, result)
c.Assert(err, NotNil)
c.Assert(result.GetInt64(0), Equals, int64(0))

//// strict model
input.Reset()
input.AppendFloat64(0, 0.5)
start := time.Now()
err = f.vecEvalInt(input, result)
c.Assert(err, IsNil)
c.Assert(result.GetInt64(0), Equals, int64(0))
sub := time.Since(start)
c.Assert(sub.Nanoseconds(), GreaterEqual, int64(0.5*1e9))

input.Reset()
input.AppendFloat64(0, 0.01)
input.AppendFloat64(0, 1)
input.AppendFloat64(0, 2)
start = time.Now()
go func() {
time.Sleep(1 * time.Second)
atomic.CompareAndSwapUint32(&ctx.GetSessionVars().Killed, 0, 1)
}()
err = f.vecEvalInt(input, result)
sub = time.Since(start)
c.Assert(err, IsNil)
c.Assert(result.GetInt64(0), Equals, int64(0))
c.Assert(result.GetInt64(1), Equals, int64(1))
c.Assert(result.GetInt64(2), Equals, int64(1))
c.Assert(sub.Nanoseconds(), LessEqual, int64(2*1e9))
c.Assert(sub.Nanoseconds(), GreaterEqual, int64(1*1e9))
}
2 changes: 1 addition & 1 deletion expression/evaluator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (s *testEvaluatorSuite) TestSleep(c *C) {
c.Assert(err, IsNil)
ret, isNull, err := f.evalInt(chunk.Row{})
c.Assert(err, IsNil)
c.Assert(isNull, IsTrue)
c.Assert(isNull, IsFalse)
c.Assert(ret, Equals, int64(0))
d[0].SetInt64(-1)
f, err = fc.getFunction(ctx, s.datumsToConstants(d))
Expand Down

0 comments on commit 6f6c415

Please sign in to comment.