Skip to content

Commit

Permalink
fix(gossip): validate blocks when receiving them over gossip (dymensi…
Browse files Browse the repository at this point in the history
  • Loading branch information
danwt authored Apr 18, 2024
1 parent 84bb99f commit 18f98d2
Show file tree
Hide file tree
Showing 12 changed files with 165 additions and 171 deletions.
43 changes: 18 additions & 25 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@ import (
"context"
"fmt"

"cosmossdk.io/errors"
errorsmod "cosmossdk.io/errors"

"github.com/dymensionxyz/dymint/p2p"
"github.com/dymensionxyz/dymint/types"
tmstate "github.com/tendermint/tendermint/proto/tendermint/state"
tmtypes "github.com/tendermint/tendermint/types"
)

// applyBlock applies the block to the store and the abci app.
// Contract: block and commit must be validated before calling this function!
// steps: save block -> execute block with app -> update state -> commit block to app -> update store height and state hash.
// As the entire process can't be atomic we need to make sure the following condition apply before
// - block height is the expected block height on the store (height + 1).
// - block height is the expected block height on the app (last block height + 1).
func (m *Manager) applyBlock(ctx context.Context, block *types.Block, commit *types.Commit, blockMetaData blockMetaData) error {
func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMetaData blockMetaData) error {
// TODO (#330): allow genesis block with height > 0 to be applied.
// TODO: add switch case to have defined behavior for each case.
// validate block height
Expand Down Expand Up @@ -49,9 +50,9 @@ func (m *Manager) applyBlock(ctx context.Context, block *types.Block, commit *ty
return err
}

responses, err := m.executeBlock(ctx, block, commit)
responses, err := m.executor.ExecuteBlock(m.lastState, block)
if err != nil {
m.logger.Error("execute block", "error", err)
m.logger.Error("execute valid block", "error", err)
return err
}

Expand Down Expand Up @@ -87,7 +88,7 @@ func (m *Manager) applyBlock(ctx context.Context, block *types.Block, commit *ty
}

// Commit block to app
retainHeight, err := m.executor.Commit(ctx, &newState, block, responses)
retainHeight, err := m.executor.Commit(&newState, block, responses)
if err != nil {
m.logger.Error("commit to the block", "error", err)
return err
Expand Down Expand Up @@ -123,7 +124,7 @@ func (m *Manager) applyBlock(ctx context.Context, block *types.Block, commit *ty
return nil
}

func (m *Manager) attemptApplyCachedBlocks(ctx context.Context) error {
func (m *Manager) attemptApplyCachedBlocks() error {
m.applyCachedBlockMutex.Lock()
defer m.applyCachedBlockMutex.Unlock()

Expand All @@ -137,12 +138,13 @@ func (m *Manager) attemptApplyCachedBlocks(ctx context.Context) error {
break
}

m.logger.Debug("Applying cached block", "height", expectedHeight)
err := m.applyBlock(ctx, prevCachedBlock, prevCachedCommit, blockMetaData{source: gossipedBlock})
// Note: cached <block,commit> pairs have passed basic validation, so no need to validate again
err := m.applyBlock(prevCachedBlock, prevCachedCommit, blockMetaData{source: gossipedBlock})
if err != nil {
m.logger.Debug("apply previously cached block", "err", err)
m.logger.Debug("applying cached block", "err", err)
return err
}
m.logger.Debug("applied cached block", "height", expectedHeight)
}

for k := range m.prevBlock {
Expand All @@ -158,7 +160,7 @@ func (m *Manager) attemptApplyCachedBlocks(ctx context.Context) error {
func (m *Manager) isHeightAlreadyApplied(blockHeight uint64) (bool, error) {
proxyAppInfo, err := m.executor.GetAppInfo()
if err != nil {
return false, errors.Wrap(err, "get app info")
return false, errorsmod.Wrap(err, "get app info")
}

isBlockAlreadyApplied := uint64(proxyAppInfo.LastBlockHeight) == blockHeight
Expand All @@ -172,7 +174,7 @@ func (m *Manager) isHeightAlreadyApplied(blockHeight uint64) (bool, error) {
func (m *Manager) UpdateStateFromApp() error {
proxyAppInfo, err := m.executor.GetAppInfo()
if err != nil {
return errors.Wrap(err, "get app info")
return errorsmod.Wrap(err, "get app info")
}

appHeight := uint64(proxyAppInfo.LastBlockHeight)
Expand All @@ -184,35 +186,26 @@ func (m *Manager) UpdateStateFromApp() error {

resp, err := m.store.LoadBlockResponses(appHeight)
if err != nil {
return errors.Wrap(err, "load block responses")
return errorsmod.Wrap(err, "load block responses")
}
copy(m.lastState.LastResultsHash[:], tmtypes.NewResults(resp.DeliverTxs).Hash())

_, err = m.store.UpdateState(m.lastState, nil)
if err != nil {
return errors.Wrap(err, "update state")
return errorsmod.Wrap(err, "update state")
}
if ok := m.store.SetHeight(appHeight); !ok {
return fmt.Errorf("store set height: %d", appHeight)
}
return nil
}

func (m *Manager) executeBlock(ctx context.Context, block *types.Block, commit *types.Commit) (*tmstate.ABCIResponses, error) {
func (m *Manager) validateBlock(block *types.Block, commit *types.Commit) error {
// Currently we're assuming proposer is never nil as it's a pre-condition for
// dymint to start
proposer := m.settlementClient.GetProposer()

if err := m.executor.Validate(m.lastState, block, commit, proposer); err != nil {
return &tmstate.ABCIResponses{}, err
}

responses, err := m.executor.Execute(ctx, m.lastState, block)
if err != nil {
return &tmstate.ABCIResponses{}, err
}

return responses, nil
return types.ValidateProposedTransition(m.lastState, block, commit, proposer)
}

func (m *Manager) gossipBlock(ctx context.Context, block types.Block, commit types.Commit) error {
Expand Down
69 changes: 9 additions & 60 deletions block/executor.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package block

import (
"bytes"
"context"
"encoding/hex"
"errors"
"time"
Expand All @@ -15,7 +13,6 @@ import (
tmtypes "github.com/tendermint/tendermint/types"
"go.uber.org/multierr"

abciconv "github.com/dymensionxyz/dymint/conv/abci"
"github.com/dymensionxyz/dymint/mempool"
"github.com/dymensionxyz/dymint/types"
)
Expand Down Expand Up @@ -136,20 +133,9 @@ func (e *Executor) CreateBlock(height uint64, lastCommit *types.Commit, lastHead
return block
}

// Validate validates block and commit.
func (e *Executor) Validate(state types.State, block *types.Block, commit *types.Commit, proposer *types.Sequencer) error {
if err := e.validateBlock(state, block); err != nil {
return err
}
if err := e.validateCommit(proposer, commit, &block.Header); err != nil {
return err
}
return nil
}

// Commit commits the block
func (e *Executor) Commit(ctx context.Context, state *types.State, block *types.Block, resp *tmstate.ABCIResponses) (int64, error) {
appHash, retainHeight, err := e.commit(ctx, state, block, resp.DeliverTxs)
func (e *Executor) Commit(state *types.State, block *types.Block, resp *tmstate.ABCIResponses) (int64, error) {
appHash, retainHeight, err := e.commit(state, block, resp.DeliverTxs)
if err != nil {
return 0, err
}
Expand All @@ -170,7 +156,7 @@ func (e *Executor) GetAppInfo() (*abci.ResponseInfo, error) {
return e.proxyAppQueryConn.InfoSync(abci.RequestInfo{})
}

func (e *Executor) commit(ctx context.Context, state *types.State, block *types.Block, deliverTxs []*abci.ResponseDeliverTx) ([]byte, int64, error) {
func (e *Executor) commit(state *types.State, block *types.Block, deliverTxs []*abci.ResponseDeliverTx) ([]byte, int64, error) {
e.mempool.Lock()
defer e.mempool.Unlock()

Expand All @@ -196,45 +182,8 @@ func (e *Executor) commit(ctx context.Context, state *types.State, block *types.
return resp.Data, resp.RetainHeight, err
}

func (e *Executor) validateBlock(state types.State, block *types.Block) error {
err := block.ValidateBasic()
if err != nil {
return err
}
if block.Header.Version.App != state.Version.Consensus.App ||
block.Header.Version.Block != state.Version.Consensus.Block {
return errors.New("block version mismatch")
}
if state.LastBlockHeight <= 0 && block.Header.Height != uint64(state.InitialHeight) {
return errors.New("initial block height mismatch")
}
if state.LastBlockHeight > 0 && block.Header.Height != uint64(state.LastStoreHeight)+1 {
return errors.New("block height mismatch")
}
if !bytes.Equal(block.Header.AppHash[:], state.AppHash[:]) {
return errors.New("AppHash mismatch")
}
if !bytes.Equal(block.Header.LastResultsHash[:], state.LastResultsHash[:]) {
return errors.New("LastResultsHash mismatch")
}

return nil
}

func (e *Executor) validateCommit(proposer *types.Sequencer, commit *types.Commit, header *types.Header) error {
abciHeaderPb := abciconv.ToABCIHeaderPB(header)
abciHeaderBytes, err := abciHeaderPb.Marshal()
if err != nil {
return err
}
if err = commit.Validate(proposer, abciHeaderBytes); err != nil {
return err
}
return nil
}

// Execute executes the block and returns the ABCIResponses.
func (e *Executor) Execute(ctx context.Context, state types.State, block *types.Block) (*tmstate.ABCIResponses, error) {
// ExecuteBlock executes the block and returns the ABCIResponses. Block should be valid (passed validation checks).
func (e *Executor) ExecuteBlock(state types.State, block *types.Block) (*tmstate.ABCIResponses, error) {
abciResponses := new(tmstate.ABCIResponses)
abciResponses.DeliverTxs = make([]*abci.ResponseDeliverTx, len(block.Data.Txs))

Expand All @@ -259,7 +208,7 @@ func (e *Executor) Execute(ctx context.Context, state types.State, block *types.
})

hash := block.Hash()
abciHeader := abciconv.ToABCIHeaderPB(&block.Header)
abciHeader := types.ToABCIHeaderPB(&block.Header)
abciHeader.ChainID = e.chainID
abciHeader.ValidatorsHash = state.Validators.Hash()
abciResponses.BeginBlock, err = e.proxyAppConsensusConn.BeginBlockSync(
Expand Down Expand Up @@ -292,13 +241,13 @@ func (e *Executor) Execute(ctx context.Context, state types.State, block *types.
}

func (e *Executor) getLastCommitHash(lastCommit *types.Commit, header *types.Header) []byte {
lastABCICommit := abciconv.ToABCICommit(lastCommit, header)
lastABCICommit := types.ToABCICommit(lastCommit, header)
return lastABCICommit.Hash()
}

func (e *Executor) getDataHash(block *types.Block) []byte {
abciData := tmtypes.Data{
Txs: abciconv.ToABCIBlockDataTxs(&block.Data),
Txs: types.ToABCIBlockDataTxs(&block.Data),
}
return abciData.Hash()
}
Expand All @@ -308,7 +257,7 @@ func (e *Executor) publishEvents(resp *tmstate.ABCIResponses, block *types.Block
return nil
}

abciBlock, err := abciconv.ToABCIBlock(block)
abciBlock, err := types.ToABCIBlock(block)
abciBlock.Header.ValidatorsHash = state.Validators.Hash()
if err != nil {
return err
Expand Down
24 changes: 10 additions & 14 deletions block/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/tendermint/tendermint/proxy"
tmtypes "github.com/tendermint/tendermint/types"

abciconv "github.com/dymensionxyz/dymint/conv/abci"
"github.com/dymensionxyz/dymint/mempool"
mempoolv1 "github.com/dymensionxyz/dymint/mempool/v1"
"github.com/dymensionxyz/dymint/mocks"
Expand Down Expand Up @@ -161,7 +160,7 @@ func TestApplyBlock(t *testing.T) {
PublicKey: proposerKey.PubKey(),
}
// Create commit for the block
abciHeaderPb := abciconv.ToABCIHeaderPB(&block.Header)
abciHeaderPb := types.ToABCIHeaderPB(&block.Header)
abciHeaderBytes, err := abciHeaderPb.Marshal()
require.NoError(err)
signature, err := proposerKey.Sign(abciHeaderBytes)
Expand All @@ -173,16 +172,16 @@ func TestApplyBlock(t *testing.T) {
}

// Apply the block
err = executor.Validate(state, block, commit, proposer)
err = types.ValidateProposedTransition(state, block, commit, proposer)
require.NoError(err)
resp, err := executor.Execute(context.Background(), state, block)
resp, err := executor.ExecuteBlock(state, block)
require.NoError(err)
require.NotNil(resp)
newState, err := executor.UpdateStateFromResponses(resp, state, block)
require.NoError(err)
require.NotNil(newState)
assert.Equal(int64(1), newState.LastBlockHeight)
_, err = executor.Commit(context.Background(), &newState, block, resp)
_, err = executor.Commit(&newState, block, resp)
require.NoError(err)
assert.Equal(mockAppHash, newState.AppHash)
newState.LastStoreHeight = uint64(newState.LastBlockHeight)
Expand All @@ -198,7 +197,7 @@ func TestApplyBlock(t *testing.T) {
assert.Len(block.Data.Txs, 3)

// Get the header bytes
abciHeaderPb = abciconv.ToABCIHeaderPB(&block.Header)
abciHeaderPb = types.ToABCIHeaderPB(&block.Header)
abciHeaderBytes, err = abciHeaderPb.Marshal()
require.NoError(err)

Expand All @@ -213,12 +212,9 @@ func TestApplyBlock(t *testing.T) {
}

// Apply the block with an invalid commit
err = executor.Validate(state, block, invalidCommit, proposer)
err = types.ValidateProposedTransition(newState, block, invalidCommit, proposer)

// FIXME: This test didn't check for specific error. It was just checking for error.
// If checking for this specific error, it fails
// require.ErrorIs(err, types.ErrInvalidSignature)
require.Error(err)
require.ErrorIs(err, types.ErrInvalidSignature)

// Create a valid commit for the block
signature, err = proposerKey.Sign(abciHeaderBytes)
Expand All @@ -230,16 +226,16 @@ func TestApplyBlock(t *testing.T) {
}

// Apply the block
err = executor.Validate(newState, block, commit, proposer)
err = types.ValidateProposedTransition(newState, block, commit, proposer)
require.NoError(err)
resp, err = executor.Execute(context.Background(), state, block)
resp, err = executor.ExecuteBlock(state, block)
require.NoError(err)
require.NotNil(resp)
newState, err = executor.UpdateStateFromResponses(resp, state, block)
require.NoError(err)
require.NotNil(newState)
assert.Equal(int64(2), newState.LastBlockHeight)
_, err = executor.Commit(context.Background(), &newState, block, resp)
_, err = executor.Commit(&newState, block, resp)
require.NoError(err)

// wait for at least 4 Tx events, for up to 3 second.
Expand Down
Loading

0 comments on commit 18f98d2

Please sign in to comment.