Skip to content

Commit

Permalink
*: support simple statistics
Browse files Browse the repository at this point in the history
we will use show status to show DDL statistics later.
  • Loading branch information
siddontang committed Nov 5, 2015
1 parent 293ba7f commit fbed93c
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 21 deletions.
2 changes: 2 additions & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ type DDL interface {
// SetLease will reset the lease time for online DDL change, it is a very dangerous function and you must guarantee that
// all servers have the same lease time.
SetLease(lease time.Duration)
// Stat returns the DDL statistics.
Stat() (map[string]interface{}, error)
}

type ddl struct {
Expand Down
79 changes: 79 additions & 0 deletions ddl/stat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package ddl

import (
"github.com/juju/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/model"
)

func (d *ddl) Stat() (map[string]interface{}, error) {
var (
owner *model.Owner
job *model.Job
schemaVer int64
)
err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
var err error
owner, err = t.GetDDLOwner()
if err != nil {
return errors.Trace(err)
}

job, err = t.GetDDLJob(0)
if err != nil {
return errors.Trace(err)
}

schemaVer, err = t.GetSchemaVersion()
if err != nil {
return errors.Trace(err)
}

return nil
})
if err != nil {
return nil, errors.Trace(err)
}

m := make(map[string]interface{})
m["DDL_server_id"] = d.uuid

m["DDL_schema_version"] = schemaVer

if owner != nil {
m["DDL_owner_id"] = owner.OwnerID
// LastUpdateTs uses nanosecond.
m["DDL_owner_last_update_ts"] = owner.LastUpdateTS / 1e9
}

if job != nil {
m["DDL_job_id"] = job.ID
m["DDL_job_action"] = job.Type.String()
m["DDL_job_last_update_ts"] = job.LastUpdateTS
m["DDL_job_state"] = job.State.String()
m["DDL_job_error"] = job.Error
m["DDL_job_schema_state"] = job.SchemaState.String()
m["DDL_job_schema_id"] = job.SchemaID
m["DDL_job_table_id"] = job.TableID
m["DDL_job_snapshot_ver"] = job.SnapshotVer
m["DDL_job_reorg_handle"] = job.ReorgHandle
m["DDL_job_args"] = job.Args
}

return m, nil
}
81 changes: 81 additions & 0 deletions ddl/stat_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package ddl

import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/util/mock"
)

var _ = Suite(&testStatSuite{})

type testStatSuite struct {
}

func (s *testStatSuite) getSchemaVer(c *C, d *ddl) int64 {
m, err := d.Stat()
c.Assert(err, IsNil)
v := m["DDL_schema_version"]
return v.(int64)
}

func (s *testStatSuite) TestStat(c *C) {
store := testCreateStore(c, "test_stat")
defer store.Close()

lease := 50 * time.Millisecond

d := newDDL(store, nil, nil, lease)
defer d.close()

time.Sleep(lease)

dbInfo := testSchemaInfo(c, d, "test")

m, err := d.Stat()
c.Assert(err, IsNil)
c.Assert(m["DDL_owner_id"], Equals, d.uuid)

job := &model.Job{
SchemaID: dbInfo.ID,
Type: model.ActionCreateSchema,
Args: []interface{}{dbInfo.Name},
}

ctx := mock.NewContext()
done := make(chan error, 1)
go func() {
done <- d.startJob(ctx, job)
}()

ticker := time.NewTicker(d.lease * 1)
defer ticker.Stop()

ver := s.getSchemaVer(c, d)
LOOP:
for {
select {
case <-ticker.C:
d.close()
c.Assert(s.getSchemaVer(c, d), GreaterEqual, ver)
d.start()
case err := <-done:
c.Assert(err, IsNil)
break LOOP
}
}
}
19 changes: 9 additions & 10 deletions ddl/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,9 @@ func (d *ddl) handleJobQueue() error {
}

