Skip to content

Commit

Permalink
stats: speed up loading full stats info (pingcap#5266)
Browse files Browse the repository at this point in the history
  • Loading branch information
alivxxx authored and coocood committed Nov 30, 2017
1 parent db270fc commit 2cbeb54
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 25 deletions.
3 changes: 3 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,9 +609,12 @@ func (do *Domain) updateStatsWorker(ctx context.Context, lease time.Duration) {
loadHistogramTicker := time.NewTicker(lease)
defer loadHistogramTicker.Stop()
statsHandle := do.StatsHandle()
t := time.Now()
err := statsHandle.InitStats(do.InfoSchema())
if err != nil {
log.Error("[stats] init stats info failed: ", errors.ErrorStack(err))
} else {
log.Info("[stats] init stats info takes ", time.Now().Sub(t))
}
for {
select {
Expand Down
100 changes: 76 additions & 24 deletions statistics/boostrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,19 @@ package statistics
import (
log "github.com/Sirupsen/logrus"
"github.com/juju/errors"
"github.com/pingcap/tidb/context"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/sqlexec"
goctx "golang.org/x/net/context"
)

func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (statsCache, error) {
sql := "select version, table_id, modify_count, count from mysql.stats_meta"
rows, _, err := h.ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(h.ctx, sql)
if err != nil {
return nil, errors.Trace(err)
}
tables := make(statsCache, len(rows))
for _, row := range rows {
func initStatsMeta4Chunk(is infoschema.InfoSchema, tables statsCache, chk *chunk.Chunk) {
for row := chk.Begin(); row != chk.End(); row = row.Next() {
tableID := row.GetInt64(1)
table, ok := is.TableByID(tableID)
if !ok {
Expand All @@ -49,16 +46,31 @@ func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (statsCache, error) {
}
tables[tableID] = tbl
}
return tables, nil
}

func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, tables statsCache) error {
sql := "select table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch from mysql.stats_histograms"
rows, _, err := h.ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(h.ctx, sql)
func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (statsCache, error) {
sql := "select version, table_id, modify_count, count from mysql.stats_meta"
rc, err := h.ctx.(sqlexec.SQLExecutor).Execute(goctx.TODO(), sql)
if err != nil {
return errors.Trace(err)
return nil, errors.Trace(err)
}
for _, row := range rows {
tables := statsCache{}
chk := rc[0].NewChunk()
for {
err := rc[0].NextChunk(chk)
if err != nil {
return nil, errors.Trace(err)
}
if chk.NumRows() == 0 {
break
}
initStatsMeta4Chunk(is, tables, chk)
}
return tables, nil
}

func initStatsHistograms4Chunk(is infoschema.InfoSchema, tables statsCache, chk *chunk.Chunk) {
for row := chk.Begin(); row != chk.End(); row = row.Next() {
table, ok := tables[row.GetInt64(0)]
if !ok {
continue
Expand Down Expand Up @@ -101,16 +113,36 @@ func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, tables statsCache
table.Columns[hist.ID] = &Column{Histogram: hist, Info: colInfo}
}
}
return nil
}

func (h *Handle) initStatsBuckets(tables statsCache) error {
sql := "select table_id, is_index, hist_id, bucket_id, count, repeats, lower_bound, upper_bound from mysql.stats_buckets"
rows, fields, err := h.ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(h.ctx, sql)
func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, tables statsCache) error {
sql := "select table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch from mysql.stats_histograms"
rc, err := h.ctx.(sqlexec.SQLExecutor).Execute(goctx.TODO(), sql)
if err != nil {
return errors.Trace(err)
}
for _, row := range rows {
chk := rc[0].NewChunk()
for {
err := rc[0].NextChunk(chk)
if err != nil {
return errors.Trace(err)
}
if chk.NumRows() == 0 {
break
}
initStatsHistograms4Chunk(is, tables, chk)
}
return nil
}

func newBytesDatum(src []byte) types.Datum {
dst := make([]byte, len(src))
copy(dst, src)
return types.NewBytesDatum(dst)
}

func initStatsBuckets4Chunk(ctx context.Context, tables statsCache, chk *chunk.Chunk) {
for row := chk.Begin(); row != chk.End(); row = row.Next() {
tableID, isIndex, histID, bucketID := row.GetInt64(0), row.GetInt64(1), row.GetInt64(2), row.GetInt64(3)
table, ok := tables[tableID]
if !ok {
Expand All @@ -124,7 +156,7 @@ func (h *Handle) initStatsBuckets(tables statsCache) error {
continue
}
hist = &index.Histogram
lower, upper = row.GetDatum(6, &fields[6].Column.FieldType), row.GetDatum(7, &fields[7].Column.FieldType)
lower, upper = newBytesDatum(row.GetBytes(6)), newBytesDatum(row.GetBytes(7))
} else {
column, ok := table.Columns[histID]
if !ok {
Expand All @@ -135,15 +167,16 @@ func (h *Handle) initStatsBuckets(tables statsCache) error {
continue
}
hist = &column.Histogram
d := row.GetDatum(6, &fields[6].Column.FieldType)
lower, err = d.ConvertTo(h.ctx.GetSessionVars().StmtCtx, &column.Info.FieldType)
d := newBytesDatum(row.GetBytes(6))
var err error
lower, err = d.ConvertTo(ctx.GetSessionVars().StmtCtx, &column.Info.FieldType)
if err != nil {
log.Debugf("decode bucket lower bound failed: %s", errors.ErrorStack(err))
delete(table.Columns, histID)
continue
}
d = row.GetDatum(7, &fields[7].Column.FieldType)
upper, err = d.ConvertTo(h.ctx.GetSessionVars().StmtCtx, &column.Info.FieldType)
d = newBytesDatum(row.GetBytes(7))
upper, err = d.ConvertTo(ctx.GetSessionVars().StmtCtx, &column.Info.FieldType)
if err != nil {
log.Debugf("decode bucket upper bound failed: %s", errors.ErrorStack(err))
delete(table.Columns, histID)
Expand All @@ -164,6 +197,25 @@ func (h *Handle) initStatsBuckets(tables statsCache) error {
commonPfxLen: commonLength,
}
}
}

func (h *Handle) initStatsBuckets(tables statsCache) error {
sql := "select table_id, is_index, hist_id, bucket_id, count, repeats, lower_bound, upper_bound from mysql.stats_buckets"
rc, err := h.ctx.(sqlexec.SQLExecutor).Execute(goctx.TODO(), sql)
if err != nil {
return errors.Trace(err)
}
chk := rc[0].NewChunk()
for {
err := rc[0].NextChunk(chk)
if err != nil {
return errors.Trace(err)
}
if chk.NumRows() == 0 {
break
}
initStatsBuckets4Chunk(h.ctx, tables, chk)
}
for _, table := range tables {
if h.LastVersion < table.Version {
h.LastVersion = table.Version
Expand Down
1 change: 1 addition & 0 deletions statistics/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func (h *Handle) Clear() {
for len(h.analyzeResultCh) > 0 {
<-h.analyzeResultCh
}
h.ctx.GetSessionVars().MaxChunkSize = 1
h.listHead = &SessionStatsCollector{mapper: make(tableDeltaMap)}
h.globalMap = make(tableDeltaMap)
}
Expand Down
2 changes: 1 addition & 1 deletion statistics/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ func (s *testStatsCacheSuite) TestInitStats(c *C) {
testKit := testkit.NewTestKit(c, s.store)
testKit.MustExec("use test")
testKit.MustExec("create table t(a int, b int, c int, primary key(a), key idx(b))")
testKit.MustExec("insert into t values (1,1,1),(2,2,2),(3,3,3)")
testKit.MustExec("insert into t values (1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5),(6,6,6)")
testKit.MustExec("analyze table t")
h := s.do.StatsHandle()
is := s.do.InfoSchema()
Expand Down

0 comments on commit 2cbeb54

Please sign in to comment.