Skip to content

Commit

Permalink
remove rollback on containers based on VM status; ignore already acce…
Browse files Browse the repository at this point in the history
…pted containers. This change guarantees that a client never sees the container at a given index change
  • Loading branch information
Dan Laine committed Apr 9, 2021
1 parent 2d27d0c commit 9d57662
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 115 deletions.
52 changes: 13 additions & 39 deletions indexer/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ func newIndex(
log logging.Logger,
codec codec.Manager,
clock timer.Clock,
isAcceptedFunc func(containerID ids.ID) bool,
) (Index, error) {
vDB := versiondb.New(baseDB)
indexToContainer := prefixdb.New(indexToContainerPrefix, vDB)
Expand Down Expand Up @@ -105,22 +104,6 @@ func newIndex(
return nil, fmt.Errorf("couldn't parse next accepted index from bytes: %w", err)
}
i.log.Info("next accepted index %d", i.nextAcceptedIndex)

// We may have committed some containers in the index's DB that were not committed at
// the VM's DB. Go back through recently accepted things and make sure they're accepted.
for j := i.nextAcceptedIndex; j >= 1; j-- {
lastAccepted, err := i.getContainerByIndex(j - 1)
if err != nil {
return nil, fmt.Errorf("couldn't get container at index %d: %s", j-1, err)
}
if isAcceptedFunc(lastAccepted.ID) {
// The last accepted container is marked as accepted by the VM. Stop.
break
}
if err := i.removeLastAccepted(lastAccepted.ID); err != nil {
return nil, fmt.Errorf("couldn't remove container: %s", err)
}
}
return i, nil
}

