Skip to content

Commit

Permalink
feat: Added show-provider-address-in-metrics flag for provider metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
fbjohnny committed Jan 6, 2025
1 parent 1d2b757 commit 97ff6f7
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 25 deletions.
2 changes: 1 addition & 1 deletion protocol/metrics/consumer_metrics_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ func (pme *ConsumerMetricsManager) UpdateHealthCheckStatus(status bool) {
atomic.StoreUint64(&pme.endpointsHealthChecksOk, uint64(value))
}

func (pme *ConsumerMetricsManager) UpdateHealthcheckStatusBreakdown(chainId string, apiInterface string, status bool) {
func (pme *ConsumerMetricsManager) UpdateHealthcheckStatusBreakdown(chainId, apiInterface string, status bool) {
if pme == nil {
return
}
Expand Down
42 changes: 42 additions & 0 deletions protocol/metrics/mapped_labels_counter_vec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package metrics

import "github.com/prometheus/client_golang/prometheus"

// MappedLabelsCounterVec is a wrapper around prometheus.CounterVec that allows for setting labels dynamically.
// We use if for the metrics that have a dynamic number of labels, based on flags given upon startup.
type MappedLabelsCounterVec struct {
*prometheus.CounterVec
labels []string
}

type MappedLabelsCounterVecOpts struct {
Name string
Help string
Labels []string
}

func NewMappedLabelsCounterVec(opts MappedLabelsCounterVecOpts) *MappedLabelsCounterVec {
metric := &MappedLabelsCounterVec{
labels: opts.Labels,
CounterVec: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: opts.Name,
Help: opts.Help,
}, opts.Labels),
}

prometheus.MustRegister(metric.CounterVec)

return metric
}

func (mlgv *MappedLabelsCounterVec) getLabelValues(labelsWithValues map[string]string) []string {
labelValues := make([]string, len(mlgv.labels))
for i, label := range mlgv.labels {
labelValues[i] = labelsWithValues[label]
}
return labelValues
}

