Skip to content

Commit

Permalink
fix: PRT - solving caching errors forever issue (#1388)
Browse files Browse the repository at this point in the history
* changing protobuf

* compiling protobuf

* adjusting chain fetcher

* fixing chain router

* chainlib interface adjustment

* creating relayWrapper

* fixing grpc test

* fixing jsonrpc test

* fixing rest tests

* fixing grpc

* fixing jsonrpc

* fixing rest

* fixing tendermint tests

* fixing tendermint

* propagating http requests status code

* commenting todos

* adding status code validation to rpc provider

* adding node error validation to rpc consumer

* fixing reliability manager

* adding command

* setting with limited ttl

* adding server flags

* space fix

* fix lint

* fix lint

* fix pointer deref

* checking nil

* checking nil

* checking nil 2

* checking nil 3

* checking nil 4

* checking nil 5

* checking nil 6
  • Loading branch information
ranlavanet authored Apr 30, 2024
1 parent 6a97f43 commit 501d7da
Show file tree
Hide file tree
Showing 21 changed files with 241 additions and 112 deletions.
1 change: 1 addition & 0 deletions ecosystem/cache/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ longer DefaultExpirationForNonFinalized will reduce sync QoS for "latest" reques
cacheCmd.Flags().String(FlagLogLevel, zerolog.InfoLevel.String(), "The logging level (trace|debug|info|warn|error|fatal|panic)")
cacheCmd.Flags().Duration(ExpirationFlagName, DefaultExpirationTimeFinalized, "how long does a cache entry lasts in the cache for a finalized entry")
cacheCmd.Flags().Duration(ExpirationNonFinalizedFlagName, DefaultExpirationForNonFinalized, "how long does a cache entry lasts in the cache for a non finalized entry")
cacheCmd.Flags().Duration(ExpirationNodeErrorsOnFinalizedFlagName, DefaultExpirationNodeErrors, "how long does a cache entry lasts in the cache for a finalized node error entry")
cacheCmd.Flags().String(FlagMetricsAddress, DisabledFlagOption, "address to listen to prometheus metrics 127.0.0.1:5555, later you can curl http://127.0.0.1:5555/metrics")
cacheCmd.Flags().Int64(FlagCacheSizeName, 2*1024*1024*1024, "the maximal amount of entries to save")
return cacheCmd
Expand Down
10 changes: 8 additions & 2 deletions ecosystem/cache/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,11 +263,17 @@ func (s *RelayerCacheServer) SetRelay(ctx context.Context, relayCacheSet *pairin
utils.Attribute{Key: "requested_block", Value: relayCacheSet.RequestedBlock},
utils.Attribute{Key: "response_data", Value: parser.CapStringLen(string(relayCacheSet.Response.Data))},
utils.Attribute{Key: "requestHash", Value: string(relayCacheSet.BlockHash)},
utils.Attribute{Key: "latestKnownBlock", Value: string(relayCacheSet.BlockHash)})
utils.Attribute{Key: "latestKnownBlock", Value: string(relayCacheSet.BlockHash)},
utils.Attribute{Key: "IsNodeError", Value: relayCacheSet.IsNodeError},
)
// finalized entries can stay there
if relayCacheSet.Finalized {
cache := s.CacheServer.finalizedCache
cache.SetWithTTL(cacheKey, cacheValue, cacheValue.Cost(), s.CacheServer.ExpirationFinalized)
if relayCacheSet.IsNodeError {
cache.SetWithTTL(cacheKey, cacheValue, cacheValue.Cost(), s.CacheServer.ExpirationNodeErrors)
} else {
cache.SetWithTTL(cacheKey, cacheValue, cacheValue.Cost(), s.CacheServer.ExpirationFinalized)
}
} else {
cache := s.CacheServer.tempCache
cache.SetWithTTL(cacheKey, cacheValue, cacheValue.Cost(), s.getExpirationForChain(time.Duration(relayCacheSet.AverageBlockTime), relayCacheSet.BlockHash))
Expand Down
15 changes: 9 additions & 6 deletions ecosystem/cache/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,22 @@ import (
)

const (
ExpirationFlagName = "expiration"
ExpirationNonFinalizedFlagName = "expiration-non-finalized"
FlagCacheSizeName = "max-items"
DefaultExpirationForNonFinalized = 500 * time.Millisecond
DefaultExpirationTimeFinalized = time.Hour
CacheNumCounters = 100000000 // expect 10M items
ExpirationFlagName = "expiration"
ExpirationNonFinalizedFlagName = "expiration-non-finalized"
ExpirationNodeErrorsOnFinalizedFlagName = "expiration-finalized-node-errors"
FlagCacheSizeName = "max-items"
DefaultExpirationForNonFinalized = 500 * time.Millisecond
DefaultExpirationTimeFinalized = time.Hour
DefaultExpirationNodeErrors = 5 * time.Second
CacheNumCounters = 100000000 // expect 10M items
)

type CacheServer struct {
finalizedCache *ristretto.Cache
tempCache *ristretto.Cache
ExpirationFinalized time.Duration
ExpirationNonFinalized time.Duration
ExpirationNodeErrors time.Duration
CacheMetrics *CacheMetrics
CacheMaxCost int64
}
Expand Down
1 change: 1 addition & 0 deletions proto/lavanet/lava/pairing/relayCache.proto
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,5 @@ message RelayCacheSet {
string chain_id = 9; // used to set latest block per chain.
int64 seen_block = 10;
int64 average_block_time = 11;
bool is_node_error = 12; // node errors wont be cached for long even if they are finalized in cases where it returns a valid response later on
}
23 changes: 13 additions & 10 deletions protocol/chainlib/chain_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,11 @@ func (cf *ChainFetcher) Verify(ctx context.Context, verification VerificationCon
if err != nil {
return utils.LavaFormatWarning("[-] verify failed sending chainMessage", err, []utils.Attribute{{Key: "chainID", Value: cf.endpoint.ChainID}, {Key: "APIInterface", Value: cf.endpoint.ApiInterface}}...)
}
if reply == nil || reply.RelayReply == nil {
return utils.LavaFormatWarning("[-] verify failed sending chainMessage, reply or reply.RelayReply are nil", nil, []utils.Attribute{{Key: "chainID", Value: cf.endpoint.ChainID}, {Key: "APIInterface", Value: cf.endpoint.ApiInterface}}...)
}

parserInput, err := FormatResponseForParsing(reply, chainMessage)
parserInput, err := FormatResponseForParsing(reply.RelayReply, chainMessage)
if err != nil {
return utils.LavaFormatWarning("[-] verify failed to parse result", err,
utils.LogAttr("chain_id", chainId),
Expand All @@ -184,7 +187,7 @@ func (cf *ChainFetcher) Verify(ctx context.Context, verification VerificationCon
{Key: "chainId", Value: chainId},
{Key: "nodeUrl", Value: proxyUrl.Url},
{Key: "Method", Value: parsing.GetApiName()},
{Key: "Response", Value: string(reply.Data)},
{Key: "Response", Value: string(reply.RelayReply.Data)},
}...)
}
if verification.LatestDistance != 0 && latestBlock != 0 && verification.ParseDirective.FunctionTag != spectypes.FUNCTION_TAG_GET_BLOCK_BY_NUM {
Expand All @@ -194,7 +197,7 @@ func (cf *ChainFetcher) Verify(ctx context.Context, verification VerificationCon
{Key: "chainId", Value: chainId},
{Key: "nodeUrl", Value: proxyUrl.Url},
{Key: "Method", Value: parsing.GetApiName()},
{Key: "Response", Value: string(reply.Data)},
{Key: "Response", Value: string(reply.RelayReply.Data)},
{Key: "parsedResult", Value: parsedResult},
}...)
}
Expand Down Expand Up @@ -271,13 +274,13 @@ func (cf *ChainFetcher) FetchLatestBlockNum(ctx context.Context) (int64, error)
if err != nil {
return spectypes.NOT_APPLICABLE, utils.LavaFormatDebug(tagName+" failed sending chainMessage", []utils.Attribute{{Key: "chainID", Value: cf.endpoint.ChainID}, {Key: "APIInterface", Value: cf.endpoint.ApiInterface}, {Key: "error", Value: err}}...)
}
parserInput, err := FormatResponseForParsing(reply, chainMessage)
parserInput, err := FormatResponseForParsing(reply.RelayReply, chainMessage)
if err != nil {
return spectypes.NOT_APPLICABLE, utils.LavaFormatDebug(tagName+" Failed formatResponseForParsing", []utils.Attribute{
{Key: "chainId", Value: chainId},
{Key: "nodeUrl", Value: proxyUrl.Url},
{Key: "Method", Value: parsing.ApiName},
{Key: "Response", Value: string(reply.Data)},
{Key: "Response", Value: string(reply.RelayReply.Data)},
{Key: "error", Value: err},
}...)
}
Expand All @@ -287,7 +290,7 @@ func (cf *ChainFetcher) FetchLatestBlockNum(ctx context.Context) (int64, error)
{Key: "chainId", Value: chainId},
{Key: "nodeUrl", Value: proxyUrl.Url},
{Key: "Method", Value: parsing.ApiName},
{Key: "Response", Value: string(reply.Data)},
{Key: "Response", Value: string(reply.RelayReply.Data)},
{Key: "error", Value: err},
}...)
}
Expand Down Expand Up @@ -331,14 +334,14 @@ func (cf *ChainFetcher) FetchBlockHashByNum(ctx context.Context, blockNum int64)
timeTaken := time.Since(start)
return "", utils.LavaFormatDebug(tagName+" failed sending chainMessage", []utils.Attribute{{Key: "sendTime", Value: timeTaken}, {Key: "error", Value: err}, {Key: "chainID", Value: cf.endpoint.ChainID}, {Key: "APIInterface", Value: cf.endpoint.ApiInterface}}...)
}
parserInput, err := FormatResponseForParsing(reply, chainMessage)
parserInput, err := FormatResponseForParsing(reply.RelayReply, chainMessage)
if err != nil {
return "", utils.LavaFormatDebug(tagName+" Failed formatResponseForParsing", []utils.Attribute{
{Key: "error", Value: err},
{Key: "chainId", Value: chainId},
{Key: "nodeUrl", Value: proxyUrl.Url},
{Key: "Method", Value: parsing.ApiName},
{Key: "Response", Value: string(reply.Data)},
{Key: "Response", Value: string(reply.RelayReply.Data)},
}...)
}

