Skip to content

Commit

Permalink
feat: proposer and sequencer set decoupling (dymensionxyz#1163)
Browse files Browse the repository at this point in the history
  • Loading branch information
keruch authored Oct 28, 2024
1 parent 314003c commit 753c630
Show file tree
Hide file tree
Showing 41 changed files with 597 additions and 790 deletions.
17 changes: 11 additions & 6 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
m.logger.Error("save block blocksync", "err", err)
}

responses, err := m.Executor.ExecuteBlock(m.State, block)
responses, err := m.Executor.ExecuteBlock(block)
if err != nil {
return fmt.Errorf("execute block: %w", err)
}
Expand Down Expand Up @@ -133,14 +133,16 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
m.Executor.UpdateStateAfterCommit(m.State, responses, appHash, block.Header.Height, block.Header.Hash())
}

// check if the proposer needs to be changed
switchRole := m.Executor.UpdateProposerFromBlock(m.State, block)
// save the proposer to store to be queried over RPC
proposer := m.State.GetProposer()
if proposer == nil {
return fmt.Errorf("logic error: got nil proposer while applying block")
}

// save sequencers to store to be queried over RPC
batch := m.Store.NewBatch()
batch, err = m.Store.SaveSequencers(block.Header.Height, &m.State.Sequencers, batch)
batch, err = m.Store.SaveProposer(block.Header.Height, *proposer, batch)
if err != nil {
return fmt.Errorf("save sequencers: %w", err)
return fmt.Errorf("save proposer: %w", err)
}

batch, err = m.Store.SaveState(m.State, batch)
Expand All @@ -157,6 +159,9 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta

m.blockCache.Delete(block.Header.Height)

// check if the proposer needs to be changed and change it if that's the case
switchRole := m.Executor.UpdateProposerFromBlock(m.State, m.Sequencers, block)

