Skip to content

Commit

Permalink
feat(p2p): enable blocksync to work with multiple revisions (dymensio…
Browse files Browse the repository at this point in the history
…nxyz#1251)

Co-authored-by: Michael Tsitrin <[email protected]>
  • Loading branch information
srene and mtsitrin authored Dec 1, 2024
1 parent 7ef37e5 commit ab04841
Show file tree
Hide file tree
Showing 32 changed files with 410 additions and 176 deletions.
2 changes: 1 addition & 1 deletion .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ packages:
DataAvailabilityLayerClient:
github.com/dymensionxyz/dymint/p2p:
interfaces:
GetProposerI:
StateGetter:
github.com/dymensionxyz/dymint/block:
interfaces:
ExecutorI:
Expand Down
20 changes: 10 additions & 10 deletions block/fork.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,6 @@ const (

// MonitorForkUpdateLoop monitors the hub for fork updates in a loop
func (m *Manager) MonitorForkUpdateLoop(ctx context.Context) error {
// if instruction already exists no need to check for fork update
if types.InstructionExists(m.RootDir) {
return nil
}

ticker := time.NewTicker(ForkMonitorInterval) // TODO make this configurable
defer ticker.Stop()

Expand All @@ -44,6 +39,11 @@ func (m *Manager) MonitorForkUpdateLoop(ctx context.Context) error {

// checkForkUpdate checks if the hub has a fork update
func (m *Manager) checkForkUpdate(msg string) error {
// if instruction exists no need to check for fork update
if types.InstructionExists(m.RootDir) {
return nil
}

rollapp, err := m.SLClient.GetRollapp()
if err != nil {
return err
Expand Down Expand Up @@ -234,18 +234,18 @@ func (m *Manager) updateStateWhenFork() error {
// Set proposer to nil to force updating it from SL
m.State.SetProposer(nil)
// Upgrade revision on state
state := m.State
state.RevisionStartHeight = instruction.RevisionStartHeight
m.State.RevisionStartHeight = instruction.RevisionStartHeight
// this is necessary to pass ValidateConfigWithRollappParams when DRS upgrade is required
if instruction.RevisionStartHeight == m.State.NextHeight() {
state.SetRevision(instruction.Revision)
m.State.SetRevision(instruction.Revision)
drsVersion, err := version.GetDRSVersion()
if err != nil {
return err
}
state.RollappParams.DrsVersion = drsVersion
m.State.RollappParams.DrsVersion = drsVersion
}
m.State = state
_, err := m.Store.SaveState(m.State, nil)
return err
}
return nil
}
Expand Down
9 changes: 9 additions & 0 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,10 @@ func (m *Manager) Start(ctx context.Context) error {
if err != nil {
return err
}
_, err = m.Store.SaveState(m.State, nil)
if err != nil {
return err
}
}

// checks if the the current node is the proposer either on rollapp or on the hub.
Expand Down Expand Up @@ -317,6 +321,7 @@ func (m *Manager) updateFromLastSettlementState() error {
return err
}

m.P2PClient.UpdateLatestSeenHeight(latestHeight)
if latestHeight >= m.State.NextHeight() {
m.UpdateTargetHeight(latestHeight)
}
Expand Down Expand Up @@ -351,6 +356,10 @@ func (m *Manager) GetProposerPubKey() tmcrypto.PubKey {
return m.State.GetProposerPubKey()
}

func (m *Manager) GetRevision() uint64 {
return m.State.GetRevision()
}

func (m *Manager) UpdateTargetHeight(h uint64) {
for {
currentHeight := m.TargetHeight.Load()
Expand Down
7 changes: 4 additions & 3 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func TestApplyCachedBlocks_WithFraudCheck(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_, manager.Cancel = context.WithCancel(context.Background())
manager.Ctx, manager.Cancel = context.WithCancel(context.Background())
go event.MustSubscribe(
ctx,
manager.Pubsub,
Expand All @@ -274,8 +274,9 @@ func TestApplyCachedBlocks_WithFraudCheck(t *testing.T) {
assert.NoError(t, err)
blockData := p2p.BlockData{Block: *batch.Blocks[0], Commit: *batch.Commits[0]}
msg := pubsub.NewMessage(blockData, map[string][]string{p2p.EventTypeKey: {p2p.EventNewGossipedBlock}})
manager.OnReceivedBlock(msg)

if manager.Ctx.Err() == nil {
manager.OnReceivedBlock(msg)
}
// Wait until daHeight is updated
time.Sleep(time.Millisecond * 500)
}
Expand Down
2 changes: 2 additions & 0 deletions block/modes.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ func (m *Manager) runAsProposer(ctx context.Context, eg *errgroup.Group) error {
m.RunMode = RunModeProposer
// Subscribe to batch events, to update last submitted height in case batch confirmation was lost. This could happen if the sequencer crash/restarted just after submitting a batch to the settlement and by the time we query the last batch, this batch wasn't accepted yet.
go uevent.MustSubscribe(ctx, m.Pubsub, "updateSubmittedHeightLoop", settlement.EventQueryNewSettlementBatchAccepted, m.UpdateLastSubmittedHeight, m.logger)
// Subscribe to P2P received blocks events (used for P2P syncing).
go uevent.MustSubscribe(ctx, m.Pubsub, p2pBlocksyncLoop, p2p.EventQueryNewBlockSyncBlock, m.OnReceivedBlock, m.logger)

// Sequencer must wait till the DA light client is synced. Otherwise it will fail when submitting blocks.
// Full-nodes does not need to wait, but if it tries to fetch blocks from DA heights previous to the DA light client height it will fail, and it will retry till it reaches the height.
Expand Down
2 changes: 1 addition & 1 deletion block/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (m *Manager) saveP2PBlockToBlockSync(block *types.Block, commit *types.Comm
if err != nil {
return fmt.Errorf("marshal binary: %w: %w", err, ErrNonRecoverable)
}
err = m.P2PClient.SaveBlock(context.Background(), block.Header.Height, gossipedBlockBytes)
err = m.P2PClient.SaveBlock(context.Background(), block.Header.Height, block.GetRevision(), gossipedBlockBytes)
if err != nil {
m.logger.Error("Adding block to blocksync store.", "err", err, "height", gossipedBlock.Block.Header.Height)
}
Expand Down
16 changes: 9 additions & 7 deletions block/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ func (m *Manager) ApplyBatchFromSL(slBatch *settlement.Batch) error {
m.retrieverMu.Lock()
defer m.retrieverMu.Unlock()

var lastAppliedHeight float64
// if batch blocks have already been applied skip, otherwise it will fail in endheight validation (it can happen when syncing from blocksync in parallel).
if m.State.Height() > slBatch.EndHeight {
return nil
}

blockIndex := 0
for _, batch := range batchResp.Batches {
for i, block := range batch.Blocks {
Expand All @@ -37,7 +41,7 @@ func (m *Manager) ApplyBatchFromSL(slBatch *settlement.Batch) error {
}

if block.GetRevision() != m.State.GetRevision() {
err := m.checkForkUpdate("syncing to fork height. please restart the node.")
err := m.checkForkUpdate(fmt.Sprintf("syncing to fork height. received block revision: %d node revision: %d. please restart the node.", block.GetRevision(), m.State.GetRevision()))
return err
}

Expand All @@ -47,8 +51,6 @@ func (m *Manager) ApplyBatchFromSL(slBatch *settlement.Batch) error {
return fmt.Errorf("apply block: height: %d: %w", block.Header.Height, err)
}

lastAppliedHeight = float64(block.Header.Height)

m.blockCache.Delete(block.Header.Height)
}
}
Expand All @@ -58,8 +60,6 @@ func (m *Manager) ApplyBatchFromSL(slBatch *settlement.Batch) error {
return fmt.Errorf("state height mismatch: state height: %d: batch end height: %d", m.State.Height(), slBatch.EndHeight)
}

types.LastReceivedDAHeightGauge.Set(lastAppliedHeight)

return nil
}

Expand All @@ -71,10 +71,12 @@ func (m *Manager) ApplyBatchFromSL(slBatch *settlement.Batch) error {
// if seq produces new block H, it can lead to double signing, as the old block can still be in the p2p network
// ----
// when this scenario encountered previously, we wanted to apply same block instead of producing new one
func (m *Manager) applyLocalBlock(height uint64) error {
func (m *Manager) applyLocalBlock() error {
defer m.retrieverMu.Unlock()
m.retrieverMu.Lock()

height := m.State.NextHeight()

block, err := m.Store.LoadBlock(height)
if err != nil {
return fmt.Errorf("load block: %w", gerrc.ErrNotFound)
Expand Down
1 change: 1 addition & 0 deletions block/slvalidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func (v *SettlementValidator) ValidateStateUpdate(batch *settlement.ResultRetrie
daBlocks := []*types.Block{}
for _, batch := range daBatch.Batches {
daBlocks = append(daBlocks, batch.Blocks...)
types.LastReceivedDAHeightGauge.Set(float64(batch.EndHeight()))
}

// validate DA blocks against the state update
Expand Down
6 changes: 6 additions & 0 deletions block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,12 @@ func (m *Manager) CreateBatch(maxBatchSize uint64, startHeight uint64, endHeight
if err != nil {
return nil, fmt.Errorf("load drs version: h: %d: %w", h, err)
}

// check all blocks have the same revision
if len(batch.Blocks) > 0 && batch.Blocks[len(batch.Blocks)-1].GetRevision() != block.GetRevision() {
return nil, fmt.Errorf("create batch: batch includes blocks with different revisions: %w", gerrc.ErrInternal)
}

batch.Blocks = append(batch.Blocks, block)
batch.Commits = append(batch.Commits, commit)
batch.DRSVersion = append(batch.DRSVersion, drsVersion)
Expand Down
14 changes: 10 additions & 4 deletions block/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"

"github.com/dymensionxyz/dymint/da"
"github.com/dymensionxyz/gerr-cosmos/gerrc"
"github.com/tendermint/tendermint/libs/pubsub"

Expand Down Expand Up @@ -60,9 +61,9 @@ func (m *Manager) SettlementSyncLoop(ctx context.Context) error {
}
// if we have the block locally, we don't need to fetch it from the DA.
// it will only happen in case of rollback.
err := m.applyLocalBlock(currH)
err := m.applyLocalBlock()
if err == nil {
m.logger.Info("Synced from local", "store height", currH, "target height", m.LastSettlementHeight.Load())
m.logger.Info("Synced from local", "store height", m.State.Height(), "target height", m.LastSettlementHeight.Load())
continue
}
if !errors.Is(err, gerrc.ErrNotFound) {
Expand All @@ -79,6 +80,11 @@ func (m *Manager) SettlementSyncLoop(ctx context.Context) error {
m.LastBlockTimeInSettlement.Store(settlementBatch.BlockDescriptors[len(settlementBatch.BlockDescriptors)-1].GetTimestamp().UTC().UnixNano())

err = m.ApplyBatchFromSL(settlementBatch.Batch)

// this will keep sync loop alive when DA is down or retrievals are failing because DA issues.
if errors.Is(err, da.ErrRetrieval) {
continue
}
if err != nil {
return fmt.Errorf("process next DA batch. err:%w", err)
}
Expand All @@ -95,8 +101,8 @@ func (m *Manager) SettlementSyncLoop(ctx context.Context) error {

}

// avoid notifying as synced in case if fails before
if m.State.Height() == m.LastSettlementHeight.Load() {
// avoid notifying as synced in case it fails before
if m.State.Height() >= m.LastSettlementHeight.Load() {
m.logger.Info("Synced.", "current height", m.State.Height(), "last submitted height", m.LastSettlementHeight.Load())
// nudge to signal to any listens that we're currently synced with the last settlement height we've seen so far
m.syncedFromSettlement.Nudge()
Expand Down
11 changes: 3 additions & 8 deletions da/celestia/celestia.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,23 +267,18 @@ func (c *DataAvailabilityLayerClient) RetrieveBatches(daMetaData *da.DASubmitMet
return da.ResultRetrieveBatch{}
default:
var resultRetrieveBatch da.ResultRetrieveBatch

err := retry.Do(
func() error {
resultRetrieveBatch = c.retrieveBatches(daMetaData)

if errors.Is(resultRetrieveBatch.Error, da.ErrRetrieval) {
c.logger.Error("Retrieve batch.", "error", resultRetrieveBatch.Error)
return resultRetrieveBatch.Error
}

return nil
return resultRetrieveBatch.Error
},
retry.Attempts(uint(*c.config.RetryAttempts)), //nolint:gosec // RetryAttempts should be always positive
retry.DelayType(retry.FixedDelay),
retry.Delay(c.config.RetryDelay),
)
if err != nil {
c.logger.Error("RetrieveBatches process failed.", "error", err)
c.logger.Error("Retrieve batch", "height", daMetaData.Height, "commitment", hex.EncodeToString(daMetaData.Commitment))
}
return resultRetrieveBatch

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ require (
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/btcsuite/btcd/btcutil v1.1.3 // indirect
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
github.com/cockroachdb/errors v1.11.1 // indirect
github.com/cockroachdb/errors v1.11.1
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect
github.com/cockroachdb/pebble v1.1.0 // indirect
github.com/cockroachdb/redact v1.1.5 // indirect
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

82 changes: 0 additions & 82 deletions mocks/github.com/dymensionxyz/dymint/p2p/mock_GetProposerI.go

This file was deleted.

Loading

0 comments on commit ab04841

Please sign in to comment.