Expand All @@ -349,14 +352,14 @@ func (cf *ChainFetcher) FetchBlockHashByNum(ctx context.Context, blockNum int64)
{Key: "chainId", Value: chainId},
{Key: "nodeUrl", Value: proxyUrl.Url},
{Key: "Method", Value: parsing.ApiName},
{Key: "Response", Value: string(reply.Data)},
{Key: "Response", Value: string(reply.RelayReply.Data)},
}...)
}
_, _, blockDistanceToFinalization, _ := cf.chainParser.ChainBlockStats()
latestBlock := atomic.LoadInt64(&cf.latestBlock) // assuming FetchLatestBlockNum is called before this one it's always true
if latestBlock > 0 {
finalized := spectypes.IsFinalizedBlock(blockNum, latestBlock, blockDistanceToFinalization)
cf.populateCache(cf.constructRelayData(collectionData.Type, path, data, blockNum, "", nil, latestBlock), reply, []byte(res), finalized)
cf.populateCache(cf.constructRelayData(collectionData.Type, path, data, blockNum, "", nil, latestBlock), reply.RelayReply, []byte(res), finalized)
}
return res, nil
}
Expand Down
3 changes: 1 addition & 2 deletions protocol/chainlib/chain_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/lavanet/lava/protocol/common"
"github.com/lavanet/lava/protocol/lavasession"
"github.com/lavanet/lava/utils"
pairingtypes "github.com/lavanet/lava/x/pairing/types"
)

