Skip to content

Commit

Permalink
PRT-1082 stateless rpc consumer with cache (#1144)
Browse files Browse the repository at this point in the history
* WIP

* WIP - changed the cache fetching to prior to provider picks.

* adding cache share state store and read

* fix nil pointer deref

* work with existing latest block cache set and get instead a new one

* adding rewrite + validation mechanism to setting the latest known block with race protection. its not ideal but currently its a good enough solution.

* fix cache issue for expected block.

* solved batch id parsing.

* adding cache to local test evmos

* removed int64 unused price var

* shared state comments fixed

* fixed cache unitest

* removed updated block when cache hits. we don't need to change the block

* fixed cache reply on shared state to not run data relliability

* fixing comments.

* lint fix

* change cb name

* fix nil deref
  • Loading branch information
ranlavanet authored Jan 21, 2024
1 parent 61944aa commit b093d06
Show file tree
Hide file tree
Showing 17 changed files with 457 additions and 147 deletions.
25 changes: 25 additions & 0 deletions cookbook/param_changes/protocol_version_update_expedited.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"title": "Protocol Version Upgrade 0.33.2",
"description": "Protocol Version Upgrade",
"summary": "Protocol Version Upgrade",
"messages": [
{
"@type": "/cosmos.gov.v1.MsgExecLegacyContent",
"authority": "lava@10d07y265gmmuvt4z0w9aw880jnsr700jz6yq37",
"content": {
"@type": "/cosmos.params.v1beta1.ParameterChangeProposal",
"title": "Protocol Version Upgrade",
"description": "Protocol Version Upgrade",
"changes": [
{
"subspace": "protocol",
"key": "Version",
"value": "{\"provider_target\":\"0.33.2\",\"consumer_target\":\"0.33.2\",\"provider_min\":\"0.32.1\",\"consumer_min\":\"0.32.1\"}"
}
]
}
}
],
"deposit": "10002000ulava",
"expedited": true
}
2 changes: 1 addition & 1 deletion ecosystem/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ func TestCacheSetGetLatestWhenAdvancingLatest(t *testing.T) {
request2 := shallowCopy(request)
request2.RequestBlock = latestBlockForRelay + 1 // make latest block advance
request2.Data = []byte(StubData + "nonRelevantData")

response.LatestBlock = latestBlockForRelay + 1
messageSet2 := pairingtypes.RelayCacheSet{
Request: shallowCopy(request2),
BlockHash: tt.hash,
Expand Down
6 changes: 3 additions & 3 deletions ecosystem/cache/format/jsonrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func FormatterForRelayRequestAndResponseJsonRPC() (inputFormatter func([]byte) [
}
batch := []json.RawMessage{}
err := json.Unmarshal(inpData, &batch)
if err == nil && len(batch) > 1 {
if err == nil && len(batch) >= 1 {
modifiedInpArray := []json.RawMessage{}
for _, batchData := range batch {
var extractedIDForBatch interface{}
Expand Down Expand Up @@ -56,7 +56,7 @@ func FormatterForRelayRequestAndResponseJsonRPC() (inputFormatter func([]byte) [
}
batch := []json.RawMessage{}
err := json.Unmarshal(inpData, &batch)
if err == nil && len(batch) > 1 && len(extractedIDArray) == len(batch) {
if err == nil && len(batch) >= 1 && len(extractedIDArray) == len(batch) {
modifiedInpArray := []json.RawMessage{}
for i, batchData := range batch {
modifiedInp, err := sjson.SetBytes(batchData, IDFieldName, extractedIDArray[i])
Expand Down Expand Up @@ -95,7 +95,7 @@ func getExtractedIdAndModifyInputForJson(inpData []byte) (modifiedInp []byte, ex
}
modifiedInp, err = sjson.SetBytes(inpData, IDFieldName, DefaultIDValue)
if err != nil {
return inpData, extractedID, utils.LavaFormatWarning("failed to set id in json", nil, utils.Attribute{Key: "jsonData", Value: inpData}, utils.LogAttr("extractedID", extractedID))
return inpData, extractedID, utils.LavaFormatWarning("failed to set id in json", err, utils.Attribute{Key: "jsonData", Value: inpData}, utils.LogAttr("extractedID", extractedID))
}
return modifiedInp, extractedID, nil
}
140 changes: 120 additions & 20 deletions ecosystem/cache/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"context"
"encoding/json"
"fmt"
"math"
"strconv"
"sync"
"sync/atomic"
"time"

Expand All @@ -28,7 +30,8 @@ var (
)

const (
SEP = ";"
DbValueConfirmationAttempts = 5
SEP = ";"
)

type RelayerCacheServer struct {
Expand Down Expand Up @@ -64,10 +67,54 @@ func (cv *LastestCacheStore) Cost() int64 {
return 8 + 16
}

func (s *RelayerCacheServer) GetRelay(ctx context.Context, relayCacheGet *pairingtypes.RelayCacheGet) (cacheReply *pairingtypes.CacheRelayReply, err error) {
func (s *RelayerCacheServer) getSeenBlockForSharedStateMode(chainId string, sharedStateId string) int64 {
if sharedStateId != "" {
id := latestBlockKey(chainId, sharedStateId)
value, found := getNonExpiredFromCache(s.CacheServer.finalizedCache, id)
if !found {
utils.LavaFormatInfo("Failed fetching state from cache for this user id", utils.LogAttr("id", id))
return 0 // we cant set the seen block in this case it will be returned 0 and wont be used in the consumer side.
}
utils.LavaFormatInfo("getting seen block cache", utils.LogAttr("id", id), utils.LogAttr("value", value))
if cacheValue, ok := value.(int64); ok {
return cacheValue
}
utils.LavaFormatFatal("Failed converting cache result to int64", nil, utils.LogAttr("value", value))
}
return 0
}

func (s *RelayerCacheServer) GetRelay(ctx context.Context, relayCacheGet *pairingtypes.RelayCacheGet) (*pairingtypes.CacheRelayReply, error) {
cacheReply := &pairingtypes.CacheRelayReply{}
var err error
var seenBlock int64

waitGroup := sync.WaitGroup{}
waitGroup.Add(2) // currently we have two groups getRelayInner and getSeenBlock
requestedBlock := relayCacheGet.Request.RequestBlock // save requested block

cacheReply, err = s.getRelayInner(ctx, relayCacheGet)
// fetch all reads at the same time.
go func() {
defer waitGroup.Done()
var cacheReplyTmp *pairingtypes.CacheRelayReply
cacheReplyTmp, err = s.getRelayInner(ctx, relayCacheGet)
if cacheReplyTmp != nil {
cacheReply = cacheReplyTmp // set cache reply only if its not nil, as we need to store seen block in it.
}
}()
go func() {
defer waitGroup.Done()
// set seen block if required
seenBlock = s.getSeenBlockForSharedStateMode(relayCacheGet.ChainID, relayCacheGet.SharedStateId)
}()

// wait for all reads to complete before moving forward
waitGroup.Wait()
// set seen block.
if seenBlock > cacheReply.SeenBlock {
cacheReply.SeenBlock = seenBlock
}

var hit bool
if err != nil {
s.cacheMiss(ctx, err)
Expand All @@ -77,14 +124,14 @@ func (s *RelayerCacheServer) GetRelay(ctx context.Context, relayCacheGet *pairin
}
// add prometheus metrics
s.CacheServer.CacheMetrics.AddApiSpecific(requestedBlock, relayCacheGet.ChainID, getMethodFromRequest(relayCacheGet), relayCacheGet.Request.ApiInterface, hit)
return
return cacheReply, err
}

func (s *RelayerCacheServer) getRelayInner(ctx context.Context, relayCacheGet *pairingtypes.RelayCacheGet) (*pairingtypes.CacheRelayReply, error) {
inputFormatter, outputFormatter := format.FormatterForRelayRequestAndResponse(relayCacheGet.Request.ApiInterface)
relayCacheGet.Request.Data = inputFormatter(relayCacheGet.Request.Data)
requestedBlock := relayCacheGet.Request.RequestBlock
getLatestBlock := s.getLatestBlock(relayCacheGet.ChainID, relayCacheGet.Provider)
getLatestBlock := s.getLatestBlock(latestBlockKey(relayCacheGet.ChainID, relayCacheGet.Provider))
relayCacheGet.Request.RequestBlock = lavaprotocol.ReplaceRequestedBlock(requestedBlock, getLatestBlock)
cacheKey := formatCacheKey(relayCacheGet.Request.ApiInterface, relayCacheGet.ChainID, relayCacheGet.Request, relayCacheGet.Provider)
utils.LavaFormatDebug("Got Cache Get", utils.Attribute{Key: "cacheKey", Value: parser.CapStringLen(cacheKey)},
Expand Down Expand Up @@ -120,6 +167,51 @@ func (s *RelayerCacheServer) getRelayInner(ctx context.Context, relayCacheGet *p
return nil, HashMismatchError
}

func (s *RelayerCacheServer) performInt64WriteWithValidationAndRetry(
getBlockCallback func() int64,
setBlockCallback func(),
newInfo int64,
) {
existingInfo := getBlockCallback()
// validate we have a newer block than the existing stored in the db.
if existingInfo <= newInfo { // refreshes state even if its equal
// for seen block we expire the entry after one hour otherwise this user will stay in the db for ever
setBlockCallback()
// a validation routine to make sure we don't have a race for the block rewrites as there are concurrent writes.
// this will be solved once we implement a db with a queue but for now its a good enough solution.
go func() {
for i := 0; i < DbValueConfirmationAttempts; i++ {
time.Sleep(time.Millisecond) // add some delay between read attempts
currentInfo := getBlockCallback()
if currentInfo > newInfo {
return // there is a newer block stored we are no longer relevant we can just stop validating.
}
if currentInfo < newInfo {
// other cache set raced us and we need to rewrite our value again as its a newer value
setBlockCallback()
}
}
}()
}
}

// this method tries to set the seen block a few times until it succeeds. it will try to make sure its value is set
// to prevent race conditions when we have a few writes in the same time with older values we will try to set our value eventually
// if its the newest seen block
func (s *RelayerCacheServer) setSeenBlockOnSharedStateMode(chainId, sharedStateId string, seenBlock int64) {
if sharedStateId == "" {
return
}
key := latestBlockKey(chainId, sharedStateId)
set := func() {
s.CacheServer.finalizedCache.SetWithTTL(key, seenBlock, 0, s.CacheServer.ExpirationFinalized)
}
get := func() int64 {
return s.getSeenBlockForSharedStateMode(chainId, sharedStateId)
}
s.performInt64WriteWithValidationAndRetry(get, set, seenBlock)
}

func (s *RelayerCacheServer) SetRelay(ctx context.Context, relayCacheSet *pairingtypes.RelayCacheSet) (*emptypb.Empty, error) {
if relayCacheSet.Request.RequestBlock < 0 {
return nil, utils.LavaFormatError("invalid relay cache set data, request block is negative", nil, utils.Attribute{Key: "requestBlock", Value: relayCacheSet.Request.RequestBlock})
Expand All @@ -142,7 +234,11 @@ func (s *RelayerCacheServer) SetRelay(ctx context.Context, relayCacheSet *pairin
cache := s.CacheServer.tempCache
cache.SetWithTTL(cacheKey, cacheValue, cacheValue.Cost(), s.getExpirationForChain(relayCacheSet.ChainID, relayCacheSet.BlockHash))
}
s.setLatestBlock(relayCacheSet.ChainID, relayCacheSet.Provider, relayCacheSet.Request.RequestBlock)
// Setting the seen block for shared state.
// Getting the max block number between the seen block on the consumer side vs the latest block on the response of the provider
latestKnownBlock := int64(math.Max(float64(relayCacheSet.Response.LatestBlock), float64(relayCacheSet.Request.SeenBlock)))
s.setSeenBlockOnSharedStateMode(relayCacheSet.ChainID, relayCacheSet.SharedStateId, latestKnownBlock)
s.setLatestBlock(latestBlockKey(relayCacheSet.ChainID, relayCacheSet.Provider), latestKnownBlock)
return &emptypb.Empty{}, nil
}

Expand Down Expand Up @@ -173,8 +269,8 @@ func (s *RelayerCacheServer) PrintCacheStats(ctx context.Context, desc string) {
)
}

func (s *RelayerCacheServer) getLatestBlockInner(chainID string, providerAddr string) (latestBlock int64, expirationTime time.Time) {
value, found := getNonExpiredFromCache(s.CacheServer.finalizedCache, latestBlockKey(chainID, providerAddr))
func (s *RelayerCacheServer) getLatestBlockInner(key string) (latestBlock int64, expirationTime time.Time) {
value, found := getNonExpiredFromCache(s.CacheServer.finalizedCache, key)
if !found {
return spectypes.NOT_APPLICABLE, time.Time{}
}
Expand All @@ -185,23 +281,25 @@ func (s *RelayerCacheServer) getLatestBlockInner(chainID string, providerAddr st
return spectypes.NOT_APPLICABLE, time.Time{}
}

func (s *RelayerCacheServer) getLatestBlock(chainID string, providerAddr string) int64 {
latestBlock, expirationTime := s.getLatestBlockInner(chainID, providerAddr)
func (s *RelayerCacheServer) getLatestBlock(key string) int64 {
latestBlock, expirationTime := s.getLatestBlockInner(key)
if latestBlock != spectypes.NOT_APPLICABLE && expirationTime.After(time.Now()) {
return latestBlock
}
return spectypes.NOT_APPLICABLE
}

func (s *RelayerCacheServer) setLatestBlock(chainID string, providerAddr string, latestBlock int64) {
existingLatest, _ := s.getLatestBlockInner(chainID, providerAddr) // we need to bypass the expirationTimeCheck

if existingLatest <= latestBlock { // equal refreshes latest if it expired
// we are setting this with a futuristic invalidation time, we still want the entry in cache to protect us from putting a lower last block
cacheStore := LastestCacheStore{latestBlock: latestBlock, latestExpirationTime: time.Now().Add(DefaultExpirationForNonFinalized)}
utils.LavaFormatDebug("setting latest block", utils.Attribute{Key: "providerAddr", Value: providerAddr}, utils.Attribute{Key: "chainID", Value: chainID}, utils.Attribute{Key: "latestBlock", Value: latestBlock})
s.CacheServer.finalizedCache.Set(latestBlockKey(chainID, providerAddr), cacheStore, cacheStore.Cost()) // no expiration time
func (s *RelayerCacheServer) setLatestBlock(key string, latestBlock int64) {
cacheStore := LastestCacheStore{latestBlock: latestBlock, latestExpirationTime: time.Now().Add(DefaultExpirationForNonFinalized)}
utils.LavaFormatDebug("setting latest block", utils.Attribute{Key: "key", Value: key}, utils.Attribute{Key: "latestBlock", Value: latestBlock})
set := func() {
s.CacheServer.finalizedCache.Set(key, cacheStore, cacheStore.Cost()) // no expiration time
}
get := func() int64 {
existingLatest, _ := s.getLatestBlockInner(key) // we need to bypass the expirationTimeCheck
return existingLatest
}
s.performInt64WriteWithValidationAndRetry(get, set, latestBlock)
}

func (s *RelayerCacheServer) getExpirationForChain(chainID string, blockHash []byte) time.Duration {
Expand Down Expand Up @@ -295,9 +393,11 @@ func formatCacheValue(response *pairingtypes.RelayReply, hash []byte, finalized
}
}

func latestBlockKey(chainID string, providerAddr string) string {
// used both by shared-state id and provider address id. so we just call the 2nd arg unique id
// as it can be both provider address or the unique user id (ip + dapp id)
func latestBlockKey(chainID string, uniqueId string) string {
// because we want to support coherence in providers
return chainID + providerAddr
return chainID + "_" + uniqueId
}

func getMethodFromRequest(relayCacheGet *pairingtypes.RelayCacheGet) string {
Expand Down
27 changes: 15 additions & 12 deletions proto/lavanet/lava/pairing/relayCache.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,32 @@ service RelayerCache {
}

message CacheRelayReply {
RelayReply reply =1;
RelayReply reply = 1;
repeated Metadata optional_metadata = 2 [(gogoproto.nullable) = false];
int64 seen_block = 3;
}

message CacheUsage {
uint64 CacheHits =1;
uint64 CacheMisses =2;
uint64 CacheHits = 1;
uint64 CacheMisses = 2;
}

message RelayCacheGet {
RelayPrivateData request =1;
bytes blockHash =2;
RelayPrivateData request = 1;
bytes blockHash = 2;
string chainID = 3; //Used to differentiate between different chains so each has its own bucket
bool finalized =4;
string provider =5;
bool finalized = 4;
string provider = 5;
string shared_state_id = 6; // empty id for no shared state
}

message RelayCacheSet {
RelayPrivateData request =1;
bytes blockHash =2;
RelayPrivateData request = 1;
bytes blockHash = 2;
string chainID = 3; //Used to differentiate between different chains so each has its own bucket
RelayReply response =4;
bool finalized =5;
string provider =6;
RelayReply response = 4;
bool finalized = 5;
string provider = 6;
repeated Metadata optional_metadata = 7 [(gogoproto.nullable) = false];
string shared_state_id = 8; // empty id for no shared state
}
3 changes: 0 additions & 3 deletions protocol/chainlib/base_chain_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ func (bcp *BaseChainParser) BuildMapFromPolicyQuery(policy PolicyInf, chainId st
func (bcp *BaseChainParser) SetPolicyFromAddonAndExtensionMap(policyInformation map[string]struct{}) {
bcp.rwLock.Lock()
defer bcp.rwLock.Unlock()
utils.LavaFormatDebug("info on policyInformation", utils.LogAttr("policyInformation", policyInformation))
// reset the current one in case we configured it previously
configuredExtensions := make(map[extensionslib.ExtensionKey]*spectypes.Extension)
for collectionKey, apiCollection := range bcp.apiCollections {
Expand All @@ -155,9 +154,7 @@ func (bcp *BaseChainParser) SetPolicyFromAddonAndExtensionMap(policyInformation
bcp.extensionParser.SetConfiguredExtensions(configuredExtensions)
// manage allowed addons
for addon := range bcp.allowedAddons {
utils.LavaFormatDebug("info on addons", utils.LogAttr("addon", addon))
if _, ok := policyInformation[addon]; ok {
utils.LavaFormatDebug("found addon", utils.LogAttr("addon", addon))
bcp.allowedAddons[addon] = true
}
}
Expand Down
3 changes: 2 additions & 1 deletion protocol/chainlib/chain_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ func (cf *ChainFetcher) populateCache(relayData *pairingtypes.RelayPrivateData,
new_ctx := context.Background()
new_ctx, cancel := context.WithTimeout(new_ctx, common.DataReliabilityTimeoutIncrease)
defer cancel()
err := cf.cache.SetEntry(new_ctx, relayData, requestedBlockHash, cf.endpoint.ChainID, reply, finalized, "", nil)
// provider side doesn't use SharedStateId, so we default it to empty so it wont have effect.
err := cf.cache.SetEntry(new_ctx, &pairingtypes.RelayCacheSet{Request: relayData, BlockHash: requestedBlockHash, ChainID: cf.endpoint.ChainID, Response: reply, Finalized: finalized, OptionalMetadata: nil, SharedStateId: ""})
if err != nil {
utils.LavaFormatWarning("chain fetcher error updating cache with new entry", err)
}
Expand Down
1 change: 1 addition & 0 deletions protocol/common/cobra_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
CDNCacheDurationFlag = "cdn-cache-duration" // how long to cache the preflight response default 24 hours (in seconds) "86400"
RelaysHealthEnableFlag = "relays-health-enable" // enable relays health check, default true
RelayHealthIntervalFlag = "relays-health-interval" // interval between each relay health check, default 5m
SharedStateFlag = "shared-state"
)

const (
Expand Down
5 changes: 5 additions & 0 deletions protocol/lavaprotocol/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ func UpdateRequestedBlock(request *pairingtypes.RelayPrivateData, response *pair
request.RequestBlock = ReplaceRequestedBlock(request.RequestBlock, response.LatestBlock)
}

// currently used when cache hits. we don't want DR.
func SetRequestedBlockNotApplicable(request *pairingtypes.RelayPrivateData) {
request.RequestBlock = spectypes.NOT_APPLICABLE
}

func ReplaceRequestedBlock(requestedBlock, latestBlock int64) int64 {
switch requestedBlock {
case spectypes.LATEST_BLOCK:
Expand Down
Loading

0 comments on commit b093d06

Please sign in to comment.