Skip to content

Commit

Permalink
statistics: refactor the statistics package use the RestrictedSQLExec…
Browse files Browse the repository at this point in the history
…utor API (pingcap#22636)
  • Loading branch information
tiancaiamao authored Feb 3, 2021
1 parent 26086b2 commit 17a65ab
Show file tree
Hide file tree
Showing 11 changed files with 299 additions and 255 deletions.
4 changes: 2 additions & 2 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1063,7 +1063,7 @@ func (do *Domain) StatsHandle() *handle.Handle {

// CreateStatsHandle is used only for test.
func (do *Domain) CreateStatsHandle(ctx sessionctx.Context) error {
h, err := handle.NewHandle(ctx, do.statsLease)
h, err := handle.NewHandle(ctx, do.statsLease, do.sysSessionPool)
if err != nil {
return err
}
Expand Down Expand Up @@ -1093,7 +1093,7 @@ var RunAutoAnalyze = true
// It should be called only once in BootstrapSession.
func (do *Domain) UpdateTableStatsLoop(ctx sessionctx.Context) error {
ctx.GetSessionVars().InRestrictedSQL = true
statsHandle, err := handle.NewHandle(ctx, do.statsLease)
statsHandle, err := handle.NewHandle(ctx, do.statsLease, do.sysSessionPool)
if err != nil {
return err
}
Expand Down
5 changes: 1 addition & 4 deletions server/statistics_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/gcutil"
"github.com/pingcap/tidb/util/sqlexec"
)

// StatsHandler is the handler for dumping statistics.
Expand Down Expand Up @@ -122,9 +121,7 @@ func (sh StatsHistoryHandler) ServeHTTP(w http.ResponseWriter, req *http.Request
writeError(w, err)
return
}
se.GetSessionVars().SnapshotInfoschema, se.GetSessionVars().SnapshotTS = is, snapshot
historyStatsExec := se.(sqlexec.RestrictedSQLExecutor)
js, err := h.DumpStatsToJSON(params[pDBName], tbl.Meta(), historyStatsExec)
js, err := h.DumpStatsToJSONBySnapshot(params[pDBName], tbl.Meta(), snapshot)
if err != nil {
writeError(w, err)
} else {
Expand Down
44 changes: 18 additions & 26 deletions statistics/handle/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,16 @@ func (h *Handle) initStatsMeta4Chunk(is infoschema.InfoSchema, cache *statsCache

func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (statsCache, error) {
sql := "select HIGH_PRIORITY version, table_id, modify_count, count from mysql.stats_meta"
rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql)
if len(rc) > 0 {
defer terror.Call(rc[0].Close)
}
rc, err := h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), sql)
if err != nil {
return statsCache{}, errors.Trace(err)
}
defer terror.Call(rc.Close)
tables := statsCache{tables: make(map[int64]*statistics.Table)}
req := rc[0].NewChunk()
req := rc.NewChunk()
iter := chunk.NewIterator4Chunk(req)
for {
err := rc[0].Next(context.TODO(), req)
err := rc.Next(context.TODO(), req)
if err != nil {
return statsCache{}, errors.Trace(err)
}
Expand Down Expand Up @@ -159,17 +157,15 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache *stat

func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, cache *statsCache) error {
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms"
rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql)
if len(rc) > 0 {
defer terror.Call(rc[0].Close)
}
rc, err := h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), sql)
if err != nil {
return errors.Trace(err)
}
req := rc[0].NewChunk()
defer terror.Call(rc.Close)
req := rc.NewChunk()
iter := chunk.NewIterator4Chunk(req)
for {
err := rc[0].Next(context.TODO(), req)
err := rc.Next(context.TODO(), req)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -207,17 +203,15 @@ func (h *Handle) initStatsTopN4Chunk(cache *statsCache, iter *chunk.Iterator4Chu

func (h *Handle) initStatsTopN(cache *statsCache) error {
sql := "select HIGH_PRIORITY table_id, hist_id, value, count from mysql.stats_top_n where is_index = 1"
rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql)
if len(rc) > 0 {
defer terror.Call(rc[0].Close)
}
rc, err := h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), sql)
if err != nil {
return errors.Trace(err)
}
req := rc[0].NewChunk()
defer terror.Call(rc.Close)
req := rc.NewChunk()
iter := chunk.NewIterator4Chunk(req)
for {
err := rc[0].Next(context.TODO(), req)
err := rc.Next(context.TODO(), req)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -304,17 +298,15 @@ func (h *Handle) initTopNCountSum(tableID, colID int64) (int64, error) {

func (h *Handle) initStatsBuckets(cache *statsCache) error {
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound, ndv from mysql.stats_buckets order by table_id, is_index, hist_id, bucket_id"
rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql)
if len(rc) > 0 {
defer terror.Call(rc[0].Close)
}
rc, err := h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), sql)
if err != nil {
return errors.Trace(err)
}
req := rc[0].NewChunk()
defer terror.Call(rc.Close)
req := rc.NewChunk()
iter := chunk.NewIterator4Chunk(req)
for {
err := rc[0].Next(context.TODO(), req)
err := rc.Next(context.TODO(), req)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -347,13 +339,13 @@ func (h *Handle) initStatsBuckets(cache *statsCache) error {
func (h *Handle) InitStats(is infoschema.InfoSchema) (err error) {
h.mu.Lock()
defer func() {
_, err1 := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), "commit")
_, err1 := h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), "commit")
if err == nil && err1 != nil {
err = err1
}
h.mu.Unlock()
}()
_, err = h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), "begin")
_, err = h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), "begin")
if err != nil {
return err
}
Expand Down
69 changes: 33 additions & 36 deletions statistics/handle/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package handle