Expand All @@ -141,6 +124,19 @@ func (i *index) Accept(ctx *snow.Context, containerID ids.ID, containerBytes []b
i.lock.Lock()
defer i.lock.Unlock()

// It may be the case that in a previous run of this node, this index committed [containerID]
// as accepted and then the node shut down before the VM committed [containerID] as accepted.
// In that case, when the node restarts Accept will be called with the same container.
// Make sure we don't index the same container twice in that event.
_, err := i.containerToIndex.Get(containerID[:])
if err == nil {
ctx.Log.Debug("not indexing already accepted container %s", containerID)
return nil
}
if err != database.ErrNotFound {
return fmt.Errorf("couldn't get whether %s is accepted: %w", containerID, err)
}

ctx.Log.Debug("indexing %d --> container %s", i.nextAcceptedIndex, containerID)
// Persist index --> Container
nextAcceptedIndexBytes := wrappers.PackLong(i.nextAcceptedIndex)
Expand Down Expand Up @@ -319,25 +315,3 @@ func (i *index) GetLastAccepted() (Container, error) {
func (i *index) lastAcceptedIndex() (uint64, bool) {
return i.nextAcceptedIndex - 1, i.nextAcceptedIndex != 0
}

// Remove the last accepted container, [containerID], from the databases.
// Assumes [p.nextAcceptedIndex] >= 1.
// Assumes [containerID] is actually the ID of the last accepted container.
func (i *index) removeLastAccepted(containerID ids.ID) error {
i.lock.Lock()
defer i.lock.Unlock()

if err := i.containerToIndex.Delete(containerID[:]); err != nil {
return err
}
indexBytes := wrappers.PackLong(i.nextAcceptedIndex - 1)
if err := i.indexToContainer.Delete(indexBytes); err != nil {
return fmt.Errorf("couldn't remove last accepted: %w", err)
}
i.nextAcceptedIndex--
indexBytes = wrappers.PackLong(i.nextAcceptedIndex)
if err := i.vDB.Put(nextAcceptedIndexKey, indexBytes); err != nil {
return fmt.Errorf("couldn't put next accepted key: %s", err)
}
return i.vDB.Commit()
}
60 changes: 14 additions & 46 deletions indexer/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import (
"github.com/stretchr/testify/assert"
)

func testIsAccepted(_ ids.ID) bool { return true }

func TestIndex(t *testing.T) {
// Setup
pageSize := uint64(64)
Expand All @@ -28,7 +26,7 @@ func TestIndex(t *testing.T) {
db := versiondb.New(baseDB)
ctx := snow.DefaultContextTest()

indexIntf, err := newIndex(db, logging.NoLog{}, codec, timer.Clock{}, testIsAccepted)
indexIntf, err := newIndex(db, logging.NoLog{}, codec, timer.Clock{})
assert.NoError(err)
idx := indexIntf.(*index)

Expand Down Expand Up @@ -82,7 +80,7 @@ func TestIndex(t *testing.T) {
assert.NoError(db.Commit())
assert.NoError(idx.Close())
db = versiondb.New(baseDB)
indexIntf, err = newIndex(db, logging.NoLog{}, codec, timer.Clock{}, testIsAccepted)
indexIntf, err = newIndex(db, logging.NoLog{}, codec, timer.Clock{})
assert.NoError(err)
idx = indexIntf.(*index)

Expand Down Expand Up @@ -117,7 +115,7 @@ func TestIndexGetContainerByRangeMaxPageSize(t *testing.T) {
assert.NoError(err)
db := memdb.New()
ctx := snow.DefaultContextTest()
indexIntf, err := newIndex(db, logging.NoLog{}, codec, timer.Clock{}, testIsAccepted)
indexIntf, err := newIndex(db, logging.NoLog{}, codec, timer.Clock{})
assert.NoError(err)
idx := indexIntf.(*index)

Expand Down Expand Up @@ -151,55 +149,25 @@ func TestIndexGetContainerByRangeMaxPageSize(t *testing.T) {
assert.EqualValues(containers[0], containers2[MaxFetchedByRange-2])
}

func TestIndexRollBackAccepted(t *testing.T) {
func TestDontIndexSameContainerTwice(t *testing.T) {
// Setup
assert := assert.New(t)
codec := codec.NewDefaultManager()
err := codec.RegisterCodec(codecVersion, linearcodec.NewDefault())
assert.NoError(err)
baseDB := memdb.New()
db := versiondb.New(baseDB)
db := memdb.New()
ctx := snow.DefaultContextTest()
indexIntf, err := newIndex(db, logging.NoLog{}, codec, timer.Clock{}, testIsAccepted)
idx, err := newIndex(db, logging.NoLog{}, codec, timer.Clock{})
assert.NoError(err)
idx := indexIntf.(*index)

// Accept 3 containers
containers := []ids.ID{}
for i := uint64(0); i < 3; i++ {
id := ids.GenerateTestID()
containers = append(containers, id)
assert.NoError(idx.Accept(ctx, id, utils.RandomBytes(32)))
}

// Close the index
assert.NoError(db.Commit())
assert.NoError(idx.Close())

// Re-open but with a function that says that only the first container is accepted
db = versiondb.New(baseDB)
isAccepted := func(containerID ids.ID) bool {
assert.Contains(containers, containerID)
return containerID == containers[0]
}
indexIntf, err = newIndex(db, logging.NoLog{}, codec, timer.Clock{}, isAccepted)
// Accept the same container twice
containerID := ids.GenerateTestID()
assert.NoError(idx.Accept(ctx, containerID, []byte{1, 2, 3}))
assert.NoError(idx.Accept(ctx, containerID, []byte{4, 5, 6}))
_, err = idx.GetContainerByIndex(1)
assert.Error(err, "should not have accepted same container twice")
gotContainer, err := idx.GetContainerByID(containerID)
assert.NoError(err)
idx = indexIntf.(*index)
assert.EqualValues(gotContainer.Bytes, []byte{1, 2, 3}, "should not have accepted same container twice")

// Should say that the only accepted container is containers[0]
index, ok := idx.lastAcceptedIndex()
assert.True(ok)
assert.EqualValues(0, index)
container, err := idx.GetLastAccepted()
assert.NoError(err)
assert.Equal(containers[0], container.ID)
assert.EqualValues(1, idx.nextAcceptedIndex)
_, err = idx.GetContainerByIndex(1)
assert.Error(err)
_, err = idx.GetContainerByIndex(2)
assert.Error(err)
_, err = idx.GetContainerByID(containers[1])
assert.Error(err)
_, err = idx.GetContainerByID(containers[2])
assert.Error(err)
}
35 changes: 5 additions & 30 deletions indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/ava-labs/avalanchego/database"
"github.com/ava-labs/avalanchego/database/prefixdb"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow/choices"
"github.com/ava-labs/avalanchego/snow/engine/avalanche"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/snow/engine/snowman"
Expand Down Expand Up @@ -222,17 +221,9 @@ func (i *indexer) RegisterChain(name string, ctx *snow.Context, engineIntf inter
return
}

switch engine := engineIntf.(type) {
switch engineIntf.(type) {
case snowman.Engine:
isAcceptedFunc := func(blkID ids.ID) bool {
ctx.Lock.Lock()
defer ctx.Lock.Unlock()

blk, err := engine.GetVM().GetBlock(blkID)
return err == nil && blk.Status() == choices.Accepted
}

index, err := i.registerChainHelper(chainID, blockPrefix, name, "block", i.consensusDispatcher, isAcceptedFunc)
index, err := i.registerChainHelper(chainID, blockPrefix, name, "block", i.consensusDispatcher)
if err != nil {
i.log.Fatal("couldn't create block index for %s: %s", name, err)
if err := i.close(); err != nil {
Expand All @@ -242,15 +233,7 @@ func (i *indexer) RegisterChain(name string, ctx *snow.Context, engineIntf inter
}
i.blockIndices[chainID] = index
case avalanche.Engine:
isVtxAcceptedFunc := func(vtxID ids.ID) bool {
ctx.Lock.Lock()
defer ctx.Lock.Unlock()

vtx, err := engine.GetVtx(vtxID)
return err == nil && vtx.Status() == choices.Accepted
}

vtxIndex, err := i.registerChainHelper(chainID, vtxPrefix, name, "vtx", i.consensusDispatcher, isVtxAcceptedFunc)
vtxIndex, err := i.registerChainHelper(chainID, vtxPrefix, name, "vtx", i.consensusDispatcher)
if err != nil {
i.log.Fatal("couldn't create vertex index for %s: %s", name, err)
if err := i.close(); err != nil {
Expand All @@ -260,14 +243,7 @@ func (i *indexer) RegisterChain(name string, ctx *snow.Context, engineIntf inter
}
i.vtxIndices[chainID] = vtxIndex

isTxAcceptedFunc := func(txID ids.ID) bool {
ctx.Lock.Lock()
defer ctx.Lock.Unlock()

tx, err := engine.GetVM().GetTx(txID)
return err == nil && tx.Status() == choices.Accepted
}
txIndex, err := i.registerChainHelper(chainID, txPrefix, name, "tx", i.decisionDispatcher, isTxAcceptedFunc)
txIndex, err := i.registerChainHelper(chainID, txPrefix, name, "tx", i.decisionDispatcher)
if err != nil {
i.log.Fatal("couldn't create tx index for %s: %s", name, err)
if err := i.close(); err != nil {
Expand All @@ -291,13 +267,12 @@ func (i *indexer) registerChainHelper(
prefixEnd byte,
name, endpoint string,
dispatcher *triggers.EventDispatcher,
isAcceptedF func(containerID ids.ID) bool,
) (Index, error) {
prefix := make([]byte, hashing.HashLen+wrappers.ByteLen)
copy(prefix, chainID[:])
prefix[hashing.HashLen] = prefixEnd
indexDB := prefixdb.New(prefix, i.db)
index, err := newIndex(indexDB, i.log, i.codec, i.clock, isAcceptedF)
index, err := newIndex(indexDB, i.log, i.codec, i.clock)
if err != nil {
_ = indexDB.Close()
return nil, err
Expand Down

0 comments on commit 9d57662

Please sign in to comment.