Skip to content

Commit

Permalink
Skip first system poll and avoid potential time skew (ava-labs#1492)
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenButtolph authored May 19, 2022
1 parent 806d725 commit d0b0e9c
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 48 deletions.
2 changes: 1 addition & 1 deletion snow/networking/tracker/resource_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func newCPUTrackerMetrics(namespace string, reg prometheus.Registerer) (*tracker
processingTimeMetric: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Name: "processing_time",
Help: "Tracked processing time over all nodes. Value would excepted to be in [0, number of CPU cores], but can go higher IO bound processes and thread multiplexing",
Help: "Tracked processing time over all nodes. Value expected to be in [0, number of CPU cores], but can go higher due to IO bound processes and thread multiplexing",
}),
cpuMetric: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Expand Down
112 changes: 65 additions & 47 deletions utils/resource/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,6 @@ type Manager interface {
Shutdown()
}

type proc struct {
p *process.Process

// [lastUpdateTime] is the time that CPU/IO measurements were most recently
// taken.
lastUpdateTime time.Time
// [lastReadBytes] is the most recent measurement of total disk bytes read.
lastReadBytes uint64
// [lastWriteBytes] is the most recent measurement of total disk bytes
// written.
lastWriteBytes uint64
}

type manager struct {
processesLock sync.Mutex
processes map[int]*proc
Expand Down Expand Up @@ -113,10 +100,7 @@ func (m *manager) TrackProcess(pid int) {
return
}

process := &proc{
p: p,
lastUpdateTime: time.Now(),
}
process := &proc{p: p}

m.processesLock.Lock()
m.processes[pid] = process
Expand All @@ -141,8 +125,9 @@ func (m *manager) update(frequency, halflife time.Duration) {

newWeight, oldWeight := getSampleWeights(frequency, halflife)

frequencyInSeconds := frequency.Seconds()
for {
currentCPUUsage, currentReadUsage, currentWriteUsage := m.getActiveUsage()
currentCPUUsage, currentReadUsage, currentWriteUsage := m.getActiveUsage(frequencyInSeconds)
currentScaledCPUUsage := newWeight * currentCPUUsage
currentScaledReadUsage := newWeight * currentReadUsage
currentScaledWriteUsage := newWeight * currentWriteUsage
Expand All @@ -165,47 +150,80 @@ func (m *manager) update(frequency, halflife time.Duration) {
// 1. Current CPU usage by all processes.
// 2. Current bytes/sec read from disk by all processes.
// 3. Current bytes/sec written to disk by all processes.
func (m *manager) getActiveUsage() (float64, float64, float64) {
now := time.Now()

func (m *manager) getActiveUsage(secondsSinceLastUpdate float64) (float64, float64, float64) {
m.processesLock.Lock()
defer m.processesLock.Unlock()

var (
usage float64
totalCPU float64
totalRead float64
totalWrite float64
)
for _, p := range m.processes {
cpu, read, write := p.getActiveUsage(secondsSinceLastUpdate)
totalCPU += cpu
totalRead += read
totalWrite += write
}

return totalCPU, totalRead, totalWrite
}

type proc struct {
p *process.Process

initialized bool

// [lastTotalCPU] is the most recent measurement of total CPU usage.
lastTotalCPU float64

// [lastReadBytes] is the most recent measurement of total disk bytes read.
lastReadBytes uint64
// [lastWriteBytes] is the most recent measurement of total disk bytes
// written.
lastWriteBytes uint64
}

func (p *proc) getActiveUsage(secondsSinceLastUpdate float64) (float64, float64, float64) {
// If there is an error tracking the CPU/disk utilization of a process,
// assume that the utilization is 0.
times, err := p.p.Times()
if err != nil {
return 0, 0, 0
}

io, err := p.p.IOCounters()
if err != nil {
return 0, 0, 0
}

var (
cpu float64
read float64
write float64
)
for _, p := range m.processes {
// If there is an error tracking the CPU/disk utilization of a process,
// assume that the utilization is 0.
cpu, err := p.p.Percent(0)
if err == nil {
usage += cpu
totalCPU := times.Total()
if p.initialized {
if totalCPU > p.lastTotalCPU {
newCPU := totalCPU - p.lastTotalCPU
cpu = newCPU / secondsSinceLastUpdate
}
io, err := p.p.IOCounters()
if err != nil {
continue
if io.ReadBytes > p.lastReadBytes {
newRead := io.ReadBytes - p.lastReadBytes
read = float64(newRead) / secondsSinceLastUpdate
}

secondsSinceLastUpdate := now.Sub(p.lastUpdateTime).Seconds()
if secondsSinceLastUpdate > 0 {
if io.ReadBytes > p.lastReadBytes {
newRead := io.ReadBytes - p.lastReadBytes
read += float64(newRead) / secondsSinceLastUpdate
}
if io.WriteBytes > p.lastWriteBytes {
newWrite := io.WriteBytes - p.lastWriteBytes
write += float64(newWrite) / secondsSinceLastUpdate
}
if io.WriteBytes > p.lastWriteBytes {
newWrite := io.WriteBytes - p.lastWriteBytes
write = float64(newWrite) / secondsSinceLastUpdate
}

p.lastUpdateTime = now
p.lastReadBytes = io.ReadBytes
p.lastWriteBytes = io.WriteBytes
}

return usage / 100, read, write
p.initialized = true
p.lastTotalCPU = totalCPU
p.lastReadBytes = io.ReadBytes
p.lastWriteBytes = io.WriteBytes

return cpu, read, write
}

// getSampleWeights converts the frequency of CPU sampling and the halflife of
Expand Down

0 comments on commit d0b0e9c

Please sign in to comment.