Skip to content

Commit

Permalink
ddl: add metrics for ddl syncer (pingcap#5765)
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala authored Feb 2, 2018
1 parent 96b6029 commit 17ee600
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 27 deletions.
1 change: 0 additions & 1 deletion ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,6 @@ func (d *ddl) addTableColumn(t table.Table, columnInfo *model.ColumnInfo, reorgI
}

d.reorgCtx.setRowCountAndHandle(count, seekHandle)
batchHandleDataHistogram.WithLabelValues(batchAddCol).Observe(sub)
log.Infof("[ddl] added column for %v rows, take time %v", count, sub)
}
}
Expand Down
6 changes: 1 addition & 5 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,11 +437,7 @@ func (d *ddl) doDDLJob(ctx context.Context, job *model.Job) error {
defer func() {
ticker.Stop()
jobsGauge.WithLabelValues(job.Type.String()).Dec()
retLabel := handleJobSucc
if err != nil {
retLabel = handleJobFailed
}
handleJobHistogram.WithLabelValues(job.Type.String(), retLabel).Observe(time.Since(startTime).Seconds())
handleJobHistogram.WithLabelValues(job.Type.String(), retLabel(err)).Observe(time.Since(startTime).Seconds())
}()
for {
select {
Expand Down
2 changes: 1 addition & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, reorgInfo
return errors.Trace(err)
}
d.reorgCtx.setRowCountAndHandle(addedCount, nextHandle)
batchHandleDataHistogram.WithLabelValues(batchAddIdx).Observe(sub)
batchAddIdxHistogram.Observe(sub)
log.Infof("[ddl] total added index for %d rows, this task [%d,%d) added index for %d rows, batch %d, take time %v",
addedCount, logStartHandle, nextHandle, taskAddedCount, currentBatchSize, sub)

Expand Down
62 changes: 51 additions & 11 deletions ddl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ var (
Help: "Gauge of jobs.",
}, []string{"action"})

// handle job result state.
handleJobSucc = "handle_job_succ"
handleJobFailed = "handle_job_failed"
// operation result state
opSucc = "op_succ"
opFailed = "op_failed"
handleJobHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "tidb",
Expand All @@ -36,22 +36,62 @@ var (
Buckets: prometheus.ExponentialBuckets(0.01, 2, 20),
}, []string{"action", "result_state"})

// handle batch data type.
batchAddCol = "batch_add_col"
batchAddIdx = "batch_add_idx"
batchDelData = "batch_del_data"
batchHandleDataHistogram = prometheus.NewHistogramVec(
batchAddIdxHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "ddl",
Name: "batch_add_or_del_data_succ",
Name: "batch_add_idx_succ",
Help: "Bucketed histogram of processing time (s) of batch handle data",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 20),
}, []string{"handle_data_type"})
})

// The syncer inits, restarts or clears.
syncerInit = "syncer_init"
syncerRestart = "syncer_restart"
syncerClear = "syncer_clear"
deploySyncerHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "ddl",
Name: "deploy_syncer_duration_seconds",
Help: "Bucketed histogram of processing time (s) of deploy syncer",
Buckets: prometheus.ExponentialBuckets(0.01, 2, 20),
}, []string{"state", "result_state"})
// The syncer updates its own version.
updateSelfVersionHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "ddl",
Name: "update_self_ver_duration_seconds",
Help: "Bucketed histogram of processing time (s) of update self version",
Buckets: prometheus.ExponentialBuckets(0.01, 2, 20),
}, []string{"result_state"})
// The owner handles syncer's version.
ownerUpdateGlobalVersion = "update_global_version"
ownerGetGlobalVersion = "get_global_version"
ownerCheckAllVersions = "check_all_versions"
ownerHandleSyncerHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "ddl",
Name: "owner_handle_syncer_duration_seconds",
Help: "Bucketed histogram of processing time (s) of handle syncer",
Buckets: prometheus.ExponentialBuckets(0.01, 2, 20),
}, []string{"op", "result_state"})
)

