Skip to content

Commit

Permalink
case: make CTE case be stable (pingcap#25035)
Browse files Browse the repository at this point in the history
  • Loading branch information
guo-shaoge authored Jun 2, 2021
1 parent 32cf14b commit 9fad132
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 47 deletions.
4 changes: 2 additions & 2 deletions executor/cte.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,6 @@ func (e *CTEExec) Open(ctx context.Context) (err error) {
if err = e.iterOutTbl.OpenAndRef(); err != nil {
return err
}

setupCTEStorageTracker(e.iterOutTbl, e.ctx, e.memTracker, e.diskTracker)
}

if e.isDistinct {
Expand All @@ -137,11 +135,13 @@ func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
defer e.resTbl.Unlock()
resAction := setupCTEStorageTracker(e.resTbl, e.ctx, e.memTracker, e.diskTracker)
iterInAction := setupCTEStorageTracker(e.iterInTbl, e.ctx, e.memTracker, e.diskTracker)
iterOutAction := setupCTEStorageTracker(e.iterOutTbl, e.ctx, e.memTracker, e.diskTracker)

failpoint.Inject("testCTEStorageSpill", func(val failpoint.Value) {
if val.(bool) && config.GetGlobalConfig().OOMUseTmpStorage {
defer resAction.WaitForTest()
defer iterInAction.WaitForTest()
defer iterOutAction.WaitForTest()
}
})

Expand Down
85 changes: 40 additions & 45 deletions executor/cte_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,32 @@ import (
"github.com/pingcap/tidb/sessionctx"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/testkit"
)

var _ = check.Suite(&CTETestSuite{})
var _ = check.Suite(&CTETestSuite{&baseCTETestSuite{}})
var _ = check.SerialSuites(&CTESerialTestSuite{&baseCTETestSuite{}})

type CTETestSuite struct {
type baseCTETestSuite struct {
store kv.Storage
dom *domain.Domain
sessionCtx sessionctx.Context
session session.Session
ctx context.Context
}

func (test *CTETestSuite) SetUpSuite(c *check.C) {
type CTETestSuite struct {
*baseCTETestSuite
}

type CTESerialTestSuite struct {
*baseCTETestSuite
}

func (test *baseCTETestSuite) SetUpSuite(c *check.C) {
var err error
test.store, err = mockstore.NewMockStore()
c.Assert(err, check.IsNil)
Expand All @@ -59,7 +69,7 @@ func (test *CTETestSuite) SetUpSuite(c *check.C) {
test.ctx = context.Background()
}

func (test *CTETestSuite) TearDownSuite(c *check.C) {
func (test *baseCTETestSuite) TearDownSuite(c *check.C) {
test.dom.Close()
test.store.Close()
}
Expand Down Expand Up @@ -107,70 +117,55 @@ func (test *CTETestSuite) TestBasicCTE(c *check.C) {
rows.Check(testkit.Rows("1", "2"))
}

func (test *CTETestSuite) TestSpillToDisk(c *check.C) {
func (test *CTESerialTestSuite) TestSpillToDisk(c *check.C) {
defer config.RestoreFunc()()
config.UpdateGlobal(func(conf *config.Config) {
conf.OOMUseTmpStorage = true
})

tk := testkit.NewTestKit(c, test.store)
tk.MustExec("use test;")

c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/testCTEStorageSpill", "return(true)"), check.IsNil)
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/testCTEStorageSpill"), check.IsNil)
tk.MustExec("set tidb_mem_quota_query = 1073741824;")
}()
c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/testSortedRowContainerSpill", "return(true)"), check.IsNil)
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/testSortedRowContainerSpill"), check.IsNil)
}()

insertStr := "insert into t1 values(0, 0)"
for i := 1; i < 5000; i++ {
insertStr += fmt.Sprintf(", (%d, %d)", i, i)
}

tk.MustExec("drop table if exists t1;")
tk.MustExec("create table t1(c1 int, c2 int);")
tk.MustExec(insertStr)
tk.MustExec("set tidb_mem_quota_query = 80000;")
rows := tk.MustQuery("with recursive cte1 as ( " +
"select c1 from t1 " +
"union " +
"select c1 + 1 c1 from cte1 where c1 < 5000) " +
"select c1 from cte1;")

memTracker := tk.Se.GetSessionVars().StmtCtx.MemTracker
diskTracker := tk.Se.GetSessionVars().StmtCtx.DiskTracker
c.Assert(memTracker.MaxConsumed(), check.Greater, int64(0))
c.Assert(diskTracker.MaxConsumed(), check.Greater, int64(0))

rowNum := 5000
var resRows []string
for i := 0; i <= rowNum; i++ {
resRows = append(resRows, fmt.Sprintf("%d", i))
}
rows.Check(testkit.Rows(resRows...))

// Use duplicated rows to test UNION DISTINCT.
tk.MustExec("set tidb_mem_quota_query = 1073741824;")
insertStr = "insert into t1 values(0, 0)"
insertStr := "insert into t1 values(0)"
rowNum := 1000
vals := make([]int, rowNum)
vals[0] = 0
for i := 1; i < rowNum; i++ {
v := rand.Intn(100)
vals[i] = v
insertStr += fmt.Sprintf(", (%d, %d)", v, v)
insertStr += fmt.Sprintf(", (%d)", v)
}
tk.MustExec("drop table if exists t1;")
tk.MustExec("create table t1(c1 int, c2 int);")
tk.MustExec("create table t1(c1 int);")
tk.MustExec(insertStr)
tk.MustExec("set tidb_mem_quota_query = 80000;")
tk.MustExec("set tidb_mem_quota_query = 40000;")
tk.MustExec("set cte_max_recursion_depth = 500000;")
rows = tk.MustQuery("with recursive cte1 as ( " +
"select c1 from t1 " +
"union " +
"select c1 + 1 c1 from cte1 where c1 < 5000) " +
"select c1 from cte1 order by c1;")

memTracker = tk.Se.GetSessionVars().StmtCtx.MemTracker
diskTracker = tk.Se.GetSessionVars().StmtCtx.DiskTracker
sql := fmt.Sprintf("with recursive cte1 as ( "+
"select c1 from t1 "+
"union "+
"select c1 + 1 c1 from cte1 where c1 < %d) "+
"select c1 from cte1 order by c1;", rowNum)
rows := tk.MustQuery(sql)

memTracker := tk.Se.GetSessionVars().StmtCtx.MemTracker
diskTracker := tk.Se.GetSessionVars().StmtCtx.DiskTracker
c.Assert(memTracker.MaxConsumed(), check.Greater, int64(0))
c.Assert(diskTracker.MaxConsumed(), check.Greater, int64(0))

sort.Ints(vals)
resRows = make([]string, 0, rowNum)
resRows := make([]string, 0, rowNum)
for i := vals[0]; i <= rowNum; i++ {
resRows = append(resRows, fmt.Sprintf("%d", i))
}
Expand Down

0 comments on commit 9fad132

Please sign in to comment.