import (
"context"
"fmt"

"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -93,28 +92,34 @@ func (h *Handle) DDLEventCh() chan *util.Event {
func (h *Handle) insertTableStats2KV(info *model.TableInfo, physicalID int64) (err error) {
h.mu.Lock()
defer h.mu.Unlock()
ctx := context.Background()
exec := h.mu.ctx.(sqlexec.SQLExecutor)
_, err = exec.Execute(context.Background(), "begin")
_, err = exec.ExecuteInternal(ctx, "begin")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = finishTransaction(context.Background(), exec, err)
err = finishTransaction(ctx, exec, err)
}()
txn, err := h.mu.ctx.Txn(true)
if err != nil {
return errors.Trace(err)
}
startTS := txn.StartTS()
sqls := make([]string, 0, 1+len(info.Columns)+len(info.Indices))
sqls = append(sqls, fmt.Sprintf("insert into mysql.stats_meta (version, table_id) values(%d, %d)", startTS, physicalID))
if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_meta (version, table_id) values(%?, %?)", startTS, physicalID); err != nil {
return err
}
for _, col := range info.Columns {
sqls = append(sqls, fmt.Sprintf("insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%d, 0, %d, 0, %d)", physicalID, col.ID, startTS))
if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%?, 0, %?, 0, %?)", physicalID, col.ID, startTS); err != nil {
return err
}
}
for _, idx := range info.Indices {
sqls = append(sqls, fmt.Sprintf("insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%d, 1, %d, 0, %d)", physicalID, idx.ID, startTS))
if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%?, 1, %?, 0, %?)", physicalID, idx.ID, startTS); err != nil {
return err
}
}
return execSQLs(context.Background(), exec, sqls)
return nil
}

