Skip to content

Commit

Permalink
executor: implement ANALYZE TABLE (pingcap#1327)
Browse files Browse the repository at this point in the history
  • Loading branch information
coocood authored Jun 20, 2016
1 parent 1a70685 commit b8bf595
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 9 deletions.
105 changes: 104 additions & 1 deletion executor/executor_simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,20 @@ package executor

import (
"fmt"
"math/rand"
"strings"
"time"

"github.com/juju/errors"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/context"
"github.com/pingcap/tidb/evaluator"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/plan/statistics"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/db"
"github.com/pingcap/tidb/sessionctx/variable"
Expand Down Expand Up @@ -310,6 +314,105 @@ func (e *SimpleExec) executeSetPwd(s *ast.SetPwdStmt) error {
}

func (e *SimpleExec) executeAnalyzeTable(s *ast.AnalyzeTableStmt) error {
// TODO: implement analyze table.
for _, table := range s.TableNames {
err := e.createStatisticsForTable(table)
if err != nil {
return errors.Trace(err)
}
}
return nil
}

const (
maxSampleCount = 10000
defaultBucketCount = 256
)

func (e *SimpleExec) createStatisticsForTable(tn *ast.TableName) error {
var tableName string
if tn.Schema.L == "" {
tableName = tn.Name.L
} else {
tableName = tn.Schema.L + "." + tn.Name.L
}
sql := "select * from " + tableName
result, err := e.ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(e.ctx, sql)
if err != nil {
return errors.Trace(err)
}
count, samples, err := e.collectSamples(result)
result.Close()
if err != nil {
return errors.Trace(err)
}
err = e.buildStatisticsAndSaveToKV(tn, count, samples)
if err != nil {
return errors.Trace(err)
}
return nil
}

// collectSamples collects sample from the result set, using Reservoir Sampling algorithm.
// See https://en.wikipedia.org/wiki/Reservoir_sampling
func (e *SimpleExec) collectSamples(result ast.RecordSet) (count int64, samples []*ast.Row, err error) {
ran := rand.New(rand.NewSource(time.Now().UnixNano()))
for {
var row *ast.Row
row, err = result.Next()
if err != nil {
return count, samples, errors.Trace(err)
}
if row == nil {
break
}
if len(samples) < maxSampleCount {
samples = append(samples, row)
} else {
shouldAdd := ran.Int63n(count) < maxSampleCount
if shouldAdd {
idx := ran.Intn(maxSampleCount)
samples[idx] = row
}
}
count++
}
return count, samples, nil
}

func (e *SimpleExec) buildStatisticsAndSaveToKV(tn *ast.TableName, count int64, sampleRows []*ast.Row) error {
txn, err := e.ctx.GetTxn(false)
if err != nil {
return errors.Trace(err)
}
columnSamples := rowsToColumnSamples(sampleRows)
t, err := statistics.NewTable(tn.TableInfo, int64(txn.StartTS()), count, defaultBucketCount, columnSamples)
if err != nil {
return errors.Trace(err)
}
tpb, err := t.ToPB()
if err != nil {
return errors.Trace(err)
}
m := meta.NewMeta(txn)
err = m.SetTableStats(tn.TableInfo.ID, tpb)
if err != nil {
return errors.Trace(err)
}
return nil
}

func rowsToColumnSamples(rows []*ast.Row) [][]types.Datum {
if len(rows) == 0 {
return nil
}
columnSamples := make([][]types.Datum, len(rows[0].Data))
for i := range columnSamples {
columnSamples[i] = make([]types.Datum, len(rows))
}
for j, row := range rows {
for i, val := range row.Data {
columnSamples[i][j] = val
}
}
return columnSamples
}
21 changes: 20 additions & 1 deletion executor/executor_simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/tidb/context"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/plan/statistics"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/testkit"
Expand Down Expand Up @@ -191,5 +195,20 @@ func (s *testSuite) TestSetPwd(c *C) {
func (s *testSuite) TestAnalyzeTable(c *C) {
defer testleak.AfterTest(c)()
tk := testkit.NewTestKit(c, s.store)
tk.MustExec(`ANALYZE TABLE mysql.User`)
tk.MustExec(`ANALYZE TABLE mysql.GLOBAL_VARIABLES`)
ctx := tk.Se.(context.Context)
is := sessionctx.GetDomain(ctx).InfoSchema()
t, err := is.TableByName(model.NewCIStr("mysql"), model.NewCIStr("GLOBAL_VARIABLES"))
c.Check(err, IsNil)
tableID := t.Meta().ID

txn, err := ctx.GetTxn(true)
c.Check(err, IsNil)
meta := meta.NewMeta(txn)
tpb, err := meta.GetTableStats(tableID)
c.Check(err, IsNil)
c.Check(tpb, NotNil)
tStats, err := statistics.TableFromPB(t.Meta(), tpb)
c.Check(err, IsNil)
c.Check(tStats, NotNil)
}
8 changes: 4 additions & 4 deletions executor/show_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,19 @@ func (s *testSuite) TestShow(c *C) {
result = tk.MustQuery(testSQL)
c.Check(result.Rows(), HasLen, 1)

var ss statistics
var ss stats
variable.RegisterStatistics(ss)
testSQL = "show status like 'character_set_results';"
result = tk.MustQuery(testSQL)
c.Check(result.Rows(), NotNil)
}

type statistics struct {
type stats struct {
}

func (s statistics) GetScope(status string) variable.ScopeFlag { return variable.DefaultScopeFlag }
func (s stats) GetScope(status string) variable.ScopeFlag { return variable.DefaultScopeFlag }

func (s statistics) Stats() (map[string]interface{}, error) {
func (s stats) Stats() (map[string]interface{}, error) {
m := make(map[string]interface{})
var a, b interface{}
b = "123"
Expand Down
36 changes: 36 additions & 0 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import (
"sync"
"time"

"github.com/golang/protobuf/proto"
"github.com/juju/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/plan/statistics"
"github.com/pingcap/tidb/structure"
"github.com/pingcap/tidb/terror"
)
Expand Down Expand Up @@ -57,6 +59,7 @@ var (
mTablePrefix = "Table"
mTableIDPrefix = "TID"
mBootstrapKey = []byte("BootstrapKey")
mTableStatsPrefix = "TStats"
)

var (
Expand Down Expand Up @@ -654,6 +657,39 @@ func (m *Meta) SetBgJobOwner(o *model.Owner) error {
return m.setJobOwner(mBgJobOwnerKey, o)
}

func (m *Meta) tableStatsKey(tableID int64) []byte {
return []byte(fmt.Sprintf("%s:%d", mTableStatsPrefix, tableID))
}

// SetTableStats sets table statistics.
func (m *Meta) SetTableStats(tableID int64, tpb *statistics.TablePB) error {
key := m.tableStatsKey(tableID)
data, err := proto.Marshal(tpb)
if err != nil {
return errors.Trace(err)
}
err = m.txn.Set(key, data)
if err != nil {
return errors.Trace(err)
}
return nil
}

// GetTableStats gets table statistics.
func (m *Meta) GetTableStats(tableID int64) (*statistics.TablePB, error) {
key := m.tableStatsKey(tableID)
data, err := m.txn.Get(key)
if err != nil {
return nil, errors.Trace(err)
}
tpb := &statistics.TablePB{}
err = proto.Unmarshal(data, tpb)
if err != nil {
return nil, errors.Trace(err)
}
return tpb, nil
}

// meta error codes.
const (
codeInvalidTableKey terror.ErrCode = 1
Expand Down
5 changes: 3 additions & 2 deletions plan/statistics/statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ func (t *Table) String() string {
return strings.Join(strs, "\n")
}

func (t *Table) toPB() (*TablePB, error) {
// ToPB converts Table to TablePB.
func (t *Table) ToPB() (*TablePB, error) {
tblPB := &TablePB{
Id: proto.Int64(t.info.ID),
Ts: proto.Int64(t.TS),
Expand Down Expand Up @@ -134,7 +135,7 @@ func (t *Table) buildColumn(offset int, samples []types.Datum) error {
// valuesPerBucket.
col.Numbers[bucketIdx] = i * sampleFactor
col.Repeats[bucketIdx] += sampleFactor
} else if i*sampleFactor-lastNumber < valuesPerBucket {
} else if i*sampleFactor-lastNumber <= valuesPerBucket {
// The bucket still have room to store a new item, update the bucket.
col.Numbers[bucketIdx] = i * sampleFactor
col.Values[bucketIdx] = samples[i]
Expand Down
2 changes: 1 addition & 1 deletion plan/statistics/statistics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (s *testStatisticsSuite) TestTable(c *C) {
log.Debug(str)
c.Check(len(str), Greater, 0)

tpb, err := t.toPB()
tpb, err := t.ToPB()
c.Check(err, IsNil)
data, err := proto.Marshal(tpb)
c.Check(err, IsNil)
Expand Down

0 comments on commit b8bf595

Please sign in to comment.