func init() {
prometheus.MustRegister(jobsGauge)
prometheus.MustRegister(handleJobHistogram)
prometheus.MustRegister(batchHandleDataHistogram)
prometheus.MustRegister(batchAddIdxHistogram)
prometheus.MustRegister(deploySyncerHistogram)
prometheus.MustRegister(updateSelfVersionHistogram)
prometheus.MustRegister(ownerHandleSyncerHistogram)
}

func retLabel(err error) string {
if err == nil {
return opSucc
}
return opFailed
}
58 changes: 49 additions & 9 deletions ddl/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,13 @@ func (s *schemaVersionSyncer) putKV(ctx goctx.Context, retryCnt int, key, val st

// Init implements SchemaSyncer.Init interface.
func (s *schemaVersionSyncer) Init(ctx goctx.Context) error {
_, err := s.etcdCli.Txn(ctx).
startTime := time.Now()
var err error
defer func() {
deploySyncerHistogram.WithLabelValues(syncerInit, retLabel(err)).Observe(time.Since(startTime).Seconds())
}()

_, err = s.etcdCli.Txn(ctx).
If(clientv3.Compare(clientv3.CreateRevision(DDLGlobalSchemaVersion), "=", 0)).
Then(clientv3.OpPut(DDLGlobalSchemaVersion, InitialVersion)).
Commit()
Expand All @@ -129,8 +135,9 @@ func (s *schemaVersionSyncer) Init(ctx goctx.Context) error {
return errors.Trace(err)
}
s.globalVerCh = s.etcdCli.Watch(ctx, DDLGlobalSchemaVersion)
return s.putKV(ctx, keyOpDefaultRetryCnt, s.selfSchemaVerPath, InitialVersion,
err = s.putKV(ctx, keyOpDefaultRetryCnt, s.selfSchemaVerPath, InitialVersion,
clientv3.WithLease(s.session.Lease()))
return errors.Trace(err)
}

// Done implements SchemaSyncer.Done interface.
Expand All @@ -140,8 +147,13 @@ func (s *schemaVersionSyncer) Done() <-chan struct{} {

// Restart implements SchemaSyncer.Restart interface.
func (s *schemaVersionSyncer) Restart(ctx goctx.Context) error {
logPrefix := fmt.Sprintf("[%s] %s", ddlPrompt, s.selfSchemaVerPath)
startTime := time.Now()
var err error
defer func() {
deploySyncerHistogram.WithLabelValues(syncerRestart, retLabel(err)).Observe(time.Since(startTime).Seconds())
}()

logPrefix := fmt.Sprintf("[%s] %s", ddlPrompt, s.selfSchemaVerPath)
// NewSession's context will affect the exit of the session.
session, err := owner.NewSession(ctx, logPrefix, s.etcdCli, owner.NewSessionRetryUnlimited, SyncerSessionTTL)
if err != nil {
Expand All @@ -151,8 +163,10 @@ func (s *schemaVersionSyncer) Restart(ctx goctx.Context) error {

childCtx, cancel := goctx.WithTimeout(ctx, keyOpDefaultTimeout)
defer cancel()
return s.putKV(childCtx, putKeyRetryUnlimited, s.selfSchemaVerPath, InitialVersion,
err = s.putKV(childCtx, putKeyRetryUnlimited, s.selfSchemaVerPath, InitialVersion,
clientv3.WithLease(s.session.Lease()))

return errors.Trace(err)
}

// GlobalVersionCh implements SchemaSyncer.GlobalVersionCh interface.
Expand All @@ -162,21 +176,34 @@ func (s *schemaVersionSyncer) GlobalVersionCh() clientv3.WatchChan {

// UpdateSelfVersion implements SchemaSyncer.UpdateSelfVersion interface.
func (s *schemaVersionSyncer) UpdateSelfVersion(ctx goctx.Context, version int64) error {
startTime := time.Now()
ver := strconv.FormatInt(version, 10)
return s.putKV(ctx, putKeyNoRetry, s.selfSchemaVerPath, ver,
err := s.putKV(ctx, putKeyNoRetry, s.selfSchemaVerPath, ver,
clientv3.WithLease(s.session.Lease()))

updateSelfVersionHistogram.WithLabelValues(retLabel(err)).Observe(time.Since(startTime).Seconds())
return errors.Trace(err)
}

// OwnerUpdateGlobalVersion implements SchemaSyncer.OwnerUpdateGlobalVersion interface.
func (s *schemaVersionSyncer) OwnerUpdateGlobalVersion(ctx goctx.Context, version int64) error {
startTime := time.Now()
ver := strconv.FormatInt(version, 10)
return s.putKV(ctx, putKeyRetryUnlimited, DDLGlobalSchemaVersion, ver)
err := s.putKV(ctx, putKeyRetryUnlimited, DDLGlobalSchemaVersion, ver)

ownerHandleSyncerHistogram.WithLabelValues(ownerUpdateGlobalVersion, retLabel(err)).Observe(time.Since(startTime).Seconds())
return errors.Trace(err)
}

// RemoveSelfVersionPath implements SchemaSyncer.RemoveSelfVersionPath interface.
func (s *schemaVersionSyncer) RemoveSelfVersionPath() error {
ctx := goctx.Background()
startTime := time.Now()
var err error
defer func() {
deploySyncerHistogram.WithLabelValues(syncerClear, retLabel(err)).Observe(time.Since(startTime).Seconds())
}()

ctx := goctx.Background()
for i := 0; i < keyOpDefaultRetryCnt; i++ {
childCtx, cancel := goctx.WithTimeout(ctx, keyOpDefaultTimeout)
_, err = s.etcdCli.Delete(childCtx, s.selfSchemaVerPath)
Expand All @@ -191,10 +218,15 @@ func (s *schemaVersionSyncer) RemoveSelfVersionPath() error {

// MustGetGlobalVersion implements SchemaSyncer.MustGetGlobalVersion interface.
func (s *schemaVersionSyncer) MustGetGlobalVersion(ctx goctx.Context) (int64, error) {
startTime := time.Now()
var err error
var resp *clientv3.GetResponse
failedCnt := 0
intervalCnt := int(time.Second / keyOpRetryInterval)

defer func() {
ownerHandleSyncerHistogram.WithLabelValues(ownerGetGlobalVersion, retLabel(err)).Observe(time.Since(startTime).Seconds())
}()
for {
if err != nil {
if failedCnt%intervalCnt == 0 {
Expand All @@ -205,7 +237,8 @@ func (s *schemaVersionSyncer) MustGetGlobalVersion(ctx goctx.Context) (int64, er
}

if isContextDone(ctx) {
return 0, errors.Trace(ctx.Err())
err = errors.Trace(ctx.Err())
return 0, err
}

resp, err = s.etcdCli.Get(ctx, DDLGlobalSchemaVersion)
Expand Down Expand Up @@ -233,13 +266,20 @@ func isContextDone(ctx goctx.Context) bool {

// OwnerCheckAllVersions implements SchemaSyncer.OwnerCheckAllVersions interface.
func (s *schemaVersionSyncer) OwnerCheckAllVersions(ctx goctx.Context, latestVer int64) error {
startTime := time.Now()
time.Sleep(CheckVersFirstWaitTime)
notMatchVerCnt := 0
intervalCnt := int(time.Second / checkVersInterval)
updatedMap := make(map[string]struct{})

var err error
defer func() {
ownerHandleSyncerHistogram.WithLabelValues(ownerGetGlobalVersion, retLabel(err)).Observe(time.Since(startTime).Seconds())
}()
for {
if isContextDone(ctx) {
return errors.Trace(ctx.Err())
err = errors.Trace(ctx.Err())
return err
}

resp, err := s.etcdCli.Get(ctx, DDLAllSchemaVersions, clientv3.WithPrefix())
Expand Down

0 comments on commit 17ee600

Please sign in to comment.