Skip to content

Commit

Permalink
fix: PRT-fix-relay-timeout-issues (#1512)
Browse files Browse the repository at this point in the history
* fix print for grpc

* fix timeout and add longish timeout duration for archive calls

* add test utils

* add debug information for easier debugging relays

* fixing an issue where a relay could terminate early by exhausting the number of retries on no pairing available error

* changing debug relays transfer to relay processor

---------

Co-authored-by: Omer <[email protected]>
  • Loading branch information
ranlavanet and omerlavanet authored Jun 26, 2024
1 parent 50c8f05 commit fe02438
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 16 deletions.
4 changes: 2 additions & 2 deletions protocol/chainlib/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,10 +325,10 @@ func (apil *GrpcChainListener) Serve(ctx context.Context, cmdFlags common.Consum

if err != nil {
errMasking := apil.logger.GetUniqueGuidResponseForError(err, msgSeed)
apil.logger.LogRequestAndResponse("http in/out", true, method, string(reqBody), "", errMasking, msgSeed, time.Since(startTime), err)
apil.logger.LogRequestAndResponse("grpc in/out", true, method, string(reqBody), "", errMasking, msgSeed, time.Since(startTime), err)
return nil, nil, utils.LavaFormatError("Failed to SendRelay", fmt.Errorf(errMasking))
}
apil.logger.LogRequestAndResponse("http in/out", false, method, string(reqBody), "", "", msgSeed, time.Since(startTime), nil)
apil.logger.LogRequestAndResponse("grpc in/out", false, method, string(reqBody), "", "", msgSeed, time.Since(startTime), nil)

// try checking for node errors.
nodeError := &GrpcNodeErrorResponse{}
Expand Down
6 changes: 5 additions & 1 deletion protocol/common/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const (
AverageWorldLatency = 300 * time.Millisecond
CommunicateWithLocalLavaNodeTimeout = (3 * time.Second) + AverageWorldLatency
DefaultTimeout = 30 * time.Second
DefaultTimeoutLongIsh = 1 * time.Minute
DefaultTimeoutLong = 3 * time.Minute
CacheTimeout = 50 * time.Millisecond
)
Expand Down Expand Up @@ -72,7 +73,10 @@ type TimeoutInfo struct {

func GetTimeoutForProcessing(relayTimeout time.Duration, timeoutInfo TimeoutInfo) time.Duration {
ctxTimeout := DefaultTimeout
if timeoutInfo.Hanging || timeoutInfo.CU > 100 || timeoutInfo.Stateful == CONSISTENCY_SELECT_ALL_PROVIDERS {
if timeoutInfo.CU >= 50 { // for heavyish relays we set longish timeout :)
ctxTimeout = DefaultTimeoutLongIsh
}
if timeoutInfo.Hanging || timeoutInfo.CU >= 100 || timeoutInfo.Stateful == CONSISTENCY_SELECT_ALL_PROVIDERS {
ctxTimeout = DefaultTimeoutLong
}
if relayTimeout > ctxTimeout {
Expand Down
19 changes: 18 additions & 1 deletion protocol/rpcconsumer/relay_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const (
BestResult // get the best result, even if it means waiting
)

func NewRelayProcessor(ctx context.Context, usedProviders *lavasession.UsedProviders, requiredSuccesses int, chainMessage chainlib.ChainMessage, consumerConsistency *ConsumerConsistency, dappID string, consumerIp string) *RelayProcessor {
func NewRelayProcessor(ctx context.Context, usedProviders *lavasession.UsedProviders, requiredSuccesses int, chainMessage chainlib.ChainMessage, consumerConsistency *ConsumerConsistency, dappID string, consumerIp string, debugRelay bool) *RelayProcessor {
guid, _ := utils.GetUniqueIdentifier(ctx)
selection := Quorum // select the majority of node responses
if chainlib.GetStateful(chainMessage) == common.CONSISTENCY_SELECT_ALL_PROVIDERS {
Expand All @@ -49,6 +49,7 @@ func NewRelayProcessor(ctx context.Context, usedProviders *lavasession.UsedProvi
consumerConsistency: consumerConsistency,
dappID: dappID,
consumerIp: consumerIp,
debugRelay: debugRelay,
}
}

Expand All @@ -67,6 +68,7 @@ type RelayProcessor struct {
dappID string
consumerIp string
skipDataReliability bool
debugRelay bool
allowSessionDegradation uint32 // used in the scenario where extension was previously used.
}

Expand Down Expand Up @@ -408,6 +410,21 @@ func (rp *RelayProcessor) ProcessingResult() (returnedResult *common.RelayResult
nodeResults := rp.nodeResultsInner()
// there are not enough successes, let's check if there are enough node errors

if rp.debugRelay {
// adding as much debug info as possible. all successful relays, all node errors and all protocol errors
utils.LavaFormatDebug("[Processing Result] Debug Relay", utils.LogAttr("rp.requiredSuccesses", rp.requiredSuccesses))
utils.LavaFormatDebug("[Processing Debug] number of node results", utils.LogAttr("len(rp.successResults)", len(rp.successResults)), utils.LogAttr("len(rp.nodeResponseErrors.relayErrors)", len(rp.nodeResponseErrors.relayErrors)), utils.LogAttr("len(rp.protocolResponseErrors.relayErrors)", len(rp.protocolResponseErrors.relayErrors)))
for idx, result := range rp.successResults {
utils.LavaFormatDebug("[Processing Debug] success result", utils.LogAttr("idx", idx), utils.LogAttr("result", result))
}
for idx, result := range rp.nodeResponseErrors.relayErrors {
utils.LavaFormatDebug("[Processing Debug] node result", utils.LogAttr("idx", idx), utils.LogAttr("result", result))
}
for idx, result := range rp.protocolResponseErrors.relayErrors {
utils.LavaFormatDebug("[Processing Debug] protocol error", utils.LogAttr("idx", idx), utils.LogAttr("result", result))
}
}

if len(nodeResults) >= rp.requiredSuccesses {
if rp.selection == Quorum {
return rp.responsesQuorum(nodeResults, rp.requiredSuccesses)
Expand Down
14 changes: 7 additions & 7 deletions protocol/rpcconsumer/relay_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestRelayProcessorHappyFlow(t *testing.T) {
require.NoError(t, err)
chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
require.NoError(t, err)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "")
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false)

usedProviders := relayProcessor.GetUsedProviders()
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
Expand Down Expand Up @@ -127,7 +127,7 @@ func TestRelayProcessorTimeout(t *testing.T) {
require.NoError(t, err)
chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
require.NoError(t, err)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "")
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false)

usedProviders := relayProcessor.GetUsedProviders()
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
Expand Down Expand Up @@ -179,7 +179,7 @@ func TestRelayProcessorRetry(t *testing.T) {
require.NoError(t, err)
chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
require.NoError(t, err)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "")
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false)

usedProviders := relayProcessor.GetUsedProviders()
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
Expand Down Expand Up @@ -223,7 +223,7 @@ func TestRelayProcessorRetryNodeError(t *testing.T) {
require.NoError(t, err)
chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
require.NoError(t, err)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "")
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false)

usedProviders := relayProcessor.GetUsedProviders()
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
Expand Down Expand Up @@ -268,7 +268,7 @@ func TestRelayProcessorStatefulApi(t *testing.T) {
require.NoError(t, err)
chainMsg, err := chainParser.ParseMsg("/cosmos/tx/v1beta1/txs", []byte("data"), http.MethodPost, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
require.NoError(t, err)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "")
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false)
usedProviders := relayProcessor.GetUsedProviders()
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
defer cancel()
Expand Down Expand Up @@ -313,7 +313,7 @@ func TestRelayProcessorStatefulApiErr(t *testing.T) {
require.NoError(t, err)
chainMsg, err := chainParser.ParseMsg("/cosmos/tx/v1beta1/txs", []byte("data"), http.MethodPost, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
require.NoError(t, err)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "")
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false)
usedProviders := relayProcessor.GetUsedProviders()
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
defer cancel()
Expand Down Expand Up @@ -357,7 +357,7 @@ func TestRelayProcessorLatest(t *testing.T) {
require.NoError(t, err)
chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/latest", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
require.NoError(t, err)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "")
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false)
usedProviders := relayProcessor.GetUsedProviders()
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
defer cancel()
Expand Down
35 changes: 30 additions & 5 deletions protocol/rpcconsumer/rpcconsumer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (rpccs *RPCConsumerServer) craftRelay(ctx context.Context) (ok bool, relay
func (rpccs *RPCConsumerServer) sendRelayWithRetries(ctx context.Context, retries int, initialRelays bool, relay *pairingtypes.RelayPrivateData, chainMessage chainlib.ChainMessage) (bool, error) {
success := false
var err error
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMessage, rpccs.consumerConsistency, "-init-", "")
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMessage, rpccs.consumerConsistency, "-init-", "", rpccs.debugRelays)
for i := 0; i < retries; i++ {
err = rpccs.sendRelayToProvider(ctx, chainMessage, relay, "-init-", "", relayProcessor)
if lavasession.PairingListEmptyError.Is(err) {
Expand Down Expand Up @@ -340,7 +340,7 @@ func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, directiveH
// make sure all of the child contexts are cancelled when we exit
ctx, cancel := context.WithCancel(ctx)
defer cancel()
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(directiveHeaders), rpccs.requiredResponses, chainMessage, rpccs.consumerConsistency, dappID, consumerIp)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(directiveHeaders), rpccs.requiredResponses, chainMessage, rpccs.consumerConsistency, dappID, consumerIp, rpccs.debugRelays)
var err error
// try sending a relay 3 times. if failed return the error
for retryFirstRelayAttempt := 0; retryFirstRelayAttempt < SendRelayAttempts; retryFirstRelayAttempt++ {
Expand All @@ -359,6 +359,9 @@ func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, directiveH
// a channel to be notified processing was done, true means we have results and can return
gotResults := make(chan bool)
processingTimeout, relayTimeout := rpccs.getProcessingTimeout(chainMessage)
if rpccs.debugRelays {
utils.LavaFormatDebug("Relay initiated with the following timeout schedule", utils.LogAttr("processingTimeout", processingTimeout), utils.LogAttr("newRelayTimeout", relayTimeout))
}
// create the processing timeout prior to entering the method so it wont reset every time
processingCtx, processingCtxCancel := context.WithTimeout(ctx, processingTimeout)
defer processingCtxCancel()
Expand Down Expand Up @@ -410,7 +413,11 @@ func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, directiveH
err := rpccs.sendRelayToProvider(processingCtx, chainMessage, relayRequestData, dappID, consumerIp, relayProcessor)
go validateReturnCondition(err)
go readResultsFromProcessor()
numberOfRetriesLaunched++
// increase number of retries launched only if we still have pairing available, if we exhausted the list we don't want to break early
// so it will just wait for the entire duration of the relay
if !lavasession.PairingListEmptyError.Is(err) {
numberOfRetriesLaunched++
}
case <-startNewBatchTicker.C:
// only trigger another batch for non BestResult relays or if we didn't pass the retry limit.
if relayProcessor.ShouldRetry(numberOfRetriesLaunched) {
Expand All @@ -420,7 +427,11 @@ func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, directiveH
go validateReturnCondition(err)
// add ticker launch metrics
go rpccs.rpcConsumerLogs.SetRelaySentByNewBatchTickerMetric(rpccs.getChainIdAndApiInterface())
numberOfRetriesLaunched++
// increase number of retries launched only if we still have pairing available, if we exhausted the list we don't want to break early
// so it will just wait for the entire duration of the relay
if !lavasession.PairingListEmptyError.Is(err) {
numberOfRetriesLaunched++
}
}
case returnErr := <-returnCondition:
// we use this channel because there could be a race condition between us releasing the provider and about to send the return
Expand Down Expand Up @@ -576,6 +587,17 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider(
relayProcessor.SetDisallowDegradation()
}

if rpccs.debugRelays {
utils.LavaFormatDebug("[Before Send] returned the following sessions",
utils.LogAttr("sessions", sessions),
utils.LogAttr("usedProviders.GetUnwantedProvidersToSend", usedProviders.GetUnwantedProvidersToSend()),
utils.LogAttr("usedProviders.GetErroredProviders", usedProviders.GetErroredProviders()),
utils.LogAttr("addons", addon),
utils.LogAttr("extensions", extensions),
utils.LogAttr("AllowSessionDegradation", relayProcessor.GetAllowSessionDegradation()),
)
}

// Iterate over the sessions map
for providerPublicAddress, sessionInfo := range sessions {
// Launch a separate goroutine for each session
Expand Down Expand Up @@ -762,6 +784,9 @@ func (rpccs *RPCConsumerServer) relayInner(ctx context.Context, singleConsumerSe
endpointClient := *singleConsumerSession.Endpoint.Client
providerPublicAddress := relayResult.ProviderInfo.ProviderAddress
relayRequest := relayResult.Request
if rpccs.debugRelays {
utils.LavaFormatDebug("Sending relay", utils.LogAttr("timeout", relayTimeout), utils.LogAttr("GUID", ctx), utils.LogAttr("provider", relayRequest.RelaySession.Provider))
}
callRelay := func() (reply *pairingtypes.RelayReply, relayLatency time.Duration, err error, backoff bool) {
relaySentTime := time.Now()
connectCtx, connectCtxCancel := context.WithTimeout(ctx, relayTimeout)
Expand Down Expand Up @@ -911,7 +936,7 @@ func (rpccs *RPCConsumerServer) sendDataReliabilityRelayIfApplicable(ctx context
relayResult := results[0]
if len(results) < 2 {
relayRequestData := lavaprotocol.NewRelayData(ctx, relayResult.Request.RelayData.ConnectionType, relayResult.Request.RelayData.ApiUrl, relayResult.Request.RelayData.Data, relayResult.Request.RelayData.SeenBlock, reqBlock, relayResult.Request.RelayData.ApiInterface, chainMessage.GetRPCMessage().GetHeaders(), relayResult.Request.RelayData.Addon, relayResult.Request.RelayData.Extensions)
relayProcessorDataReliability := NewRelayProcessor(ctx, relayProcessor.usedProviders, 1, chainMessage, rpccs.consumerConsistency, dappID, consumerIp)
relayProcessorDataReliability := NewRelayProcessor(ctx, relayProcessor.usedProviders, 1, chainMessage, rpccs.consumerConsistency, dappID, consumerIp, rpccs.debugRelays)
err := rpccs.sendRelayToProvider(ctx, chainMessage, relayRequestData, dappID, consumerIp, relayProcessorDataReliability)
if err != nil {
return utils.LavaFormatWarning("failed data reliability relay to provider", err, utils.LogAttr("relayProcessorDataReliability", relayProcessorDataReliability))
Expand Down

0 comments on commit fe02438

Please sign in to comment.