From 9faee81587e7a737228d4281ecf05f48732fd8ac Mon Sep 17 00:00:00 2001 From: Omer <100387053+omerlavanet@users.noreply.github.com> Date: Sun, 18 Feb 2024 13:05:43 +0200 Subject: [PATCH] PRT-add-flag-to-export-jail-reports (#1226) * wip * add reports for conflicts * added tests and nil safety * added aggregation, and added referrer client aggregation * lint * send referer only when a relay succeeds * fix dup ports in full_consumer_example * add nil reference support * check for nil just in case * added debug prints * added logging for health check --- .../full_consumer_example.yml | 8 +- config/health_examples/health_template.yml | 2 - protocol/chainlib/common.go | 28 +--- protocol/chainlib/jsonRPC.go | 10 +- protocol/chainlib/rest.go | 10 +- protocol/chainlib/tendermintRPC.go | 13 +- .../lavaprotocol/response_builder_test.go | 38 ++--- protocol/lavaprotocol/reuqest_builder_test.go | 19 +-- .../lavasession/consumer_session_manager.go | 117 ++++--------- .../consumer_session_manager_test.go | 2 +- protocol/lavasession/consumer_types.go | 24 +-- protocol/lavasession/reported_providers.go | 14 +- .../lavasession/reported_providers_test.go | 6 +- protocol/metrics/consumer_referrer_client.go | 86 ++++++++++ .../metrics/consumer_referrer_client_test.go | 66 ++++++++ .../metrics/consumer_relayserver_client.go | 2 +- protocol/metrics/consumer_reports_client.go | 113 +++++++++++++ .../metrics/consumer_reports_client_test.go | 89 ++++++++++ protocol/metrics/queue_sender.go | 156 ++++++++++++++++++ protocol/metrics/relays_monitor.go | 7 + protocol/rpcconsumer/rpcconsumer.go | 12 +- protocol/rpcconsumer/rpcconsumer_server.go | 10 +- .../reliability_manager_test.go | 76 ++++----- scripts/init_chain_commands.sh | 3 +- 24 files changed, 691 insertions(+), 220 deletions(-) create mode 100644 protocol/metrics/consumer_referrer_client.go create mode 100644 protocol/metrics/consumer_referrer_client_test.go create mode 100644 protocol/metrics/consumer_reports_client.go create mode 100644 protocol/metrics/consumer_reports_client_test.go create mode 100644 protocol/metrics/queue_sender.go diff --git a/config/consumer_examples/full_consumer_example.yml b/config/consumer_examples/full_consumer_example.yml index ececa4ff5f..56cdda6f29 100644 --- a/config/consumer_examples/full_consumer_example.yml +++ b/config/consumer_examples/full_consumer_example.yml @@ -134,9 +134,6 @@ endpoints: - chain-id: AGR api-interface: grpc network-address: 127.0.0.1:3387 - - chain-id: KOIIT - api-interface: jsonrpc - network-address: 127.0.0.1:3388 - chain-id: AGR api-interface: tendermintrpc network-address: 127.0.0.1:3388 @@ -149,4 +146,9 @@ endpoints: - chain-id: AGRT api-interface: tendermintrpc network-address: 127.0.0.1:3391 + - chain-id: KOIIT + api-interface: jsonrpc + network-address: 127.0.0.1:3392 metrics-listen-address: ":7779" +# referer-be-address: "http://127.0.0.1:6500" +# reports-be-address: "http://127.0.0.1:6501" \ No newline at end of file diff --git a/config/health_examples/health_template.yml b/config/health_examples/health_template.yml index 24d8a4437c..7a02719119 100644 --- a/config/health_examples/health_template.yml +++ b/config/health_examples/health_template.yml @@ -15,8 +15,6 @@ consumer_endpoints: - chain-id: ETH1 api-interface: jsonrpc network-address: http://127.0.0.1:3333 - addons: - - debug - chain-id: LAV1 api-interface: rest network-address: http://127.0.0.1:3360 diff --git a/protocol/chainlib/common.go b/protocol/chainlib/common.go index cb15740a13..9fa6f2d98c 100644 --- a/protocol/chainlib/common.go +++ b/protocol/chainlib/common.go @@ -1,7 +1,6 @@ package chainlib import ( - "bytes" "encoding/json" "fmt" "net" @@ -374,33 +373,20 @@ func ValidateNilResponse(responseString string) error { } type RefererData struct { - Address string - Marker string + Address string + Marker string + ReferrerClient *metrics.ConsumerReferrerClient } func (rd *RefererData) SendReferer(refererMatchString string) error { if rd == nil || rd.Address == "" { return nil } - utils.LavaFormatDebug("referer detected", utils.LogAttr("referer", refererMatchString)) - payload := map[string]interface{}{} - payload["referer-id"] = refererMatchString - payloadBytes, err := json.Marshal(payload) - if err != nil { - return utils.LavaFormatError("failed marshaling payload for Referer", err) - } - req, err := http.NewRequest("POST", rd.Address, bytes.NewBuffer(payloadBytes)) - if err != nil { - return utils.LavaFormatError("failed building a request for referer", err) + if rd.ReferrerClient == nil { + return nil } - req.Header.Set("Content-Type", "application/json") - // Make the HTTP request - client := &http.Client{} - resp, err := client.Do(req) - if err != nil { - return utils.LavaFormatDebug("failed sending http request", utils.LogAttr("error", err)) - } - defer resp.Body.Close() + utils.LavaFormatDebug("referer detected", utils.LogAttr("referer", refererMatchString)) + rd.ReferrerClient.AppendReferrer(metrics.NewReferrerRequest(refererMatchString)) return nil } diff --git a/protocol/chainlib/jsonRPC.go b/protocol/chainlib/jsonRPC.go index 5e9d77077f..4a76d9233d 100644 --- a/protocol/chainlib/jsonRPC.go +++ b/protocol/chainlib/jsonRPC.go @@ -339,9 +339,6 @@ func (apil *JsonRPCChainListener) Serve(ctx context.Context, cmdFlags common.Con apil.logger.AnalyzeWebSocketErrorAndWriteMessage(websockConn, messageType, nil, msgSeed, []byte("Unable to extract dappID"), spectypes.APIInterfaceJsonRPC, time.Since(startTime)) } refererMatch, ok := websockConn.Locals(refererMatchString).(string) - if ok && refererMatch != "" && apil.refererData != nil { - go apil.refererData.SendReferer(refererMatch) - } ctx, cancel := context.WithCancel(context.Background()) guid := utils.GenerateUniqueIdentifier() ctx = utils.WithUniqueIdentifier(ctx, guid) @@ -350,6 +347,9 @@ func (apil *JsonRPCChainListener) Serve(ctx context.Context, cmdFlags common.Con utils.LavaFormatDebug("ws in <<<", utils.Attribute{Key: "seed", Value: msgSeed}, utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "msg", Value: msg}, utils.Attribute{Key: "dappID", Value: dappID}) metricsData := metrics.NewRelayAnalytics(dappID, chainID, apiInterface) relayResult, err := apil.relaySender.SendRelay(ctx, "", string(msg), http.MethodPost, dappID, websockConn.RemoteAddr().String(), metricsData, nil) + if ok && refererMatch != "" && apil.refererData != nil && err == nil { + go apil.refererData.SendReferer(refererMatch) + } reply := relayResult.GetReply() replyServer := relayResult.GetReplyServer() go apil.logger.AddMetricForWebSocket(metricsData, err, websockConn) @@ -428,10 +428,10 @@ func (apil *JsonRPCChainListener) Serve(ctx context.Context, cmdFlags common.Con utils.LogAttr("headers", headers), ) refererMatch := fiberCtx.Params(refererMatchString, "") - if refererMatch != "" && apil.refererData != nil { + relayResult, err := apil.relaySender.SendRelay(ctx, "", string(fiberCtx.Body()), http.MethodPost, dappID, consumerIp, metricsData, headers) + if refererMatch != "" && apil.refererData != nil && err == nil { go apil.refererData.SendReferer(refererMatch) } - relayResult, err := apil.relaySender.SendRelay(ctx, "", string(fiberCtx.Body()), http.MethodPost, dappID, consumerIp, metricsData, headers) reply := relayResult.GetReply() go apil.logger.AddMetricForHttp(metricsData, err, fiberCtx.GetReqHeaders()) if err != nil { diff --git a/protocol/chainlib/rest.go b/protocol/chainlib/rest.go index b9af4a6aa2..011f2ddba6 100644 --- a/protocol/chainlib/rest.go +++ b/protocol/chainlib/rest.go @@ -301,11 +301,11 @@ func (apil *RestChainListener) Serve(ctx context.Context, cmdFlags common.Consum utils.LogAttr("headers", restHeaders), ) refererMatch := fiberCtx.Params(refererMatchString, "") - if refererMatch != "" && apil.refererData != nil { - go apil.refererData.SendReferer(refererMatch) - } requestBody := string(fiberCtx.Body()) relayResult, err := apil.relaySender.SendRelay(ctx, path+query, requestBody, http.MethodPost, dappID, fiberCtx.Get(common.IP_FORWARDING_HEADER_NAME, fiberCtx.IP()), analytics, restHeaders) + if refererMatch != "" && apil.refererData != nil && err == nil { + go apil.refererData.SendReferer(refererMatch) + } reply := relayResult.GetReply() go apil.logger.AddMetricForHttp(analytics, err, fiberCtx.GetReqHeaders()) if err != nil { @@ -367,10 +367,10 @@ func (apil *RestChainListener) Serve(ctx context.Context, cmdFlags common.Consum utils.LogAttr("headers", restHeaders), ) refererMatch := fiberCtx.Params(refererMatchString, "") - if refererMatch != "" && apil.refererData != nil { + relayResult, err := apil.relaySender.SendRelay(ctx, path+query, "", fiberCtx.Method(), dappID, fiberCtx.Get(common.IP_FORWARDING_HEADER_NAME, fiberCtx.IP()), analytics, restHeaders) + if refererMatch != "" && apil.refererData != nil && err == nil { go apil.refererData.SendReferer(refererMatch) } - relayResult, err := apil.relaySender.SendRelay(ctx, path+query, "", fiberCtx.Method(), dappID, fiberCtx.Get(common.IP_FORWARDING_HEADER_NAME, fiberCtx.IP()), analytics, restHeaders) reply := relayResult.GetReply() go apil.logger.AddMetricForHttp(analytics, err, fiberCtx.GetReqHeaders()) if err != nil { diff --git a/protocol/chainlib/tendermintRPC.go b/protocol/chainlib/tendermintRPC.go index efded4faea..e543c1fd85 100644 --- a/protocol/chainlib/tendermintRPC.go +++ b/protocol/chainlib/tendermintRPC.go @@ -374,11 +374,11 @@ func (apil *TendermintRpcChainListener) Serve(ctx context.Context, cmdFlags comm utils.LavaFormatInfo("ws in <<<", utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "seed", Value: msgSeed}, utils.Attribute{Key: "msg", Value: msg}, utils.Attribute{Key: "dappID", Value: dappID}) msgSeed = strconv.FormatUint(guid, 10) refererMatch, ok := websocketConn.Locals(refererMatchString).(string) - if ok && refererMatch != "" && apil.refererData != nil { - go apil.refererData.SendReferer(refererMatch) - } metricsData := metrics.NewRelayAnalytics(dappID, chainID, apiInterface) relayResult, err := apil.relaySender.SendRelay(ctx, "", string(msg), "", dappID, websocketConn.RemoteAddr().String(), metricsData, nil) + if ok && refererMatch != "" && apil.refererData != nil && err == nil { + go apil.refererData.SendReferer(refererMatch) + } reply := relayResult.GetReply() replyServer := relayResult.GetReplyServer() go apil.logger.AddMetricForWebSocket(metricsData, err, websocketConn) @@ -451,10 +451,10 @@ func (apil *TendermintRpcChainListener) Serve(ctx context.Context, cmdFlags comm utils.LogAttr("headers", headers), ) refererMatch := fiberCtx.Params(refererMatchString, "") - if refererMatch != "" && apil.refererData != nil { + relayResult, err := apil.relaySender.SendRelay(ctx, "", string(fiberCtx.Body()), "", dappID, fiberCtx.Get(common.IP_FORWARDING_HEADER_NAME, fiberCtx.IP()), metricsData, headers) + if refererMatch != "" && apil.refererData != nil && err == nil { go apil.refererData.SendReferer(refererMatch) } - relayResult, err := apil.relaySender.SendRelay(ctx, "", string(fiberCtx.Body()), "", dappID, fiberCtx.Get(common.IP_FORWARDING_HEADER_NAME, fiberCtx.IP()), metricsData, headers) reply := relayResult.GetReply() go apil.logger.AddMetricForHttp(metricsData, err, fiberCtx.GetReqHeaders()) @@ -515,6 +515,9 @@ func (apil *TendermintRpcChainListener) Serve(ctx context.Context, cmdFlags comm go apil.refererData.SendReferer(refererMatch) } relayResult, err := apil.relaySender.SendRelay(ctx, path+query, "", "", dappID, fiberCtx.Get(common.IP_FORWARDING_HEADER_NAME, fiberCtx.IP()), metricsData, headers) + if refererMatch != "" && apil.refererData != nil && err == nil { + go apil.refererData.SendReferer(refererMatch) + } msgSeed := strconv.FormatUint(guid, 10) reply := relayResult.GetReply() go apil.logger.AddMetricForHttp(metricsData, err, fiberCtx.GetReqHeaders()) diff --git a/protocol/lavaprotocol/response_builder_test.go b/protocol/lavaprotocol/response_builder_test.go index 251e0dbd56..7b3ceb09ed 100644 --- a/protocol/lavaprotocol/response_builder_test.go +++ b/protocol/lavaprotocol/response_builder_test.go @@ -26,16 +26,15 @@ func TestSignAndExtractResponse(t *testing.T) { specId := "LAV1" epoch := int64(100) singleConsumerSession := &lavasession.SingleConsumerSession{ - CuSum: 20, - LatestRelayCu: 10, // set by GetSessions cuNeededForSession - QoSInfo: lavasession.QoSReport{LastQoSReport: &pairingtypes.QualityOfServiceReport{}}, - SessionId: 123, - Parent: nil, - RelayNum: 1, - LatestBlock: epoch, - Endpoint: nil, - BlockListed: false, // if session lost sync we blacklist it. - ConsecutiveNumberOfFailures: 0, // number of times this session has failed + CuSum: 20, + LatestRelayCu: 10, // set by GetSessions cuNeededForSession + QoSInfo: lavasession.QoSReport{LastQoSReport: &pairingtypes.QualityOfServiceReport{}}, + SessionId: 123, + Parent: nil, + RelayNum: 1, + LatestBlock: epoch, + Endpoint: nil, + BlockListed: false, // if session lost sync we blacklist it. } metadataValue := make([]pairingtypes.Metadata, 1) metadataValue[0] = pairingtypes.Metadata{ @@ -75,16 +74,15 @@ func TestSignAndExtractResponseLatest(t *testing.T) { testSpecId := "BLAV1" epoch := int64(100) singleConsumerSession := &lavasession.SingleConsumerSession{ - CuSum: 20, - LatestRelayCu: 10, // set by GetSessions cuNeededForSession - QoSInfo: lavasession.QoSReport{LastQoSReport: &pairingtypes.QualityOfServiceReport{}}, - SessionId: 123, - Parent: nil, - RelayNum: 1, - LatestBlock: epoch, - Endpoint: nil, - BlockListed: false, // if session lost sync we blacklist it. - ConsecutiveNumberOfFailures: 0, // number of times this session has failed + CuSum: 20, + LatestRelayCu: 10, // set by GetSessions cuNeededForSession + QoSInfo: lavasession.QoSReport{LastQoSReport: &pairingtypes.QualityOfServiceReport{}}, + SessionId: 123, + Parent: nil, + RelayNum: 1, + LatestBlock: epoch, + Endpoint: nil, + BlockListed: false, // if session lost sync we blacklist it. } metadataValue := make([]pairingtypes.Metadata, 1) metadataValue[0] = pairingtypes.Metadata{ diff --git a/protocol/lavaprotocol/reuqest_builder_test.go b/protocol/lavaprotocol/reuqest_builder_test.go index 87cd86d936..fc967648e3 100644 --- a/protocol/lavaprotocol/reuqest_builder_test.go +++ b/protocol/lavaprotocol/reuqest_builder_test.go @@ -16,16 +16,15 @@ func TestSignAndExtract(t *testing.T) { specId := "LAV1" epoch := int64(100) singleConsumerSession := &lavasession.SingleConsumerSession{ - CuSum: 20, - LatestRelayCu: 10, // set by GetSessions cuNeededForSession - QoSInfo: lavasession.QoSReport{LastQoSReport: &pairingtypes.QualityOfServiceReport{}}, - SessionId: 123, - Parent: nil, - RelayNum: 1, - LatestBlock: epoch, - Endpoint: nil, - BlockListed: false, // if session lost sync we blacklist it. - ConsecutiveNumberOfFailures: 0, // number of times this session has failed + CuSum: 20, + LatestRelayCu: 10, // set by GetSessions cuNeededForSession + QoSInfo: lavasession.QoSReport{LastQoSReport: &pairingtypes.QualityOfServiceReport{}}, + SessionId: 123, + Parent: nil, + RelayNum: 1, + LatestBlock: epoch, + Endpoint: nil, + BlockListed: false, // if session lost sync we blacklist it. } metadataValue := make([]pairingtypes.Metadata, 1) metadataValue[0] = pairingtypes.Metadata{ diff --git a/protocol/lavasession/consumer_session_manager.go b/protocol/lavasession/consumer_session_manager.go index b6784b4441..cc26c6b70c 100644 --- a/protocol/lavasession/consumer_session_manager.go +++ b/protocol/lavasession/consumer_session_manager.go @@ -684,25 +684,40 @@ func (csm *ConsumerSessionManager) OnSessionFailure(consumerSession *SingleConsu return sdkerrors.Wrapf(SessionIsAlreadyBlockListedError, "trying to report a session failure of a blocklisted consumer session") } + // check if need to block & report + var blockProvider, reportProvider bool + if ReportAndBlockProviderError.Is(errorReceived) { + blockProvider = true + reportProvider = true + } else if BlockProviderError.Is(errorReceived) { + blockProvider = true + } + consumerSession.QoSInfo.TotalRelays++ - consumerSession.ConsecutiveNumberOfFailures += 1 // increase number of failures for this session - consumerSession.errosCount += 1 + consumerSession.ConsecutiveErrors = append(consumerSession.ConsecutiveErrors, errorReceived) + consumerSession.errorsCount += 1 // if this session failed more than MaximumNumberOfFailuresAllowedPerConsumerSession times or session went out of sync we block it. - var consumerSessionBlockListed bool - if consumerSession.ConsecutiveNumberOfFailures > MaximumNumberOfFailuresAllowedPerConsumerSession || IsSessionSyncLoss(errorReceived) { - utils.LavaFormatDebug("Blocking consumer session", utils.LogAttr("ConsecutiveNumberOfFailures", consumerSession.ConsecutiveNumberOfFailures), utils.LogAttr("errosCount", consumerSession.errosCount), utils.Attribute{Key: "id", Value: consumerSession.SessionId}) + if len(consumerSession.ConsecutiveErrors) > MaximumNumberOfFailuresAllowedPerConsumerSession || IsSessionSyncLoss(errorReceived) { + utils.LavaFormatDebug("Blocking consumer session", utils.LogAttr("ConsecutiveErrors", consumerSession.ConsecutiveErrors), utils.LogAttr("errorsCount", consumerSession.errorsCount), utils.Attribute{Key: "id", Value: consumerSession.SessionId}) consumerSession.BlockListed = true // block this session from future usages - consumerSessionBlockListed = true + // we will check the total number of cu for this provider and decide if we need to report it. + if consumerSession.Parent.atomicReadUsedComputeUnits() <= consumerSession.LatestRelayCu { // if we had 0 successful relays and we reached block session we need to report this provider + blockProvider = true + reportProvider = true + } + if reportProvider { + providerAddr := consumerSession.Parent.PublicLavaAddress + go csm.reportedProviders.AppendReport(metrics.NewReportsRequest(providerAddr, consumerSession.ConsecutiveErrors, csm.rpcEndpoint.ChainID)) + } } cuToDecrease := consumerSession.LatestRelayCu // latency, isHangingApi, syncScore arent updated when there is a failure go csm.providerOptimizer.AppendRelayFailure(consumerSession.Parent.PublicLavaAddress) consumerSession.LatestRelayCu = 0 // making sure no one uses it in a wrong way - + consecutiveErrors := uint64(len(consumerSession.ConsecutiveErrors)) parentConsumerSessionsWithProvider := consumerSession.Parent // must read this pointer before unlocking - reportErrors := consumerSession.ConsecutiveNumberOfFailures - // finished with consumerSession here can unlock. csm.updateMetricsManager(consumerSession) + // finished with consumerSession here can unlock. consumerSession.lock.Unlock() // we unlock before we change anything in the parent ConsumerSessionsWithProvider err := parentConsumerSessionsWithProvider.decreaseUsedComputeUnits(cuToDecrease) // change the cu in parent @@ -710,27 +725,9 @@ func (csm *ConsumerSessionManager) OnSessionFailure(consumerSession *SingleConsu return err } - // check if need to block & report - var blockProvider, reportProvider bool - if ReportAndBlockProviderError.Is(errorReceived) { - blockProvider = true - reportProvider = true - } else if BlockProviderError.Is(errorReceived) { - blockProvider = true - } - - // if BlockListed is true here meaning we had a ConsecutiveNumberOfFailures > MaximumNumberOfFailuresAllowedPerConsumerSession or out of sync - // we will check the total number of cu for this provider and decide if we need to report it. - if consumerSessionBlockListed { - if parentConsumerSessionsWithProvider.atomicReadUsedComputeUnits() == 0 { // if we had 0 successful relays and we reached block session we need to report this provider - blockProvider = true - reportProvider = true - } - } - if blockProvider { publicProviderAddress, pairingEpoch := parentConsumerSessionsWithProvider.getPublicLavaAddressAndPairingEpoch() - err = csm.blockProvider(publicProviderAddress, reportProvider, pairingEpoch, 0, reportErrors, nil) + err = csm.blockProvider(publicProviderAddress, reportProvider, pairingEpoch, 0, consecutiveErrors, nil) if err != nil { if EpochMismatchError.Is(err) { return nil // no effects this epoch has been changed @@ -755,8 +752,8 @@ func (csm *ConsumerSessionManager) OnDataReliabilitySessionDone(consumerSession return sdkerrors.Wrapf(err, "OnDataReliabilitySessionDone, consumerSession.lock must be locked before accessing this method") } - defer consumerSession.lock.Unlock() // we need to be locked here, if we didn't get it locked we try lock anyway - consumerSession.ConsecutiveNumberOfFailures = 0 // reset failures. + defer consumerSession.lock.Unlock() // we need to be locked here, if we didn't get it locked we try lock anyway + consumerSession.ConsecutiveErrors = []error{} consumerSession.LatestBlock = latestServicedBlock // update latest serviced block if expectedBH-latestServicedBlock > 1000 { utils.LavaFormatWarning("identified block gap", nil, @@ -790,8 +787,8 @@ func (csm *ConsumerSessionManager) OnSessionDone( defer consumerSession.lock.Unlock() // we need to be locked here, if we didn't get it locked we try lock anyway consumerSession.CuSum += consumerSession.LatestRelayCu // add CuSum to current cu usage. consumerSession.LatestRelayCu = 0 // reset cu just in case - consumerSession.ConsecutiveNumberOfFailures = 0 // reset failures. - consumerSession.LatestBlock = latestServicedBlock // update latest serviced block + consumerSession.ConsecutiveErrors = []error{} + consumerSession.LatestBlock = latestServicedBlock // update latest serviced block // calculate QoS consumerSession.CalculateQoS(currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, int64(providersCount)) go csm.providerOptimizer.AppendRelayData(consumerSession.Parent.PublicLavaAddress, currentLatency, isHangingApi, specComputeUnits, uint64(latestServicedBlock)) @@ -799,8 +796,7 @@ func (csm *ConsumerSessionManager) OnSessionDone( return nil } -// func () - +// updates QoS metrics for a provider // consumerSession should still be locked when accessing this method as it fetches information from the session it self func (csm *ConsumerSessionManager) updateMetricsManager(consumerSession *SingleConsumerSession) { if csm.consumerMetricsManager == nil { @@ -945,54 +941,7 @@ func (csm *ConsumerSessionManager) OnSessionDoneIncreaseCUOnly(consumerSession * defer consumerSession.lock.Unlock() // we need to be locked here, if we didn't get it locked we try lock anyway consumerSession.CuSum += consumerSession.LatestRelayCu // add CuSum to current cu usage. consumerSession.LatestRelayCu = 0 // reset cu just in case - consumerSession.ConsecutiveNumberOfFailures = 0 // reset failures. - return nil -} - -// On a failed DataReliability session we don't decrease the cu unlike a normal session, we just unlock and verify if we need to block this session or provider. -func (csm *ConsumerSessionManager) OnDataReliabilitySessionFailure(consumerSession *SingleConsumerSession, errorReceived error) error { - // consumerSession must be locked when getting here. - if err := csm.verifyLock(consumerSession); err != nil { - return sdkerrors.Wrapf(err, "OnDataReliabilitySessionFailure consumerSession.lock must be locked before accessing this method") - } - // consumer Session should be locked here. so we can just apply the session failure here. - if consumerSession.BlockListed { - // if consumer session is already blocklisted return an error. - return sdkerrors.Wrapf(SessionIsAlreadyBlockListedError, "trying to report a session failure of a blocklisted client session") - } - consumerSession.QoSInfo.TotalRelays++ - consumerSession.ConsecutiveNumberOfFailures += 1 // increase number of failures for this session - consumerSession.RelayNum -= 1 // upon data reliability failure, decrease the relay number so we can try again. - - // if this session failed more than MaximumNumberOfFailuresAllowedPerConsumerSession times we block list it. - if consumerSession.ConsecutiveNumberOfFailures > MaximumNumberOfFailuresAllowedPerConsumerSession { - consumerSession.BlockListed = true // block this session from future usages - } else if SessionOutOfSyncError.Is(errorReceived) { // this is an error that we must block the session due to. - consumerSession.BlockListed = true - } - - var blockProvider, reportProvider bool - if ReportAndBlockProviderError.Is(errorReceived) { - blockProvider = true - reportProvider = true - } else if BlockProviderError.Is(errorReceived) { - blockProvider = true - } - - parentConsumerSessionsWithProvider := consumerSession.Parent - consumerSession.lock.Unlock() - - if blockProvider { - publicProviderAddress, pairingEpoch := parentConsumerSessionsWithProvider.getPublicLavaAddressAndPairingEpoch() - err := csm.blockProvider(publicProviderAddress, reportProvider, pairingEpoch, 0, 0, nil) - if err != nil { - if EpochMismatchError.Is(err) { - return nil // no effects this epoch has been changed - } - return err - } - } - + consumerSession.ConsecutiveErrors = []error{} return nil } @@ -1003,9 +952,9 @@ func (csm *ConsumerSessionManager) GenerateReconnectCallback(consumerSessionsWit } } -func NewConsumerSessionManager(rpcEndpoint *RPCEndpoint, providerOptimizer ProviderOptimizer, consumerMetricsManager *metrics.ConsumerMetricsManager) *ConsumerSessionManager { +func NewConsumerSessionManager(rpcEndpoint *RPCEndpoint, providerOptimizer ProviderOptimizer, consumerMetricsManager *metrics.ConsumerMetricsManager, reporter metrics.Reporter) *ConsumerSessionManager { csm := &ConsumerSessionManager{ - reportedProviders: *NewReportedProviders(), + reportedProviders: *NewReportedProviders(reporter), consumerMetricsManager: consumerMetricsManager, } csm.rpcEndpoint = rpcEndpoint diff --git a/protocol/lavasession/consumer_session_manager_test.go b/protocol/lavasession/consumer_session_manager_test.go index e35c893bef..3b58e84e18 100644 --- a/protocol/lavasession/consumer_session_manager_test.go +++ b/protocol/lavasession/consumer_session_manager_test.go @@ -45,7 +45,7 @@ func CreateConsumerSessionManager() *ConsumerSessionManager { AllowInsecureConnectionToProviders = true // set to allow insecure for tests purposes rand.InitRandomSeed() baseLatency := common.AverageWorldLatency / 2 // we want performance to be half our timeout or better - return NewConsumerSessionManager(&RPCEndpoint{"stub", "stub", "stub", false, "/", 0}, provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, 0, baseLatency, 1), nil) + return NewConsumerSessionManager(&RPCEndpoint{"stub", "stub", "stub", false, "/", 0}, provideroptimizer.NewProviderOptimizer(provideroptimizer.STRATEGY_BALANCED, 0, baseLatency, 1), nil, nil) } var grpcServer *grpc.Server diff --git a/protocol/lavasession/consumer_types.go b/protocol/lavasession/consumer_types.go index 4060d51efd..3de0bafa62 100644 --- a/protocol/lavasession/consumer_types.go +++ b/protocol/lavasession/consumer_types.go @@ -58,18 +58,18 @@ type QoSReport struct { } type SingleConsumerSession struct { - CuSum uint64 - LatestRelayCu uint64 // set by GetSessions cuNeededForSession - QoSInfo QoSReport - SessionId int64 - Parent *ConsumerSessionsWithProvider - lock utils.LavaMutex - RelayNum uint64 - LatestBlock int64 - Endpoint *Endpoint - BlockListed bool // if session lost sync we blacklist it. - ConsecutiveNumberOfFailures uint64 // number of times this session has failed - errosCount uint64 + CuSum uint64 + LatestRelayCu uint64 // set by GetSessions cuNeededForSession + QoSInfo QoSReport + SessionId int64 + Parent *ConsumerSessionsWithProvider + lock utils.LavaMutex + RelayNum uint64 + LatestBlock int64 + Endpoint *Endpoint + BlockListed bool // if session lost sync we blacklist it. + ConsecutiveErrors []error + errorsCount uint64 } type DataReliabilitySession struct { diff --git a/protocol/lavasession/reported_providers.go b/protocol/lavasession/reported_providers.go index b7578ac7bd..419dbfaf80 100644 --- a/protocol/lavasession/reported_providers.go +++ b/protocol/lavasession/reported_providers.go @@ -4,6 +4,7 @@ import ( "sync" "time" + metrics "github.com/lavanet/lava/protocol/metrics" "github.com/lavanet/lava/utils" pairingtypes "github.com/lavanet/lava/x/pairing/types" ) @@ -15,6 +16,7 @@ const ( type ReportedProviders struct { addedToPurgeAndReport map[string]*ReportedProviderEntry // list of purged providers to report for QoS unavailability. (easier to search maps.) lock sync.RWMutex + reporter metrics.Reporter } type ReportedProviderEntry struct { @@ -112,8 +114,16 @@ func (rp *ReportedProviders) ReconnectProviders() { } } -func NewReportedProviders() *ReportedProviders { - rp := &ReportedProviders{addedToPurgeAndReport: map[string]*ReportedProviderEntry{}} +func (rp *ReportedProviders) AppendReport(report metrics.ReportsRequest) { + if rp == nil || rp.reporter == nil { + return + } + utils.LavaFormatDebug("sending report on provider", utils.LogAttr("provider", report.Provider)) + rp.reporter.AppendReport(report) +} + +func NewReportedProviders(reporter metrics.Reporter) *ReportedProviders { + rp := &ReportedProviders{addedToPurgeAndReport: map[string]*ReportedProviderEntry{}, reporter: reporter} go func() { ticker := time.NewTicker(ReconnectCandidateTime) defer ticker.Stop() diff --git a/protocol/lavasession/reported_providers_test.go b/protocol/lavasession/reported_providers_test.go index 3ea829b3d2..7155c24d9f 100644 --- a/protocol/lavasession/reported_providers_test.go +++ b/protocol/lavasession/reported_providers_test.go @@ -9,7 +9,7 @@ import ( ) func TestReportedProvider(t *testing.T) { - reportedProviders := NewReportedProviders() + reportedProviders := NewReportedProviders(nil) providers := []string{"p1", "p2", "p3"} reportedProviders.ReportProvider(providers[0], 0, 0, nil) require.True(t, reportedProviders.IsReported(providers[0])) @@ -31,7 +31,7 @@ func TestReportedProvider(t *testing.T) { } func TestReportedErrors(t *testing.T) { - reportedProviders := NewReportedProviders() + reportedProviders := NewReportedProviders(nil) providers := []string{"p1", "p2", "p3"} reportedProviders.ReportProvider(providers[0], 5, 0, nil) require.True(t, reportedProviders.IsReported(providers[0])) @@ -63,7 +63,7 @@ func TestReportedReconnect(t *testing.T) { reconnectAttempt++ return fmt.Errorf("nope") } - reportedProviders := NewReportedProviders() + reportedProviders := NewReportedProviders(nil) providers := []string{"p1", "p2", "p3", "p4"} reportedProviders.ReportProvider(providers[0], 0, 5, reconnected) require.True(t, reportedProviders.IsReported(providers[0])) diff --git a/protocol/metrics/consumer_referrer_client.go b/protocol/metrics/consumer_referrer_client.go new file mode 100644 index 0000000000..7f855f3d92 --- /dev/null +++ b/protocol/metrics/consumer_referrer_client.go @@ -0,0 +1,86 @@ +package metrics + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/lavanet/lava/utils" +) + +const ( + referrerName = "referrer" +) + +type ReferrerSender interface { + AppendReferrer(referrer ReferrerRequest) +} + +type ConsumerReferrerClient struct { + *QueueSender +} + +func NewReferrerRequest(referrerId string) ReferrerRequest { + return ReferrerRequest{ + Name: referrerName, + ReferrerId: referrerId, + Count: 1, + } +} + +type ReferrerRequest struct { + ReferrerId string `json:"referer-id"` + Name string `json:"name"` + Count uint64 `json:"count"` +} + +func (rr ReferrerRequest) String() string { + rr.Name = reportName + bytes, err := json.Marshal(rr) + if err != nil { + return "" + } + return string(bytes) +} + +func NewConsumerReferrerClient(endpointAddress string, interval ...time.Duration) *ConsumerReferrerClient { + if endpointAddress == "" { + utils.LavaFormatInfo("Running with referrer Server Disabled") + return nil + } + + cuc := &ConsumerReferrerClient{ + QueueSender: NewQueueSender(endpointAddress, "ConsumerReferrer", ConsumerReferrerClient{}.aggregation, interval...), + } + return cuc +} + +func (cuc *ConsumerReferrerClient) AppendReferrer(referrer ReferrerRequest) { + if cuc == nil { + return + } + cuc.appendQueue(referrer) +} + +func (cuc ConsumerReferrerClient) aggregation(aggregate []fmt.Stringer) []fmt.Stringer { + referrers := map[string]ReferrerRequest{} + aggregated := []fmt.Stringer{} + for _, valueToAggregate := range aggregate { + referrerRequest, ok := valueToAggregate.(ReferrerRequest) + if !ok { + // it's something else in the queue + aggregated = append(aggregated, valueToAggregate) + continue + } + if referrerReq, ok := referrers[referrerRequest.ReferrerId]; ok { + referrerReq.Count += 1 + referrers[referrerRequest.ReferrerId] = referrerReq + } else { + referrers[referrerRequest.ReferrerId] = referrerRequest + } + } + for _, referrerReq := range referrers { + aggregated = append(aggregated, referrerReq) + } + return aggregated +} diff --git a/protocol/metrics/consumer_referrer_client_test.go b/protocol/metrics/consumer_referrer_client_test.go new file mode 100644 index 0000000000..4a9cde0a8b --- /dev/null +++ b/protocol/metrics/consumer_referrer_client_test.go @@ -0,0 +1,66 @@ +package metrics + +import ( + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestReferrerClientFlows(t *testing.T) { + t.Run("one-shot", func(t *testing.T) { + messages := []map[string]interface{}{} + reqMap := []map[string]interface{}{} + serverHandle := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Handle the incoming request and provide the desired response + data := make([]byte, r.ContentLength) + r.Body.Read(data) + err := json.Unmarshal(data, &reqMap) + require.NoError(t, err) + messages = append(messages, reqMap...) + reqMap = []map[string]interface{}{} + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, `{"jsonrpc":"2.0","id":1,"result":"0x10a7a08"}`) + }) + + mockServer := httptest.NewServer(serverHandle) + defer mockServer.Close() + endpoint := mockServer.URL + serverClient := NewConsumerReferrerClient(endpoint, 100*time.Millisecond) + serverClient.AppendReferrer(NewReferrerRequest("banana")) + serverClient.AppendReferrer(NewReferrerRequest("banana")) + serverClient.AppendReferrer(NewReferrerRequest("papaya")) + time.Sleep(110 * time.Millisecond) + require.Len(t, messages, 2) + bananas := 0 + papayas := 0 + for _, message := range messages { + if message["referer-id"] == "banana" { + bananas++ + require.Equal(t, message["count"], 2.0) + } else if message["referer-id"] == "papaya" { + papayas++ + } + } + require.Equal(t, bananas, 1) + require.Equal(t, papayas, 1) + }) +} + +func TestReferrerClientNull(t *testing.T) { + t.Run("null", func(t *testing.T) { + serverClient := NewConsumerReferrerClient("") + require.Nil(t, serverClient) + serverClient.AppendReferrer(NewReferrerRequest("banana")) + time.Sleep(110 * time.Millisecond) + getSender := func() ReferrerSender { + return serverClient + } + reporter := getSender() + reporter.AppendReferrer(NewReferrerRequest("banana")) + }) +} diff --git a/protocol/metrics/consumer_relayserver_client.go b/protocol/metrics/consumer_relayserver_client.go index c79183c571..60fbc5a6f5 100644 --- a/protocol/metrics/consumer_relayserver_client.go +++ b/protocol/metrics/consumer_relayserver_client.go @@ -84,7 +84,7 @@ func (cuc *ConsumerRelayServerClient) relayDataSendQueueTick() { cuc.lock.Unlock() }() } else { - utils.LavaFormatDebug("[CUC] server is busy skipping send", utils.LogAttr("id", cuc.sendID)) + utils.LavaFormatDebug("[CUC] server is busy/empty skipping send", utils.LogAttr("id", cuc.sendID)) } } diff --git a/protocol/metrics/consumer_reports_client.go b/protocol/metrics/consumer_reports_client.go new file mode 100644 index 0000000000..9f9e3adaef --- /dev/null +++ b/protocol/metrics/consumer_reports_client.go @@ -0,0 +1,113 @@ +package metrics + +import ( + "encoding/json" + "strings" + "time" + + "github.com/lavanet/lava/utils" + pairingtypes "github.com/lavanet/lava/x/pairing/types" +) + +const ( + reportName = "report" + conflictName = "conflict" +) + +type ConsumerReportsClient struct { + *QueueSender +} + +func NewReportsRequest(provider string, errors []error, specId string) ReportsRequest { + errorsStrings := []string{} + for _, err := range errors { + if err == nil { + continue + } + errorsStrings = append(errorsStrings, err.Error()) + } + return ReportsRequest{ + Name: reportName, + Errors: strings.Join(errorsStrings, ","), + Provider: provider, + SpecId: specId, + } +} + +type ReportsRequest struct { + Name string `json:"name"` + Errors string `json:"errors"` + Provider string `json:"provider"` + SpecId string `json:"spec_id"` +} + +func (rr ReportsRequest) String() string { + rr.Name = reportName + bytes, err := json.Marshal(rr) + if err != nil { + return "" + } + return string(bytes) +} + +type Reporter interface { + AppendReport(report ReportsRequest) + AppendConflict(report ConflictRequest) +} + +func NewConflictRequest(request1 *pairingtypes.RelayRequest, result1 *pairingtypes.RelayReply, request2 *pairingtypes.RelayRequest, result2 *pairingtypes.RelayReply) ConflictRequest { + return ConflictRequest{ + Name: conflictName, + Conflicts: []ConflictContainer{{ + Request: *request1, + Reply: *result1, + }, { + Request: *request2, + Reply: *result2, + }}, + } +} + +type ConflictContainer struct { + Request pairingtypes.RelayRequest `json:"request"` + Reply pairingtypes.RelayReply `json:"reply"` +} +type ConflictRequest struct { + Name string `json:"name"` + Conflicts []ConflictContainer `json:"conflicts"` +} + +func (rr ConflictRequest) String() string { + rr.Name = conflictName + bytes, err := json.Marshal(rr) + if err != nil { + return "" + } + return string(bytes) +} + +func NewConsumerReportsClient(endpointAddress string, interval ...time.Duration) *ConsumerReportsClient { + if endpointAddress == "" { + utils.LavaFormatInfo("Running with Consumer Relay Server Disabled") + return nil + } + + cuc := &ConsumerReportsClient{ + QueueSender: NewQueueSender(endpointAddress, "ConsumerReports", nil, interval...), + } + return cuc +} + +func (cuc *ConsumerReportsClient) AppendReport(report ReportsRequest) { + if cuc == nil { + return + } + cuc.appendQueue(report) +} + +func (cuc *ConsumerReportsClient) AppendConflict(report ConflictRequest) { + if cuc == nil { + return + } + cuc.appendQueue(report) +} diff --git a/protocol/metrics/consumer_reports_client_test.go b/protocol/metrics/consumer_reports_client_test.go new file mode 100644 index 0000000000..8ca7393159 --- /dev/null +++ b/protocol/metrics/consumer_reports_client_test.go @@ -0,0 +1,89 @@ +package metrics + +import ( + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "testing" + "time" + + pairingtypes "github.com/lavanet/lava/x/pairing/types" + "github.com/stretchr/testify/require" +) + +func TestReportsClientFlows(t *testing.T) { + t.Run("one-shot", func(t *testing.T) { + messages := []map[string]interface{}{} + reqMap := []map[string]interface{}{} + serverHandle := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Handle the incoming request and provide the desired response + data := make([]byte, r.ContentLength) + r.Body.Read(data) + err := json.Unmarshal(data, &reqMap) + require.NoError(t, err) + messages = append(messages, reqMap...) + reqMap = []map[string]interface{}{} + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, `{"jsonrpc":"2.0","id":1,"result":"0x10a7a08"}`) + }) + + mockServer := httptest.NewServer(serverHandle) + defer mockServer.Close() + endpoint := mockServer.URL + serverClient := NewConsumerReportsClient(endpoint, 100*time.Millisecond) + serverClient.AppendReport(NewReportsRequest("lava@test", []error{fmt.Errorf("bad"), fmt.Errorf("very-bad")}, "LAV1")) + serverClient.AppendReport(NewReportsRequest("lava@test", []error{fmt.Errorf("bad"), fmt.Errorf("very-bad")}, "LAV1")) + serverClient.AppendConflict(NewConflictRequest(&pairingtypes.RelayRequest{ + RelaySession: &pairingtypes.RelaySession{Provider: "lava@conflict0"}, + RelayData: &pairingtypes.RelayPrivateData{}, + }, &pairingtypes.RelayReply{ + Data: []byte{1, 2, 3}, + Sig: []byte{}, + LatestBlock: 0, + FinalizedBlocksHashes: []byte{}, + SigBlocks: []byte{}, + Metadata: []pairingtypes.Metadata{}, + }, &pairingtypes.RelayRequest{}, &pairingtypes.RelayReply{})) + time.Sleep(110 * time.Millisecond) + require.Len(t, messages, 3) + reports := 0 + conflicts := 0 + for _, message := range messages { + if message["name"] == "report" { + reports++ + } else if message["name"] == "conflict" { + conflicts++ + } + } + require.Equal(t, reports, 2) + require.Equal(t, conflicts, 1) + }) +} + +func TestReportsClientNull(t *testing.T) { + t.Run("null", func(t *testing.T) { + serverClient := NewConsumerReportsClient("") + require.Nil(t, serverClient) + serverClient.AppendReport(NewReportsRequest("lava@test", []error{fmt.Errorf("bad"), fmt.Errorf("very-bad")}, "LAV1")) + conflictData := NewConflictRequest(&pairingtypes.RelayRequest{ + RelaySession: &pairingtypes.RelaySession{Provider: "lava@conflict0"}, + RelayData: &pairingtypes.RelayPrivateData{}, + }, &pairingtypes.RelayReply{ + Data: []byte{1, 2, 3}, + Sig: []byte{}, + LatestBlock: 0, + FinalizedBlocksHashes: []byte{}, + SigBlocks: []byte{}, + Metadata: []pairingtypes.Metadata{}, + }, &pairingtypes.RelayRequest{}, &pairingtypes.RelayReply{}) + serverClient.AppendConflict(conflictData) + time.Sleep(110 * time.Millisecond) + getReporter := func() Reporter { + return serverClient + } + reporter := getReporter() + reporter.AppendConflict(conflictData) + reporter.AppendReport(NewReportsRequest("lava@test", []error{fmt.Errorf("bad"), fmt.Errorf("very-bad")}, "LAV1")) + }) +} diff --git a/protocol/metrics/queue_sender.go b/protocol/metrics/queue_sender.go new file mode 100644 index 0000000000..80519e8349 --- /dev/null +++ b/protocol/metrics/queue_sender.go @@ -0,0 +1,156 @@ +package metrics + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "sync" + "time" + + "github.com/lavanet/lava/utils" +) + +type QueueSender struct { + name string + endpointAddress string + addQueue []fmt.Stringer + ticker *time.Ticker + lock sync.RWMutex + sendID int + isSendQueueRunning bool + aggregationFunction func([]fmt.Stringer) []fmt.Stringer +} + +func NewQueueSender(endpointAddress string, name string, aggregationFunction func([]fmt.Stringer) []fmt.Stringer, interval ...time.Duration) *QueueSender { + if endpointAddress == "" { + return nil + } + tickerTime := 30 * time.Second + if len(interval) > 0 { + tickerTime = interval[0] + } + cuc := &QueueSender{ + name: name, + endpointAddress: endpointAddress, + ticker: time.NewTicker(tickerTime), + addQueue: make([]fmt.Stringer, 0), + aggregationFunction: aggregationFunction, + } + + go cuc.sendQueueStart() + + return cuc +} + +func (cuc *QueueSender) sendQueueStart() { + if cuc == nil { + return + } + utils.LavaFormatDebug(fmt.Sprintf("[QueueSender:%s] Starting sendQueueStart loop", cuc.name)) + for range cuc.ticker.C { + cuc.sendQueueTick() + } +} + +func (crc *QueueSender) sendQueueTick() { + if crc == nil { + return + } + + crc.lock.Lock() + defer crc.lock.Unlock() + + if !crc.isSendQueueRunning && len(crc.addQueue) > 0 { + sendQueue := crc.addQueue + crc.addQueue = make([]fmt.Stringer, 0) + crc.isSendQueueRunning = true + crc.sendID++ + utils.LavaFormatDebug(fmt.Sprintf("[QueueSender:%s] Swapped queues", crc.name), utils.LogAttr("sendQueue_length", len((sendQueue))), utils.LogAttr("send_id", crc.sendID)) + + sendID := crc.sendID + cucEndpointAddress := crc.endpointAddress + + go func() { + crc.sendData(sendQueue, sendID, cucEndpointAddress) + + crc.lock.Lock() + crc.isSendQueueRunning = false + crc.lock.Unlock() + }() + } else { + utils.LavaFormatDebug(fmt.Sprintf("[QueueSender:%s] server is busy skipping send", crc.name), utils.LogAttr("id", crc.sendID)) + } +} + +func (cuc *QueueSender) appendQueue(request fmt.Stringer) { + if cuc == nil { + return + } + cuc.lock.Lock() + defer cuc.lock.Unlock() + cuc.addQueue = append(cuc.addQueue, request) +} + +func (crc *QueueSender) send(sendQueue []fmt.Stringer, sendID int, endpointAddress string) (*http.Response, error) { + if crc == nil { + return nil, utils.LavaFormatError("QueueSender is nil. misuse detected", nil) + } + + if len(sendQueue) == 0 { + return nil, errors.New("sendQueue is empty") + } + client := &http.Client{ + Timeout: 10 * time.Second, + } + + jsonData, err := json.Marshal(sendQueue) + if err != nil { + return nil, utils.LavaFormatError("Failed marshaling aggregated requests", err) + } + + var resp *http.Response + for i := 0; i < 3; i++ { + resp, err = client.Post(endpointAddress, "application/json", bytes.NewBuffer(jsonData)) + if err != nil { + utils.LavaFormatDebug(fmt.Sprintf("[QueueSender:%s] Failed to post request", crc.name), utils.LogAttr("Attempt", i+1), utils.LogAttr("err", err)) + time.Sleep(2 * time.Second) + } else { + return resp, nil + } + } + + return nil, utils.LavaFormatWarning(fmt.Sprintf("[QueueSender:%s] Failed to send requests after 3 attempts", crc.name), err) +} + +func (crc *QueueSender) handleSendResponse(resp *http.Response, sendID int) { + if crc == nil { + return + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + utils.LavaFormatWarning(fmt.Sprintf("[QueueSender:%s] failed reading response body", crc.name), err) + } else { + utils.LavaFormatWarning(fmt.Sprintf("[QueueSender:%s] Received non-200 status code", crc.name), nil, utils.LogAttr("status_code", resp.StatusCode), utils.LogAttr("body", string(bodyBytes))) + } + } +} + +func (cuc *QueueSender) sendData(sendQueue []fmt.Stringer, sendID int, cucEndpointAddress string) { + if cuc == nil { + return + } + if cuc.aggregationFunction != nil { + sendQueue = cuc.aggregationFunction(sendQueue) + } + resp, err := cuc.send(sendQueue, sendID, cucEndpointAddress) + if err != nil { + utils.LavaFormatWarning("[QueueSender] failed sendRelay data", err) + return + } + cuc.handleSendResponse(resp, sendID) +} diff --git a/protocol/metrics/relays_monitor.go b/protocol/metrics/relays_monitor.go index 30606fce73..aafdaa47c9 100644 --- a/protocol/metrics/relays_monitor.go +++ b/protocol/metrics/relays_monitor.go @@ -5,6 +5,8 @@ import ( "sync" "sync/atomic" "time" + + "github.com/lavanet/lava/utils" ) type RelaysMonitor struct { @@ -61,6 +63,11 @@ func (sem *RelaysMonitor) startInner(ctx context.Context) { select { case <-sem.ticker.C: success, _ := sem.relaySender() + utils.LavaFormatInfo("Health Check Interval Check", + utils.LogAttr("chain", sem.chainID), + utils.LogAttr("apiInterface", sem.apiInterface), + utils.LogAttr("health result", success), + ) sem.storeHealthStatus(success) case <-ctx.Done(): sem.ticker.Stop() diff --git a/protocol/rpcconsumer/rpcconsumer.go b/protocol/rpcconsumer/rpcconsumer.go index 22c5da94ee..7246e48b12 100644 --- a/protocol/rpcconsumer/rpcconsumer.go +++ b/protocol/rpcconsumer/rpcconsumer.go @@ -42,6 +42,7 @@ const ( DebugProbesFlagName = "debug-probes" refererBackendAddressFlagName = "referer-be-address" refererMarkerFlagName = "referer-marker" + reportsSendBEAddress = "reports-be-address" ) var ( @@ -100,6 +101,7 @@ type ConsumerStateTrackerInf interface { type AnalyticsServerAddressess struct { MetricsListenAddress string RelayServerAddress string + ReportsAddressFlag string } type RPCConsumer struct { consumerStateTracker ConsumerStateTrackerInf @@ -124,10 +126,10 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt if common.IsTestMode(ctx) { testModeWarn("RPCConsumer running tests") } - + options.refererData.ReferrerClient = metrics.NewConsumerReferrerClient(options.refererData.Address) + consumerReportsManager := metrics.NewConsumerReportsClient(options.analyticsServerAddressess.ReportsAddressFlag) consumerMetricsManager := metrics.NewConsumerMetricsManager(options.analyticsServerAddressess.MetricsListenAddress) // start up prometheus metrics consumerUsageserveManager := metrics.NewConsumerRelayServerClient(options.analyticsServerAddressess.RelayServerAddress) // start up relay server reporting - rpcConsumerMetrics, err := metrics.NewRPCConsumerLogs(consumerMetricsManager, consumerUsageserveManager) if err != nil { utils.LavaFormatFatal("failed creating RPCConsumer logs", err) @@ -280,7 +282,7 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt } // Register For Updates - consumerSessionManager := lavasession.NewConsumerSessionManager(rpcEndpoint, optimizer, consumerMetricsManager) + consumerSessionManager := lavasession.NewConsumerSessionManager(rpcEndpoint, optimizer, consumerMetricsManager, consumerReportsManager) rpcc.consumerStateTracker.RegisterConsumerSessionManagerForPairingUpdates(ctx, consumerSessionManager) var relaysMonitor *metrics.RelaysMonitor @@ -290,7 +292,7 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt } rpcConsumerServer := &RPCConsumerServer{} utils.LavaFormatInfo("RPCConsumer Listening", utils.Attribute{Key: "endpoints", Value: rpcEndpoint.String()}) - err = rpcConsumerServer.ServeRPCRequests(ctx, rpcEndpoint, rpcc.consumerStateTracker, chainParser, finalizationConsensus, consumerSessionManager, options.requiredResponses, privKey, lavaChainID, options.cache, rpcConsumerMetrics, consumerAddr, consumerConsistency, relaysMonitor, options.cmdFlags, options.stateShare, options.refererData) + err = rpcConsumerServer.ServeRPCRequests(ctx, rpcEndpoint, rpcc.consumerStateTracker, chainParser, finalizationConsensus, consumerSessionManager, options.requiredResponses, privKey, lavaChainID, options.cache, rpcConsumerMetrics, consumerAddr, consumerConsistency, relaysMonitor, options.cmdFlags, options.stateShare, options.refererData, consumerReportsManager) if err != nil { err = utils.LavaFormatError("failed serving rpc requests", err, utils.Attribute{Key: "endpoint", Value: rpcEndpoint}) errCh <- err @@ -501,6 +503,7 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77 analyticsServerAddressess := AnalyticsServerAddressess{ MetricsListenAddress: viper.GetString(metrics.MetricsListenFlagName), RelayServerAddress: viper.GetString(metrics.RelayServerFlagName), + ReportsAddressFlag: viper.GetString(reportsSendBEAddress), } var refererData *chainlib.RefererData @@ -556,6 +559,7 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77 cmdRPCConsumer.Flags().Duration(common.RelayHealthIntervalFlag, RelayHealthIntervalFlagDefault, "interval between relay health checks") cmdRPCConsumer.Flags().String(refererBackendAddressFlagName, "", "address to send referer to") cmdRPCConsumer.Flags().String(refererMarkerFlagName, "lava-referer-", "the string marker to identify referer") + cmdRPCConsumer.Flags().String(reportsSendBEAddress, "", "address to send reports to") cmdRPCConsumer.Flags().BoolVar(&lavasession.DebugProbes, DebugProbesFlagName, false, "adding information to probes") common.AddRollingLogConfig(cmdRPCConsumer) return cmdRPCConsumer diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index 1e3db7a3c2..5ee1181926 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -51,6 +51,7 @@ type RPCConsumerServer struct { consumerConsistency *ConsumerConsistency sharedState bool // using the cache backend to sync the latest seen block with other consumers relaysMonitor *metrics.RelaysMonitor + reporter metrics.Reporter } type relayResponse struct { @@ -80,6 +81,7 @@ func (rpccs *RPCConsumerServer) ServeRPCRequests(ctx context.Context, listenEndp cmdFlags common.ConsumerCmdFlags, sharedState bool, refererData *chainlib.RefererData, + reporter metrics.Reporter, ) (err error) { rpccs.consumerSessionManager = consumerSessionManager rpccs.listenEndpoint = listenEndpoint @@ -94,7 +96,7 @@ func (rpccs *RPCConsumerServer) ServeRPCRequests(ctx context.Context, listenEndp rpccs.consumerAddress = consumerAddress rpccs.consumerConsistency = consumerConsistency rpccs.sharedState = sharedState - + rpccs.reporter = reporter chainListener, err := chainlib.NewChainListener(ctx, listenEndpoint, rpccs, rpccs, rpcConsumerLogs, chainParser, refererData) if err != nil { return err @@ -865,7 +867,13 @@ func (rpccs *RPCConsumerServer) sendDataReliabilityRelayIfApplicable(ctx context if err != nil { utils.LavaFormatError("could not send detection Transaction", err, utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "conflict", Value: conflict}) } + if rpccs.reporter != nil { + utils.LavaFormatDebug("sending conflict report to BE", utils.LogAttr("conflicting api", chainMessage.GetApi().Name)) + rpccs.reporter.AppendConflict(metrics.NewConflictRequest(relayResult.Request, relayResult.Reply, relayResultDataReliability.Request, relayResultDataReliability.Reply)) + } } + } else { + utils.LavaFormatDebug("[+] verified relay successfully with data reliability", utils.LogAttr("api", chainMessage.GetApi().Name)) } return nil } diff --git a/protocol/rpcprovider/reliabilitymanager/reliability_manager_test.go b/protocol/rpcprovider/reliabilitymanager/reliability_manager_test.go index ffa623e6de..a8f9977f9f 100644 --- a/protocol/rpcprovider/reliabilitymanager/reliability_manager_test.go +++ b/protocol/rpcprovider/reliabilitymanager/reliability_manager_test.go @@ -44,28 +44,26 @@ func TestFullFlowReliabilityCompare(t *testing.T) { specId := "LAV1" epoch := int64(100) singleConsumerSession := &lavasession.SingleConsumerSession{ - CuSum: 20, - LatestRelayCu: 10, // set by GetSessions cuNeededForSession - QoSInfo: lavasession.QoSReport{LastQoSReport: &pairingtypes.QualityOfServiceReport{}}, - SessionId: 123, - Parent: nil, - RelayNum: 1, - LatestBlock: epoch, - Endpoint: nil, - BlockListed: false, // if session lost sync we blacklist it. - ConsecutiveNumberOfFailures: 0, // number of times this session has failed + CuSum: 20, + LatestRelayCu: 10, // set by GetSessions cuNeededForSession + QoSInfo: lavasession.QoSReport{LastQoSReport: &pairingtypes.QualityOfServiceReport{}}, + SessionId: 123, + Parent: nil, + RelayNum: 1, + LatestBlock: epoch, + Endpoint: nil, + BlockListed: false, // if session lost sync we blacklist it. } singleConsumerSession2 := &lavasession.SingleConsumerSession{ - CuSum: 200, - LatestRelayCu: 100, // set by GetSessions cuNeededForSession - QoSInfo: lavasession.QoSReport{LastQoSReport: &pairingtypes.QualityOfServiceReport{}}, - SessionId: 456, - Parent: nil, - RelayNum: 5, - LatestBlock: epoch, - Endpoint: nil, - BlockListed: false, // if session lost sync we blacklist it. - ConsecutiveNumberOfFailures: 0, // number of times this session has failed + CuSum: 200, + LatestRelayCu: 100, // set by GetSessions cuNeededForSession + QoSInfo: lavasession.QoSReport{LastQoSReport: &pairingtypes.QualityOfServiceReport{}}, + SessionId: 456, + Parent: nil, + RelayNum: 5, + LatestBlock: epoch, + Endpoint: nil, + BlockListed: false, // if session lost sync we blacklist it. } metadataValue := make([]pairingtypes.Metadata, 1) metadataValue[0] = pairingtypes.Metadata{ @@ -198,28 +196,26 @@ func TestFullFlowReliabilityConflict(t *testing.T) { consumerSesssionWithProvider := &lavasession.ConsumerSessionsWithProvider{} singleConsumerSession := &lavasession.SingleConsumerSession{ - CuSum: 20, - LatestRelayCu: 10, // set by GetSessions cuNeededForSession - QoSInfo: lavasession.QoSReport{LastQoSReport: &pairingtypes.QualityOfServiceReport{}}, - SessionId: 123, - Parent: nil, - RelayNum: 1, - LatestBlock: epoch, - Endpoint: nil, - BlockListed: false, // if session lost sync we blacklist it. - ConsecutiveNumberOfFailures: 0, // number of times this session has failed + CuSum: 20, + LatestRelayCu: 10, // set by GetSessions cuNeededForSession + QoSInfo: lavasession.QoSReport{LastQoSReport: &pairingtypes.QualityOfServiceReport{}}, + SessionId: 123, + Parent: nil, + RelayNum: 1, + LatestBlock: epoch, + Endpoint: nil, + BlockListed: false, // if session lost sync we blacklist it. } singleConsumerSession2 := &lavasession.SingleConsumerSession{ - CuSum: 200, - LatestRelayCu: 100, // set by GetSessions cuNeededForSession - QoSInfo: lavasession.QoSReport{LastQoSReport: &pairingtypes.QualityOfServiceReport{}}, - SessionId: 456, - Parent: consumerSesssionWithProvider, - RelayNum: 5, - LatestBlock: epoch, - Endpoint: nil, - BlockListed: false, // if session lost sync we blacklist it. - ConsecutiveNumberOfFailures: 0, // number of times this session has failed + CuSum: 200, + LatestRelayCu: 100, // set by GetSessions cuNeededForSession + QoSInfo: lavasession.QoSReport{LastQoSReport: &pairingtypes.QualityOfServiceReport{}}, + SessionId: 456, + Parent: consumerSesssionWithProvider, + RelayNum: 5, + LatestBlock: epoch, + Endpoint: nil, + BlockListed: false, // if session lost sync we blacklist it. } metadataValue := make([]pairingtypes.Metadata, 1) metadataValue[0] = pairingtypes.Metadata{ diff --git a/scripts/init_chain_commands.sh b/scripts/init_chain_commands.sh index d37bd609e2..40f4ce8762 100755 --- a/scripts/init_chain_commands.sh +++ b/scripts/init_chain_commands.sh @@ -66,7 +66,8 @@ lavad tx gov vote $(latest_vote) yes -y --from alice --gas-adjustment "1.5" --ga echo; echo "#### Sending proposal for plans del ####" lavad tx subscription buy DefaultPlan $(lavad keys show user1 -a) --enable-auto-renewal -y --from user1 --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE -lavad tx project set-policy $(lavad keys show user1 -a)-admin ./cookbook/projects/policy_all_chains_with_addon.yml -y --from user1 --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE +# wait_count_blocks 2 +# lavad tx project set-policy $(lavad keys show user1 -a)-admin ./cookbook/projects/policy_all_chains_with_addon.yml -y --from user1 --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE # MANTLE CHAINS="ETH1,GTH1,SEP1,COS3,FTM250,CELO,LAV1,COS4,ALFAJORES,ARB1,ARBN,APT1,STRK,JUN1,COS5,POLYGON1,EVMOS,OPTM,BASET,CANTO,SUIT,SOLANA,BSC,AXELAR,AVAX,FVM,NEAR,SQDSUBGRAPH,AGR,AGRT,KOIIT"