Skip to content

Commit

Permalink
store/tikv: Refine metrics of RangeTaskRunner (pingcap#10559)
Browse files Browse the repository at this point in the history
  • Loading branch information
MyonKeminta authored and zz-jason committed Jun 3, 2019
1 parent c729dd5 commit 060e325
Showing 1 changed file with 5 additions and 6 deletions.
11 changes: 5 additions & 6 deletions store/tikv/range_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ import (

const (
rangeTaskDefaultStatLogInterval = time.Minute * 10
rangeTaskMetricsUpdateInterval = time.Second * 10

lblCompletedRegions = "completed-regions"
)

// RangeTaskRunner splits a range into many ranges to process concurrently, and convenient to send requests to all
Expand Down Expand Up @@ -78,6 +79,7 @@ func (s *RangeTaskRunner) SetStatLogInterval(interval time.Duration) {
// Empty startKey or endKey means unbounded.
func (s *RangeTaskRunner) RunOnRange(ctx context.Context, startKey []byte, endKey []byte) error {
s.completedRegions = 0
metrics.TiKVRangeTaskStats.WithLabelValues(s.name, lblCompletedRegions).Set(0)

if len(endKey) != 0 && bytes.Compare(startKey, endKey) >= 0 {
logutil.Logger(ctx).Info("empty range task executed. ignored",
Expand All @@ -95,8 +97,6 @@ func (s *RangeTaskRunner) RunOnRange(ctx context.Context, startKey []byte, endKe

// Periodically log the progress
statLogTicker := time.NewTicker(s.statLogInterval)
// Periodically update metrics
metricsTicker := time.NewTicker(rangeTaskMetricsUpdateInterval)

ctx, cancel := context.WithCancel(ctx)
taskCh := make(chan *kv.KeyRange, s.concurrency)
Expand All @@ -121,8 +121,8 @@ func (s *RangeTaskRunner) RunOnRange(ctx context.Context, startKey []byte, endKe
wg.Wait()
}
statLogTicker.Stop()
metricsTicker.Stop()
cancel()
metrics.TiKVRangeTaskStats.WithLabelValues(s.name, lblCompletedRegions).Set(0)
}()

// Iterate all regions and send each region's range as a task to the workers.
Expand All @@ -137,8 +137,6 @@ func (s *RangeTaskRunner) RunOnRange(ctx context.Context, startKey []byte, endKe
zap.Int("concurrency", s.concurrency),
zap.Duration("cost time", time.Since(startTime)),
zap.Int32("completed regions", s.CompletedRegions()))
case <-metricsTicker.C:
metrics.TiKVRangeTaskStats.WithLabelValues(s.name, "completed").Set(float64(s.CompletedRegions()))
default:
}

Expand Down Expand Up @@ -261,5 +259,6 @@ func (w *rangeTaskWorker) run(ctx context.Context, cancel context.CancelFunc) {
break
}
atomic.AddInt32(w.completedRegions, int32(completedRegions))
metrics.TiKVRangeTaskStats.WithLabelValues(w.name, lblCompletedRegions).Add(float64(completedRegions))
}
}

0 comments on commit 060e325

Please sign in to comment.