Skip to content

Commit

Permalink
feat(block manager): allows loading blocks from db when syncing (dyme…
Browse files Browse the repository at this point in the history
  • Loading branch information
mtsitrin authored Jun 4, 2024
1 parent d9dec3e commit 2f49475
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 39 deletions.
92 changes: 53 additions & 39 deletions block/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ package block
import (
"context"
"fmt"
"time"

"github.com/avast/retry-go/v4"

"code.cloudfoundry.org/go-diodes"
"github.com/dymensionxyz/dymint/da"
Expand Down Expand Up @@ -36,29 +33,19 @@ func (m *Manager) RetrieveLoop(ctx context.Context) {
// It fetches the batches from the settlement, gets the DA height and gets
// the actual blocks from the DA.
func (m *Manager) syncToTargetHeight(targetHeight uint64) error {
for currH := m.State.Height(); currH < targetHeight; currH = m.State.Height() {

// It's important that we query the state index before fetching the batch, rather
// than e.g. keep it and increment it, because we might be concurrently applying blocks
// and may require a higher index than expected.
stateIndex, err := m.queryStateIndex()
if err != nil {
return fmt.Errorf("query state index: %w", err)
for currH := m.State.NextHeight(); currH <= targetHeight; currH = m.State.NextHeight() {
// if we have the block locally, we don't need to fetch it from the DA
err := m.processLocalBlock(currH)
if err == nil {
m.logger.Info("Synced from local", "store height", currH, "target height", targetHeight)
continue
}

settlementBatch, err := m.SLClient.GetBatchAtIndex(stateIndex)
if err != nil {
return fmt.Errorf("retrieve batch: %w", err)
}

m.logger.Info("Retrieved batch.", "state_index", stateIndex)

err = m.ProcessNextDABatch(settlementBatch.MetaData.DA)
err = m.syncFromDABatch()
if err != nil {
return fmt.Errorf("process next DA batch: %w", err)
}
m.logger.Info("Synced from DA", "store height", currH, "target height", targetHeight)

m.logger.Info("Synced from DA", "store height", m.State.Height(), "target height", targetHeight)
}

err := m.attemptApplyCachedBlocks()
Expand All @@ -69,24 +56,51 @@ func (m *Manager) syncToTargetHeight(targetHeight uint64) error {
return nil
}

// TODO: we could encapsulate the retry in the SL client
func (m *Manager) queryStateIndex() (uint64, error) {
var stateIndex uint64
return stateIndex, retry.Do(
func() error {
res, err := m.SLClient.GetHeightState(m.State.NextHeight())
if err != nil {
m.logger.Debug("sl client get height state", "error", err)
return err
}
stateIndex = res.State.StateIndex
return nil
},
retry.Attempts(0), // try forever
retry.Delay(500*time.Millisecond),
retry.LastErrorOnly(true),
retry.DelayType(retry.FixedDelay),
)
func (m *Manager) syncFromDABatch() error {
// It's important that we query the state index before fetching the batch, rather
// than e.g. keep it and increment it, because we might be concurrently applying blocks
// and may require a higher index than expected.
res, err := m.SLClient.GetHeightState(m.State.NextHeight())
if err != nil {
return fmt.Errorf("retrieve state: %w", err)
}
stateIndex := res.State.StateIndex

settlementBatch, err := m.SLClient.GetBatchAtIndex(stateIndex)
if err != nil {
return fmt.Errorf("retrieve batch: %w", err)
}

m.logger.Info("Retrieved batch.", "state_index", stateIndex)

err = m.ProcessNextDABatch(settlementBatch.MetaData.DA)
if err != nil {
return fmt.Errorf("process next DA batch: %w", err)
}
return nil
}

func (m *Manager) processLocalBlock(height uint64) error {
block, err := m.Store.LoadBlock(height)
if err != nil {
return err
}
commit, err := m.Store.LoadCommit(height)
if err != nil {
return err
}
if err := m.validateBlock(block, commit); err != nil {
return fmt.Errorf("validate block from local store: height: %d: %w", height, err)
}

m.retrieverMu.Lock()
err = m.applyBlock(block, commit, blockMetaData{source: localDbBlock})
if err != nil {
return fmt.Errorf("apply block from local store: height: %d: %w", height, err)
}
m.retrieverMu.Unlock()

return nil
}

func (m *Manager) ProcessNextDABatch(daMetaData *da.DASubmitMetaData) error {
Expand Down
1 change: 1 addition & 0 deletions block/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const (
producedBlock blockSource = "produced"
gossipedBlock blockSource = "gossip"
daBlock blockSource = "da"
localDbBlock blockSource = "local_db"
)

type blockMetaData struct {
Expand Down

0 comments on commit 2f49475

Please sign in to comment.