Skip to content

Commit

Permalink
addressing PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
otherview committed Mar 12, 2021
1 parent 19b89df commit 86a2762
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 82 deletions.
2 changes: 1 addition & 1 deletion health/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func newMetrics(log logging.Logger, namespace string, registerer prometheus.Regi

// healthy handles the metrics for the healthy cases
func (m *metrics) healthy() {
m.failingChecks.Set(0)
m.failingChecks.Desc()
}

// unHealthy handles the metrics for the unhealthy cases
Expand Down
2 changes: 0 additions & 2 deletions health/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,8 @@ func (c *checkListener) OnCheckCompleted(name string, result health.Result) {
if !exists || isHealthy == previouslyHealthy {
if isHealthy {
c.log.Debug("%q returned healthy with: %s", name, string(resultJSON))
c.metrics.healthy()
} else {
c.log.Debug("%q returned unhealthy with: %s", name, string(resultJSON))
c.metrics.unHealthy()
}
return
}
Expand Down
6 changes: 3 additions & 3 deletions network/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type metrics struct {
sendQueuePortionFull prometheus.Gauge
sendFailRate prometheus.Gauge

getVersion, versionMetric,
getVersion, version,
getPeerlist, peerlist,
ping, pong,
getAcceptedFrontier, acceptedFrontier,
Expand Down Expand Up @@ -109,7 +109,7 @@ func (m *metrics) initialize(registerer prometheus.Registerer) error {
registerer.Register(m.sendFailRate),

m.getVersion.initialize(GetVersion, registerer),
m.versionMetric.initialize(Version, registerer),
m.version.initialize(Version, registerer),
m.getPeerlist.initialize(GetPeerList, registerer),
m.peerlist.initialize(PeerList, registerer),
m.ping.initialize(Ping, registerer),
Expand All @@ -134,7 +134,7 @@ func (m *metrics) message(msgType Op) *messageMetrics {
case GetVersion:
return &m.getVersion
case Version:
return &m.versionMetric
return &m.version
case GetPeerList:
return &m.getPeerlist
case PeerList:
Expand Down
45 changes: 16 additions & 29 deletions network/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,16 +294,25 @@ func (p *peer) Send(msg Msg) bool {
p.senderLock.Lock()
defer p.senderLock.Unlock()

op := msg.Op()
msgMetrics := p.net.message(op)
if msgMetrics == nil {
p.net.log.Debug("dropping an unknown send message from %s with op %s", p.id, op.String())
return false
}

// If the peer was closed then the sender channel was closed and we are
// unable to send this message without panicking. So drop the message.
if p.closed.GetValue() {
p.net.log.Debug("dropping message to %s due to a closed connection", p.id)
msgMetrics.numFailed.Inc()
return false
}

// is it possible to send?
if dropMsg := p.dropMessagePeer(); dropMsg {
p.net.log.Debug("dropping message to %s due to a send queue with too many bytes", p.id)
msgMetrics.numFailed.Inc()
return false
}

Expand All @@ -319,17 +328,21 @@ func (p *peer) Send(msg Msg) bool {
// we never sent the message, remove from pending totals
atomic.AddInt64(&p.net.pendingBytes, -msgBytesLen)
p.net.log.Debug("dropping message to %s due to a send queue with too many bytes", p.id)
msgMetrics.numFailed.Inc()
return false
}

select {
case p.sender <- msgBytes:
atomic.AddInt64(&p.pendingBytes, msgBytesLen)
msgMetrics.numSent.Inc()
msgMetrics.sentBytes.Add(float64(len(msg.Bytes())))
return true
default:
// we never sent the message, remove from pending totals
atomic.AddInt64(&p.net.pendingBytes, -msgBytesLen)
p.net.log.Debug("dropping message to %s due to a full send queue", p.id)
msgMetrics.numFailed.Inc()
return false
}
}
Expand Down Expand Up @@ -461,8 +474,6 @@ func (p *peer) close() {
func (p *peer) GetVersion() {
msg, err := p.net.b.GetVersion()
p.net.log.AssertNoError(err)
p.net.getVersion.numSent.Inc()
p.net.getVersion.sentBytes.Add(float64(len(msg.Bytes())))
p.Send(msg)
}

Expand All @@ -486,8 +497,6 @@ func (p *peer) GetPeerList() {
msg, err := p.net.b.GetPeerList()
p.net.log.AssertNoError(err)
p.Send(msg)
p.net.getPeerlist.numReceived.Inc()
p.net.getPeerlist.receivedBytes.Add(float64(len(msg.Bytes())))
}

// assumes the stateLock is not held
Expand All @@ -510,30 +519,18 @@ func (p *peer) PeerList(peers []utils.IPDesc) {
func (p *peer) Ping() {
msg, err := p.net.b.Ping()
p.net.log.AssertNoError(err)
if p.Send(msg) {
p.net.ping.numSent.Inc()
p.net.ping.sentBytes.Add(float64(len(msg.Bytes())))
} else {
p.net.ping.numFailed.Inc()
}
p.Send(msg)
}

// assumes the [stateLock] is not held
func (p *peer) Pong() {
msg, err := p.net.b.Pong()
p.net.log.AssertNoError(err)
if p.Send(msg) {
p.net.pong.numSent.Inc()
p.net.pong.sentBytes.Add(float64(len(msg.Bytes())))
} else {
p.net.pong.numFailed.Inc()
}
p.Send(msg)
}

// assumes the [stateLock] is not held
func (p *peer) getVersion(msg Msg) {
p.net.getVersion.numReceived.Inc()
p.net.getVersion.receivedBytes.Add(float64(len(msg.Bytes())))
p.Version()
}

Expand Down Expand Up @@ -635,17 +632,12 @@ func (p *peer) version(msg Msg) {
p.gotVersion.SetValue(true)

p.tryMarkConnected()

p.net.versionMetric.numReceived.Inc()
p.net.versionMetric.receivedBytes.Add(float64(len(msg.Bytes())))
}

// assumes the [stateLock] is not held
func (p *peer) getPeerList(msg Msg) {
if p.gotVersion.GetValue() {
p.SendPeerList()
p.net.getPeerlist.numReceived.Inc()
p.net.getPeerlist.receivedBytes.Add(float64(len(msg.Bytes())))
}
}

Expand All @@ -666,15 +658,10 @@ func (p *peer) peerList(msg Msg) {
}
p.net.stateLock.Unlock()
}

p.net.peerlist.numReceived.Inc()
p.net.peerlist.receivedBytes.Add(float64(len(msg.Bytes())))
}

// assumes the [stateLock] is not held
func (p *peer) ping(msg Msg) {
p.net.ping.numReceived.Inc()
p.net.ping.receivedBytes.Add(float64(len(msg.Bytes())))
func (p *peer) ping(_ Msg) {
p.Pong()
}

Expand Down
12 changes: 2 additions & 10 deletions snow/consensus/avalanche/topological.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package avalanche

import (
"errors"
"time"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow"
Expand Down Expand Up @@ -204,24 +203,17 @@ func (ta *Topological) Finalized() bool { return ta.cg.Finalized() }

// HealthCheck returns information about the consensus health.
func (ta *Topological) HealthCheck() (interface{}, error) {
numOutstandingVtx := ta.Metrics.ProcessingEntries.Len()
numOutstandingVtx := ta.Metrics.ContainersLen()
healthy := numOutstandingVtx <= ta.params.MaxOutstandingItems
details := map[string]interface{}{
"outstandingVertices": numOutstandingVtx,
}
ta.Metrics.OutstandingContainers(numOutstandingVtx)

// check for long running vertices
now := ta.Metrics.Clock.Time()
oldestStartTime := now
if startTime, exists := ta.Metrics.ProcessingEntries.Oldest(); exists {
oldestStartTime = startTime.(time.Time)
}

timeReqRunning := now.Sub(oldestStartTime)
timeReqRunning := now.Sub(ta.Metrics.MeasureAndGetOldest())
healthy = healthy && timeReqRunning <= ta.params.MaxItemProcessingTime
details["longestRunningVertex"] = timeReqRunning.String()
ta.Metrics.LongestRunningContainer(timeReqRunning.Milliseconds())

snowstormReport, err := ta.cg.HealthCheck()
healthy = healthy && err == nil
Expand Down
35 changes: 18 additions & 17 deletions snow/consensus/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type Metrics struct {
// ProcessingEntries keeps track of the time that each item was issued into
// the consensus instance. This is used to calculate the amount of time to
// accept or reject the item.
ProcessingEntries linkedhashmap.LinkedHashmap
processingEntries linkedhashmap.LinkedHashmap

// log reports anomalous events.
log logging.Logger
Expand All @@ -39,13 +39,12 @@ type Metrics struct {
// rejected tracks the number of milliseconds that an item was processing
// before being rejected
latRejected prometheus.Histogram
outstandingContainers prometheus.Gauge
longestRunningContainer prometheus.Histogram
}

// Initialize the metrics with the provided names.
func (m *Metrics) Initialize(metricName, descriptionName string, log logging.Logger, namespace string, registerer prometheus.Registerer) error {
m.ProcessingEntries = linkedhashmap.New()
m.processingEntries = linkedhashmap.New()
m.log = log

m.numProcessing = prometheus.NewGauge(prometheus.GaugeOpts{
Expand All @@ -65,11 +64,6 @@ func (m *Metrics) Initialize(metricName, descriptionName string, log logging.Log
Help: fmt.Sprintf("Latency of rejecting from the time the %s was issued in milliseconds", descriptionName),
Buckets: timer.MillisecondsBuckets,
})
m.outstandingContainers = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Name: fmt.Sprintf("%s_outstanding", metricName),
Help: fmt.Sprintf("Number of %s waiting to be processed", descriptionName),
})
m.longestRunningContainer = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: namespace,
Name: fmt.Sprintf("%s_longest_running", metricName),
Expand All @@ -88,18 +82,18 @@ func (m *Metrics) Initialize(metricName, descriptionName string, log logging.Log

// Issued marks the item as having been issued.
func (m *Metrics) Issued(id ids.ID) {
m.ProcessingEntries.Put(id, m.Clock.Time())
m.processingEntries.Put(id, m.Clock.Time())
m.numProcessing.Inc()
}

// Accepted marks the item as having been accepted.
func (m *Metrics) Accepted(id ids.ID) {
startTime, ok := m.ProcessingEntries.Get(id)
startTime, ok := m.processingEntries.Get(id)
if !ok {
m.log.Debug("unable to measure Accepted transaction %v", id.String())
return
}
m.ProcessingEntries.Delete(id)
m.processingEntries.Delete(id)

endTime := m.Clock.Time()
duration := endTime.Sub(startTime.(time.Time))
Expand All @@ -109,23 +103,30 @@ func (m *Metrics) Accepted(id ids.ID) {

// Rejected marks the item as having been rejected.
func (m *Metrics) Rejected(id ids.ID) {
startTime, ok := m.ProcessingEntries.Get(id)
startTime, ok := m.processingEntries.Get(id)
if !ok {
m.log.Debug("unable to measure Rejected transaction %v", id.String())
return
}
m.ProcessingEntries.Delete(id)
m.processingEntries.Delete(id)

endTime := m.Clock.Time()
duration := endTime.Sub(startTime.(time.Time))
m.latRejected.Observe(float64(duration.Milliseconds()))
m.numProcessing.Dec()
}

func (m *Metrics) OutstandingContainers(txs int) {
m.outstandingContainers.Set(float64(txs))
func (m *Metrics) MeasureAndGetOldest() time.Time {
now := m.Clock.Time()
if startTime, exists := m.processingEntries.Oldest(); exists {
oldestTime := startTime.(time.Time)
m.longestRunningContainer.Observe(float64(m.Clock.Time().Sub(oldestTime)))
return oldestTime
}

return now
}

func (m *Metrics) LongestRunningContainer(milliseconds int64) {
m.longestRunningContainer.Observe(float64(milliseconds))
func (m *Metrics) ContainersLen() int {
return m.processingEntries.Len()
}
12 changes: 2 additions & 10 deletions snow/consensus/snowman/topological.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package snowman

import (
"errors"
"time"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow"
Expand Down Expand Up @@ -263,24 +262,17 @@ func (ts *Topological) Finalized() bool { return len(ts.blocks) == 1 }

// HealthCheck returns information about the consensus health.
func (ts *Topological) HealthCheck() (interface{}, error) {
numOutstandingBlks := ts.Metrics.ProcessingEntries.Len()
numOutstandingBlks := ts.Metrics.ContainersLen()
healthy := numOutstandingBlks <= ts.params.MaxOutstandingItems
details := map[string]interface{}{
"outstandingBlocks": numOutstandingBlks,
}
ts.Metrics.OutstandingContainers(numOutstandingBlks)

// check for long running blocks
now := ts.Metrics.Clock.Time()
oldestStartTime := now
if startTime, exists := ts.Metrics.ProcessingEntries.Oldest(); exists {
oldestStartTime = startTime.(time.Time)
}

timeReqRunning := now.Sub(oldestStartTime)
timeReqRunning := now.Sub(ts.Metrics.MeasureAndGetOldest())
healthy = healthy && timeReqRunning <= ts.params.MaxItemProcessingTime
details["longestRunningBlock"] = timeReqRunning.String()
ts.Metrics.LongestRunningContainer(timeReqRunning.Milliseconds())

if !healthy {
return details, errUnhealthy
Expand Down
12 changes: 2 additions & 10 deletions snow/consensus/snowstorm/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"fmt"
"sort"
"strings"
"time"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow"
Expand Down Expand Up @@ -94,24 +93,17 @@ func (c *common) Finalized() bool {

// HealthCheck returns information about the consensus health.
func (c *common) HealthCheck() (interface{}, error) {
numOutstandingTxs := c.Metrics.ProcessingEntries.Len()
numOutstandingTxs := c.Metrics.ContainersLen()
healthy := numOutstandingTxs <= c.params.MaxOutstandingItems
details := map[string]interface{}{
"outstandingTransactions": numOutstandingTxs,
}
c.Metrics.OutstandingContainers(numOutstandingTxs)

// check for long running transactions
now := c.Metrics.Clock.Time()
oldestStartTime := now
if startTime, exists := c.Metrics.ProcessingEntries.Oldest(); exists {
oldestStartTime = startTime.(time.Time)
}

timeReqRunning := now.Sub(oldestStartTime)
timeReqRunning := now.Sub(c.Metrics.MeasureAndGetOldest())
healthy = healthy && timeReqRunning <= c.params.MaxItemProcessingTime
details["longestRunningTx"] = timeReqRunning.String()
c.Metrics.LongestRunningContainer(timeReqRunning.Milliseconds())

if !healthy {
return details, errUnhealthy
Expand Down

0 comments on commit 86a2762

Please sign in to comment.