// insertColStats2KV insert a record to stats_histograms with distinct_count 1 and insert a bucket to stats_buckets with default value.
Expand All @@ -123,43 +128,40 @@ func (h *Handle) insertColStats2KV(physicalID int64, colInfos []*model.ColumnInf
h.mu.Lock()
defer h.mu.Unlock()

ctx := context.TODO()
exec := h.mu.ctx.(sqlexec.SQLExecutor)
_, err = exec.Execute(context.Background(), "begin")
_, err = exec.ExecuteInternal(ctx, "begin")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = finishTransaction(context.Background(), exec, err)
err = finishTransaction(ctx, exec, err)
}()
txn, err := h.mu.ctx.Txn(true)
if err != nil {
return errors.Trace(err)
}
startTS := txn.StartTS()
// First of all, we update the version.
_, err = exec.Execute(context.Background(), fmt.Sprintf("update mysql.stats_meta set version = %d where table_id = %d ", startTS, physicalID))
_, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %? where table_id = %?", startTS, physicalID)
if err != nil {
return
}
ctx := context.TODO()
// If we didn't update anything by last SQL, it means the stats of this table does not exist.
if h.mu.ctx.GetSessionVars().StmtCtx.AffectedRows() > 0 {
// By this step we can get the count of this table, then we can sure the count and repeats of bucket.
var rs []sqlexec.RecordSet
rs, err = exec.Execute(ctx, fmt.Sprintf("select count from mysql.stats_meta where table_id = %d", physicalID))
if len(rs) > 0 {
defer terror.Call(rs[0].Close)
}
var rs sqlexec.RecordSet
rs, err = exec.ExecuteInternal(ctx, "select count from mysql.stats_meta where table_id = %?", physicalID)
if err != nil {
return
}
req := rs[0].NewChunk()
err = rs[0].Next(ctx, req)
defer terror.Call(rs.Close)
req := rs.NewChunk()
err = rs.Next(ctx, req)
if err != nil {
return
}
count := req.GetRow(0).GetInt64(0)
sqls := make([]string, 0, len(colInfos))
for _, colInfo := range colInfos {
value := types.NewDatum(colInfo.GetOriginDefaultValue())
value, err = value.ConvertTo(h.mu.ctx.GetSessionVars().StmtCtx, &colInfo.FieldType)
Expand All @@ -168,40 +170,35 @@ func (h *Handle) insertColStats2KV(physicalID int64, colInfos []*model.ColumnInf
}
if value.IsNull() {
// If the adding column has default value null, all the existing rows have null value on the newly added column.
sqls = append(sqls, fmt.Sprintf("insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, null_count) values (%d, %d, 0, %d, 0, %d)", startTS, physicalID, colInfo.ID, count))
if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, null_count) values (%?, %?, 0, %?, 0, %?)", startTS, physicalID, colInfo.ID, count); err != nil {
return err
}
} else {
// If this stats exists, we insert histogram meta first, the distinct_count will always be one.
sqls = append(sqls, fmt.Sprintf("insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, tot_col_size) values (%d, %d, 0, %d, 1, %d)", startTS, physicalID, colInfo.ID, int64(len(value.GetBytes()))*count))
if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, tot_col_size) values (%?, %?, 0, %?, 1, %?)", startTS, physicalID, colInfo.ID, int64(len(value.GetBytes()))*count); err != nil {
return err
}
value, err = value.ConvertTo(h.mu.ctx.GetSessionVars().StmtCtx, types.NewFieldType(mysql.TypeBlob))
if err != nil {
return
}
// There must be only one bucket for this new column and the value is the default value.
sqls = append(sqls, fmt.Sprintf("insert into mysql.stats_buckets (table_id, is_index, hist_id, bucket_id, repeats, count, lower_bound, upper_bound) values (%d, 0, %d, 0, %d, %d, X'%X', X'%X')", physicalID, colInfo.ID, count, count, value.GetBytes(), value.GetBytes()))
if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_buckets (table_id, is_index, hist_id, bucket_id, repeats, count, lower_bound, upper_bound) values (%?, 0, %?, 0, %?, %?, %?, %?)", physicalID, colInfo.ID, count, count, value.GetBytes(), value.GetBytes()); err != nil {
return err
}
}
}
return execSQLs(context.Background(), exec, sqls)
}
return
}

