Skip to content

Commit

Permalink
feat: PRT Block Hash Retry Archive After Redesign Part 3 - Add Cachin…
Browse files Browse the repository at this point in the history
…g Logic to successful attempts (#1731)

* feat: PRT Block Hash Retry Archive

* fix deref

* feat: PRT-Block-Hash-Cache-after-redesign-part-2

* fix lint

* WIP

* Part 2 complete?

* fix lint

* WIP

* apply archive only on 2nd relay.

* using parse msg to create a new protocol message with archive.

* lint

* removing sdk tests

* feature is done?

* ocd

* fix test

* Feature finished + tests

* setting finalized so we cache for longer hashes that are archive.

* fix test

* remove extensions from all flows and save RouterKey in singleConsumerSession

* version merge

* rename function for better description on functionality

* bigger consistency timeout to succeed on actions

* adding initialized flag

* append existing extensions to archive

* WIP

* finished state machine

* fix nil deref

* fix tests

* increase timeout

* fix possible race on protocol message during async code

* add comments and rename

* claude is great

* rename

* update function

* adding upgrade functionality and state change

* LINTUSH

* add used default value

* fix bug

* fix edge cases

* more info

---------

Co-authored-by: omerlavanet <[email protected]>
Co-authored-by: Omer <[email protected]>
  • Loading branch information
3 people authored Nov 24, 2024
1 parent c9c52cb commit f41ce88
Show file tree
Hide file tree
Showing 14 changed files with 434 additions and 103 deletions.
17 changes: 17 additions & 0 deletions protocol/chainlib/chain_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,26 @@ type baseChainMessageContainer struct {
timeoutOverride time.Duration
forceCacheRefresh bool
parseDirective *spectypes.ParseDirective // setting the parse directive related to the api, can be nil
usedDefaultValue bool

inputHashCache []byte
// resultErrorParsingMethod passed by each api interface message to parse the result of the message
// and validate it doesn't contain a node error
resultErrorParsingMethod func(data []byte, httpStatusCode int) (hasError bool, errorMessage string)
}

func (bcmc *baseChainMessageContainer) UpdateEarliestInMessage(incomingEarliest int64) bool {
updatedSuccessfully := false
if bcmc.earliestRequestedBlock != spectypes.EARLIEST_BLOCK {
// check earliest is not unset (0) or incoming is lower than current value
if bcmc.earliestRequestedBlock == 0 || bcmc.earliestRequestedBlock > incomingEarliest {
bcmc.earliestRequestedBlock = incomingEarliest
updatedSuccessfully = true
}
}
return updatedSuccessfully
}

func (bcnc *baseChainMessageContainer) GetRequestedBlocksHashes() []string {
return bcnc.requestedBlockHashes
}
Expand Down Expand Up @@ -158,6 +171,10 @@ func (bcnc *baseChainMessageContainer) OverrideExtensions(extensionNames []strin
}
}

func (bcnc *baseChainMessageContainer) GetUsedDefaultValue() bool {
return bcnc.usedDefaultValue
}