if switchRole {
// TODO: graceful role change (https://github.com/dymensionxyz/dymint/issues/1008)
m.logger.Info("Node changing to proposer role")
Expand Down
10 changes: 6 additions & 4 deletions block/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ type ExecutorI interface {
CreateBlock(height uint64, lastCommit *types.Commit, lastHeaderHash, nextSeqHash [32]byte, state *types.State, maxBlockDataSizeBytes uint64) *types.Block
Commit(state *types.State, block *types.Block, resp *tmstate.ABCIResponses) ([]byte, int64, error)
GetAppInfo() (*abci.ResponseInfo, error)
ExecuteBlock(state *types.State, block *types.Block) (*tmstate.ABCIResponses, error)
ExecuteBlock(block *types.Block) (*tmstate.ABCIResponses, error)
UpdateStateAfterInitChain(s *types.State, res *abci.ResponseInitChain)
UpdateMempoolAfterInitChain(s *types.State)
UpdateStateAfterCommit(s *types.State, resp *tmstate.ABCIResponses, appHash []byte, height uint64, lastHeaderHash [32]byte)
UpdateProposerFromBlock(s *types.State, block *types.Block) bool
UpdateProposerFromBlock(s *types.State, seqSet *types.SequencerSet, block *types.Block) bool
}

var _ ExecutorI = new(Executor)

// Executor creates and applies blocks and maintains state.
type Executor struct {
localAddress []byte
Expand Down Expand Up @@ -163,7 +165,7 @@ func (e *Executor) CreateBlock(
}
copy(block.Header.LastCommitHash[:], types.GetLastCommitHash(lastCommit, &block.Header))
copy(block.Header.DataHash[:], types.GetDataHash(block))
copy(block.Header.SequencerHash[:], state.Sequencers.ProposerHash())
copy(block.Header.SequencerHash[:], state.GetProposerHash())
copy(block.Header.NextSequencersHash[:], nextSeqHash[:])

return block
Expand Down Expand Up @@ -216,7 +218,7 @@ func (e *Executor) commit(state *types.State, block *types.Block, deliverTxs []*
}

// ExecuteBlock executes the block and returns the ABCIResponses. Block should be valid (passed validation checks).
func (e *Executor) ExecuteBlock(state *types.State, block *types.Block) (*tmstate.ABCIResponses, error) {
func (e *Executor) ExecuteBlock(block *types.Block) (*tmstate.ABCIResponses, error) {
abciResponses := new(tmstate.ABCIResponses)
abciResponses.DeliverTxs = make([]*abci.ResponseDeliverTx, len(block.Data.Txs))

Expand Down
28 changes: 14 additions & 14 deletions block/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,19 @@ func TestCreateBlock(t *testing.T) {

// Init state
state := &types.State{}
state.Sequencers.SetProposer(types.NewSequencerFromValidator(*tmtypes.NewValidator(tmPubKey, 1)))
state.SetProposer(types.NewSequencerFromValidator(*tmtypes.NewValidator(tmPubKey, 1)))
state.ConsensusParams.Block.MaxBytes = int64(maxBytes)
state.ConsensusParams.Block.MaxGas = 100000
// empty block
block := executor.CreateBlock(1, &types.Commit{}, [32]byte{}, [32]byte(state.Sequencers.ProposerHash()[:]), state, maxBytes)
block := executor.CreateBlock(1, &types.Commit{}, [32]byte{}, [32]byte(state.GetProposerHash()), state, maxBytes)
require.NotNil(block)
assert.Empty(block.Data.Txs)
assert.Equal(uint64(1), block.Header.Height)

// one small Tx
err = mpool.CheckTx([]byte{1, 2, 3, 4}, func(r *abci.Response) {}, mempool.TxInfo{})
require.NoError(err)
block = executor.CreateBlock(2, &types.Commit{}, [32]byte{}, [32]byte(state.Sequencers.ProposerHash()), state, maxBytes)
block = executor.CreateBlock(2, &types.Commit{}, [32]byte{}, [32]byte(state.GetProposerHash()), state, maxBytes)
require.NotNil(block)
assert.Equal(uint64(2), block.Header.Height)
assert.Len(block.Data.Txs, 1)
Expand All @@ -85,7 +85,7 @@ func TestCreateBlock(t *testing.T) {
require.NoError(err)
err = mpool.CheckTx(make([]byte, 100), func(r *abci.Response) {}, mempool.TxInfo{})
require.NoError(err)
block = executor.CreateBlock(3, &types.Commit{}, [32]byte{}, [32]byte(state.Sequencers.ProposerHash()), state, maxBytes)
block = executor.CreateBlock(3, &types.Commit{}, [32]byte{}, [32]byte(state.GetProposerHash()), state, maxBytes)
require.NotNil(block)
assert.Len(block.Data.Txs, 2)
}
Expand Down Expand Up @@ -131,11 +131,11 @@ func TestCreateBlockWithConsensusMessages(t *testing.T) {
require.NoError(err)

state := &types.State{}
state.Sequencers.SetProposer(types.NewSequencerFromValidator(*tmtypes.NewValidator(tmPubKey, 1)))
state.SetProposer(types.NewSequencerFromValidator(*tmtypes.NewValidator(tmPubKey, 1)))
state.ConsensusParams.Block.MaxBytes = int64(maxBytes)
state.ConsensusParams.Block.MaxGas = 100000

block := executor.CreateBlock(1, &types.Commit{}, [32]byte{}, [32]byte(state.Sequencers.ProposerHash()[:]), state, maxBytes)
block := executor.CreateBlock(1, &types.Commit{}, [32]byte{}, [32]byte(state.GetProposerHash()[:]), state, maxBytes)

require.NotNil(block)
assert.Empty(block.Data.Txs)
Expand Down Expand Up @@ -253,7 +253,7 @@ func TestApplyBlock(t *testing.T) {

// Init state
state := &types.State{}
state.Sequencers.SetProposer(types.NewSequencerFromValidator(*tmtypes.NewValidator(tmPubKey, 1)))
state.SetProposer(types.NewSequencerFromValidator(*tmtypes.NewValidator(tmPubKey, 1)))
state.InitialHeight = 1
state.ChainID = chainID
state.SetHeight(0)
Expand All @@ -266,7 +266,7 @@ func TestApplyBlock(t *testing.T) {
// Create first block with one Tx from mempool
_ = mpool.CheckTx([]byte{1, 2, 3, 4}, func(r *abci.Response) {}, mempool.TxInfo{})
require.NoError(err)
block := executor.CreateBlock(1, &types.Commit{Height: 0}, [32]byte{0x01}, [32]byte(state.Sequencers.ProposerHash()), state, maxBytes)
block := executor.CreateBlock(1, &types.Commit{Height: 0}, [32]byte{0x01}, [32]byte(state.GetProposerHash()), state, maxBytes)
require.NotNil(block)
assert.Equal(uint64(1), block.Header.Height)
assert.Len(block.Data.Txs, 1)
Expand All @@ -284,10 +284,10 @@ func TestApplyBlock(t *testing.T) {
}

// Apply the block
err = types.ValidateProposedTransition(state, block, commit, state.Sequencers.GetProposerPubKey())
err = types.ValidateProposedTransition(state, block, commit, state.GetProposerPubKey())
require.NoError(err)

resp, err := executor.ExecuteBlock(state, block)
resp, err := executor.ExecuteBlock(block)
require.NoError(err)
require.NotNil(resp)
appHash, _, err := executor.Commit(state, block, resp)
Expand All @@ -301,7 +301,7 @@ func TestApplyBlock(t *testing.T) {
require.NoError(mpool.CheckTx([]byte{5, 6, 7, 8, 9}, func(r *abci.Response) {}, mempool.TxInfo{}))
require.NoError(mpool.CheckTx([]byte{1, 2, 3, 4, 5}, func(r *abci.Response) {}, mempool.TxInfo{}))
require.NoError(mpool.CheckTx(make([]byte, 9990), func(r *abci.Response) {}, mempool.TxInfo{}))
block = executor.CreateBlock(2, commit, block.Header.Hash(), [32]byte(state.Sequencers.ProposerHash()), state, maxBytes)
block = executor.CreateBlock(2, commit, block.Header.Hash(), [32]byte(state.GetProposerHash()), state, maxBytes)
require.NotNil(block)
assert.Equal(uint64(2), block.Header.Height)
assert.Len(block.Data.Txs, 3)
Expand All @@ -322,7 +322,7 @@ func TestApplyBlock(t *testing.T) {
}

// Apply the block with an invalid commit
err = types.ValidateProposedTransition(state, block, invalidCommit, state.Sequencers.GetProposerPubKey())
err = types.ValidateProposedTransition(state, block, invalidCommit, state.GetProposerPubKey())
require.ErrorContains(err, types.ErrInvalidSignature.Error())

// Create a valid commit for the block
Expand All @@ -335,9 +335,9 @@ func TestApplyBlock(t *testing.T) {
}

// Apply the block
err = types.ValidateProposedTransition(state, block, commit, state.Sequencers.GetProposerPubKey())
err = types.ValidateProposedTransition(state, block, commit, state.GetProposerPubKey())
require.NoError(err)
resp, err = executor.ExecuteBlock(state, block)
resp, err = executor.ExecuteBlock(block)
require.NoError(err)
require.NotNil(resp)
_, _, err = executor.Commit(state, block, resp)
Expand Down
10 changes: 6 additions & 4 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ type Manager struct {
LocalKey crypto.PrivKey

// Store and execution
Store store.Store
State *types.State
Executor ExecutorI
Store store.Store
State *types.State
Executor ExecutorI
Sequencers *types.SequencerSet // Sequencers is the set of sequencers that are currently active on the rollapp

// Clients and servers
Pubsub *pubsub.Server
Expand Down Expand Up @@ -143,6 +144,7 @@ func NewManager(
Genesis: genesis,
Store: store,
Executor: exec,
Sequencers: types.NewSequencerSet(),
SLClient: settlementClient,
indexerService: indexerService,
logger: logger.With("module", "block_manager"),
Expand Down Expand Up @@ -364,7 +366,7 @@ func (m *Manager) updateLastFinalizedHeightFromSettlement() error {
}

func (m *Manager) GetProposerPubKey() tmcrypto.PubKey {
return m.State.Sequencers.GetProposerPubKey()
return m.State.GetProposerPubKey()
}

func (m *Manager) UpdateTargetHeight(h uint64) {
Expand Down
5 changes: 3 additions & 2 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ func TestProduceOnlyAfterSynced(t *testing.T) {

// TestApplyCachedBlocks checks the flow that happens when we are receiving blocks from p2p and some of the blocks
// are already cached. This means blocks that were gossiped but are bigger than the expected next block height.
// TODO: this test is flaky! https://github.com/dymensionxyz/dymint/issues/1173
func TestApplyCachedBlocks_WithFraudCheck(t *testing.T) {
// Init app
app := testutil.GetAppMock(testutil.EndBlock)
Expand Down Expand Up @@ -464,8 +465,8 @@ func TestProducePendingBlock(t *testing.T) {
require.NoError(t, err)
// Generate block and commit and save it to the store
block := testutil.GetRandomBlock(1, 3)
copy(block.Header.SequencerHash[:], manager.State.Sequencers.ProposerHash())
copy(block.Header.NextSequencersHash[:], manager.State.Sequencers.ProposerHash())
copy(block.Header.SequencerHash[:], manager.State.GetProposerHash())
copy(block.Header.NextSequencersHash[:], manager.State.GetProposerHash())

_, err = manager.Store.SaveBlock(block, &block.LastCommit, nil)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion block/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (m *Manager) produceBlock(allowEmpty bool, nextProposerHash *[32]byte) (*ty
}

maxBlockDataSize := uint64(float64(m.Conf.BatchSubmitBytes) * types.MaxBlockSizeAdjustment)
proposerHashForBlock := [32]byte(m.State.Sequencers.ProposerHash())
proposerHashForBlock := [32]byte(m.State.GetProposerHash())
// if nextProposerHash is set, we create a last block
if nextProposerHash != nil {
maxBlockDataSize = 0
Expand Down
23 changes: 12 additions & 11 deletions block/sequencers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import (
"fmt"
"time"

"github.com/tendermint/tendermint/libs/pubsub"

"github.com/dymensionxyz/dymint/settlement"
"github.com/dymensionxyz/dymint/types"
"github.com/tendermint/tendermint/libs/pubsub"
)

func (m *Manager) MonitorSequencerRotation(ctx context.Context, rotateC chan string) error {
Expand Down Expand Up @@ -66,7 +67,7 @@ func (m *Manager) IsProposer() bool {

// check if recovering from halt
if l2Proposer == nil && hubProposer != nil {
m.State.Sequencers.SetProposer(hubProposer)
m.State.SetProposer(hubProposer)
}

// we run sequencer flow if we're proposer on L2 or hub (can be different during rotation phase, before hub receives the last state update)
Expand Down Expand Up @@ -113,8 +114,8 @@ func (m *Manager) CompleteRotation(ctx context.Context, nextSeqAddr string) erro
// validate nextSeq is in the bonded set
var nextSeqHash [32]byte
if nextSeqAddr != "" {
seq := m.State.Sequencers.GetByAddress(nextSeqAddr)
if seq == nil {
seq, found := m.Sequencers.GetByAddress(nextSeqAddr)
if !found {
return types.ErrMissingProposerPubKey
}
copy(nextSeqHash[:], seq.MustHash())
Expand Down Expand Up @@ -170,18 +171,18 @@ func (m *Manager) UpdateSequencerSetFromSL() error {
if err != nil {
return err
}
m.State.Sequencers.SetSequencers(seqs)
m.logger.Debug("Updated bonded sequencer set.", "newSet", m.State.Sequencers.String())
m.Sequencers.SetSequencers(seqs)
m.logger.Debug("Updated bonded sequencer set.", "newSet", m.Sequencers.String())
return nil
}

// UpdateProposer updates the proposer from the hub
func (m *Manager) UpdateProposer() error {
m.State.Sequencers.SetProposer(m.SLClient.GetProposer())
m.State.SetProposer(m.SLClient.GetProposer())
return nil
}

// UpdateLastSubmittedHeight will update last height submitted height upon events.
// UpdateSequencerSet will update last height submitted height upon events.
// This may be necessary in case we crashed/restarted before getting response for our submission to the settlement layer.
func (m *Manager) UpdateSequencerSet(event pubsub.Message) {
eventData, ok := event.Data().(*settlement.EventDataNewBondedSequencer)
Expand All @@ -190,7 +191,7 @@ func (m *Manager) UpdateSequencerSet(event pubsub.Message) {
return
}

if m.State.Sequencers.GetByAddress(eventData.SeqAddr) != nil {
if _, found := m.Sequencers.GetByAddress(eventData.SeqAddr); found {
m.logger.Debug("Sequencer not added from new bonded sequencer event because already in the list.")
return
}
Expand All @@ -200,6 +201,6 @@ func (m *Manager) UpdateSequencerSet(event pubsub.Message) {
m.logger.Error("Unable to add new sequencer from event. err:%w", err)
return
}
sequencers := append(m.State.Sequencers.Sequencers, newSequencer)
m.State.Sequencers.SetSequencers(sequencers)

m.Sequencers.AppendSequencer(newSequencer)
}
20 changes: 11 additions & 9 deletions block/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (e *Executor) UpdateStateAfterCommit(s *types.State, resp *tmstate.ABCIResp
// In case of a node that a becomes the proposer, we return true to mark the role change
// currently the node will rebooted to apply the new role
// TODO: (https://github.com/dymensionxyz/dymint/issues/1008)
func (e *Executor) UpdateProposerFromBlock(s *types.State, block *types.Block) bool {
func (e *Executor) UpdateProposerFromBlock(s *types.State, seqSet *types.SequencerSet, block *types.Block) bool {
// no sequencer change
if bytes.Equal(block.Header.SequencerHash[:], block.Header.NextSequencersHash[:]) {
return false
Expand All @@ -150,19 +150,21 @@ func (e *Executor) UpdateProposerFromBlock(s *types.State, block *types.Block) b
// the chain will be halted until proposer is set
// TODO: recover from halt (https://github.com/dymensionxyz/dymint/issues/1021)
e.logger.Info("rollapp left with no proposer. chain is halted")
s.Sequencers.SetProposer(nil)
s.SetProposer(nil)
return false
}

// if hash changed, update the active sequencer
err := s.Sequencers.SetProposerByHash(block.Header.NextSequencersHash[:])
if err != nil {
e.logger.Error("update new proposer", "err", err)
panic(fmt.Sprintf("failed to update new proposer: %v", err))
// if hash changed, update the proposer
seq, found := seqSet.GetByHash(block.Header.NextSequencersHash[:])
if !found {
e.logger.Error("cannot find proposer by hash")
panic("cannot find proposer by hash")
}
s.SetProposer(&seq)

localSeq := s.Sequencers.GetByConsAddress(e.localAddress)
if localSeq != nil && bytes.Equal(localSeq.MustHash(), block.Header.NextSequencersHash[:]) {
// check if this node becomes a proposer
localSeq, found := seqSet.GetByConsAddress(e.localAddress)
if found && bytes.Equal(localSeq.MustHash(), block.Header.NextSequencersHash[:]) {
return true
}

Expand Down
Loading

0 comments on commit 753c630

Please sign in to comment.