Skip to content

Commit

Permalink
statistics: handle the prune mode correctly in the refresher (pingcap…
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 authored Nov 5, 2024
1 parent f238540 commit b71f0c0
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 35 deletions.
2 changes: 1 addition & 1 deletion pkg/statistics/handle/autoanalyze/autoanalyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func (sa *statsAnalyze) handleAutoAnalyze(sctx sessionctx.Context) bool {
sa.refresher.ProcessDMLChangesForTest()
sa.refresher.RequeueFailedJobsForTest()
}
analyzed := sa.refresher.AnalyzeHighestPriorityTables()
analyzed := sa.refresher.AnalyzeHighestPriorityTables(sctx)
// During the test, we need to wait for the auto analyze job to be finished.
if intest.InTest {
sa.refresher.WaitAutoAnalyzeFinishedForTest()
Expand Down
44 changes: 34 additions & 10 deletions pkg/statistics/handle/autoanalyze/priorityqueue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,21 +167,37 @@ func TestRefreshLastAnalysisDuration(t *testing.T) {
require.Len(t, runningJobs, 2)
}

func TestProcessDMLChanges(t *testing.T) {
func testProcessDMLChanges(t *testing.T, partitioned bool) {
store, dom := testkit.CreateMockStoreAndDomain(t)
handle := dom.StatsHandle()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t1 (a int)")
tk.MustExec("create table t2 (a int)")
ctx := context.Background()
if partitioned {
tk.MustExec("use test")
tk.MustExec("create table t1 (a int) partition by range (a) (partition p0 values less than (10), partition p1 values less than (20))")
tk.MustExec("create table t2 (a int) partition by range (a) (partition p0 values less than (10), partition p1 values less than (20))")
// Because we don't handle the DDL events in unit tests by default,
// we need to use this way to make sure the stats record for the global table is created.
// Insert some rows into the tables.
tk.MustExec("insert into t1 values (11)")
tk.MustExec("insert into t2 values (12)")
require.NoError(t, handle.DumpStatsDeltaToKV(true))
// Analyze the tables.
tk.MustExec("analyze table t1")
tk.MustExec("analyze table t2")
require.NoError(t, handle.Update(ctx, dom.InfoSchema()))
} else {
tk.MustExec("use test")
tk.MustExec("create table t1 (a int)")
tk.MustExec("create table t2 (a int)")
}
tk.MustExec("insert into t1 values (1)")
tk.MustExec("insert into t2 values (1)")
tk.MustExec("insert into t2 values (1), (2)")
statistics.AutoAnalyzeMinCnt = 0
defer func() {
statistics.AutoAnalyzeMinCnt = 1000
}()

ctx := context.Background()
require.NoError(t, handle.DumpStatsDeltaToKV(true))
require.NoError(t, handle.Update(ctx, dom.InfoSchema()))
schema := pmodel.NewCIStr("test")
Expand All @@ -205,10 +221,10 @@ func TestProcessDMLChanges(t *testing.T) {
require.NoError(t, job2.Analyze(handle, dom.SysProcTracker()))
require.NoError(t, handle.Update(ctx, dom.InfoSchema()))

// Insert 10 rows into t1.
tk.MustExec("insert into t1 values (2), (3), (4), (5), (6), (7), (8), (9), (10), (11)")
// Insert 2 rows into t2.
tk.MustExec("insert into t2 values (2), (3)")
// Insert 9 rows into t1.
tk.MustExec("insert into t1 values (3), (4), (5), (6), (7), (8), (9), (10), (11)")
// Insert 1 row into t2.
tk.MustExec("insert into t2 values (3)")

// Dump the stats to kv.
require.NoError(t, handle.DumpStatsDeltaToKV(true))
Expand Down Expand Up @@ -242,6 +258,14 @@ func TestProcessDMLChanges(t *testing.T) {
require.Equal(t, tbl2.Meta().ID, updatedJob2.GetTableID(), "t2 should have higher weight due to smaller table size")
}

func TestProcessDMLChanges(t *testing.T) {
testProcessDMLChanges(t, false)
}

func TestProcessDMLChangesPartitioned(t *testing.T) {
testProcessDMLChanges(t, true)
}

func TestProcessDMLChangesWithRunningJobs(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
handle := dom.StatsHandle()
Expand Down
3 changes: 2 additions & 1 deletion pkg/statistics/handle/autoanalyze/refresher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ go_test(
"worker_test.go",
],
flaky = True,
shard_count = 8,
shard_count = 9,
deps = [
":refresher",
"//pkg/parser/model",
Expand All @@ -42,6 +42,7 @@ go_test(
"//pkg/statistics",
"//pkg/statistics/handle/autoanalyze/priorityqueue",
"//pkg/statistics/handle/types",
"//pkg/statistics/handle/util",
"//pkg/testkit",
"//pkg/testkit/testsetup",
"@com_github_stretchr_testify//require",
Expand Down
20 changes: 8 additions & 12 deletions pkg/statistics/handle/autoanalyze/refresher/refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,33 +82,29 @@ func (r *Refresher) UpdateConcurrency() {
}

// AnalyzeHighestPriorityTables picks tables with the highest priority and analyzes them.
func (r *Refresher) AnalyzeHighestPriorityTables() bool {
se, err := r.statsHandle.SPool().Get()
if err != nil {
statslogutil.StatsLogger().Error("Failed to get session context", zap.Error(err))
return false
}
defer r.statsHandle.SPool().Put(se)

sctx := se.(sessionctx.Context)
// Note: Make sure the session has the latest variable values.
// Usually, this is done by the caller through `util.CallWithSCtx`.
func (r *Refresher) AnalyzeHighestPriorityTables(sctx sessionctx.Context) bool {
parameters := exec.GetAutoAnalyzeParameters(sctx)
err = r.setAutoAnalysisTimeWindow(parameters)
err := r.setAutoAnalysisTimeWindow(parameters)
if err != nil {
statslogutil.StatsLogger().Error("Set auto analyze time window failed", zap.Error(err))
return false
}
if !r.isWithinTimeWindow() {
return false
}
currentAutoAnalyzeRatio := exec.ParseAutoAnalyzeRatio(parameters[variable.TiDBAutoAnalyzeRatio])
currentPruneMode := variable.PartitionPruneMode(sctx.GetSessionVars().PartitionPruneMode.Load())
if !r.jobs.IsInitialized() {
if err := r.jobs.Initialize(); err != nil {
statslogutil.StatsLogger().Error("Failed to initialize the queue", zap.Error(err))
return false
}
r.lastSeenAutoAnalyzeRatio = currentAutoAnalyzeRatio
r.lastSeenPruneMode = currentPruneMode
} else {
// Only do this if the queue is already initialized.
currentAutoAnalyzeRatio := exec.ParseAutoAnalyzeRatio(parameters[variable.TiDBAutoAnalyzeRatio])
currentPruneMode := variable.PartitionPruneMode(sctx.GetSessionVars().PartitionPruneMode.Load())
if currentAutoAnalyzeRatio != r.lastSeenAutoAnalyzeRatio || currentPruneMode != r.lastSeenPruneMode {
r.lastSeenAutoAnalyzeRatio = currentAutoAnalyzeRatio
r.lastSeenPruneMode = currentPruneMode
Expand Down
105 changes: 95 additions & 10 deletions pkg/statistics/handle/autoanalyze/refresher/refresher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,66 @@ import (
"testing"

pmodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/refresher"
"github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/stretchr/testify/require"
)

func TestChangePruneMode(t *testing.T) {
statistics.AutoAnalyzeMinCnt = 0
defer func() {
statistics.AutoAnalyzeMinCnt = 1000
}()

store, dom := testkit.CreateMockStoreAndDomain(t)
handle := dom.StatsHandle()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t1 (a int, b int, index idx(a)) partition by range (a) (partition p0 values less than (10), partition p1 values less than (140))")
tk.MustExec("insert into t1 values (0, 0)")
require.NoError(t, handle.DumpStatsDeltaToKV(true))
require.NoError(t, handle.Update(context.Background(), dom.InfoSchema()))
tk.MustExec("analyze table t1")
r := refresher.NewRefresher(handle, dom.SysProcTracker(), dom.DDLNotifier())
defer r.Close()

// Insert more data to each partition.
tk.MustExec("insert into t1 values (1, 1), (11, 11)")
require.NoError(t, handle.DumpStatsDeltaToKV(true))
require.NoError(t, handle.Update(context.Background(), dom.InfoSchema()))

// Two jobs are added because the prune mode is static.
tk.MustExec("set global tidb_partition_prune_mode = 'static'")
require.NoError(t, util.CallWithSCtx(handle.SPool(), func(sctx sessionctx.Context) error {
require.True(t, handle.HandleAutoAnalyze())
return nil
}))
r.WaitAutoAnalyzeFinishedForTest()
require.NoError(t, util.CallWithSCtx(handle.SPool(), func(sctx sessionctx.Context) error {
require.True(t, r.AnalyzeHighestPriorityTables(sctx))
return nil
}))
r.WaitAutoAnalyzeFinishedForTest()
require.Equal(t, 0, r.Len())

// Insert more data to each partition.
tk.MustExec("insert into t1 values (2, 2), (3, 3), (4, 4), (12, 12), (13, 13), (14, 14)")
require.NoError(t, handle.DumpStatsDeltaToKV(true))
require.NoError(t, handle.Update(context.Background(), dom.InfoSchema()))

// One job is added because the prune mode is dynamic.
tk.MustExec("set global tidb_partition_prune_mode = 'dynamic'")
require.NoError(t, util.CallWithSCtx(handle.SPool(), func(sctx sessionctx.Context) error {
require.True(t, r.AnalyzeHighestPriorityTables(sctx))
return nil
}))
r.WaitAutoAnalyzeFinishedForTest()
require.Equal(t, 0, r.Len())
}

func TestSkipAnalyzeTableWhenAutoAnalyzeRatioIsZero(t *testing.T) {
statistics.AutoAnalyzeMinCnt = 0
defer func() {
Expand Down Expand Up @@ -67,12 +121,19 @@ func TestSkipAnalyzeTableWhenAutoAnalyzeRatioIsZero(t *testing.T) {
r := refresher.NewRefresher(handle, sysProcTracker, dom.DDLNotifier())
defer r.Close()
// No jobs are added.
require.False(t, r.AnalyzeHighestPriorityTables())
require.NoError(t, util.CallWithSCtx(handle.SPool(), func(sctx sessionctx.Context) error {
require.False(t, r.AnalyzeHighestPriorityTables(sctx))
return nil
}))
require.Equal(t, 0, r.Len())
// Enable the auto analyze.
tk.MustExec("set global tidb_auto_analyze_ratio = 0.2")
// Jobs are added.
require.True(t, r.AnalyzeHighestPriorityTables())
require.NoError(t, util.CallWithSCtx(handle.SPool(), func(sctx sessionctx.Context) error {
require.True(t, r.AnalyzeHighestPriorityTables(sctx))
return nil
}))
require.Equal(t, 0, r.Len())
}

func TestIgnoreNilOrPseudoStatsOfPartitionedTable(t *testing.T) {
Expand All @@ -92,7 +153,10 @@ func TestIgnoreNilOrPseudoStatsOfPartitionedTable(t *testing.T) {
sysProcTracker := dom.SysProcTracker()
r := refresher.NewRefresher(handle, sysProcTracker, dom.DDLNotifier())
defer r.Close()
require.False(t, r.AnalyzeHighestPriorityTables())
require.NoError(t, util.CallWithSCtx(handle.SPool(), func(sctx sessionctx.Context) error {
require.False(t, r.AnalyzeHighestPriorityTables(sctx))
return nil
}))
require.Equal(t, 0, r.Len(), "No jobs are added because table stats are nil")
}

Expand All @@ -113,7 +177,10 @@ func TestIgnoreNilOrPseudoStatsOfNonPartitionedTable(t *testing.T) {
sysProcTracker := dom.SysProcTracker()
r := refresher.NewRefresher(handle, sysProcTracker, dom.DDLNotifier())
defer r.Close()
require.False(t, r.AnalyzeHighestPriorityTables())
require.NoError(t, util.CallWithSCtx(handle.SPool(), func(sctx sessionctx.Context) error {
require.False(t, r.AnalyzeHighestPriorityTables(sctx))
return nil
}))
require.Equal(t, 0, r.Len(), "No jobs are added because table stats are nil")
}

Expand Down Expand Up @@ -158,7 +225,10 @@ func TestIgnoreTinyTable(t *testing.T) {
sysProcTracker := dom.SysProcTracker()
r := refresher.NewRefresher(handle, sysProcTracker, dom.DDLNotifier())
defer r.Close()
require.True(t, r.AnalyzeHighestPriorityTables())
require.NoError(t, util.CallWithSCtx(handle.SPool(), func(sctx sessionctx.Context) error {
require.True(t, r.AnalyzeHighestPriorityTables(sctx))
return nil
}))
require.Equal(t, 0, r.Len(), "Only t1 is added to the job queue, because t2 is a tiny table(not enough data)")
}

Expand Down Expand Up @@ -194,7 +264,10 @@ func TestAnalyzeHighestPriorityTables(t *testing.T) {
r := refresher.NewRefresher(handle, sysProcTracker, dom.DDLNotifier())
defer r.Close()
// Analyze t1 first.
require.True(t, r.AnalyzeHighestPriorityTables())
require.NoError(t, util.CallWithSCtx(handle.SPool(), func(sctx sessionctx.Context) error {
require.True(t, r.AnalyzeHighestPriorityTables(sctx))
return nil
}))
r.WaitAutoAnalyzeFinishedForTest()
require.NoError(t, handle.DumpStatsDeltaToKV(true))
require.NoError(t, handle.Update(context.Background(), dom.InfoSchema()))
Expand All @@ -212,7 +285,10 @@ func TestAnalyzeHighestPriorityTables(t *testing.T) {
tblStats2 := handle.GetPartitionStats(tbl2.Meta(), pid2)
require.Equal(t, int64(6), tblStats2.ModifyCount)
// Do one more round.
require.True(t, r.AnalyzeHighestPriorityTables())
require.NoError(t, util.CallWithSCtx(handle.SPool(), func(sctx sessionctx.Context) error {
require.True(t, r.AnalyzeHighestPriorityTables(sctx))
return nil
}))
r.WaitAutoAnalyzeFinishedForTest()
// t2 is analyzed.
pid2 = tbl2.Meta().GetPartitionInfo().Definitions[1].ID
Expand Down Expand Up @@ -257,7 +333,10 @@ func TestAnalyzeHighestPriorityTablesConcurrently(t *testing.T) {
r := refresher.NewRefresher(handle, sysProcTracker, dom.DDLNotifier())
defer r.Close()
// Analyze tables concurrently.
require.True(t, r.AnalyzeHighestPriorityTables())
require.NoError(t, util.CallWithSCtx(handle.SPool(), func(sctx sessionctx.Context) error {
require.True(t, r.AnalyzeHighestPriorityTables(sctx))
return nil
}))
r.WaitAutoAnalyzeFinishedForTest()
require.NoError(t, handle.DumpStatsDeltaToKV(true))
require.NoError(t, handle.Update(context.Background(), dom.InfoSchema()))
Expand All @@ -284,7 +363,10 @@ func TestAnalyzeHighestPriorityTablesConcurrently(t *testing.T) {
require.Equal(t, int64(4), tblStats3.ModifyCount)

// Do one more round to analyze t3.
require.True(t, r.AnalyzeHighestPriorityTables())
require.NoError(t, util.CallWithSCtx(handle.SPool(), func(sctx sessionctx.Context) error {
require.True(t, r.AnalyzeHighestPriorityTables(sctx))
return nil
}))
r.WaitAutoAnalyzeFinishedForTest()
require.NoError(t, handle.DumpStatsDeltaToKV(true))
require.NoError(t, handle.Update(context.Background(), dom.InfoSchema()))
Expand Down Expand Up @@ -322,7 +404,10 @@ func TestAnalyzeHighestPriorityTablesWithFailedAnalysis(t *testing.T) {
r := refresher.NewRefresher(handle, sysProcTracker, dom.DDLNotifier())
defer r.Close()

r.AnalyzeHighestPriorityTables()
require.NoError(t, util.CallWithSCtx(handle.SPool(), func(sctx sessionctx.Context) error {
require.True(t, r.AnalyzeHighestPriorityTables(sctx))
return nil
}))
r.WaitAutoAnalyzeFinishedForTest()

is := dom.InfoSchema()
Expand Down
4 changes: 3 additions & 1 deletion pkg/statistics/handle/updatetest/update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1284,6 +1284,8 @@ func TestAutoAnalyzePartitionTableAfterAddingIndex(t *testing.T) {
tblInfo := tbl.Meta()
idxInfo := tblInfo.Indices[0]
require.Nil(t, h.GetTableStats(tblInfo).GetIdx(idxInfo.ID))
require.True(t, h.HandleAutoAnalyze())
require.Eventually(t, func() bool {
return h.HandleAutoAnalyze()
}, 3*time.Second, time.Millisecond*100)
require.NotNil(t, h.GetTableStats(tblInfo).GetIdx(idxInfo.ID))
}

0 comments on commit b71f0c0

Please sign in to comment.