Skip to content

Commit

Permalink
fix(manager): more robust error handling and health status (dymension…
Browse files Browse the repository at this point in the history
…xyz#696)

Co-authored-by: Michael Tsitrin <[email protected]>
Co-authored-by: Omri <[email protected]>
  • Loading branch information
3 people authored Apr 18, 2024
1 parent 18f98d2 commit ab41f13
Show file tree
Hide file tree
Showing 33 changed files with 585 additions and 604 deletions.
39 changes: 16 additions & 23 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
// TODO: add switch case to have defined behavior for each case.
// validate block height
if block.Header.Height != m.store.NextHeight() {
m.logger.Error("Block not applied. wrong height", "block height", block.Header.Height, "expected height", m.store.NextHeight())
return types.ErrInvalidBlockHeight
}

Expand All @@ -31,67 +30,63 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
// Check if the app's last block height is the same as the currently produced block height
isBlockAlreadyApplied, err := m.isHeightAlreadyApplied(block.Header.Height)
if err != nil {
return err
return fmt.Errorf("check if block is already applied: %w", err)
}
// In case the following true, it means we crashed after the commit and before updating the store height.
// In that case we'll want to align the store with the app state and continue to the next block.
if isBlockAlreadyApplied {
err := m.UpdateStateFromApp()
if err != nil {
return err
return fmt.Errorf("update state from app: %w", err)
}
m.logger.Debug("Aligned with app state required. Skipping to next block", "height", block.Header.Height)
return nil
}
// Start applying the block assuming no inconsistency was found.
_, err = m.store.SaveBlock(block, commit, nil)
if err != nil {
m.logger.Error("save block", "error", err)
return err
return fmt.Errorf("save block: %w", err)
}

responses, err := m.executor.ExecuteBlock(m.lastState, block)
if err != nil {
m.logger.Error("execute valid block", "error", err)
return err
return fmt.Errorf("execute block: %w", err)
}

newState, err := m.executor.UpdateStateFromResponses(responses, m.lastState, block)
if err != nil {
return err
return fmt.Errorf("update state from responses: %w", err)
}

batch := m.store.NewBatch()

batch, err = m.store.SaveBlockResponses(block.Header.Height, responses, batch)
if err != nil {
batch.Discard()
return err
return fmt.Errorf("save block responses: %w", err)
}

m.lastState = newState
batch, err = m.store.UpdateState(m.lastState, batch)
if err != nil {
batch.Discard()
return err
return fmt.Errorf("update state: %w", err)
}
batch, err = m.store.SaveValidators(block.Header.Height, m.lastState.Validators, batch)
if err != nil {
batch.Discard()
return err
return fmt.Errorf("save validators: %w", err)
}

err = batch.Commit()
if err != nil {
m.logger.Error("persist batch to disk", "error", err)
return err
return fmt.Errorf("commit batch to disk: %w", err)
}

// Commit block to app
retainHeight, err := m.executor.Commit(&newState, block, responses)
if err != nil {
m.logger.Error("commit to the block", "error", err)
return err
return fmt.Errorf("commit block: %w", err)
}

// Prune old heights, if requested by ABCI app.
Expand All @@ -112,8 +107,7 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta

_, err = m.store.UpdateState(newState, nil)
if err != nil {
m.logger.Error("update state", "error", err)
return err
return fmt.Errorf("final update state: %w", err)
}
m.lastState = newState

Expand Down Expand Up @@ -141,8 +135,7 @@ func (m *Manager) attemptApplyCachedBlocks() error {
// Note: cached <block,commit> pairs have passed basic validation, so no need to validate again
err := m.applyBlock(prevCachedBlock, prevCachedCommit, blockMetaData{source: gossipedBlock})
if err != nil {
m.logger.Debug("applying cached block", "err", err)
return err
return fmt.Errorf("apply cached block: expected height: %d: %w", expectedHeight, err)
}
m.logger.Debug("applied cached block", "height", expectedHeight)
}
Expand Down Expand Up @@ -212,12 +205,12 @@ func (m *Manager) gossipBlock(ctx context.Context, block types.Block, commit typ
gossipedBlock := p2p.GossipedBlock{Block: block, Commit: commit}
gossipedBlockBytes, err := gossipedBlock.MarshalBinary()
if err != nil {
m.logger.Error("marshal block", "error", err)
return err
return fmt.Errorf("marshal binary: %w: %w", err, ErrNonRecoverable)
}
if err := m.p2pClient.GossipBlock(ctx, gossipedBlockBytes); err != nil {
m.logger.Error("gossip block", "error", err)
return err
// Although this boils down to publishing on a topic, we don't want to speculate too much on what
// could cause that to fail, so we assume recoverable.
return fmt.Errorf("p2p gossip block: %w: %w", err, ErrRecoverable)
}
return nil
}
8 changes: 8 additions & 0 deletions block/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package block

import "errors"

var (
ErrNonRecoverable = errors.New("non recoverable")
ErrRecoverable = errors.New("recoverable")
)
34 changes: 18 additions & 16 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,18 @@ import (
"sync/atomic"
"time"

uevent "github.com/dymensionxyz/dymint/utils/event"

"code.cloudfoundry.org/go-diodes"

"github.com/dymensionxyz/dymint/node/events"
"github.com/dymensionxyz/dymint/p2p"
"github.com/libp2p/go-libp2p/core/crypto"

tmcrypto "github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/libs/pubsub"
tmtypes "github.com/tendermint/tendermint/types"

"github.com/dymensionxyz/dymint/node/events"
"github.com/dymensionxyz/dymint/p2p"
"github.com/dymensionxyz/dymint/utils"

"github.com/tendermint/tendermint/proxy"

"github.com/dymensionxyz/dymint/config"
Expand Down Expand Up @@ -156,16 +157,16 @@ func (m *Manager) Start(ctx context.Context, isAggregator bool) error {
return err
}

m.StartEventListener(ctx, isAggregator)

if isAggregator {
go m.ProduceBlockLoop(ctx)
go m.SubmitLoop(ctx)
} else {
go m.RetriveLoop(ctx)
go m.RetrieveLoop(ctx)
go m.SyncTargetLoop(ctx)
}

m.EventListener(ctx, isAggregator)

return nil
}

Expand Down Expand Up @@ -210,22 +211,23 @@ func getAddress(key crypto.PrivKey) ([]byte, error) {
return tmcrypto.AddressHash(rawKey), nil
}

// EventListener registers events to callbacks.
func (m *Manager) EventListener(ctx context.Context, isAggregator bool) {
// StartEventListener registers events to callbacks.
func (m *Manager) StartEventListener(ctx context.Context, isAggregator bool) {
if isAggregator {
go utils.SubscribeAndHandleEvents(ctx, m.pubsub, "nodeHealthStatusHandler", events.EventQueryHealthStatus, m.healthStatusEventCallback, m.logger)
go uevent.MustSubscribe(ctx, m.pubsub, "nodeHealth", events.QueryHealthStatus, m.onNodeHealthStatus, m.logger)
} else {
go utils.SubscribeAndHandleEvents(ctx, m.pubsub, "ApplyBlockLoop", p2p.EventQueryNewNewGossipedBlock, m.applyBlockCallback, m.logger, 100)
go uevent.MustSubscribe(ctx, m.pubsub, "applyBlockLoop", p2p.EventQueryNewNewGossipedBlock, m.onNewGossipedBlock, m.logger, 100)
}
}

func (m *Manager) healthStatusEventCallback(event pubsub.Message) {
eventData := event.Data().(*events.EventDataHealthStatus)
m.logger.Info("Received health status event", "eventData", eventData)
m.shouldProduceBlocksCh <- eventData.Healthy
func (m *Manager) onNodeHealthStatus(event pubsub.Message) {
eventData := event.Data().(*events.DataHealthStatus)
m.logger.Info("received health status event", "eventData", eventData)
m.shouldProduceBlocksCh <- eventData.Error == nil
}

func (m *Manager) applyBlockCallback(event pubsub.Message) {
// onNewGossippedBlock will take a block and apply it
func (m *Manager) onNewGossipedBlock(event pubsub.Message) {
m.logger.Debug("Received new block event", "eventData", event.Data(), "cachedBlocks", len(m.prevBlock))
eventData := event.Data().(p2p.GossipedBlock)
block := eventData.Block
Expand Down
24 changes: 12 additions & 12 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func TestProduceNewBlock(t *testing.T) {
manager, err := getManager(getManagerConfig(), nil, nil, 1, 1, 0, proxyApp, nil)
require.NoError(t, err)
// Produce block
err = manager.produceBlock(context.Background(), true)
err = manager.produceAndGossipBlock(context.Background(), true)
require.NoError(t, err)
// Validate state is updated with the commit hash
assert.Equal(t, uint64(1), manager.store.Height())
Expand All @@ -210,7 +210,7 @@ func TestProducePendingBlock(t *testing.T) {
_, err = manager.store.SaveBlock(block, &block.LastCommit, nil)
require.NoError(t, err)
// Produce block
err = manager.produceBlock(context.Background(), true)
err = manager.produceAndGossipBlock(context.Background(), true)
require.NoError(t, err)
// Validate state is updated with the block that was saved in the store
assert.Equal(t, block.Header.Hash(), *(*[32]byte)(manager.lastState.LastBlockID.Hash))
Expand Down Expand Up @@ -244,26 +244,26 @@ func TestBlockProductionNodeHealth(t *testing.T) {
}{
{
name: "HealthyEventBlocksProduced",
healthStatusEvent: map[string][]string{events.EventNodeTypeKey: {events.EventHealthStatus}},
healthStatusEventData: &events.EventDataHealthStatus{Healthy: true, Error: nil},
healthStatusEvent: events.HealthStatusList,
healthStatusEventData: &events.DataHealthStatus{},
shouldProduceBlocks: true,
},
{
name: "UnhealthyEventBlocksNotProduced",
healthStatusEvent: map[string][]string{events.EventNodeTypeKey: {events.EventHealthStatus}},
healthStatusEventData: &events.EventDataHealthStatus{Healthy: false, Error: errors.New("Unhealthy")},
healthStatusEvent: events.HealthStatusList,
healthStatusEventData: &events.DataHealthStatus{Error: errors.New("unhealthy")},
shouldProduceBlocks: false,
},
{
name: "UnhealthyEventBlocksStillNotProduced",
healthStatusEvent: map[string][]string{events.EventNodeTypeKey: {events.EventHealthStatus}},
healthStatusEventData: &events.EventDataHealthStatus{Healthy: false, Error: errors.New("Unhealthy")},
healthStatusEvent: events.HealthStatusList,
healthStatusEventData: &events.DataHealthStatus{Error: errors.New("unhealthy")},
shouldProduceBlocks: false,
},
{
name: "HealthyEventBlocksProduced",
healthStatusEvent: map[string][]string{events.EventNodeTypeKey: {events.EventHealthStatus}},
healthStatusEventData: &events.EventDataHealthStatus{Healthy: true, Error: nil},
healthStatusEvent: events.HealthStatusList,
healthStatusEventData: &events.DataHealthStatus{},
shouldProduceBlocks: true,
},
}
Expand Down Expand Up @@ -382,7 +382,7 @@ func TestProduceBlockFailAfterCommit(t *testing.T) {
})
mockStore.ShouldFailSetHeight = tc.shouldFailSetSetHeight
mockStore.ShoudFailUpdateState = tc.shouldFailUpdateState
_ = manager.produceBlock(context.Background(), true)
_ = manager.produceAndGossipBlock(context.Background(), true)
require.Equal(tc.expectedStoreHeight, manager.store.Height(), tc.name)
require.Equal(tc.expectedStateAppHash, manager.lastState.AppHash, tc.name)
storeState, err := manager.store.LoadState()
Expand Down Expand Up @@ -435,7 +435,7 @@ func TestCreateNextDABatchWithBytesLimit(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
// Produce blocks
for i := 0; i < tc.blocksToProduce; i++ {
err := manager.produceBlock(ctx, true)
err := manager.produceAndGossipBlock(ctx, true)
assert.NoError(err)
}

Expand Down
Loading

0 comments on commit ab41f13

Please sign in to comment.