Skip to content

Commit

Permalink
Atomic block processing (dymensionxyz#92)
Browse files Browse the repository at this point in the history
* Atomic block processing

* discard current batch if proccess failed

* prevent starting of new batch from canceling the previous one

* add store batch test

* fix lint issues

* fix lint isues

* add mtx lock

* fix lock issue

* fix lock issues

* fix data race issue

* move currentBatch to store object

* discard current batch in casses of errors

* fix review issues

* fix lint issue

* remove wrape functions and make batch param optional param
  • Loading branch information
ItzhakBokris authored Sep 25, 2022
1 parent cd0d2a4 commit 83e432f
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 49 deletions.
46 changes: 35 additions & 11 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func NewManager(
}

updateState(&s, res)
if err := store.UpdateState(s); err != nil {
if _, err := store.UpdateState(s, nil); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -372,34 +372,47 @@ func (m *Manager) applyBlock(ctx context.Context, block *types.Block, commit *ty
)
if block.Header.Height > m.store.Height() {
m.logger.Info("Syncing block", "height", block.Header.Height)

batch := m.store.NewBatch()

newState, responses, err := m.executor.ApplyBlock(ctx, m.lastState, block)
if err != nil {
batch.Discard()
m.logger.Error("failed to ApplyBlock", "error", err)
return err
}
err = m.store.SaveBlock(block, commit)
batch, err = m.store.SaveBlock(block, commit, batch)
if err != nil {
batch.Discard()
m.logger.Error("failed to save block", "error", err)
return err
}
var appHash []byte
err = m.executor.Commit(ctx, &newState, block, responses)
if err != nil {
batch.Discard()
m.logger.Error("failed to Commit", "error", err)
return err
}
m.store.SetHeight(block.Header.Height)

err = m.store.SaveBlockResponses(block.Header.Height, responses)
batch, err = m.store.SaveBlockResponses(block.Header.Height, responses, batch)
if err != nil {
batch.Discard()
m.logger.Error("failed to save block responses", "error", err)
return err
}

err = batch.Commit()
if err != nil {
m.logger.Error("failed to persist batch to disk", "error", err)
return err
}

copy(newState.AppHash[:], appHash)

m.lastState = newState
err = m.store.UpdateState(m.lastState)
_, err = m.store.UpdateState(m.lastState, nil)
if err != nil {
m.logger.Error("failed to save updated state", "error", err)
return err
Expand Down Expand Up @@ -440,7 +453,6 @@ func (m *Manager) fetchBatch(daHeight uint64) (da.ResultRetrieveBatch, error) {
}

func (m *Manager) publishBlock(ctx context.Context) error {

var lastCommit *types.Commit
var lastHeaderHash [32]byte
var err error
Expand All @@ -463,7 +475,7 @@ func (m *Manager) publishBlock(ctx context.Context) error {
}

var block *types.Block

batch := m.store.NewBatch()
// Check if there's an already stored block at a newer height
// If there is use that instead of creating a new block
var commit *types.Commit
Expand Down Expand Up @@ -492,42 +504,54 @@ func (m *Manager) publishBlock(ctx context.Context) error {
}

// SaveBlock commits the DB tx
err = m.store.SaveBlock(block, commit)
batch, err = m.store.SaveBlock(block, commit, batch)
if err != nil {
batch.Discard()
return err
}
}

// Apply the block but DONT commit
newState, responses, err := m.executor.ApplyBlock(ctx, m.lastState, block)
if err != nil {
batch.Discard()
return err
}

// Commit the new state and block which writes to disk on the proxy app
err = m.executor.Commit(ctx, &newState, block, responses)
if err != nil {
batch.Discard()
return err
}

// SaveBlockResponses commits the DB tx
err = m.store.SaveBlockResponses(block.Header.Height, responses)
batch, err = m.store.SaveBlockResponses(block.Header.Height, responses, batch)
if err != nil {
batch.Discard()
return err
}

// After this call m.lastState is the NEW state returned from ApplyBlock
m.lastState = newState

// UpdateState commits the DB tx
err = m.store.UpdateState(m.lastState)
batch, err = m.store.UpdateState(m.lastState, batch)
if err != nil {
batch.Discard()
return err
}

// SaveValidators commits the DB tx
err = m.store.SaveValidators(block.Header.Height, m.lastState.Validators)
batch, err = m.store.SaveValidators(block.Header.Height, m.lastState.Validators, batch)
if err != nil {
batch.Discard()
return err
}

err = batch.Commit()
if err != nil {
m.logger.Error("failed to persist batch to disk", "error", err)
return err
}

Expand Down Expand Up @@ -573,7 +597,7 @@ func (m *Manager) submitNextBatch(ctx context.Context) {

func (m *Manager) updateStateIndex(stateIndex uint64) error {
atomic.StoreUint64(&m.lastState.SLStateIndex, stateIndex)
err := m.store.UpdateState(m.lastState)
_, err := m.store.UpdateState(m.lastState, nil)
if err != nil {
m.logger.Error("Failed to update state", "error", err)
return err
Expand Down
4 changes: 2 additions & 2 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestInitialState(t *testing.T) {
// Init empty store and full store
emptyStore := store.New(store.NewDefaultInMemoryKVStore())
fullStore := store.New(store.NewDefaultInMemoryKVStore())
err := fullStore.UpdateState(sampleState)
_, err := fullStore.UpdateState(sampleState, nil)
require.NoError(t, err)

cases := []struct {
Expand Down Expand Up @@ -201,7 +201,7 @@ func getManager(genesisHeight int64, storeInitialHeight int64, storeLastBlockHei
// And updating the state according to the genesis.
state := testutil.GenerateState(storeInitialHeight, storeLastBlockHeight)
store := store.New(store.NewDefaultInMemoryKVStore())
if err := store.UpdateState(state); err != nil {
if _, err := store.UpdateState(state, nil); err != nil {
return nil, err
}
key, _, _ := crypto.GenerateEd25519Key(rand.Reader)
Expand Down
14 changes: 7 additions & 7 deletions rpc/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func TestGetBlock(t *testing.T) {
require.NoError(err)

block := getRandomBlock(1, 10)
err = rpc.node.Store.SaveBlock(block, &types.Commit{})
_, err = rpc.node.Store.SaveBlock(block, &types.Commit{}, nil)
rpc.node.Store.SetHeight(block.Header.Height)
require.NoError(err)

Expand All @@ -274,7 +274,7 @@ func TestGetCommit(t *testing.T) {
require.NoError(err)

for _, b := range blocks {
err = rpc.node.Store.SaveBlock(b, &types.Commit{Height: b.Header.Height})
_, err = rpc.node.Store.SaveBlock(b, &types.Commit{Height: b.Header.Height}, nil)
rpc.node.Store.SetHeight(b.Header.Height)
require.NoError(err)
}
Expand Down Expand Up @@ -309,10 +309,10 @@ func TestBlockSearch(t *testing.T) {
heights := []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
for _, h := range heights {
block := getRandomBlock(uint64(h), 5)
err := rpc.node.Store.SaveBlock(block, &types.Commit{
_, err := rpc.node.Store.SaveBlock(block, &types.Commit{
Height: uint64(h),
HeaderHash: block.Header.Hash(),
})
}, nil)
require.NoError(err)
}
indexBlocks(t, rpc, heights)
Expand Down Expand Up @@ -373,7 +373,7 @@ func TestGetBlockByHash(t *testing.T) {
require.NoError(err)

block := getRandomBlock(1, 10)
err = rpc.node.Store.SaveBlock(block, &types.Commit{})
_, err = rpc.node.Store.SaveBlock(block, &types.Commit{}, nil)
require.NoError(err)
abciBlock, err := abciconv.ToABCIBlock(block)
require.NoError(err)
Expand Down Expand Up @@ -567,10 +567,10 @@ func TestBlockchainInfo(t *testing.T) {
heights := []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
for _, h := range heights {
block := getRandomBlock(uint64(h), 5)
err := rpc.node.Store.SaveBlock(block, &types.Commit{
_, err := rpc.node.Store.SaveBlock(block, &types.Commit{
Height: uint64(h),
HeaderHash: block.Header.Hash(),
})
}, nil)
rpc.node.Store.SetHeight(block.Header.Height)
require.NoError(err)
}
Expand Down
65 changes: 45 additions & 20 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ func New(kv KVStore) Store {
}
}

// NewBatch creates a new db batch.
func (s *DefaultStore) NewBatch() Batch {
return s.db.NewBatch()
}

// SetHeight sets the height saved in the Store if it is higher than the existing height
func (s *DefaultStore) SetHeight(height uint64) {
storeHeight := atomic.LoadUint64(&s.height)
Expand All @@ -55,33 +60,40 @@ func (s *DefaultStore) Height() uint64 {

// SaveBlock adds block to the store along with corresponding commit.
// Stored height is updated if block height is greater than stored value.
func (s *DefaultStore) SaveBlock(block *types.Block, commit *types.Commit) error {
func (s *DefaultStore) SaveBlock(block *types.Block, commit *types.Commit, batch Batch) (Batch, error) {
hash := block.Header.Hash()
blockBlob, err := block.MarshalBinary()
if err != nil {
return fmt.Errorf("failed to marshal Block to binary: %w", err)
return batch, fmt.Errorf("failed to marshal Block to binary: %w", err)
}

commitBlob, err := commit.MarshalBinary()
if err != nil {
return fmt.Errorf("failed to marshal Commit to binary: %w", err)
return batch, fmt.Errorf("failed to marshal Commit to binary: %w", err)
}

bb := s.db.NewBatch()
bb := batch
if bb == nil {
bb = s.db.NewBatch()
}
err = multierr.Append(err, bb.Set(getBlockKey(hash), blockBlob))
err = multierr.Append(err, bb.Set(getCommitKey(hash), commitBlob))
err = multierr.Append(err, bb.Set(getIndexKey(block.Header.Height), hash[:]))

if err != nil {
bb.Discard()
return err
if batch == nil {
bb.Discard()
}
return batch, err
}

if err = bb.Commit(); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
if batch == nil {
if err = bb.Commit(); err != nil {
return nil, fmt.Errorf("failed to commit transaction: %w", err)
}
}

return nil
return batch, nil
}

// LoadBlock returns block at given height, or error if it's not found in Store.
Expand Down Expand Up @@ -112,12 +124,16 @@ func (s *DefaultStore) LoadBlockByHash(hash [32]byte) (*types.Block, error) {
}

// SaveBlockResponses saves block responses (events, tx responses, validator set updates, etc) in Store.
func (s *DefaultStore) SaveBlockResponses(height uint64, responses *tmstate.ABCIResponses) error {
func (s *DefaultStore) SaveBlockResponses(height uint64, responses *tmstate.ABCIResponses, batch Batch) (Batch, error) {
data, err := responses.Marshal()
if err != nil {
return fmt.Errorf("failed to marshal response: %w", err)
return batch, fmt.Errorf("failed to marshal response: %w", err)
}
if batch == nil {
return nil, s.db.Set(getResponsesKey(height), data)
}
return s.db.Set(getResponsesKey(height), data)
err = batch.Set(getResponsesKey(height), data)
return batch, err
}

// LoadBlockResponses returns block results at given height, or error if it's not found in Store.
Expand Down Expand Up @@ -159,16 +175,21 @@ func (s *DefaultStore) LoadCommitByHash(hash [32]byte) (*types.Commit, error) {

// UpdateState updates state saved in Store. Only one State is stored.
// If there is no State in Store, state will be saved.
func (s *DefaultStore) UpdateState(state types.State) error {
func (s *DefaultStore) UpdateState(state types.State, batch Batch) (Batch, error) {
pbState, err := state.ToProto()
if err != nil {
return fmt.Errorf("failed to marshal state to JSON: %w", err)
return batch, fmt.Errorf("failed to marshal state to JSON: %w", err)
}
data, err := pbState.Marshal()
if err != nil {
return err
return batch, err
}
return s.db.Set(getStateKey(), data)

if batch == nil {
return nil, s.db.Set(getStateKey(), data)
}
err = batch.Set(getStateKey(), data)
return batch, err
}

// LoadState returns last state saved with UpdateState.
Expand All @@ -190,17 +211,21 @@ func (s *DefaultStore) LoadState() (types.State, error) {
}

// SaveValidators stores validator set for given block height in store.
func (s *DefaultStore) SaveValidators(height uint64, validatorSet *tmtypes.ValidatorSet) error {
func (s *DefaultStore) SaveValidators(height uint64, validatorSet *tmtypes.ValidatorSet, batch Batch) (Batch, error) {
pbValSet, err := validatorSet.ToProto()
if err != nil {
return fmt.Errorf("failed to marshal ValidatorSet to protobuf: %w", err)
return batch, fmt.Errorf("failed to marshal ValidatorSet to protobuf: %w", err)
}
blob, err := pbValSet.Marshal()
if err != nil {
return fmt.Errorf("failed to marshal ValidatorSet: %w", err)
return batch, fmt.Errorf("failed to marshal ValidatorSet: %w", err)
}

return s.db.Set(getValidatorsKey(height), blob)
if batch == nil {
return nil, s.db.Set(getValidatorsKey(height), blob)
}
err = batch.Set(getValidatorsKey(height), blob)
return batch, err
}

// LoadValidators loads validator set at given block height from store.
Expand Down
Loading

0 comments on commit 83e432f

Please sign in to comment.