Skip to content

Commit

Permalink
PRT-1061 state tracker recovery from pruned block (lavanet#1107)
Browse files Browse the repository at this point in the history
* create reset state support for state tracker. triggered on huge block gap

* lint

* reset if our block time reaches 1h delay

* remove commented code

* adding unitest to validate new chain tracker functionality

* fixing timeouts and locks
  • Loading branch information
ranlavanet authored Jan 8, 2024
1 parent 311068b commit b222bad
Show file tree
Hide file tree
Showing 18 changed files with 286 additions and 130 deletions.
7 changes: 2 additions & 5 deletions protocol/chaintracker/chain_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type ChainTracker struct {
blockQueueMu sync.RWMutex
blocksQueue []BlockStore // holds all past hashes up until latest block
forkCallback func(int64) // a function to be called when a fork is detected
newLatestCallback func(int64, string) // a function to be called when a new block is detected
newLatestCallback func(int64, int64, string) // a function to be called when a new block is detected, from what block to what block including gaps
oldBlockCallback func(latestBlockTime time.Time) // a function to be called when an old block is detected
consistencyCallback func(oldBlock int64, block int64)
serverBlockMemory uint64
Expand Down Expand Up @@ -320,10 +320,7 @@ func (cs *ChainTracker) fetchAllPreviousBlocksIfNecessary(ctx context.Context) (
}
if gotNewBlock {
if cs.newLatestCallback != nil {
for i := prev_latest + 1; i <= newLatestBlock; i++ {
// on catch up of several blocks we don't want to miss any callbacks
cs.newLatestCallback(i, latestHash) // TODO: this is calling the latest hash only repeatedly, this is not precise, currently not used anywhere except for prints
}
cs.newLatestCallback(prev_latest, newLatestBlock, latestHash) // TODO: this is calling the latest hash only repeatedly, this is not precise, currently not used anywhere except for prints
}
blocksUpdated := uint64(newLatestBlock - prev_latest)
// update our timer resolution
Expand Down
10 changes: 8 additions & 2 deletions protocol/chaintracker/chain_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,15 +292,20 @@ func TestChainTrackerCallbacks(t *testing.T) {
}
// used to identify if the newLatest callback was called
callbackCalledNewLatest := false
newBlockCallback := func(arg int64, hash string) {
utils.LavaFormatDebug("new latest callback called")
callbackCalledTimes := 0
newBlockCallback := func(blockFrom int64, blockTo int64, hash string) {
callbackCalledNewLatest = true
for block := blockFrom + 1; block <= blockTo; block++ {
callbackCalledTimes++
}
}
chainTrackerConfig := chaintracker.ChainTrackerConfig{BlocksToSave: uint64(fetcherBlocks), AverageBlockTime: TimeForPollingMock, ServerBlockMemory: uint64(mockBlocks), ForkCallback: forkCallback, NewLatestCallback: newBlockCallback}
chainTracker, err := chaintracker.NewChainTracker(context.Background(), mockChainFetcher, chainTrackerConfig)
require.NoError(t, err)
totalAdvancement := 0
t.Run("one long test", func(t *testing.T) {
for _, tt := range tests {
totalAdvancement += int(tt.advancement)
utils.LavaFormatInfo(startedTestStr + tt.name)
callbackCalledFork = false
callbackCalledNewLatest = false
Expand Down Expand Up @@ -335,6 +340,7 @@ func TestChainTrackerCallbacks(t *testing.T) {
} else {
require.False(t, callbackCalledFork)
}
require.Equal(t, totalAdvancement, callbackCalledTimes)
if tt.advancement > 0 {
require.True(t, callbackCalledNewLatest)
} else {
Expand Down
4 changes: 2 additions & 2 deletions protocol/chaintracker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ const (
)

type ChainTrackerConfig struct {
ForkCallback func(block int64) // a function to be called when a fork is detected
NewLatestCallback func(block int64, hash string) // a function to be called when a new block is detected
ForkCallback func(block int64) // a function to be called when a fork is detected
NewLatestCallback func(blockFrom int64, blockTo int64, hash string) // a function to be called when a new block is detected
ConsistencyCallback func(oldBlock int64, block int64)
OldBlockCallback func(latestBlockTime time.Time)
ServerAddress string // if not empty will open up a grpc server for that address
Expand Down
16 changes: 9 additions & 7 deletions protocol/rpcconsumer/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,15 @@ func startTesting(ctx context.Context, clientCtx client.Context, txFactory tx.Fa
if err != nil {
return utils.LavaFormatError("panic severity critical error, failed creating chain proxy, continuing with others endpoints", err, utils.Attribute{Key: "parallelConnections", Value: uint64(parallelConnections)}, utils.Attribute{Key: "rpcProviderEndpoint", Value: rpcProviderEndpoint})
}
printOnNewLatestCallback := func(block int64, hash string) {
utils.LavaFormatInfo("Received a new Block",
utils.Attribute{Key: "block", Value: block},
utils.Attribute{Key: "hash", Value: hash},
utils.Attribute{Key: "Chain", Value: rpcProviderEndpoint.ChainID},
utils.Attribute{Key: "apiInterface", Value: rpcProviderEndpoint.ApiInterface},
)
printOnNewLatestCallback := func(blockFrom int64, blockTo int64, hash string) {
for block := blockFrom + 1; block <= blockTo; block++ {
utils.LavaFormatInfo("Received a new Block",
utils.Attribute{Key: "block", Value: block},
utils.Attribute{Key: "hash", Value: hash},
utils.Attribute{Key: "Chain", Value: rpcProviderEndpoint.ChainID},
utils.Attribute{Key: "apiInterface", Value: rpcProviderEndpoint.ApiInterface},
)
}
}
consistencyErrorCallback := func(oldBlock, newBlock int64) {
utils.LavaFormatError("Consistency issue detected", nil,
Expand Down
6 changes: 4 additions & 2 deletions protocol/rpcprovider/rpcprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,10 @@ func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint
_, averageBlockTime, blocksToFinalization, blocksInFinalizationData := chainParser.ChainBlockStats()
var chainTracker *chaintracker.ChainTracker
// chainTracker accepts a callback to be called on new blocks, we use this to call metrics update on a new block
recordMetricsOnNewBlock := func(block int64, hash string) {
rpcp.providerMetricsManager.SetLatestBlock(chainID, uint64(block))
recordMetricsOnNewBlock := func(blockFrom int64, blockTo int64, hash string) {
for block := blockFrom + 1; block <= blockTo; block++ {
rpcp.providerMetricsManager.SetLatestBlock(chainID, uint64(block))
}
}

// in order to utilize shared resources between chains we need go routines with the same chain to wait for one another here
Expand Down
4 changes: 2 additions & 2 deletions protocol/statetracker/consumer_state_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ func (cst *ConsumerStateTracker) RegisterForPairingUpdates(ctx context.Context,
}

func (cst *ConsumerStateTracker) RegisterFinalizationConsensusForUpdates(ctx context.Context, finalizationConsensus *lavaprotocol.FinalizationConsensus) {
finalizationConsensusUpdater := NewFinalizationConsensusUpdater(cst.stateQuery)
finalizationConsensusUpdater := updaters.NewFinalizationConsensusUpdater(cst.stateQuery)
finalizationConsensusUpdaterRaw := cst.StateTracker.RegisterForUpdates(ctx, finalizationConsensusUpdater)
finalizationConsensusUpdater, ok := finalizationConsensusUpdaterRaw.(*FinalizationConsensusUpdater)
finalizationConsensusUpdater, ok := finalizationConsensusUpdaterRaw.(*updaters.FinalizationConsensusUpdater)
if !ok {
utils.LavaFormatFatal("invalid updater type returned from RegisterForUpdates", nil, utils.Attribute{Key: "updater", Value: finalizationConsensusUpdaterRaw})
}
Expand Down
60 changes: 29 additions & 31 deletions protocol/statetracker/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,41 +51,31 @@ func eventsLookup(ctx context.Context, clientCtx client.Context, blocks, fromBlo
if latestHeight < blocks {
return utils.LavaFormatError("requested blocks is bigger than latest block height", nil, utils.Attribute{Key: "requested", Value: blocks}, utils.Attribute{Key: "latestHeight", Value: latestHeight})
}

readEventsFromBlock := func(block int64, hash string) {
brp, err := updaters.TryIntoTendermintRPC(clientCtx.Client)
if err != nil {
utils.LavaFormatFatal("invalid blockResults provider", err)
}
blockResults, err := brp.BlockResults(ctx, &block)
if err != nil {
utils.LavaFormatError("invalid blockResults status", err)
return
}
for _, event := range blockResults.BeginBlockEvents {
checkEventForShow(eventName, event, hasAttributeName, value, block, showAttributeName)
}
transactionResults := blockResults.TxsResults
for _, tx := range transactionResults {
events := tx.Events
for _, event := range events {
ticker := time.NewTicker(5 * time.Second)
readEventsFromBlock := func(blockFrom int64, blockTo int64, hash string) {
for block := blockFrom; block < blockTo; block++ {
brp, err := updaters.TryIntoTendermintRPC(clientCtx.Client)
if err != nil {
utils.LavaFormatFatal("invalid blockResults provider", err)
}
blockResults, err := brp.BlockResults(ctx, &block)
if err != nil {
utils.LavaFormatError("invalid blockResults status", err)
return
}
for _, event := range blockResults.BeginBlockEvents {
checkEventForShow(eventName, event, hasAttributeName, value, block, showAttributeName)
}
}
}

if blocks > 0 {
if fromBlock <= 0 {
fromBlock = latestHeight - blocks
}
ticker := time.NewTicker(5 * time.Second)
utils.LavaFormatInfo("Reading Events", utils.Attribute{Key: "from", Value: fromBlock}, utils.Attribute{Key: "to", Value: fromBlock + blocks})
for block := fromBlock; block < fromBlock+blocks; block++ {
readEventsFromBlock(block, "")
// if the user aborted stop
transactionResults := blockResults.TxsResults
for _, tx := range transactionResults {
events := tx.Events
for _, event := range events {
checkEventForShow(eventName, event, hasAttributeName, value, block, showAttributeName)
}
}
select {
case <-signalChan:
return nil
return
case <-ticker.C:
if !disableInteractive {
fmt.Printf("Current Block: %d\r", block)
Expand All @@ -94,6 +84,14 @@ func eventsLookup(ctx context.Context, clientCtx client.Context, blocks, fromBlo
}
}
}

if blocks > 0 {
if fromBlock <= 0 {
fromBlock = latestHeight - blocks
}
utils.LavaFormatInfo("Reading Events", utils.Attribute{Key: "from", Value: fromBlock}, utils.Attribute{Key: "to", Value: fromBlock + blocks})
readEventsFromBlock(fromBlock, fromBlock+blocks, "")
}
lavaChainFetcher := chainlib.NewLavaChainFetcher(ctx, clientCtx)
latestBlock, err := lavaChainFetcher.FetchLatestBlockNum(ctx)
if err != nil {
Expand Down
36 changes: 28 additions & 8 deletions protocol/statetracker/state_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type StateTracker struct {

type Updater interface {
Update(int64)
Reset(int64)
UpdaterKey() string
}

Expand Down Expand Up @@ -95,18 +96,37 @@ func (st *StateTracker) GetAverageBlockTime() time.Duration {
return st.AverageBlockTime
}

func (st *StateTracker) newLavaBlock(latestBlock int64, hash string) {
func (st *StateTracker) newLavaBlock(blockFrom int64, blockTo int64, hash string) {
// go over the registered updaters and trigger update
st.registrationLock.RLock()
defer st.registrationLock.RUnlock()
// first update event tracker
err := st.EventTracker.UpdateBlockResults(latestBlock)
if err != nil {
utils.LavaFormatWarning("calling update without updated events tracker", err)
// if we had a huge gap
if time.Duration(blockTo-blockFrom)*st.AverageBlockTime > time.Hour { // if we are 1H behind
// in case we have a huge gap we launch a reset on the state of all the updaters. as the state is no longer valid.
// this can be caused by a huge catch up on blocks after a halt or a sync state on the node. sometimes pruning the blocks the protocol requires.
// therefore we need to reset the state and fetch all information from the chain
// first update the event tracker to latest block.
err := st.EventTracker.UpdateBlockResults(blockTo)
if err != nil {
utils.LavaFormatError("failing to fetch latest result after gap", err, utils.LogAttr("blockFrom", blockFrom), utils.LogAttr("blockTo", blockTo))
}
// reset will try to reset the updaters. if it fails it will retry every update until it succeeds.
for _, updater := range st.newLavaBlockUpdaters {
updater.Reset(blockTo)
}
return // return after state has been reset.
}
// after events were updated we can trigger updaters
for _, updater := range st.newLavaBlockUpdaters {
updater.Update(latestBlock)

for block := blockFrom + 1; block <= blockTo; block++ {
// first update event tracker
err := st.EventTracker.UpdateBlockResults(block)
if err != nil {
utils.LavaFormatWarning("calling update without updated events tracker", err)
}
// after events were updated we can trigger updaters
for _, updater := range st.newLavaBlockUpdaters {
updater.Update(block)
}
}
}

Expand Down
62 changes: 43 additions & 19 deletions protocol/statetracker/updaters/downtime_params_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type DowntimeParamsUpdater struct {
downtimeParamsStateQuery DowntimeParamsStateQuery
downtimeParams *downtimev1.Params
downtimeParamsUpdatables []*DowntimeParamsUpdatable
shouldUpdate bool
}

func NewDowntimeParamsUpdater(downtimeParamsStateQuery DowntimeParamsStateQuery, eventTracker *EventTracker) *DowntimeParamsUpdater {
Expand Down Expand Up @@ -60,28 +61,51 @@ func (dpu *DowntimeParamsUpdater) RegisterDowntimeParamsUpdatable(ctx context.Co
return nil
}

func (dpu *DowntimeParamsUpdater) Update(latestBlock int64) {
func (dpu *DowntimeParamsUpdater) fetchResourcesAndUpdateHandlers() error {
// fetch updated downtime params from consensus
timeoutCtx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
params, err := dpu.downtimeParamsStateQuery.GetDowntimeParams(timeoutCtx)
if err != nil {
return utils.LavaFormatError("Failed fetching latest Downtime params from chain", err)
}

for _, downtimeParamsUpdatable := range dpu.downtimeParamsUpdatables {
// iterate over all updaters and execute their updatable
(*downtimeParamsUpdatable).SetDowntimeParams(*params)
}
return nil
}

// call only when locked.
func (dpu *DowntimeParamsUpdater) updateInner(latestBlock int64) {
err := dpu.fetchResourcesAndUpdateHandlers()
if err == nil {
utils.LavaFormatDebug("Updated Downtime params successfully")
dpu.shouldUpdate = false
} else {
utils.LavaFormatError("Failed updating downtime parameters", err, utils.LogAttr("block", latestBlock))
}
}

func (dpu *DowntimeParamsUpdater) Reset(latestBlock int64) {
dpu.lock.Lock()
defer dpu.lock.Unlock()
paramsUpdated, err := dpu.eventTracker.getLatestDowntimeParamsUpdateEvents(latestBlock)
if paramsUpdated || err != nil {
var params *downtimev1.Params
// fetch updated downtime params from consensus
for i := 0; i < BlockResultRetry; i++ {
params, err = dpu.downtimeParamsStateQuery.GetDowntimeParams(context.Background())
if err == nil {
break
}
time.Sleep(50 * time.Millisecond * time.Duration(i+1))
}

if params == nil {
return
}
utils.LavaFormatDebug("Reset Triggered for Downtime Updater", utils.LogAttr("block", latestBlock))
dpu.shouldUpdate = true
dpu.updateInner(latestBlock)
}

for _, downtimeParamsUpdatable := range dpu.downtimeParamsUpdatables {
// iterate over all updaters and execute their updatable
(*downtimeParamsUpdatable).SetDowntimeParams(*params)
func (dpu *DowntimeParamsUpdater) Update(latestBlock int64) {
dpu.lock.Lock()
defer dpu.lock.Unlock()
if dpu.shouldUpdate {
dpu.updateInner(latestBlock)
} else {
paramsUpdated, err := dpu.eventTracker.getLatestDowntimeParamsUpdateEvents(latestBlock)
if paramsUpdated || err != nil {
dpu.shouldUpdate = true // in case we fail to update now. remember to update next block update
dpu.updateInner(latestBlock)
}
}
}
16 changes: 14 additions & 2 deletions protocol/statetracker/updaters/epoch_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package updaters

import (
"sync"
"time"

"github.com/lavanet/lava/utils"
"golang.org/x/net/context"
Expand Down Expand Up @@ -74,10 +75,12 @@ func (eu *EpochUpdater) UpdaterKey() string {
return CallbackKeyForEpochUpdate
}

func (eu *EpochUpdater) Update(latestBlock int64) {
// our epoch updater always fetches the latest epoch start params.
func (eu *EpochUpdater) updateInner(latestBlock int64) {
eu.lock.Lock()
defer eu.lock.Unlock()
ctx := context.Background()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
currentEpoch, err := eu.stateQuery.CurrentEpochStart(ctx)
if err != nil {
return // failed to get the current epoch
Expand All @@ -100,3 +103,12 @@ func (eu *EpochUpdater) Update(latestBlock int64) {
epochUpdatable.UpdateOnBlock(currentEpoch, latestBlock)
}
}

func (eu *EpochUpdater) Reset(latestBlock int64) {
utils.LavaFormatDebug("Reset triggered for Epoch Updater", utils.LogAttr("block", latestBlock))
eu.updateInner(latestBlock)
}

func (eu *EpochUpdater) Update(latestBlock int64) {
eu.updateInner(latestBlock)
}
1 change: 1 addition & 0 deletions protocol/statetracker/updaters/event_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func (et *EventTracker) UpdateBlockResults(latestBlock int64) (err error) {
}
latestBlock = res.SyncInfo.LatestBlockHeight
}

brp, err := TryIntoTendermintRPC(et.ClientCtx.Client)
if err != nil {
return utils.LavaFormatError("could not get block result provider", err)
Expand Down
Loading

0 comments on commit b222bad

Please sign in to comment.