Skip to content

Commit

Permalink
feat: add pruning mechanism that deletes old blocks and commits (dyme…
Browse files Browse the repository at this point in the history
  • Loading branch information
mtsitrin authored Jun 14, 2023
1 parent 7cf1203 commit 72e5acf
Show file tree
Hide file tree
Showing 15 changed files with 424 additions and 139 deletions.
17 changes: 15 additions & 2 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func NewManager(
return nil, err
}

// TODO(mtsitrin): Probably should be validated and manage default on config init
// TODO((#119): Probably should be validated and manage default on config init.
// TODO(omritoptix): Think about the default batchSize and default DABlockTime proper location.
if conf.DABlockTime == 0 {
logger.Info("WARNING: using default DA block time", "DABlockTime", config.DefaultNodeConfig.DABlockTime)
Expand Down Expand Up @@ -431,6 +431,7 @@ func (m *Manager) syncUntilTarget(ctx context.Context, syncTarget uint64) {
// In case the following doesn't hold true, it means we crashed after the commit and before updating the store height.
// In that case we'll want to align the store with the app state and continue to the next block.
func (m *Manager) applyBlock(ctx context.Context, block *types.Block, commit *types.Commit, blockMetaData blockMetaData) error {
//TODO: make it more go idiomatic, indent left the main logic
if block.Header.Height == m.store.Height()+1 {
m.logger.Info("Applying block", "height", block.Header.Height, "source", blockMetaData.source)

Expand Down Expand Up @@ -488,16 +489,28 @@ func (m *Manager) applyBlock(ctx context.Context, block *types.Block, commit *ty
}

// Commit block to app
err = m.executor.Commit(ctx, &newState, block, responses)
retainHeight, err := m.executor.Commit(ctx, &newState, block, responses)
if err != nil {
m.logger.Error("Failed to commit to the block", "error", err)
return err
}

// Prune old heights, if requested by ABCI app.
if retainHeight > 0 {
pruned, err := m.pruneBlocks(retainHeight)
if err != nil {
m.logger.Error("failed to prune blocks", "retain_height", retainHeight, "err", err)
} else {
m.logger.Debug("pruned blocks", "pruned", pruned, "retain_height", retainHeight)
}
}

// Update the state with the new app hash, last validators and store height from the commit.
// Every one of those, if happens before commit, prevents us from re-executing the block in case failed during commit.
newState.LastValidators = m.lastState.Validators.Copy()
newState.LastStoreHeight = block.Header.Height
newState.BaseHeight = m.store.Base()

_, err = m.store.UpdateState(newState, nil)
if err != nil {
m.logger.Error("Failed to update state", "error", err)
Expand Down
23 changes: 23 additions & 0 deletions block/pruning.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package block

import (
"fmt"
"sync/atomic"
)

func (m *Manager) pruneBlocks(retainHeight int64) (uint64, error) {
syncTarget := atomic.LoadUint64(&m.syncTarget)

if retainHeight > int64(syncTarget) {
return 0, fmt.Errorf("cannot prune uncommitted blocks")
}

pruned, err := m.store.PruneBlocks(retainHeight)
if err != nil {
return 0, fmt.Errorf("failed to prune block store: %w", err)
}

//TODO: prune state/indexer and state/txindexer??

return pruned, nil
}
1 change: 1 addition & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func NewNode(ctx context.Context, conf config.NodeConfig, p2pKey crypto.PrivKey,
baseKV = store.NewDefaultKVStore(conf.RootDir, conf.DBPath, "dymint")
}
mainKV := store.NewPrefixKV(baseKV, mainPrefix)
// TODO: dalcKV is needed for mock only. Initilize only if mock used
dalcKV := store.NewPrefixKV(baseKV, dalcPrefix)
indexerKV := store.NewPrefixKV(baseKV, indexerPrefix)

Expand Down
1 change: 1 addition & 0 deletions proto/types/dymint/state.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,5 @@ message State {
bytes app_hash = 15;

uint64 last_store_height = 16 [(gogoproto.customname) = "LastStoreHeight"];
uint64 base_height = 17;
}
19 changes: 10 additions & 9 deletions state/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,19 +168,20 @@ func (e *BlockExecutor) UpdateStateFromResponses(resp *tmstate.ABCIResponses, st
}

// Commit commits the block
func (e *BlockExecutor) Commit(ctx context.Context, state *types.State, block *types.Block, resp *tmstate.ABCIResponses) error {
appHash, err := e.commit(ctx, state, block, resp.DeliverTxs)
func (e *BlockExecutor) Commit(ctx context.Context, state *types.State, block *types.Block, resp *tmstate.ABCIResponses) (int64, error) {
appHash, retainHeight, err := e.commit(ctx, state, block, resp.DeliverTxs)
if err != nil {
return err
return 0, err
}

copy(state.AppHash[:], appHash[:])

err = e.publishEvents(resp, block, *state)
if err != nil {
e.logger.Error("failed to fire block events", "error", err)
return 0, err
}
return nil
return retainHeight, nil
}

func (e *BlockExecutor) updateState(state types.State, block *types.Block, abciResponses *tmstate.ABCIResponses, validatorUpdates []*tmtypes.Validator) (types.State, error) {
Expand Down Expand Up @@ -233,28 +234,28 @@ func (e *BlockExecutor) GetAppInfo() (*abcitypes.ResponseInfo, error) {
return e.proxyAppQueryConn.InfoSync(abcitypes.RequestInfo{})
}

func (e *BlockExecutor) commit(ctx context.Context, state *types.State, block *types.Block, deliverTxs []*abci.ResponseDeliverTx) ([]byte, error) {
func (e *BlockExecutor) commit(ctx context.Context, state *types.State, block *types.Block, deliverTxs []*abci.ResponseDeliverTx) ([]byte, int64, error) {
e.mempool.Lock()
defer e.mempool.Unlock()

err := e.mempool.FlushAppConn()
if err != nil {
return nil, err
return nil, 0, err
}

resp, err := e.proxyAppConsensusConn.CommitSync()
if err != nil {
return nil, err
return nil, 0, err
}

maxBytes := state.ConsensusParams.Block.MaxBytes
maxGas := state.ConsensusParams.Block.MaxGas
err = e.mempool.Update(int64(block.Header.Height), fromDymintTxs(block.Data.Txs), deliverTxs, mempool.PreCheckMaxBytes(maxBytes), mempool.PostCheckMaxGas(maxGas))
if err != nil {
return nil, err
return nil, 0, err
}

return resp.Data, err
return resp.Data, resp.RetainHeight, err
}

func (e *BlockExecutor) validateBlock(state types.State, block *types.Block) error {
Expand Down
10 changes: 7 additions & 3 deletions state/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func TestApplyBlock(t *testing.T) {
require.NoError(err)
require.NotNil(newState)
assert.Equal(int64(1), newState.LastBlockHeight)
err = executor.Commit(context.Background(), &newState, block, resp)
_, err = executor.Commit(context.Background(), &newState, block, resp)
require.NoError(err)
assert.Equal(mockAppHash, newState.AppHash)
newState.LastStoreHeight = uint64(newState.LastBlockHeight)
Expand Down Expand Up @@ -212,7 +212,11 @@ func TestApplyBlock(t *testing.T) {

// Apply the block with an invalid commit
err = executor.Validate(state, block, invalidCommit, proposer)
require.Error(types.ErrInvalidSignature)

// FIXME: This test didn't check for specific error. It was just checking for error.
// If checking for this specific error, it fails
// require.ErrorIs(err, types.ErrInvalidSignature)
require.Error(err)

// Create a valid commit for the block
signature, err = proposerKey.Sign(abciHeaderBytes)
Expand All @@ -233,7 +237,7 @@ func TestApplyBlock(t *testing.T) {
require.NoError(err)
require.NotNil(newState)
assert.Equal(int64(2), newState.LastBlockHeight)
err = executor.Commit(context.Background(), &newState, block, resp)
_, err = executor.Commit(context.Background(), &newState, block, resp)
require.NoError(err)

// wait for at least 4 Tx events, for up to 3 second.
Expand Down
73 changes: 73 additions & 0 deletions store/pruning.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package store

import "fmt"

// PruneBlocks removes block up to (but not including) a height. It returns number of blocks pruned.
func (s *DefaultStore) PruneBlocks(heightInt int64) (uint64, error) {
if heightInt <= 0 {
return 0, fmt.Errorf("height must be greater than 0")
}

height := uint64(heightInt)
if height > s.Height() {
return 0, fmt.Errorf("cannot prune beyond the latest height %v", s.height)
}
base := s.Base()
if height < base {
return 0, fmt.Errorf("cannot prune to height %v, it is lower than base height %v",
height, base)
}

pruned := uint64(0)
batch := s.db.NewBatch()
defer batch.Discard()

flush := func(batch Batch, base uint64) error {
err := batch.Commit()
if err != nil {
return fmt.Errorf("failed to prune up to height %v: %w", base, err)
}
s.SetBase(base)
return nil
}

for h := base; h < height; h++ {
hash, err := s.loadHashFromIndex(h)
if err != nil {
continue
}
if err := batch.Delete(getBlockKey(hash)); err != nil {
return 0, err
}
if err := batch.Delete(getCommitKey(hash)); err != nil {
return 0, err
}
if err := batch.Delete(getIndexKey(h)); err != nil {
return 0, err
}
if err := batch.Delete(getResponsesKey(h)); err != nil {
return 0, err
}
if err := batch.Delete(getValidatorsKey(h)); err != nil {
return 0, err
}

pruned++

// flush every 1000 blocks to avoid batches becoming too large
if pruned%1000 == 0 && pruned > 0 {
err := flush(batch, h)
if err != nil {
return 0, err
}
batch = s.db.NewBatch()
defer batch.Discard()
}
}

err := flush(batch, height)
if err != nil {
return 0, err
}
return pruned, nil
}
90 changes: 90 additions & 0 deletions store/pruning_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package store_test

import (
"testing"

"github.com/dymensionxyz/dymint/store"
"github.com/dymensionxyz/dymint/testutil"
"github.com/dymensionxyz/dymint/types"
"github.com/stretchr/testify/assert"
)

func TestStorePruning(t *testing.T) {
t.Parallel()

pruningHeight := uint64(3)

cases := []struct {
name string
blocks []*types.Block
pruningHeight uint64
expectedBase uint64
expectedHeight uint64
shouldError bool
}{
{"blocks with pruning", []*types.Block{
testutil.GetRandomBlock(1, 0),
testutil.GetRandomBlock(2, 0),
testutil.GetRandomBlock(3, 0),
testutil.GetRandomBlock(4, 0),
testutil.GetRandomBlock(5, 0),
}, pruningHeight, pruningHeight, 5, false},
{"blocks out of order", []*types.Block{
testutil.GetRandomBlock(2, 0),
testutil.GetRandomBlock(3, 0),
testutil.GetRandomBlock(1, 0),
}, pruningHeight, pruningHeight, 3, false},
{"with a gap", []*types.Block{
testutil.GetRandomBlock(1, 0),
testutil.GetRandomBlock(9, 0),
testutil.GetRandomBlock(10, 0),
}, pruningHeight, pruningHeight, 10, false},
{"pruning beyond latest height", []*types.Block{
testutil.GetRandomBlock(1, 0),
testutil.GetRandomBlock(2, 0),
}, pruningHeight, 1, 2, true}, // should error because pruning height > latest height
{"pruning height 0", []*types.Block{
testutil.GetRandomBlock(1, 0),
testutil.GetRandomBlock(2, 0),
testutil.GetRandomBlock(3, 0),
}, 0, 1, 3, true},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
assert := assert.New(t)
bstore := store.New(store.NewDefaultInMemoryKVStore())
assert.Equal(uint64(0), bstore.Height())

for _, block := range c.blocks {
_, err := bstore.SaveBlock(block, &types.Commit{}, nil)
bstore.SetHeight(block.Header.Height)
assert.NoError(err)
}

_, err := bstore.PruneBlocks(int64(c.pruningHeight))
if c.shouldError {
assert.Error(err)
} else {
assert.NoError(err)
assert.Equal(pruningHeight, bstore.Base())
assert.Equal(c.expectedHeight, bstore.Height())
assert.Equal(c.expectedBase, bstore.Base())

// Check if pruned blocks are really removed from the store
for h := uint64(1); h < pruningHeight; h++ {
_, err := bstore.LoadBlock(h)
assert.Error(err, "Block at height %d should be pruned", h)

_, err = bstore.LoadBlockResponses(h)
assert.Error(err, "BlockResponse at height %d should be pruned", h)

_, err = bstore.LoadCommit(h)
assert.Error(err, "Commit at height %d should be pruned", h)
}
}
})
}
}

//TODO: prune twice
Loading

0 comments on commit 72e5acf

Please sign in to comment.