func (bcnc *baseChainMessageContainer) SetExtension(extension *spectypes.Extension) {
if len(bcnc.extensions) > 0 {
for _, ext := range bcnc.extensions {
Expand Down
3 changes: 3 additions & 0 deletions protocol/chainlib/chainlib.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ type ChainMessage interface {
CheckResponseError(data []byte, httpStatusCode int) (hasError bool, errorMessage string)
GetRawRequestHash() ([]byte, error)
GetRequestedBlocksHashes() []string
UpdateEarliestInMessage(incomingEarliest int64) bool
SetExtension(extension *spectypes.Extension)
GetUsedDefaultValue() bool

ChainMessageForSend
}
Expand Down
23 changes: 23 additions & 0 deletions protocol/chainlib/chainlib_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion protocol/chainlib/extensionslib/extension_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ func (ep *ExtensionParser) ExtensionParsing(addon string, extensionsChainMessage
if len(ep.configuredExtensions) == 0 {
return
}

for extensionKey, extension := range ep.configuredExtensions {
if extensionKey.Addon != addon {
// this extension is not relevant for this api
Expand Down
16 changes: 10 additions & 6 deletions protocol/chainlib/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ func (apip *GrpcChainParser) CraftMessage(parsing *spectypes.ParseDirective, con
if err != nil {
return nil, err
}
return apip.newChainMessage(apiCont.api, spectypes.NOT_APPLICABLE, nil, grpcMessage, apiCollection), nil
parsedInput := &parser.ParsedInput{}
parsedInput.SetBlock(spectypes.NOT_APPLICABLE)
return apip.newChainMessage(apiCont.api, parsedInput, grpcMessage, apiCollection), nil
}

// ParseMsg parses message data into chain message object
Expand Down Expand Up @@ -165,18 +167,19 @@ func (apip *GrpcChainParser) ParseMsg(url string, data []byte, connectionType st
utils.LogAttr("overwriteRequestedBlock", overwriteReqBlock),
)
parsedInput.SetBlock(spectypes.NOT_APPLICABLE)
} else {
parsedInput.UsedDefaultValue = false
}
}

parsedBlock := parsedInput.GetBlock()
blockHashes, _ := parsedInput.GetBlockHashes()

nodeMsg := apip.newChainMessage(apiCont.api, parsedBlock, blockHashes, &grpcMessage, apiCollection)
nodeMsg := apip.newChainMessage(apiCont.api, parsedInput, &grpcMessage, apiCollection)
apip.BaseChainParser.ExtensionParsing(apiCollection.CollectionData.AddOn, nodeMsg, extensionInfo)
return nodeMsg, apip.BaseChainParser.Validate(nodeMsg)
}

func (*GrpcChainParser) newChainMessage(api *spectypes.Api, requestedBlock int64, requestedHashes []string, grpcMessage *rpcInterfaceMessages.GrpcMessage, apiCollection *spectypes.ApiCollection) *baseChainMessageContainer {
func (*GrpcChainParser) newChainMessage(api *spectypes.Api, parsedInput *parser.ParsedInput, grpcMessage *rpcInterfaceMessages.GrpcMessage, apiCollection *spectypes.ApiCollection) *baseChainMessageContainer {
requestedBlock := parsedInput.GetBlock()
requestedHashes, _ := parsedInput.GetBlockHashes()
nodeMsg := &baseChainMessageContainer{
api: api,
msg: grpcMessage, // setting the grpc message as a pointer so we can set descriptors for parsing
Expand All @@ -185,6 +188,7 @@ func (*GrpcChainParser) newChainMessage(api *spectypes.Api, requestedBlock int64
apiCollection: apiCollection,
resultErrorParsingMethod: grpcMessage.CheckResponseError,
parseDirective: GetParseDirective(api, apiCollection),
usedDefaultValue: parsedInput.UsedDefaultValue,
}
return nodeMsg
}
Expand Down
18 changes: 13 additions & 5 deletions protocol/chainlib/jsonRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (apip *JsonRPCChainParser) CraftMessage(parsing *spectypes.ParseDirective,
if err != nil {
return nil, err
}
return apip.newChainMessage(apiCont.api, spectypes.NOT_APPLICABLE, nil, msg, apiCollection), nil
return apip.newChainMessage(apiCont.api, spectypes.NOT_APPLICABLE, nil, msg, apiCollection, false), nil
}

// this func parses message data into chain message object
Expand All @@ -111,6 +111,7 @@ func (apip *JsonRPCChainParser) ParseMsg(url string, data []byte, connectionType
var apiCollection *spectypes.ApiCollection
var latestRequestedBlock, earliestRequestedBlock int64 = 0, 0
blockHashes := []string{}
parsedDefault := true
for idx, msg := range msgs {
parsedInput := parser.NewParsedInput()
internalPath := ""
Expand Down Expand Up @@ -145,6 +146,9 @@ func (apip *JsonRPCChainParser) ParseMsg(url string, data []byte, connectionType
if hashes, err := parsedInput.GetBlockHashes(); err == nil {
blockHashes = append(blockHashes, hashes...)
}
if !parsedInput.UsedDefaultValue {
parsedDefault = false
}
} else {
parsedBlock, err := msg.ParseBlock(overwriteReqBlock)
parsedInput.SetBlock(parsedBlock)
Expand All @@ -154,6 +158,8 @@ func (apip *JsonRPCChainParser) ParseMsg(url string, data []byte, connectionType
utils.LogAttr("overwriteReqBlock", overwriteReqBlock),
)
parsedInput.SetBlock(spectypes.NOT_APPLICABLE)
} else {
parsedInput.UsedDefaultValue = false
}
}

Expand Down Expand Up @@ -205,9 +211,9 @@ func (apip *JsonRPCChainParser) ParseMsg(url string, data []byte, connectionType

var nodeMsg *baseChainMessageContainer
if len(msgs) == 1 {
nodeMsg = apip.newChainMessage(api, latestRequestedBlock, blockHashes, &msgs[0], apiCollection)
nodeMsg = apip.newChainMessage(api, latestRequestedBlock, blockHashes, &msgs[0], apiCollection, parsedDefault)
} else {
nodeMsg, err = apip.newBatchChainMessage(api, latestRequestedBlock, earliestRequestedBlock, blockHashes, msgs, apiCollection)
nodeMsg, err = apip.newBatchChainMessage(api, latestRequestedBlock, earliestRequestedBlock, blockHashes, msgs, apiCollection, parsedDefault)
if err != nil {
return nil, err
}
Expand All @@ -216,7 +222,7 @@ func (apip *JsonRPCChainParser) ParseMsg(url string, data []byte, connectionType
return nodeMsg, apip.BaseChainParser.Validate(nodeMsg)
}

func (*JsonRPCChainParser) newBatchChainMessage(serviceApi *spectypes.Api, requestedBlock int64, earliestRequestedBlock int64, requestedBlockHashes []string, msgs []rpcInterfaceMessages.JsonrpcMessage, apiCollection *spectypes.ApiCollection) (*baseChainMessageContainer, error) {
func (*JsonRPCChainParser) newBatchChainMessage(serviceApi *spectypes.Api, requestedBlock int64, earliestRequestedBlock int64, requestedBlockHashes []string, msgs []rpcInterfaceMessages.JsonrpcMessage, apiCollection *spectypes.ApiCollection, usedDefaultValue bool) (*baseChainMessageContainer, error) {
batchMessage, err := rpcInterfaceMessages.NewBatchMessage(msgs)
if err != nil {
return nil, err
Expand All @@ -230,11 +236,12 @@ func (*JsonRPCChainParser) newBatchChainMessage(serviceApi *spectypes.Api, reque
earliestRequestedBlock: earliestRequestedBlock,
resultErrorParsingMethod: rpcInterfaceMessages.CheckResponseErrorForJsonRpcBatch,
parseDirective: nil,
usedDefaultValue: usedDefaultValue,
}
return nodeMsg, err
}

func (*JsonRPCChainParser) newChainMessage(serviceApi *spectypes.Api, requestedBlock int64, requestedBlockHashes []string, msg *rpcInterfaceMessages.JsonrpcMessage, apiCollection *spectypes.ApiCollection) *baseChainMessageContainer {
func (*JsonRPCChainParser) newChainMessage(serviceApi *spectypes.Api, requestedBlock int64, requestedBlockHashes []string, msg *rpcInterfaceMessages.JsonrpcMessage, apiCollection *spectypes.ApiCollection, usedDefaultValue bool) *baseChainMessageContainer {
nodeMsg := &baseChainMessageContainer{
api: serviceApi,
apiCollection: apiCollection,
Expand All @@ -243,6 +250,7 @@ func (*JsonRPCChainParser) newChainMessage(serviceApi *spectypes.Api, requestedB
msg: msg,
resultErrorParsingMethod: msg.CheckResponseError,
parseDirective: GetParseDirective(serviceApi, apiCollection),
usedDefaultValue: usedDefaultValue,
}
return nodeMsg
}
Expand Down
46 changes: 46 additions & 0 deletions protocol/chainlib/protocol_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package chainlib
import (
"strings"

"github.com/lavanet/lava/v4/protocol/chainlib/extensionslib"
"github.com/lavanet/lava/v4/protocol/common"
"github.com/lavanet/lava/v4/utils"
pairingtypes "github.com/lavanet/lava/v4/x/pairing/types"
"github.com/lavanet/lava/v4/x/spec/types"
)

type UserData struct {
Expand Down Expand Up @@ -35,6 +38,48 @@ func (bpm *BaseProtocolMessage) HashCacheRequest(chainId string) ([]byte, func([
return HashCacheRequest(bpm.relayRequestData, chainId)
}

// addMissingExtensions adds any extensions from updatedProtocolExtensions that are not in currentPrivateDataExtensions
func (bpm *BaseProtocolMessage) addMissingExtensions(updatedProtocolExtensions []*types.Extension, currentPrivateDataExtensions []string) []string {
// Create a map for O(1) lookups
existingExtensions := make(map[string]struct{}, len(currentPrivateDataExtensions))
for _, ext := range currentPrivateDataExtensions {
existingExtensions[ext] = struct{}{}
}

// Add missing extensions
for _, ext := range updatedProtocolExtensions {
if _, exists := existingExtensions[ext.Name]; !exists {
currentPrivateDataExtensions = append(currentPrivateDataExtensions, ext.Name)
if len(updatedProtocolExtensions) == len(currentPrivateDataExtensions) {
break
}
}
}
return currentPrivateDataExtensions
}

func (bpm *BaseProtocolMessage) UpdateEarliestAndValidateExtensionRules(extensionParser *extensionslib.ExtensionParser, earliestBlockHashRequested int64, addon string, seenBlock int64) bool {
if earliestBlockHashRequested >= 0 {
success := bpm.UpdateEarliestInMessage(earliestBlockHashRequested)
// check if we successfully updated the earliest block in the message
if success {
// parse the extensions for the new updated earliest block
extensionParser.ExtensionParsing(addon, bpm, uint64(seenBlock))
updatedProtocolExtensions := bpm.GetExtensions()
currentPrivateDataExtensions := bpm.RelayPrivateData().Extensions
utils.LavaFormatTrace("[Archive Debug] Trying to add extensions", utils.LogAttr("currentProtocolExtensions", updatedProtocolExtensions), utils.LogAttr("currentPrivateDataExtensions", currentPrivateDataExtensions))
if len(updatedProtocolExtensions) > len(currentPrivateDataExtensions) {
// we need to add the missing extension to the private data.
currentPrivateDataExtensions = bpm.addMissingExtensions(updatedProtocolExtensions, currentPrivateDataExtensions)
bpm.RelayPrivateData().Extensions = currentPrivateDataExtensions
utils.LavaFormatTrace("[Archive Debug] After Swap", utils.LogAttr("bpm.RelayPrivateData().Extensions", bpm.RelayPrivateData().Extensions))
return true
}
}
}
return false
}

func (bpm *BaseProtocolMessage) GetBlockedProviders() []string {
if bpm.directiveHeaders == nil {
return nil
Expand Down Expand Up @@ -65,4 +110,5 @@ type ProtocolMessage interface {
HashCacheRequest(chainId string) ([]byte, func([]byte) []byte, error)
GetBlockedProviders() []string
GetUserData() common.UserData
UpdateEarliestAndValidateExtensionRules(extensionParser *extensionslib.ExtensionParser, earliestBlockHashRequested int64, addon string, seenBlock int64) bool
}
25 changes: 14 additions & 11 deletions protocol/chainlib/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,13 @@ func (apip *RestChainParser) CraftMessage(parsing *spectypes.ParseDirective, con
if err != nil {
return nil, err
}
api := apiCont.api
apiCollection, err := apip.getApiCollection(connectionType, apiCont.collectionKey.InternalPath, apiCont.collectionKey.Addon)
if err != nil {
return nil, err
}
return apip.newChainMessage(api, spectypes.NOT_APPLICABLE, nil, restMessage, apiCollection), nil
parsedInput := parser.NewParsedInput()
parsedInput.SetBlock(spectypes.NOT_APPLICABLE)
return apip.newChainMessage(apiCont.api, parsedInput, restMessage, apiCollection), nil
}

// ParseMsg parses message data into chain message object
Expand Down Expand Up @@ -126,26 +127,28 @@ func (apip *RestChainParser) ParseMsg(urlPath string, data []byte, connectionTyp
utils.LogAttr("overwriteRequestedBlock", overwriteReqBlock),
)
parsedInput.SetBlock(spectypes.NOT_APPLICABLE)
} else {
parsedInput.UsedDefaultValue = false
}
}

parsedBlock := parsedInput.GetBlock()
blockHashes, _ := parsedInput.GetBlockHashes()

nodeMsg := apip.newChainMessage(apiCont.api, parsedBlock, blockHashes, &restMessage, apiCollection)
nodeMsg := apip.newChainMessage(apiCont.api, parsedInput, &restMessage, apiCollection)
apip.BaseChainParser.ExtensionParsing(apiCollection.CollectionData.AddOn, nodeMsg, extensionInfo)
return nodeMsg, apip.BaseChainParser.Validate(nodeMsg)
}

func (*RestChainParser) newChainMessage(serviceApi *spectypes.Api, requestBlock int64, requestedHashes []string, restMessage *rpcInterfaceMessages.RestMessage, apiCollection *spectypes.ApiCollection) *baseChainMessageContainer {
func (*RestChainParser) newChainMessage(api *spectypes.Api, parsedInput *parser.ParsedInput, restMessage *rpcInterfaceMessages.RestMessage, apiCollection *spectypes.ApiCollection) *baseChainMessageContainer {
requestedBlock := parsedInput.GetBlock()
requestedHashes, _ := parsedInput.GetBlockHashes()
nodeMsg := &baseChainMessageContainer{
api: serviceApi,
apiCollection: apiCollection,
api: api,
msg: restMessage,
latestRequestedBlock: requestBlock,
latestRequestedBlock: requestedBlock,
requestedBlockHashes: requestedHashes,
apiCollection: apiCollection,
resultErrorParsingMethod: restMessage.CheckResponseError,
parseDirective: GetParseDirective(serviceApi, apiCollection),
parseDirective: GetParseDirective(api, apiCollection),
usedDefaultValue: parsedInput.UsedDefaultValue,
}
return nodeMsg
}
Expand Down
Loading

0 comments on commit f41ce88

Please sign in to comment.