Skip to content

Commit

Permalink
executor: refine fast-analyze sample step in old partition (pingcap#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
lysu authored Sep 1, 2020
1 parent f82e532 commit 64a5e4f
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 55 deletions.
74 changes: 19 additions & 55 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,15 +648,15 @@ type AnalyzeFastExec struct {
}

func (e *AnalyzeFastExec) calculateEstimateSampleStep() (uint32, error) {
sql := fmt.Sprintf("select flag from mysql.stats_histograms where table_id = %d;", e.tblInfo.ID)
sql := fmt.Sprintf("select flag from mysql.stats_histograms where table_id = %d;", e.physicalTableID)
rows, _, err := e.ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(sql)
if err != nil {
return 0, errors.Trace(err)
}
var historyRowCount uint64
hasBeenAnalyzed := len(rows) != 0 && rows[0].GetInt64(0) == statistics.AnalyzeFlag
if hasBeenAnalyzed {
historyRowCount = uint64(domain.GetDomain(e.ctx).StatsHandle().GetTableStats(e.tblInfo).Count)
historyRowCount = uint64(domain.GetDomain(e.ctx).StatsHandle().GetPartitionStats(e.tblInfo, e.physicalTableID).Count)
} else {
dbInfo, ok := domain.GetDomain(e.ctx).InfoSchema().SchemaByTable(e.tblInfo)
if !ok {
Expand All @@ -666,7 +666,19 @@ func (e *AnalyzeFastExec) calculateEstimateSampleStep() (uint32, error) {
if err != nil {
return 0, err
}
sql := fmt.Sprintf("select count(*) from %s.%s;", dbInfo.Name.L, e.tblInfo.Name.L)
var partition string
if e.tblInfo.ID != e.physicalTableID {
for _, definition := range e.tblInfo.Partition.Definitions {
if definition.ID == e.physicalTableID {
partition = fmt.Sprintf(" partition(%s)", definition.Name.L)
break
}
}
}
sql := fmt.Sprintf("select count(*) from %s.%s", dbInfo.Name.L, e.tblInfo.Name.L)
if len(partition) > 0 {
sql += partition
}
recordSets, err := e.ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), sql)
if err != nil || len(recordSets) == 0 {
return 0, errors.Trace(err)
Expand Down Expand Up @@ -717,61 +729,13 @@ func (e *AnalyzeFastExec) activateTxnForRowCount() (rollbackFn func() error, err
return nil, nil
}

// getNextSampleKey gets the next sample key after last failed request. It only retries the needed region.
// Different from other requests, each request range must be the whole region because the region row count
// is only for a whole region. So we need to first find the longest successive prefix ranges of previous request,
// then the next sample key should be the last range that could align with the region bound.
func (e *AnalyzeFastExec) getNextSampleKey(bo *tikv.Backoffer, startKey kv.Key) (kv.Key, error) {
if len(e.sampTasks) == 0 {
e.scanTasks = e.scanTasks[:0]
return startKey, nil
}
sort.Slice(e.sampTasks, func(i, j int) bool {
return bytes.Compare(e.sampTasks[i].StartKey, e.sampTasks[j].StartKey) < 0
})
// The sample task should be consecutive with scan task.
if len(e.scanTasks) > 0 && bytes.Equal(e.scanTasks[0].StartKey, startKey) && !bytes.Equal(e.scanTasks[0].EndKey, e.sampTasks[0].StartKey) {
e.scanTasks = e.scanTasks[:0]
e.sampTasks = e.sampTasks[:0]
return startKey, nil
}
prefixLen := 0
for ; prefixLen < len(e.sampTasks)-1; prefixLen++ {
if !bytes.Equal(e.sampTasks[prefixLen].EndKey, e.sampTasks[prefixLen+1].StartKey) {
break
}
}
// Find the last one that could align with region bound.
for ; prefixLen >= 0; prefixLen-- {
loc, err := e.cache.LocateKey(bo, e.sampTasks[prefixLen].EndKey)
if err != nil {
return nil, err
}
if bytes.Equal(loc.StartKey, e.sampTasks[prefixLen].EndKey) {
startKey = loc.StartKey
break
}
}
e.sampTasks = e.sampTasks[:prefixLen+1]
for i := len(e.scanTasks) - 1; i >= 0; i-- {
if bytes.Compare(startKey, e.scanTasks[i].EndKey) < 0 {
e.scanTasks = e.scanTasks[:i]
}
}
return startKey, nil
}

// buildSampTask returns two variables, the first bool is whether the task meets region error
// and need to rebuild.
// buildSampTask build sample tasks.
func (e *AnalyzeFastExec) buildSampTask() (err error) {
bo := tikv.NewBackofferWithVars(context.Background(), 500, nil)
store, _ := e.ctx.GetStore().(tikv.Storage)
e.cache = store.GetRegionCache()
startKey, endKey := tablecodec.GetTableHandleKeyRange(e.physicalTableID)
targetKey, err := e.getNextSampleKey(bo, startKey)
if err != nil {
return err
}
targetKey := startKey
accessRegionsCounter := 0
for {
// Search for the region which contains the targetKey.
Expand Down Expand Up @@ -1107,8 +1071,8 @@ func (e *AnalyzeFastExec) runTasks() ([]*statistics.Histogram, []*statistics.CMS
stats := domain.GetDomain(e.ctx).StatsHandle()
var rowCount int64 = 0
if stats.Lease() > 0 {
if t := stats.GetTableStats(e.tblInfo); !t.Pseudo {
rowCount = stats.GetTableStats(e.tblInfo).Count
if t := stats.GetPartitionStats(e.tblInfo, e.physicalTableID); !t.Pseudo {
rowCount = t.Count
}
}
hists, cms := make([]*statistics.Histogram, length), make([]*statistics.CMSketch, length)
Expand Down
10 changes: 10 additions & 0 deletions executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,16 @@ func (s *testFastAnalyze) TestFastAnalyze(c *C) {
tk.MustQuery("show stats_buckets where table_name = 't2'").Check(testkit.Rows(
"test t2 a 0 0 1 1 0 0",
"test t2 a 0 1 2 1 18446744073709551615 18446744073709551615"))

tk.MustExec(`set @try_old_partition_implementation=1`)
tk.MustExec(`create table t3 (id int, v int, primary key(id), index k(v)) partition by hash (id) partitions 4`)
tk.MustExec(`insert into t3 values(1, 1), (2, 2), (5, 1), (9, 3), (13, 3), (17, 5), (3, 0)`)
tk.MustExec(`analyze table t3`)
tk.MustQuery(`explain select v from t3 partition(p1) where v = 3`).Check(testkit.Rows(
"IndexReader_7 2.00 root index:IndexRangeScan_6",
"└─IndexRangeScan_6 2.00 cop[tikv] table:t3, partition:p1, index:k(v) range:[3,3], keep order:false",
))
tk.MustExec(`set @try_old_partition_implementation=0`)
}

func (s *testSuite1) TestIssue15993(c *C) {
Expand Down

0 comments on commit 64a5e4f

Please sign in to comment.