Skip to content

Commit

Permalink
planner: introduce historical meta stats auto-dump (pingcap#32041)
Browse files Browse the repository at this point in the history
  • Loading branch information
An-DJ authored Mar 3, 2022
1 parent fb1066e commit fc6b939
Show file tree
Hide file tree
Showing 7 changed files with 205 additions and 4 deletions.
41 changes: 41 additions & 0 deletions executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2612,3 +2612,44 @@ func TestRecordHistoryStatsAfterAnalyze(t *testing.T) {
// 5. historical stats must be equal to the current stats
require.JSONEq(t, string(jsOrigin), string(jsCur))
}

func TestRecordHistoryStatsMetaAfterAnalyze(t *testing.T) {
store, dom, clean := testkit.CreateMockStoreAndDomain(t)
defer clean()

tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@tidb_analyze_version = 2")
tk.MustExec("set global tidb_enable_historical_stats = 0")
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int)")
tk.MustExec("analyze table test.t")

h := dom.StatsHandle()
is := dom.InfoSchema()
tableInfo, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)

// 1. switch off the tidb_enable_historical_stats, and there is no record in table `mysql.stats_meta_history`
tk.MustQuery(fmt.Sprintf("select count(*) from mysql.stats_meta_history where table_id = '%d'", tableInfo.Meta().ID)).Check(testkit.Rows("0"))
// insert demo tuples, and there is no record either.
insertNums := 5
for i := 0; i < insertNums; i++ {
tk.MustExec("insert into test.t (a,b) values (1,1), (2,2), (3,3)")
err := h.DumpStatsDeltaToKV(handle.DumpDelta)
require.NoError(t, err)
}
tk.MustQuery(fmt.Sprintf("select count(*) from mysql.stats_meta_history where table_id = '%d'", tableInfo.Meta().ID)).Check(testkit.Rows("0"))

// 2. switch on the tidb_enable_historical_stats and insert tuples to produce count/modifyCount delta change.
tk.MustExec("set global tidb_enable_historical_stats = 1")
defer tk.MustExec("set global tidb_enable_historical_stats = 0")

for i := 0; i < insertNums; i++ {
tk.MustExec("insert into test.t (a,b) values (1,1), (2,2), (3,3)")
err := h.DumpStatsDeltaToKV(handle.DumpDelta)
require.NoError(t, err)
}
tk.MustQuery(fmt.Sprintf("select modify_count, count from mysql.stats_meta_history where table_id = '%d' order by create_time", tableInfo.Meta().ID)).Sort().Check(
testkit.Rows("18 18", "21 21", "24 24", "27 27", "30 30"))
}
6 changes: 3 additions & 3 deletions executor/infoschema_cluster_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func (s *infosSchemaClusterTableSuite) TestTableStorageStats() {
"test 2",
))
rows := tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Rows()
s.Require().Len(rows, 29)
s.Require().Len(rows, 30)

// More tests about the privileges.
tk.MustExec("create user 'testuser'@'localhost'")
Expand All @@ -345,12 +345,12 @@ func (s *infosSchemaClusterTableSuite) TestTableStorageStats() {
Hostname: "localhost",
}, nil, nil))

tk.MustQuery("select count(1) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql'").Check(testkit.Rows("29"))
tk.MustQuery("select count(1) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql'").Check(testkit.Rows("30"))

s.Require().True(tk.Session().Auth(&auth.UserIdentity{
Username: "testuser3",
Hostname: "localhost",
}, nil, nil))