func (mlgv *MappedLabelsCounterVec) WithLabelValues(labelsWithValues map[string]string) prometheus.Counter {
return mlgv.CounterVec.WithLabelValues(mlgv.getLabelValues(labelsWithValues)...)
}
11 changes: 7 additions & 4 deletions protocol/metrics/provider_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ const (
type ProviderMetrics struct {
specID string
apiInterface string
endpoint string
lock sync.Mutex
totalCUServicedMetric *prometheus.CounterVec
totalCUPaidMetric *prometheus.CounterVec
totalRelaysServicedMetric *prometheus.CounterVec
totalRelaysServicedMetric *MappedLabelsCounterVec
totalErroredMetric *prometheus.CounterVec
consumerQoSMetric *prometheus.GaugeVec
loadRateMetric *prometheus.GaugeVec
Expand All @@ -32,7 +33,8 @@ func (pm *ProviderMetrics) AddRelay(consumerAddress string, cu uint64, qos *pair
pm.lock.Lock()
defer pm.lock.Unlock()
pm.totalCUServicedMetric.WithLabelValues(pm.specID, pm.apiInterface).Add(float64(cu))
pm.totalRelaysServicedMetric.WithLabelValues(pm.specID, pm.apiInterface).Add(1)
labels := map[string]string{"spec": pm.specID, "apiInterface": pm.apiInterface, "provider_endpoint": pm.endpoint}
pm.totalRelaysServicedMetric.WithLabelValues(labels).Add(1)
if qos == nil {
return
}
Expand Down Expand Up @@ -75,16 +77,17 @@ func (pm *ProviderMetrics) AddError() {
pm.totalErroredMetric.WithLabelValues(pm.specID, pm.apiInterface).Add(1)
}

func NewProviderMetrics(specID, apiInterface string, totalCUServicedMetric *prometheus.CounterVec,
func NewProviderMetrics(specID, apiInterface, endpoint string, totalCUServicedMetric *prometheus.CounterVec,
totalCUPaidMetric *prometheus.CounterVec,
totalRelaysServicedMetric *prometheus.CounterVec,
totalRelaysServicedMetric *MappedLabelsCounterVec,
totalErroredMetric *prometheus.CounterVec,
consumerQoSMetric *prometheus.GaugeVec,
loadRateMetric *prometheus.GaugeVec,
) *ProviderMetrics {
pm := &ProviderMetrics{
specID: specID,
apiInterface: apiInterface,
endpoint: endpoint,
lock: sync.Mutex{},
totalCUServicedMetric: totalCUServicedMetric,
totalCUPaidMetric: totalCUPaidMetric,
Expand Down
51 changes: 33 additions & 18 deletions protocol/metrics/provider_metrics_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@ const (
DisabledFlagOption = "disabled"
)

var ShowProviderEndpointInProviderMetrics = false

type ProviderMetricsManager struct {
providerMetrics map[string]*ProviderMetrics
lock sync.RWMutex
totalCUServicedMetric *prometheus.CounterVec
totalCUPaidMetric *prometheus.CounterVec
totalRelaysServicedMetric *prometheus.CounterVec
totalRelaysServicedMetric *MappedLabelsCounterVec
totalErroredMetric *prometheus.CounterVec
consumerQoSMetric *prometheus.GaugeVec
blockMetric *prometheus.GaugeVec
blockMetric *MappedLabelsGaugeVec
lastServicedBlockTimeMetric *prometheus.GaugeVec
disabledChainsMetric *prometheus.GaugeVec
fetchLatestFailedMetric *prometheus.CounterVec
Expand Down Expand Up @@ -59,10 +61,15 @@ func NewProviderMetricsManager(networkAddress string) *ProviderMetricsManager {
}, []string{"spec", "apiInterface"})

// Create a new GaugeVec metric to represent the TotalRelaysServiced over time.
totalRelaysServicedMetric := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "lava_provider_total_relays_serviced",
Help: "The total number of relays serviced by the provider over time.",
}, []string{"spec", "apiInterface"})
totalRelaysServicedLabels := []string{"spec", "apiInterface"}
if ShowProviderEndpointInProviderMetrics {
totalRelaysServicedLabels = append(totalRelaysServicedLabels, "provider_endpoint")
}
totalRelaysServicedMetric := NewMappedLabelsCounterVec(MappedLabelsCounterVecOpts{
Name: "lava_provider_total_relays_serviced",
Help: "The total number of relays serviced by the provider over time.",
Labels: totalRelaysServicedLabels,
})

// Create a new GaugeVec metric to represent the TotalErrored over time.
totalErroredMetric := prometheus.NewCounterVec(prometheus.CounterOpts{
Expand All @@ -75,10 +82,16 @@ func NewProviderMetricsManager(networkAddress string) *ProviderMetricsManager {
Help: "The latest QoS score from a consumer",
}, []string{"spec", "consumer_address", "qos_metric"})

blockMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_latest_block",
Help: "The latest block measured",
}, []string{"spec"})
blockMetricLabels := []string{"spec"}
if ShowProviderEndpointInProviderMetrics {
blockMetricLabels = append(blockMetricLabels, "provider_endpoint")
}

blockMetric := NewMappedLabelsGaugeVec(MappedLabelsGaugeVecOpts{
Name: "lava_latest_block",
Help: "The latest block measured",
Labels: blockMetricLabels,
})

// Create a new GaugeVec metric to represent the TotalCUPaid over time.
totalCUPaidMetric := prometheus.NewCounterVec(prometheus.CounterOpts{
Expand Down Expand Up @@ -161,10 +174,8 @@ func NewProviderMetricsManager(networkAddress string) *ProviderMetricsManager {
// Register the metrics with the Prometheus registry.
prometheus.MustRegister(totalCUServicedMetric)
prometheus.MustRegister(totalCUPaidMetric)
prometheus.MustRegister(totalRelaysServicedMetric)
prometheus.MustRegister(totalErroredMetric)
prometheus.MustRegister(consumerQoSMetric)
prometheus.MustRegister(blockMetric)
prometheus.MustRegister(lastServicedBlockTimeMetric)
prometheus.MustRegister(disabledChainsMetric)
prometheus.MustRegister(fetchLatestFailedMetric)
Expand Down Expand Up @@ -246,13 +257,13 @@ func (pme *ProviderMetricsManager) setProviderMetric(metrics *ProviderMetrics) {
pme.providerMetrics[specID+apiInterface] = metrics
}

func (pme *ProviderMetricsManager) AddProviderMetrics(specID, apiInterface string) *ProviderMetrics {
func (pme *ProviderMetricsManager) AddProviderMetrics(specID, apiInterface, providerEndpoint string) *ProviderMetrics {
if pme == nil {
return nil
}

if pme.getProviderMetric(specID, apiInterface) == nil {
providerMetric := NewProviderMetrics(specID, apiInterface, pme.totalCUServicedMetric, pme.totalCUPaidMetric, pme.totalRelaysServicedMetric, pme.totalErroredMetric, pme.consumerQoSMetric, pme.loadRateMetric)
providerMetric := NewProviderMetrics(specID, apiInterface, providerEndpoint, pme.totalCUServicedMetric, pme.totalCUPaidMetric, pme.totalRelaysServicedMetric, pme.totalErroredMetric, pme.consumerQoSMetric, pme.loadRateMetric)
pme.setProviderMetric(providerMetric)

endpoint := fmt.Sprintf("/metrics/%s/%s/health", specID, apiInterface)
Expand All @@ -279,13 +290,15 @@ func (pme *ProviderMetricsManager) AddProviderMetrics(specID, apiInterface strin
return pme.getProviderMetric(specID, apiInterface)
}

func (pme *ProviderMetricsManager) SetLatestBlock(specID string, block uint64) {
func (pme *ProviderMetricsManager) SetLatestBlock(specID, providerEndpoint string, block uint64) {
if pme == nil {
return
}
pme.lock.Lock()
defer pme.lock.Unlock()
pme.blockMetric.WithLabelValues(specID).Set(float64(block))

labels := map[string]string{"spec": specID, "provider_endpoint": providerEndpoint}
pme.blockMetric.WithLabelValues(labels).Set(float64(block))
pme.lastServicedBlockTimeMetric.WithLabelValues(specID).Set(float64(time.Now().Unix()))
}

Expand Down Expand Up @@ -321,7 +334,7 @@ func (pme *ProviderMetricsManager) UpdateHealthCheckStatus(status bool) {
atomic.StoreUint64(&pme.endpointsHealthChecksOk, uint64(value))
}

func (pme *ProviderMetricsManager) UpdateHealthcheckStatusBreakdown(chainId string, apiInterface string, status bool) {
func (pme *ProviderMetricsManager) UpdateHealthcheckStatusBreakdown(chainId, apiInterface string, status bool) {
if pme == nil {
return
}
Expand All @@ -337,7 +350,9 @@ func (pme *ProviderMetricsManager) SetBlock(latestLavaBlock int64) {
if pme == nil {
return
}
pme.blockMetric.WithLabelValues("lava").Set(float64(latestLavaBlock))

labels := map[string]string{"spec": "lava", "provider_endpoint": ""}
pme.blockMetric.WithLabelValues(labels).Set(float64(latestLavaBlock))
}

func (pme *ProviderMetricsManager) SetDisabledChain(specID string, apInterface string) {
Expand Down
5 changes: 3 additions & 2 deletions protocol/rpcprovider/rpcprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint
// chainTracker accepts a callback to be called on new blocks, we use this to call metrics update on a new block
recordMetricsOnNewBlock := func(blockFrom int64, blockTo int64, hash string) {
for block := blockFrom + 1; block <= blockTo; block++ {
rpcp.providerMetricsManager.SetLatestBlock(chainID, uint64(block))
rpcp.providerMetricsManager.SetLatestBlock(chainID, rpcProviderEndpoint.NetworkAddress.Address, uint64(block))
}
}
var chainFetcher chainlib.ChainFetcherIf
Expand Down Expand Up @@ -504,7 +504,7 @@ func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint
return utils.LavaFormatError("panic severity critical error, failed validating chain", err, utils.Attribute{Key: "rpcProviderEndpoint", Value: rpcProviderEndpoint})
}

providerMetrics := rpcp.providerMetricsManager.AddProviderMetrics(chainID, apiInterface)
providerMetrics := rpcp.providerMetricsManager.AddProviderMetrics(chainID, apiInterface, rpcProviderEndpoint.NetworkAddress.Address)

reliabilityManager := reliabilitymanager.NewReliabilityManager(chainTracker, rpcp.providerStateTracker, rpcp.addr.String(), chainRouter, chainParser)
rpcp.providerStateTracker.RegisterReliabilityManagerForVoteUpdates(ctx, reliabilityManager, rpcProviderEndpoint)
Expand Down Expand Up @@ -833,6 +833,7 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt
cmdRPCProvider.Flags().String(common.UseStaticSpecFlag, "", "load offline spec provided path to spec file, used to test specs before they are proposed on chain, example for spec with inheritance: --use-static-spec ./specs/mainnet-1/specs/ibc.json,./specs/mainnet-1/specs/tendermint.json,./specs/mainnet-1/specs/cosmossdk.json,./specs/mainnet-1/specs/ethermint.json,./specs/mainnet-1/specs/ethereum.json,./specs/mainnet-1/specs/evmos.json")
cmdRPCProvider.Flags().Uint64(common.RateLimitRequestPerSecondFlag, 0, "Measuring the load relative to this number for feedback - per second - per chain - default unlimited. Given Y simultaneous relay calls, a value of X and will measure Y/X load rate.")
cmdRPCProvider.Flags().BoolVar(&chainlib.AllowMissingApisByDefault, common.AllowMissingApisByDefaultFlagName, true, "allows missing apis to be proxied to the node by default, set false to block missing apis in the spec, might result in degraded performance if spec is misconfigured")
cmdRPCProvider.Flags().BoolVar(&metrics.ShowProviderEndpointInProviderMetrics, metrics.ShowProviderEndpointInMetricsFlagName, metrics.ShowProviderEndpointInProviderMetrics, "show provider endpoint in provider metrics")
common.AddRollingLogConfig(cmdRPCProvider)
return cmdRPCProvider
}

0 comments on commit 97ff6f7

Please sign in to comment.