log.Warnf("run DDL job %v", job)
err = d.runJob(t, job)
if err != nil {
return errors.Trace(err)
}
// if run job meets error, we will save this error in job Error
// and retry later if the job is not cancelled.
d.runJob(t, job)

if job.State == model.JobDone || job.State == model.JobCancelled {
err = d.finishJob(t, job)
Expand Down Expand Up @@ -270,9 +269,9 @@ func (d *ddl) onWorker() {
}
}

func (d *ddl) runJob(t *meta.Meta, job *model.Job) error {
func (d *ddl) runJob(t *meta.Meta, job *model.Job) {
if job.State == model.JobDone || job.State == model.JobCancelled {
return nil
return
}

job.State = model.JobRunning
Expand Down Expand Up @@ -305,16 +304,16 @@ func (d *ddl) runJob(t *meta.Meta, job *model.Job) error {
err = errors.Errorf("invalid job %v", job)
}

// if err and inner doesn't cancel job, return err.
// saves error in job, so that others can know error happens.
if err != nil {
// if job is not cancelled, we should log this error.
if job.State != model.JobCancelled {
return errors.Trace(err)
log.Errorf("run job err %v", errors.ErrorStack(err))
}

job.Error = err.Error()
job.ErrorCount++
}

return nil
}

// for every lease seconds, we will re-update the whole schema, so we will wait 2 * lease time
Expand Down
23 changes: 19 additions & 4 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package domain

import (
"sync/atomic"
"time"

"github.com/juju/errors"
Expand All @@ -28,10 +29,11 @@ import (
// Domain represents a storage space. Different domains can use the same database name.
// Multiple domains can be used in parallel without synchronization.
type Domain struct {
store kv.Storage
infoHandle *infoschema.Handle
ddl ddl.DDL
leaseCh chan time.Duration
store kv.Storage
infoHandle *infoschema.Handle
ddl ddl.DDL
leaseCh chan time.Duration
lastLeaseTS int64
}

func (do *Domain) loadInfoSchema(txn kv.Transaction) (err error) {
Expand Down Expand Up @@ -101,6 +103,18 @@ func (do *Domain) SetLease(lease time.Duration) {
do.ddl.SetLease(lease)
}

// Stat returns the DDL statistic.
func (do *Domain) Stat() (map[string]interface{}, error) {
m, err := do.ddl.Stat()
if err != nil {
return nil, errors.Trace(err)
}

m["DDL_last_reload_schema_ts"] = atomic.LoadInt64(&do.lastLeaseTS)

return m, nil
}

func (do *Domain) onDDLChange(err error) error {
if err != nil {
return err
Expand Down Expand Up @@ -132,6 +146,7 @@ func (do *Domain) loadSchemaInLoop(lease time.Duration) {
if err := do.reload(); err != nil {
log.Fatalf("reload schema err %v", err)
}
atomic.StoreInt64(&do.lastLeaseTS, time.Now().Unix())
case newLease := <-do.leaseCh:
if newLease <= 0 {
newLease = defaultLoadTime
Expand Down
16 changes: 9 additions & 7 deletions model/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,15 @@ func (action ActionType) String() string {

// Job is for a DDL operation.
type Job struct {
ID int64 `json:"id"`
Type ActionType `json:"type"`
SchemaID int64 `json:"schema_id"`
TableID int64 `json:"table_id"`
State JobState `json:"state"`
Error string `json:"err"`
Args []interface{} `json:"-"`
ID int64 `json:"id"`
Type ActionType `json:"type"`
SchemaID int64 `json:"schema_id"`
TableID int64 `json:"table_id"`
State JobState `json:"state"`
Error string `json:"err"`
// every time we meet an error when running job, we will increase it
ErrorCount int64 `json:"err_count"`
Args []interface{} `json:"-"`
// we must use json raw message for delay parsing special args.
RawArgs json.RawMessage `json:"raw_args"`
SchemaState SchemaState `json:"schema_state"`
Expand Down

0 comments on commit fbed93c

Please sign in to comment.