tk.MustQuery("select count(1) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql'").Check(testkit.Rows("29"))
tk.MustQuery("select count(1) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql'").Check(testkit.Rows("30"))
}
24 changes: 23 additions & 1 deletion session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,16 @@ const (
UNIQUE KEY table_version_seq (table_id, version, seq_no),
KEY table_create_time (table_id, create_time, seq_no)
);`
// CreateStatsMetaHistory stores the historical meta stats.
CreateStatsMetaHistory = `CREATE TABLE IF NOT EXISTS mysql.stats_meta_history (
table_id bigint(64) NOT NULL,
modify_count bigint(64) NOT NULL,
count bigint(64) NOT NULL,
version bigint(64) NOT NULL comment 'stats version which corresponding to stats:version in EXPLAIN',
create_time datetime(6) NOT NULL,
UNIQUE KEY table_version (table_id, version),
KEY table_create_time (table_id, create_time)
);`
)

// bootstrap initiates system DB for a store.
Expand Down Expand Up @@ -569,11 +579,13 @@ const (
version82 = 82
// version83 adds the tables mysql.stats_history
version83 = 83
// version84 adds the tables mysql.stats_meta_history
version84 = 84
)

// currentBootstrapVersion is defined as a variable, so we can modify its value for testing.
// please make sure this is the largest version
var currentBootstrapVersion int64 = version83
var currentBootstrapVersion int64 = version84

var (
bootstrapVersion = []func(Session, int64){
Expand Down Expand Up @@ -660,6 +672,7 @@ var (
upgradeToVer81,
upgradeToVer82,
upgradeToVer83,
upgradeToVer84,
}
)

Expand Down Expand Up @@ -1722,6 +1735,13 @@ func upgradeToVer83(s Session, ver int64) {
doReentrantDDL(s, CreateStatsHistory)
}

func upgradeToVer84(s Session, ver int64) {
if ver >= version84 {
return
}
doReentrantDDL(s, CreateStatsMetaHistory)
}

func writeOOMAction(s Session) {
comment := "oom-action is `log` by default in v3.0.x, `cancel` by default in v4.0.11+"
mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE= %?`,
Expand Down Expand Up @@ -1810,6 +1830,8 @@ func doDDLWorks(s Session) {
mustExecute(s, CreateAnalyzeOptionsTable)
// Create stats_history table.
mustExecute(s, CreateStatsHistory)
// Create stats_meta_history table.
mustExecute(s, CreateStatsMetaHistory)
}

// doDMLWorks executes DML statements in bootstrap stage.
Expand Down
32 changes: 32 additions & 0 deletions session/bootstrap_upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,35 @@ func TestUpgradeVersion83(t *testing.T) {
require.Equal(t, statsHistoryTblFields[i].tp, strings.ToLower(row.GetString(1)))
}
}

func TestUpgradeVersion84(t *testing.T) {
ctx := context.Background()
store, _, clean := testkit.CreateMockStoreAndDomain(t)
defer clean()

tk := testkit.NewTestKit(t, store)
ver, err := session.GetBootstrapVersion(tk.Session())
require.NoError(t, err)
require.Equal(t, session.CurrentBootstrapVersion, ver)

statsHistoryTblFields := []struct {
field string
tp string
}{
{"table_id", "bigint(64)"},
{"modify_count", "bigint(64)"},
{"count", "bigint(64)"},
{"version", "bigint(64)"},
{"create_time", "datetime(6)"},
}
rStatsHistoryTbl, err := tk.Exec(`desc mysql.stats_meta_history`)
require.NoError(t, err)
req := rStatsHistoryTbl.NewChunk(nil)
require.NoError(t, rStatsHistoryTbl.Next(ctx, req))
require.Equal(t, 5, req.NumRows())
for i := 0; i < 5; i++ {
row := req.GetRow(i)
require.Equal(t, statsHistoryTblFields[i].field, strings.ToLower(row.GetString(0)))
require.Equal(t, statsHistoryTblFields[i].tp, strings.ToLower(row.GetString(1)))
}
}
14 changes: 14 additions & 0 deletions statistics/handle/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,12 @@ func (h *Handle) DDLEventCh() chan *util.Event {
// insertTableStats2KV inserts a record standing for a new table to stats_meta and inserts some records standing for the
// new columns and indices which belong to this table.
func (h *Handle) insertTableStats2KV(info *model.TableInfo, physicalID int64) (err error) {
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
err = h.recordHistoricalStatsMeta(physicalID, statsVer)
}
}()
h.mu.Lock()
defer h.mu.Unlock()
ctx := context.Background()
Expand All @@ -195,6 +201,7 @@ func (h *Handle) insertTableStats2KV(info *model.TableInfo, physicalID int64) (e
if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_meta (version, table_id) values(%?, %?)", startTS, physicalID); err != nil {
return err
}
statsVer = startTS
for _, col := range info.Columns {
if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%?, 0, %?, 0, %?)", physicalID, col.ID, startTS); err != nil {
return err
Expand All @@ -211,6 +218,12 @@ func (h *Handle) insertTableStats2KV(info *model.TableInfo, physicalID int64) (e
// insertColStats2KV insert a record to stats_histograms with distinct_count 1 and insert a bucket to stats_buckets with default value.
// This operation also updates version.
func (h *Handle) insertColStats2KV(physicalID int64, colInfos []*model.ColumnInfo) (err error) {
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
err = h.recordHistoricalStatsMeta(physicalID, statsVer)
}
}()
h.mu.Lock()
defer h.mu.Unlock()