// finishTransaction will execute `commit` when error is nil, otherwise `rollback`.
func finishTransaction(ctx context.Context, exec sqlexec.SQLExecutor, err error) error {
if err == nil {
_, err = exec.Execute(ctx, "commit")
_, err = exec.ExecuteInternal(ctx, "commit")
} else {
_, err1 := exec.Execute(ctx, "rollback")
_, err1 := exec.ExecuteInternal(ctx, "rollback")
terror.Log(errors.Trace(err1))
}
return errors.Trace(err)
}

func execSQLs(ctx context.Context, exec sqlexec.SQLExecutor, sqls []string) error {
for _, sql := range sqls {
_, err := exec.Execute(ctx, sql)
if err != nil {
return err
}
}
return nil
}
21 changes: 16 additions & 5 deletions statistics/handle/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
Expand Down Expand Up @@ -111,17 +112,27 @@ func dumpJSONCol(hist *statistics.Histogram, CMSketch *statistics.CMSketch, topn

// DumpStatsToJSON dumps statistic to json.
func (h *Handle) DumpStatsToJSON(dbName string, tableInfo *model.TableInfo, historyStatsExec sqlexec.RestrictedSQLExecutor) (*JSONTable, error) {
var snapshot uint64
if historyStatsExec != nil {
sctx := historyStatsExec.(sessionctx.Context)
snapshot = sctx.GetSessionVars().SnapshotTS
}
return h.DumpStatsToJSONBySnapshot(dbName, tableInfo, snapshot)
}

// DumpStatsToJSONBySnapshot dumps statistic to json.
func (h *Handle) DumpStatsToJSONBySnapshot(dbName string, tableInfo *model.TableInfo, snapshot uint64) (*JSONTable, error) {
pi := tableInfo.GetPartitionInfo()
if pi == nil || h.CurrentPruneMode() == variable.DynamicOnly {
return h.tableStatsToJSON(dbName, tableInfo, tableInfo.ID, historyStatsExec)
return h.tableStatsToJSON(dbName, tableInfo, tableInfo.ID, snapshot)
}
jsonTbl := &JSONTable{
DatabaseName: dbName,
TableName: tableInfo.Name.L,
Partitions: make(map[string]*JSONTable, len(pi.Definitions)),
}
for _, def := range pi.Definitions {
tbl, err := h.tableStatsToJSON(dbName, tableInfo, def.ID, historyStatsExec)
tbl, err := h.tableStatsToJSON(dbName, tableInfo, def.ID, snapshot)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -133,12 +144,12 @@ func (h *Handle) DumpStatsToJSON(dbName string, tableInfo *model.TableInfo, hist
return jsonTbl, nil
}

func (h *Handle) tableStatsToJSON(dbName string, tableInfo *model.TableInfo, physicalID int64, historyStatsExec sqlexec.RestrictedSQLExecutor) (*JSONTable, error) {
tbl, err := h.TableStatsFromStorage(tableInfo, physicalID, true, historyStatsExec)
func (h *Handle) tableStatsToJSON(dbName string, tableInfo *model.TableInfo, physicalID int64, snapshot uint64) (*JSONTable, error) {
tbl, err := h.TableStatsFromStorage(tableInfo, physicalID, true, snapshot)
if err != nil || tbl == nil {
return nil, err
}
tbl.Version, tbl.ModifyCount, tbl.Count, err = h.statsMetaByTableIDFromStorage(physicalID, historyStatsExec)
tbl.Version, tbl.ModifyCount, tbl.Count, err = h.statsMetaByTableIDFromStorage(physicalID, snapshot)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion statistics/handle/dump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func (s *testStatsSuite) TestDumpVer2Stats(c *C) {
tableInfo, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)

storageTbl, err := h.TableStatsFromStorage(tableInfo.Meta(), tableInfo.Meta().ID, false, nil)
storageTbl, err := h.TableStatsFromStorage(tableInfo.Meta(), tableInfo.Meta().ID, false, 0)
c.Assert(err, IsNil)

dumpJSONTable, err := h.DumpStatsToJSON("test", tableInfo.Meta(), nil)
Expand Down
Loading

0 comments on commit 17a65ab

Please sign in to comment.