Skip to content

Commit

Permalink
PRT-add-health-checker (lavanet#1036)
Browse files Browse the repository at this point in the history
* WIP health command

* finished health data gathering

* added alerts to health

* revert upgrading go version

* revert

* adapt to go 1.20

* added suppression system

* lint

* WIP alert attributes for monitoring and suppression

* added suppression support, WIP testing and configurability

* fix spec raw call to spec expanded

* added suppression count configuration

* added health example

* added latest block metric on all checks

* added unhealthy as 0

* fix missing logs issues

* fix panic in state tracker when the node is unavailable

* added fixes and apiInterface data

* added the autogenerating file to ignore

* prettify errors, improve suppression by attribute

* verified working alert mechanism

* more sleep to health test

* better error handling

* fix resource exhaustion on context done in grpc connector

* lint
  • Loading branch information
omerlavanet authored Dec 14, 2023
1 parent 4d71b87 commit f6c9d5f
Show file tree
Hide file tree
Showing 21 changed files with 1,775 additions and 72 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,6 @@ ecosystem/lavajs/proto/tendermint

testutil/e2e/sdk/tests/package-lock.json
testutil/e2e/sdk/tests/node_modules.json

config/health_examples/health_template_gen.yml
# Yarn
.yarn/
2 changes: 2 additions & 0 deletions cmd/lavap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/lavanet/lava/cmd/lavad/cmd"
"github.com/lavanet/lava/ecosystem/cache"
"github.com/lavanet/lava/protocol/badgegenerator"
"github.com/lavanet/lava/protocol/monitoring"
"github.com/lavanet/lava/protocol/rpcconsumer"
"github.com/lavanet/lava/protocol/rpcprovider"
"github.com/lavanet/lava/protocol/statetracker"
Expand Down Expand Up @@ -52,6 +53,7 @@ func main() {
testCmd.AddCommand(rpcconsumer.CreateTestRPCConsumerCobraCommand())
testCmd.AddCommand(rpcprovider.CreateTestRPCProviderCobraCommand())
testCmd.AddCommand(statetracker.CreateEventsCobraCommand())
testCmd.AddCommand(monitoring.CreateHealthCobraCommand())
rootCmd.AddCommand(cache.CreateCacheCobraCommand())
if err := svrcmd.Execute(rootCmd, "", app.DefaultNodeHome); err != nil {
switch e := err.(type) {
Expand Down
31 changes: 31 additions & 0 deletions config/health_examples/health_example.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
max-provider-latency: 150ms
subscription-days-left-alert: 10
interval: 5m
allowed_time_lag: 30s
query-retries: 5
alert-webhook-url: <alert-hook>
identifier: health_example
cu-percent-threshold: 0.2
alert-suppression-interval: 6h
disable-alert-suppression: false
suppression-alert-count-threshold: 3
metrics-listen-address: ":7776"
disable-alert-logging: false
subscription_addresses:
- lava@...
- lava@...
provider_addresses:
- lava@...
- lava@...
- lava@...
consumer_endpoints:
- chain-id: ETH1
api-interface: jsonrpc
network-address: 127.0.0.1:3333
reference_endpoints:
- chain-id: ETH1
api-interface: jsonrpc
network-address: public-rpc-1
- chain-id: ETH1
api-interface: jsonrpc
network-address: public-rpc-2
29 changes: 29 additions & 0 deletions config/health_examples/health_template.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
max-provider-latency: 150ms
subscription-days-left-alert: 10
interval: 5s
allowed_time_lag: 30s
query-retries: 5
identifier: health_example
cu-percent-threshold: 0.2
alert-suppression-interval: 60s
disable-alert-suppression: false
suppression-alert-count-threshold: 2
metrics-listen-address: ":7776"
disable-alert-logging: false
allow-insecure-provider-dialing: true
consumer_endpoints:
- chain-id: ETH1
api-interface: jsonrpc
network-address: http://127.0.0.1:3333
- chain-id: LAV1
api-interface: rest
network-address: http://127.0.0.1:3360
- chain-id: LAV1
api-interface: tendermintrpc
network-address: http://127.0.0.1:3361
- chain-id: LAV1
api-interface: grpc
network-address: 127.0.0.1:3362
#REPLACED
subscription_addresses:
provider_addresses:
16 changes: 9 additions & 7 deletions protocol/chainlib/chainproxy/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,16 +132,17 @@ func (connector *Connector) connectorLoop(ctx context.Context) {
}

func (connector *Connector) Close() {
for {
for i := 0; ; i++ {
connector.lock.Lock()
log.Println("Connector closing", len(connector.freeClients))
for i := 0; i < len(connector.freeClients); i++ {
connector.freeClients[i].Close()
}
connector.freeClients = []*rpcclient.Client{}

if connector.usedClients > 0 {
log.Println("Connector closing, waiting for in use clients", connector.usedClients)
if i > 10 {
utils.LavaFormatError("stuck while closing connector", nil, utils.LogAttr("freeClients", connector.freeClients), utils.LogAttr("usedClients", connector.usedClients))
}
connector.lock.Unlock()
time.Sleep(100 * time.Millisecond)
} else {
Expand Down Expand Up @@ -335,8 +336,8 @@ func (connector *GRPCConnector) GetRpc(ctx context.Context, block bool) (*grpc.C
}

ret := connector.freeClients[0]
connector.freeClients = connector.freeClients[1:]
connector.usedClients++
connector.freeClients = connector.freeClients[1:]

return ret, nil
}
Expand All @@ -360,16 +361,17 @@ func (connector *GRPCConnector) connectorLoop(ctx context.Context) {
}

func (connector *GRPCConnector) Close() {
for {
for i := 0; ; i++ {
connector.lock.Lock()
log.Println("Connector closing", len(connector.freeClients))
for i := 0; i < len(connector.freeClients); i++ {
connector.freeClients[i].Close()
}
connector.freeClients = []*grpc.ClientConn{}

if connector.usedClients > 0 {
log.Println("Connector closing, waiting for in use clients", connector.usedClients)
if i > 10 {
utils.LavaFormatError("stuck while closing grpc connector", nil, utils.LogAttr("freeClients", connector.freeClients), utils.LogAttr("usedClients", connector.usedClients))
}
connector.lock.Unlock()
time.Sleep(100 * time.Millisecond)
} else {
Expand Down
6 changes: 6 additions & 0 deletions protocol/chainlib/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,12 @@ func newGrpcChainProxy(ctx context.Context, nodeUrl string, averageBlockTime tim
if err != nil {
return nil, utils.LavaFormatError("reflectionConnection Error", err)
}
// this connection is kept open so it needs to be closed on teardown
go func() {
<-ctx.Done()
utils.LavaFormatInfo("tearing down reflection connection, context done")
conn.ReturnRpc(reflectionConnection)
}()

err = parser.(*GrpcChainParser).setupForProvider(reflectionConnection)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions protocol/lavaprotocol/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ var (
ProviderFinzalizationDataAccountabilityError = sdkerrors.New("ProviderFinzalizationDataAccountability Error", 3366, "provider returned invalid finalization data, with accountability")
HashesConsunsusError = sdkerrors.New("HashesConsunsus Error", 3367, "identified finalized responses with conflicting hashes, from two providers")
ConsistencyError = sdkerrors.New("Consistency Error", 3368, "does not meet consistency requirements")
UnhandledRelayReceiverError = sdkerrors.New("UnhandledRelayReceiver Error", 3369, "provider does not handle requested api interface and spec")
DisabledRelayReceiverError = sdkerrors.New("DisabledRelayReceiverError Error", 3370, "provider does not pass verification and disabled this interface and spec")
)
107 changes: 107 additions & 0 deletions protocol/metrics/health_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package metrics

import (
"net/http"

"github.com/lavanet/lava/utils"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

type HealthMetrics struct {
failedRuns *prometheus.CounterVec
successfulRuns *prometheus.CounterVec
failureAlerts *prometheus.GaugeVec
healthyChecks *prometheus.GaugeVec
unhealthyChecks *prometheus.GaugeVec
latestBlocks *prometheus.GaugeVec
}

func NewHealthMetrics(networkAddress string) *HealthMetrics {
if networkAddress == DisabledFlagOption {
utils.LavaFormatWarning("prometheus endpoint inactive, option is disabled", nil)
return nil
}

latestBlocks := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_health_latest_blocks",
Help: "The latest blocks queried on all checks",
}, []string{"identifier", "entity"})

failureAlerts := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_health_failure_alerts",
Help: "The current amount of active alerts",
}, []string{"identifier"})

healthyChecks := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_healthy_entities",
Help: "The current amount of healthy checks",
}, []string{"identifier"})

unhealthyChecks := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_unhealthy_entities",
Help: "The current amount of healthy checks",
}, []string{"identifier"})

failedRuns := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "lava_health_failed_runs",
Help: "The total of runs failed",
}, []string{"identifier"})

successfulRuns := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "lava_health_successful_runs",
Help: "The total of runs succeeded",
}, []string{"identifier"})
// Register the metrics with the Prometheus registry.
prometheus.MustRegister(failedRuns)
prometheus.MustRegister(successfulRuns)
prometheus.MustRegister(failureAlerts)
prometheus.MustRegister(healthyChecks)
prometheus.MustRegister(unhealthyChecks)
prometheus.MustRegister(latestBlocks)
http.Handle("/metrics", promhttp.Handler())
go func() {
utils.LavaFormatInfo("prometheus endpoint listening", utils.Attribute{Key: "Listen Address", Value: networkAddress})
http.ListenAndServe(networkAddress, nil)
}()
return &HealthMetrics{
failedRuns: failedRuns,
successfulRuns: successfulRuns,
failureAlerts: failureAlerts,
healthyChecks: healthyChecks,
unhealthyChecks: unhealthyChecks,
latestBlocks: latestBlocks,
}
}

func (pme *HealthMetrics) SetFailedRun(label string) {
if pme == nil {
return
}
pme.failedRuns.WithLabelValues(label).Add(1)
}

func (pme *HealthMetrics) SetSuccess(label string) {
if pme == nil {
return
}
pme.successfulRuns.WithLabelValues(label).Add(1)
}

func (pme *HealthMetrics) SetLatestBlockData(label string, data map[string]uint64) {
if pme == nil {
return
}
for entity, value := range data {
pme.latestBlocks.WithLabelValues(label, entity).Set(float64(value))
}
}

func (pme *HealthMetrics) SetAlertResults(label string, fails uint64, unhealthy uint64, healthy uint64) {
if pme == nil {
return
}
pme.failureAlerts.WithLabelValues(label).Set(float64(fails))
pme.unhealthyChecks.WithLabelValues(label).Set(float64(unhealthy))
pme.healthyChecks.WithLabelValues(label).Set(float64(healthy))
}
Loading

0 comments on commit f6c9d5f

Please sign in to comment.