Expand All @@ -233,6 +246,7 @@ func (h *Handle) insertColStats2KV(physicalID int64, colInfos []*model.ColumnInf
if err != nil {
return
}
statsVer = startTS
// If we didn't update anything by last SQL, it means the stats of this table does not exist.
if h.mu.ctx.GetSessionVars().StmtCtx.AffectedRows() > 0 {
// By this step we can get the count of this table, then we can sure the count and repeats of bucket.
Expand Down
85 changes: 85 additions & 0 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,12 @@ func (h *Handle) StatsMetaCountAndModifyCount(tableID int64) (int64, int64, erro
// SaveTableStatsToStorage saves the stats of a table to storage.
func (h *Handle) SaveTableStatsToStorage(results *statistics.AnalyzeResults, needDumpFMS bool) (err error) {
tableID := results.TableID.GetStatisticsID()
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
err = h.recordHistoricalStatsMeta(tableID, statsVer)
}
}()
h.mu.Lock()
defer h.mu.Unlock()
ctx := context.TODO()
Expand Down Expand Up @@ -1026,6 +1032,7 @@ func (h *Handle) SaveTableStatsToStorage(results *statistics.AnalyzeResults, nee
if _, err = exec.ExecuteInternal(ctx, "replace into mysql.stats_meta (version, table_id, count, snapshot) values (%?, %?, %?, %?)", version, tableID, results.Count, results.Snapshot); err != nil {
return err
}
statsVer = version
} else {
modifyCnt := curModifyCnt - results.BaseModifyCnt
if modifyCnt < 0 {
Expand All @@ -1038,6 +1045,7 @@ func (h *Handle) SaveTableStatsToStorage(results *statistics.AnalyzeResults, nee
if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version=%?, modify_count=%?, count=%?, snapshot=%? where table_id=%?", version, modifyCnt, cnt, results.Snapshot, tableID); err != nil {
return err
}
statsVer = version
}
// 2. Save histograms.
for _, result := range results.Ars {
Expand Down Expand Up @@ -1149,6 +1157,12 @@ func (h *Handle) SaveTableStatsToStorage(results *statistics.AnalyzeResults, nee
// SaveStatsToStorage saves the stats to storage.
// TODO: refactor to reduce the number of parameters
func (h *Handle) SaveStatsToStorage(tableID int64, count int64, isIndex int, hg *statistics.Histogram, cms *statistics.CMSketch, topN *statistics.TopN, fms *statistics.FMSketch, statsVersion int, isAnalyzed int64, needDumpFMS bool, updateAnalyzeTime bool) (err error) {
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
err = h.recordHistoricalStatsMeta(tableID, statsVer)
}
}()
h.mu.Lock()
defer h.mu.Unlock()
ctx := context.TODO()
Expand All @@ -1175,6 +1189,7 @@ func (h *Handle) SaveStatsToStorage(tableID int64, count int64, isIndex int, hg
if err != nil {
return err
}
statsVer = version
cmSketch, err := statistics.EncodeCMSketchWithoutTopN(cms)
if err != nil {
return err
Expand Down Expand Up @@ -1252,6 +1267,12 @@ func (h *Handle) SaveStatsToStorage(tableID int64, count int64, isIndex int, hg

// SaveMetaToStorage will save stats_meta to storage.
func (h *Handle) SaveMetaToStorage(tableID, count, modifyCount int64) (err error) {
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
err = h.recordHistoricalStatsMeta(tableID, statsVer)
}
}()
h.mu.Lock()
defer h.mu.Unlock()
ctx := context.TODO()
Expand All @@ -1269,6 +1290,7 @@ func (h *Handle) SaveMetaToStorage(tableID, count, modifyCount int64) (err error
}
version := txn.StartTS()
_, err = exec.ExecuteInternal(ctx, "replace into mysql.stats_meta (version, table_id, count, modify_count) values (%?, %?, %?, %?)", version, tableID, count, modifyCount)
statsVer = version
return err
}

Expand Down Expand Up @@ -1445,6 +1467,12 @@ const (

// InsertExtendedStats inserts a record into mysql.stats_extended and update version in mysql.stats_meta.
func (h *Handle) InsertExtendedStats(statsName string, colIDs []int64, tp int, tableID int64, ifNotExists bool) (err error) {
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
err = h.recordHistoricalStatsMeta(tableID, statsVer)
}
}()
sort.Slice(colIDs, func(i, j int) bool { return colIDs[i] < colIDs[j] })
bytes, err := json.Marshal(colIDs)
if err != nil {
Expand Down Expand Up @@ -1490,6 +1518,7 @@ func (h *Handle) InsertExtendedStats(statsName string, colIDs []int64, tp int, t
if _, err = exec.ExecuteInternal(ctx, "UPDATE mysql.stats_meta SET version = %? WHERE table_id = %?", version, tableID); err != nil {
return err
}
statsVer = version
// Remove the existing 'deleted' records.
if _, err = exec.ExecuteInternal(ctx, "DELETE FROM mysql.stats_extended WHERE name = %? and table_id = %?", statsName, tableID); err != nil {
return err
Expand All @@ -1509,6 +1538,12 @@ func (h *Handle) InsertExtendedStats(statsName string, colIDs []int64, tp int, t

// MarkExtendedStatsDeleted update the status of mysql.stats_extended to be `deleted` and the version of mysql.stats_meta.
func (h *Handle) MarkExtendedStatsDeleted(statsName string, tableID int64, ifExists bool) (err error) {
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
err = h.recordHistoricalStatsMeta(tableID, statsVer)
}
}()
ctx := context.Background()
rows, _, err := h.execRestrictedSQL(ctx, "SELECT name FROM mysql.stats_extended WHERE name = %? and table_id = %? and status in (%?, %?)", statsName, tableID, StatsStatusInited, StatsStatusAnalyzed)
if err != nil {
Expand Down Expand Up @@ -1546,6 +1581,7 @@ func (h *Handle) MarkExtendedStatsDeleted(statsName string, tableID int64, ifExi
if _, err = exec.ExecuteInternal(ctx, "UPDATE mysql.stats_meta SET version = %? WHERE table_id = %?", version, tableID); err != nil {
return err
}
statsVer = version
if _, err = exec.ExecuteInternal(ctx, "UPDATE mysql.stats_extended SET version = %?, status = %? WHERE name = %? and table_id = %?", version, StatsStatusDeleted, statsName, tableID); err != nil {
return err
}
Expand Down Expand Up @@ -1715,6 +1751,12 @@ func (h *Handle) fillExtStatsCorrVals(item *statistics.ExtendedStatsItem, cols [

// SaveExtendedStatsToStorage writes extended stats of a table into mysql.stats_extended.
func (h *Handle) SaveExtendedStatsToStorage(tableID int64, extStats *statistics.ExtendedStatsColl, isLoad bool) (err error) {
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
err = h.recordHistoricalStatsMeta(tableID, statsVer)
}
}()
if extStats == nil || len(extStats.Stats) == 0 {
return nil
}
Expand Down Expand Up @@ -1756,6 +1798,7 @@ func (h *Handle) SaveExtendedStatsToStorage(tableID int64, extStats *statistics.
if _, err := exec.ExecuteInternal(ctx, "UPDATE mysql.stats_meta SET version = %? WHERE table_id = %?", version, tableID); err != nil {
return err
}
statsVer = version
}
return nil
}
Expand Down Expand Up @@ -1956,9 +1999,51 @@ func (h *Handle) RecordHistoricalStatsToStorage(dbName string, tableInfo *model.

// CheckHistoricalStatsEnable is used to check whether TiDBEnableHistoricalStats is enabled.
func (h *Handle) CheckHistoricalStatsEnable() (enable bool, err error) {
h.mu.Lock()
defer h.mu.Unlock()
val, err := h.mu.ctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBEnableHistoricalStats)
if err != nil {
return false, errors.Trace(err)
}
return variable.TiDBOptOn(val), nil
}

func (h *Handle) recordHistoricalStatsMeta(tableID int64, version uint64) error {
if tableID == 0 || version == 0 {
return errors.Errorf("tableID %d, version %d are invalid", tableID, version)
}
historicalStatsEnabled, err := h.CheckHistoricalStatsEnable()
if err != nil {
return errors.Errorf("check tidb_enable_historical_stats failed: %v", err)
}
if !historicalStatsEnabled {
return nil
}

ctx := context.Background()
h.mu.Lock()
defer h.mu.Unlock()
rows, _, err := h.execRestrictedSQL(ctx, "select modify_count, count from mysql.stats_meta where table_id = %? and version = %?", tableID, version)
if err != nil {
return errors.Trace(err)
}
if len(rows) == 0 {
return errors.New("no historical meta stats can be recorded")
}
modifyCount, count := rows[0].GetInt64(0), rows[0].GetInt64(1)

exec := h.mu.ctx.(sqlexec.SQLExecutor)
_, err = exec.ExecuteInternal(ctx, "begin pessimistic")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = finishTransaction(ctx, exec, err)
}()

const sql = "REPLACE INTO mysql.stats_meta_history(table_id, modify_count, count, version, create_time) VALUES (%?, %?, %?, %?, NOW())"
if _, err := exec.ExecuteInternal(ctx, sql, tableID, modifyCount, count, version); err != nil {
return errors.Trace(err)
}
return nil
}
Loading

0 comments on commit fc6b939

Please sign in to comment.