type chainRouterEntry struct {
Expand Down Expand Up @@ -57,7 +56,7 @@ func (cri chainRouterImpl) ExtensionsSupported(extensions []string) bool {
return ok
}

func (cri chainRouterImpl) SendNodeMsg(ctx context.Context, ch chan interface{}, chainMessage ChainMessageForSend, extensions []string) (relayReply *pairingtypes.RelayReply, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, proxyUrl common.NodeUrl, chainId string, err error) {
func (cri chainRouterImpl) SendNodeMsg(ctx context.Context, ch chan interface{}, chainMessage ChainMessageForSend, extensions []string) (relayReply *RelayReplyWrapper, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, proxyUrl common.NodeUrl, chainId string, err error) {
// add the parsed addon from the apiCollection
addon := chainMessage.GetApiCollection().CollectionData.AddOn
selectedChainProxy, err := cri.getChainProxySupporting(addon, extensions)
Expand Down
4 changes: 2 additions & 2 deletions protocol/chainlib/chainlib.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,13 @@ type ChainListener interface {
}

type ChainRouter interface {
SendNodeMsg(ctx context.Context, ch chan interface{}, chainMessage ChainMessageForSend, extensions []string) (relayReply *pairingtypes.RelayReply, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, proxyUrl common.NodeUrl, chainId string, err error) // has to be thread safe, reuse code within ParseMsg as common functionality
SendNodeMsg(ctx context.Context, ch chan interface{}, chainMessage ChainMessageForSend, extensions []string) (relayReply *RelayReplyWrapper, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, proxyUrl common.NodeUrl, chainId string, err error) // has to be thread safe, reuse code within ParseMsg as common functionality
ExtensionsSupported([]string) bool
}

type ChainProxy interface {
GetChainProxyInformation() (common.NodeUrl, string)
SendNodeMsg(ctx context.Context, ch chan interface{}, chainMessage ChainMessageForSend) (relayReply *pairingtypes.RelayReply, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, err error) // has to be thread safe, reuse code within ParseMsg as common functionality
SendNodeMsg(ctx context.Context, ch chan interface{}, chainMessage ChainMessageForSend) (relayReply *RelayReplyWrapper, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, err error) // has to be thread safe, reuse code within ParseMsg as common functionality
}

func GetChainRouter(ctx context.Context, nConns uint, rpcProviderEndpoint *lavasession.RPCProviderEndpoint, chainParser ChainParser) (ChainRouter, error) {
Expand Down
1 change: 1 addition & 0 deletions protocol/chainlib/chainproxy/rpcclient/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ func (hc *httpConn) doRequest(ctx context.Context, msg interface{}, isJsonRPC bo
resp, err := hc.client.Do(req)
if resp != nil {
// resp can be non nil on error
metadata.AppendToOutgoingContext(ctx, common.StatusCodeMetadataKey, strconv.Itoa(resp.StatusCode))
trailer := metadata.Pairs(common.StatusCodeMetadataKey, strconv.Itoa(resp.StatusCode))
grpc.SetTrailer(ctx, trailer) // we ignore this error here since this code can be triggered not from grpc
}
Expand Down
5 changes: 5 additions & 0 deletions protocol/chainlib/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ const (

var InvalidResponses = []string{"null", "", "nil", "undefined"}

type RelayReplyWrapper struct {
StatusCode int
RelayReply *pairingtypes.RelayReply
}

type VerificationKey struct {
Extension string
Addon string
Expand Down
55 changes: 40 additions & 15 deletions protocol/chainlib/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ import (
"google.golang.org/grpc/status"
)

const GRPCStatusCodeOnFailedMessages = 32

type GrpcNodeErrorResponse struct {
ErrorMessage string `json:"error_message"`
ErrorCode uint32 `json:"error_code"`
Expand Down Expand Up @@ -427,7 +429,7 @@ func newGrpcChainProxy(ctx context.Context, averageBlockTime time.Duration, pars
return cp, nil
}

func (cp *GrpcChainProxy) SendNodeMsg(ctx context.Context, ch chan interface{}, chainMessage ChainMessageForSend) (relayReply *pairingtypes.RelayReply, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, err error) {
func (cp *GrpcChainProxy) SendNodeMsg(ctx context.Context, ch chan interface{}, chainMessage ChainMessageForSend) (relayReply *RelayReplyWrapper, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, err error) {
if ch != nil {
return nil, "", nil, utils.LavaFormatError("Subscribe is not allowed on grpc", nil, utils.Attribute{Key: "GUID", Value: ctx})
}
Expand Down Expand Up @@ -531,20 +533,38 @@ func (cp *GrpcChainProxy) SendNodeMsg(ctx context.Context, ch chan interface{},
connectCtx, cancel := cp.CapTimeoutForSend(ctx, chainMessage)
defer cancel()
err = conn.Invoke(connectCtx, "/"+nodeMessage.Path, msg, response, grpc.Header(&respHeaders))
// Extract status code from response headers
statusCodeHeader := respHeaders.Get("grpc-status")
if len(statusCodeHeader) > 0 {
statusCodeTest, err := strconv.Atoi(statusCodeHeader[0])
if err != nil {
// Handle error
utils.LavaFormatError("Error:", err, utils.LogAttr("statusCode", statusCodeTest))
} else {
// Use the status code
utils.LavaFormatDebug("Status Code:", utils.LogAttr("statusCode", statusCodeTest))
}
} else {
utils.LavaFormatDebug("NO Status Code:")
// No status code found in response headers
}
if err != nil {
// Validate if the error is related to the provider connection to the node or it is a valid error
// in case the error is valid (e.g. bad input parameters) the error will return in the form of a valid error reply
if parsedError := cp.HandleNodeError(ctx, err); parsedError != nil {
return nil, "", nil, parsedError
}
// return the node's error back to the client as the error type is a invalid request which is cu deductible
respBytes, handlingError := parseGrpcNodeErrorToReply(ctx, err)
respBytes, statusCode, handlingError := parseGrpcNodeErrorToReply(ctx, err)
if handlingError != nil {
return nil, "", nil, handlingError
}
reply := &pairingtypes.RelayReply{
Data: respBytes,
Metadata: convertToMetadataMapOfSlices(respHeaders),
reply := &RelayReplyWrapper{
StatusCode: int(statusCode),
RelayReply: &pairingtypes.RelayReply{
Data: respBytes,
Metadata: convertToMetadataMapOfSlices(respHeaders),
},
}
return reply, "", nil, nil
}
Expand All @@ -554,32 +574,37 @@ func (cp *GrpcChainProxy) SendNodeMsg(ctx context.Context, ch chan interface{},
if err != nil {
return nil, "", nil, utils.LavaFormatError("proto.Marshal(response) Failed", err, utils.Attribute{Key: "GUID", Value: ctx})
}

reply := &pairingtypes.RelayReply{
Data: respBytes,
Metadata: convertToMetadataMapOfSlices(respHeaders),
reply := &RelayReplyWrapper{
StatusCode: http.StatusOK, // status code is used only for rest at the moment
RelayReply: &pairingtypes.RelayReply{
Data: respBytes,
Metadata: convertToMetadataMapOfSlices(respHeaders),
},
}
return reply, "", nil, nil
}

// This method assumes that the error is due to misuse of the request arguments, meaning the user would like to get
// the response from the server to fix the request arguments. this method will make sure the user will get the response
// from the node in the same format as expected.
func parseGrpcNodeErrorToReply(ctx context.Context, err error) ([]byte, error) {
func parseGrpcNodeErrorToReply(ctx context.Context, err error) ([]byte, uint32, error) {
var respBytes []byte
var marshalingError error
var errorCode uint32 = GRPCStatusCodeOnFailedMessages
// try fetching status code from error or otherwise use the GRPCStatusCodeOnFailedMessages
if statusError, ok := status.FromError(err); ok {
respBytes, marshalingError = json.Marshal(&GrpcNodeErrorResponse{ErrorMessage: statusError.Message(), ErrorCode: uint32(statusError.Code())})
errorCode = uint32(statusError.Code())
respBytes, marshalingError = json.Marshal(&GrpcNodeErrorResponse{ErrorMessage: statusError.Message(), ErrorCode: errorCode})
if marshalingError != nil {
return nil, utils.LavaFormatError("json.Marshal(&GrpcNodeErrorResponse Failed 1", err, utils.Attribute{Key: "GUID", Value: ctx})
return nil, errorCode, utils.LavaFormatError("json.Marshal(&GrpcNodeErrorResponse Failed 1", err, utils.Attribute{Key: "GUID", Value: ctx})
}
} else {
respBytes, marshalingError = json.Marshal(&GrpcNodeErrorResponse{ErrorMessage: err.Error(), ErrorCode: uint32(32)})
respBytes, marshalingError = json.Marshal(&GrpcNodeErrorResponse{ErrorMessage: err.Error(), ErrorCode: errorCode})
if marshalingError != nil {
return nil, utils.LavaFormatError("json.Marshal(&GrpcNodeErrorResponse Failed 2", err, utils.Attribute{Key: "GUID", Value: ctx})
return nil, errorCode, utils.LavaFormatError("json.Marshal(&GrpcNodeErrorResponse Failed 2", err, utils.Attribute{Key: "GUID", Value: ctx})
}
}
return respBytes, nil
return respBytes, errorCode, nil
}

func marshalJSON(msg proto.Message) ([]byte, error) {
Expand Down
4 changes: 2 additions & 2 deletions protocol/chainlib/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func TestParsingRequestedBlocksHeadersGrpc(t *testing.T) {
require.Equal(t, test.requestedBlock, requestedBlock)
reply, _, _, _, _, err := chainRouter.SendNodeMsg(ctx, nil, chainMessage, nil)
require.NoError(t, err)
parserInput, err := FormatResponseForParsing(reply, chainMessage)
parserInput, err := FormatResponseForParsing(reply.RelayReply, chainMessage)
require.NoError(t, err)
blockNum, err := parser.ParseBlockFromReply(parserInput, parsingForCrafting.ResultParsing)
require.NoError(t, err)
Expand Down Expand Up @@ -283,7 +283,7 @@ func TestSettingBlocksHeadersGrpc(t *testing.T) {
require.Equal(t, test.block, requestedBlock)
reply, _, _, _, _, err := chainRouter.SendNodeMsg(ctx, nil, chainMessage, nil)
require.NoError(t, err)
parserInput, err := FormatResponseForParsing(reply, chainMessage)
parserInput, err := FormatResponseForParsing(reply.RelayReply, chainMessage)
require.NoError(t, err)
blockNum, err := parser.ParseBlockFromReply(parserInput, parsingForCrafting.ResultParsing)
require.NoError(t, err)
Expand Down
Loading

0 comments on commit 501d7da

Please sign in to comment.