Skip to content

Commit

Permalink
Simplify proposerVM indexer (ava-labs#1185)
Browse files Browse the repository at this point in the history
  • Loading branch information
abi87 authored Feb 8, 2022
1 parent 81901ab commit 3de2a65
Showing 10 changed files with 145 additions and 276 deletions.
7 changes: 4 additions & 3 deletions vms/proposervm/block.go
Original file line number Diff line number Diff line change
@@ -11,7 +11,6 @@ import (
"github.com/ava-labs/avalanchego/snow/choices"
"github.com/ava-labs/avalanchego/snow/consensus/snowman"
"github.com/ava-labs/avalanchego/vms/proposervm/block"
"github.com/ava-labs/avalanchego/vms/proposervm/indexer"
"github.com/ava-labs/avalanchego/vms/proposervm/proposer"
)

@@ -34,7 +33,9 @@ var (
)

type Block interface {
indexer.WrappingBlock
snowman.Block

getInnerBlk() snowman.Block

verifyPreForkChild(child *preForkBlock) error
verifyPostForkChild(child *postForkBlock) error
@@ -230,7 +231,7 @@ func (p *postForkCommonComponents) buildChild(
return child, nil
}

func (p *postForkCommonComponents) GetInnerBlk() snowman.Block {
func (p *postForkCommonComponents) getInnerBlk() snowman.Block {
return p.innerBlk
}

7 changes: 5 additions & 2 deletions vms/proposervm/block_server.go
Original file line number Diff line number Diff line change
@@ -5,13 +5,16 @@ package proposervm

import (
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow/consensus/snowman"
"github.com/ava-labs/avalanchego/vms/proposervm/indexer"
)

var _ indexer.BlockServer = &VM{}

// GetWrappingBlk implements BlockServer interface
func (vm *VM) GetWrappingBlk(blkID ids.ID) (indexer.WrappingBlock, error) {
// GetFullPostForkBlock implements BlockServer interface
// Note: this is a contention heavy call that should be avoided
// for frequent/repeated indexer ops
func (vm *VM) GetFullPostForkBlock(blkID ids.ID) (snowman.Block, error) {
vm.ctx.Lock.Lock()
defer vm.ctx.Lock.Unlock()

5 changes: 4 additions & 1 deletion vms/proposervm/indexer/block_server.go
Original file line number Diff line number Diff line change
@@ -6,12 +6,15 @@ package indexer
import (
"github.com/ava-labs/avalanchego/database/versiondb"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow/consensus/snowman"
)

// BlockServer represents all requests heightIndexer can issue
// against ProposerVM. All methods must be thread-safe.
type BlockServer interface {
versiondb.Commitable

GetWrappingBlk(blkID ids.ID) (WrappingBlock, error)
// Note: this is a contention heavy call that should be avoided
// for frequent/repeated indexer ops
GetFullPostForkBlock(blkID ids.ID) (snowman.Block, error)
}
17 changes: 8 additions & 9 deletions vms/proposervm/indexer/block_server_test.go
Original file line number Diff line number Diff line change
@@ -22,19 +22,18 @@ var (
type TestBlockServer struct {
T *testing.T

CantGetWrappingBlk bool
CantCommit bool
CantGetFullPostForkBlock bool
CantCommit bool

GetWrappingBlkF func(blkID ids.ID) (WrappingBlock, error)
GetInnerBlkF func(id ids.ID) (snowman.Block, error)
CommitF func() error
GetFullPostForkBlockF func(blkID ids.ID) (snowman.Block, error)
CommitF func() error
}

func (tsb *TestBlockServer) GetWrappingBlk(blkID ids.ID) (WrappingBlock, error) {
if tsb.GetWrappingBlkF != nil {
return tsb.GetWrappingBlkF(blkID)
func (tsb *TestBlockServer) GetFullPostForkBlock(blkID ids.ID) (snowman.Block, error) {
if tsb.GetFullPostForkBlockF != nil {
return tsb.GetFullPostForkBlockF(blkID)
}
if tsb.CantGetWrappingBlk && tsb.T != nil {
if tsb.CantGetFullPostForkBlock && tsb.T != nil {
tsb.T.Fatal(errGetWrappingBlk)
}
return nil, errGetWrappingBlk
48 changes: 28 additions & 20 deletions vms/proposervm/indexer/height_indexer.go
Original file line number Diff line number Diff line change
@@ -36,20 +36,20 @@ type HeightIndexer interface {
func NewHeightIndexer(
server BlockServer,
log logging.Logger,
indexState state.HeightIndex,
indexState state.State,
) HeightIndexer {
return newHeightIndexer(server, log, indexState)
}

func newHeightIndexer(
server BlockServer,
log logging.Logger,
indexState state.HeightIndex,
indexState state.State,
) *heightIndexer {
return &heightIndexer{
server: server,
log: log,
indexState: indexState,
state: indexState,
commitFrequency: defaultCommitFrequency,
}
}
@@ -58,8 +58,8 @@ type heightIndexer struct {
server BlockServer
log logging.Logger

jobDone utils.AtomicBool
indexState state.HeightIndex
jobDone utils.AtomicBool
state state.State

commitFrequency int
}
@@ -75,7 +75,7 @@ func (hi *heightIndexer) IsRepaired() bool {
// the process has limited memory footprint, can be resumed from periodic checkpoints
// and works asynchronously without blocking the VM.
func (hi *heightIndexer) RepairHeightIndex(ctx context.Context) error {
startBlkID, err := hi.indexState.GetCheckpoint()
startBlkID, err := hi.state.GetCheckpoint()
if err == database.ErrNotFound {
hi.jobDone.SetValue(true)
return nil // nothing to do
@@ -84,7 +84,15 @@ func (hi *heightIndexer) RepairHeightIndex(ctx context.Context) error {
return err
}

if err := hi.doRepair(ctx, startBlkID); err != nil {
// retrieve checkpoint height. We explicitly track block height
// in doRepair to avoid heavier DB reads.
startBlk, err := hi.server.GetFullPostForkBlock(startBlkID)
if err != nil {
return err
}

startHeight := startBlk.Height()
if err := hi.doRepair(ctx, startBlkID, startHeight); err != nil {
return fmt.Errorf("could not repair height index: %w", err)
}
if err := hi.flush(); err != nil {
@@ -96,29 +104,30 @@ func (hi *heightIndexer) RepairHeightIndex(ctx context.Context) error {
// if height index needs repairing, doRepair would do that. It
// iterates back via parents, checking and rebuilding height indexing.
// Note: batch commit is deferred to doRepair caller
func (hi *heightIndexer) doRepair(ctx context.Context, currentProBlkID ids.ID) error {
func (hi *heightIndexer) doRepair(ctx context.Context, currentProBlkID ids.ID, currentHeight uint64) error {
var (
start = time.Now()
lastLogTime = start
indexedBlks int
lastIndexedBlks int
previousHeight uint64
)
for {
if err := ctx.Err(); err != nil {
return err
}

processingStart := time.Now()
currentAcceptedBlk, err := hi.server.GetWrappingBlk(currentProBlkID)
currentAcceptedBlk, _, err := hi.state.GetBlock(currentProBlkID)
if err == database.ErrNotFound {
// We have visited all the proposerVM blocks. Because we previously
// verified that we needed to perform a repair, we know that this
// will not happen on the first iteration. This guarantees that
// [previousHeight] will be correctly initialized.
if err := hi.indexState.SetForkHeight(previousHeight); err != nil {
// forkHeight will be correctly initialized.
forkHeight := currentHeight + 1
if err := hi.state.SetForkHeight(forkHeight); err != nil {
return err
}
if err := hi.indexState.DeleteCheckpoint(); err != nil {
if err := hi.state.DeleteCheckpoint(); err != nil {
return err
}
hi.jobDone.SetValue(true)
@@ -128,7 +137,7 @@ func (hi *heightIndexer) doRepair(ctx context.Context, currentProBlkID ids.ID) e
"indexing finished after %d blocks, duration %v, with fork height %d",
indexedBlks,
time.Since(start),
previousHeight,
forkHeight,
)
return nil
}
@@ -140,7 +149,7 @@ func (hi *heightIndexer) doRepair(ctx context.Context, currentProBlkID ids.ID) e
if indexedBlks-lastIndexedBlks > hi.commitFrequency {
// Note: checkpoint must be the lowest block in the batch. This ensures that
// checkpoint is the highest un-indexed block from which process would restart.
if err := hi.indexState.SetCheckpoint(currentProBlkID); err != nil {
if err := hi.state.SetCheckpoint(currentProBlkID); err != nil {
return err
}

@@ -156,8 +165,7 @@ func (hi *heightIndexer) doRepair(ctx context.Context, currentProBlkID ids.ID) e
}

// Rebuild height block index.
currentHeight := currentAcceptedBlk.Height()
if err := hi.indexState.SetBlockIDAtHeight(currentHeight, currentProBlkID); err != nil {
if err := hi.state.SetBlockIDAtHeight(currentHeight, currentProBlkID); err != nil {
return err
}

@@ -174,8 +182,8 @@ func (hi *heightIndexer) doRepair(ctx context.Context, currentProBlkID ids.ID) e
}

// keep checking the parent
currentProBlkID = currentAcceptedBlk.Parent()
previousHeight = currentHeight
currentProBlkID = currentAcceptedBlk.ParentID()
currentHeight--

processingDuration := time.Since(processingStart)
// Sleep [sleepDurationMultiplier]x (5x) the amount of time we spend processing the block
@@ -186,7 +194,7 @@ func (hi *heightIndexer) doRepair(ctx context.Context, currentProBlkID ids.ID) e

// flush writes the commits to the underlying DB
func (hi *heightIndexer) flush() error {
if err := hi.indexState.Commit(); err != nil {
if err := hi.state.Commit(); err != nil {
return err
}
return hi.server.Commit()
Loading

0 comments on commit 3de2a65

Please sign in to comment.