Skip to content

Commit

Permalink
now both hare and tortoise can update state (spacemeshos#1870)
Browse files Browse the repository at this point in the history
## Motivation
Closes spacemeshos#1805 

## Changes
added a unified method in mesh to update the state `updateStateWithLayer`
Now inputs can be received via 2 code paths, the first is `ValidateLayer` which runs tortoise validation and then update the state
Note: this may result in different states received when running a node and receiving inputs from hare and syncing a node, since potentially, if assumptions are broken , tortoise may contain more blocks than first agreed by hare protocol for a certain layer @barakshani , please re affirm this

## Test Plan
add UT to test receiving inputs from both tortoise and hare,
test state received when syncing from tortoise vs state received from hare
  • Loading branch information
antonlerner committed Mar 22, 2020
1 parent b22fcb3 commit ed30588
Show file tree
Hide file tree
Showing 13 changed files with 309 additions and 88 deletions.
4 changes: 4 additions & 0 deletions cmd/hare/hare.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func init() {
type mockBlockProvider struct {
}

func (mbp *mockBlockProvider) HandleValidatedLayer(validatedLayer types.LayerID, layer []types.BlockID) {
panic("implement me")
}

func (mbp *mockBlockProvider) LayerBlockIds(layerId types.LayerID) ([]types.BlockID, error) {
return buildSet(), nil
}
Expand Down
3 changes: 3 additions & 0 deletions hare/flows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ func newRandBlockID(rng *rand.Rand) (id types.BlockID) {
type mockBlockProvider struct {
}

func (mbp *mockBlockProvider) HandleValidatedLayer(validatedLayer types.LayerID, layer []types.BlockID) {
}

func (mbp *mockBlockProvider) LayerBlockIds(layerId types.LayerID) ([]types.BlockID, error) {
return buildSet(), nil
}
Expand Down
14 changes: 9 additions & 5 deletions hare/hare.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ type TerminationOutput interface {
Completed() bool
}

type orphanBlockProvider interface {
type layers interface {
LayerBlockIds(layerID types.LayerID) ([]types.BlockID, error)
HandleValidatedLayer(validatedLayer types.LayerID, layer []types.BlockID)
}

// checks if the collected output is valid
Expand All @@ -52,7 +53,7 @@ type Hare struct {

sign Signer

obp orphanBlockProvider
msh layers
rolacle Rolacle

networkDelta time.Duration
Expand All @@ -77,7 +78,7 @@ type Hare struct {

// New returns a new Hare struct.
func New(conf config.Config, p2p NetworkService, sign Signer, nid types.NodeId, validate outputValidationFunc,
syncState syncStateFunc, obp orphanBlockProvider, rolacle Rolacle,
syncState syncStateFunc, obp layers, rolacle Rolacle,
layersPerEpoch uint16, idProvider identityProvider, stateQ StateQuerier,
beginLayer chan types.LayerID, logger log.Log) *Hare {
h := new(Hare)
Expand All @@ -96,7 +97,7 @@ func New(conf config.Config, p2p NetworkService, sign Signer, nid types.NodeId,

h.sign = sign

h.obp = obp
h.msh = obp
h.rolacle = rolacle

h.networkDelta = time.Duration(conf.WakeupDelta) * time.Second
Expand Down Expand Up @@ -172,6 +173,8 @@ func (h *Hare) collectOutput(output TerminationOutput) error {

id := output.ID()

h.msh.HandleValidatedLayer(types.LayerID(id), blocks)

if h.outOfBufferRange(id) {
return ErrTooLate
}
Expand All @@ -181,6 +184,7 @@ func (h *Hare) collectOutput(output TerminationOutput) error {
delete(h.outputs, h.oldestResultInBuffer())
}
h.outputs[types.LayerID(id)] = blocks

h.mu.Unlock()

return nil
Expand Down Expand Up @@ -216,7 +220,7 @@ func (h *Hare) onTick(id types.LayerID) {

h.Debug("get hare results")
// retrieve set form orphan blocks
blocks, err := h.obp.LayerBlockIds(h.lastLayer)
blocks, err := h.msh.LayerBlockIds(h.lastLayer)
if err != nil {
h.With().Error("No blocks for consensus", log.LayerId(uint64(id)), log.Err(err))
return
Expand Down
2 changes: 1 addition & 1 deletion hare/hare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestHare_GetResult2(t *testing.T) {
}

h := createHare(n1, log.NewDefault(t.Name()))
h.obp = om
h.msh = om

h.networkDelta = 0

Expand Down
3 changes: 3 additions & 0 deletions hare/orphan_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ type orphanMock struct {
f func() []types.BlockID
}

func (op *orphanMock) HandleValidatedLayer(validatedLayer types.LayerID, layer []types.BlockID) {
}

func (op *orphanMock) GetOrphanBlocks() []types.BlockID {
if op.f != nil {
return op.f()
Expand Down
2 changes: 1 addition & 1 deletion mesh/atxdb_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (t *AtxDbMock) GetATXs(atxIds []types.AtxId) (map[types.AtxId]*types.Activa
}

func (t *AtxDbMock) GetFullAtx(id types.AtxId) (*types.ActivationTx, error) {
panic("implement me")
return t.db[id], nil
}

func (t *AtxDbMock) AddAtx(id types.AtxId, atx *types.ActivationTx) {
Expand Down
163 changes: 117 additions & 46 deletions mesh/mesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,19 +96,23 @@ type Mesh struct {
orphMutex sync.RWMutex
pMutex sync.RWMutex
done chan struct{}
nextValidLayers map[types.LayerID]*types.Layer
maxValidatedLayer types.LayerID
txMutex sync.Mutex
}

func NewMesh(db *MeshDB, atxDb AtxDB, rewardConfig Config, mesh Tortoise, txInvalidator TxMemPoolInValidator, atxInvalidator AtxMemPoolInValidator, pr TxProcessor, logger log.Log) *Mesh {
ll := &Mesh{
Log: logger,
trtl: mesh,
txInvalidator: txInvalidator,
atxInvalidator: atxInvalidator,
TxProcessor: pr,
done: make(chan struct{}),
MeshDB: db,
config: rewardConfig,
AtxDB: atxDb,
Log: logger,
trtl: mesh,
txInvalidator: txInvalidator,
atxInvalidator: atxInvalidator,
TxProcessor: pr,
done: make(chan struct{}),
MeshDB: db,
config: rewardConfig,
AtxDB: atxDb,
nextValidLayers: make(map[types.LayerID]*types.Layer),
}

ll.Validator = &validator{ll, 0}
Expand Down Expand Up @@ -266,20 +270,108 @@ func (v *validator) ValidateLayer(lyr *types.Layer) {
func (m *Mesh) pushLayersToState(oldPbase types.LayerID, newPbase types.LayerID) {
for layerId := oldPbase; layerId < newPbase; layerId++ {
l, err := m.GetLayer(layerId)
// TODO: propagate/handle error
if err != nil || l == nil {
// TODO: propagate/handle error
m.With().Error("failed to get layer", log.LayerId(layerId.Uint64()), log.Err(err))
break
return
}
m.AccumulateRewards(l, m.config)
m.PushTransactions(l)
m.logStateRoot(layerId)
m.setLayerHash(l)
m.setLatestLayerInState(layerId)
validBlocks, invalidBlocks := m.BlocksByValidity(l.Blocks())
m.updateStateWithLayer(layerId, types.NewExistingLayer(layerId, validBlocks))
m.reInsertTxsToPool(validBlocks, invalidBlocks, l.Index())
}
m.persistLayerHash()
}

func (m *Mesh) reInsertTxsToPool(validBlocks, invalidBlocks []*types.Block, l types.LayerID) {
seenTxIds := make(map[types.TransactionId]struct{})
uniqueTxIds(validBlocks, seenTxIds)
returnedTxs := m.getTxs(uniqueTxIds(invalidBlocks, seenTxIds), l)
grouped, accounts := m.removeFromUnappliedTxs(returnedTxs, l)
for account := range accounts {
m.removeRejectedFromAccountTxs(account, grouped, l)
}
for _, tx := range returnedTxs {
err := m.blockBuilder.ValidateAndAddTxToPool(tx)
// We ignore errors here, since they mean that the tx is no longer valid and we shouldn't re-add it
if err == nil {
m.With().Info("transaction from contextually invalid block re-added to mempool",
log.TxId(tx.Id().ShortString()))
}
}
}

func (m *Mesh) applyState(l *types.Layer) {
m.AccumulateRewards(l, m.config)
m.PushTransactions(l)
m.logStateRoot(l.Index())
m.setLayerHash(l)
m.setLatestLayerInState(l.Index())
}

func (m *Mesh) HandleValidatedLayer(validatedLayer types.LayerID, layer []types.BlockID) {
blocks := []*types.Block{}

for _, blockId := range layer {
block, err := m.GetBlock(blockId)
if err != nil {
//stop processing this hare result, wait until tortoise pushes this layer into state
log.Error("hare terminated with block that is not present in mesh")
return
}
blocks = append(blocks, block)
}
lyr := types.NewExistingLayer(validatedLayer, blocks)
invalidBlocks := m.getInvalidBlocksByHare(lyr)
m.updateStateWithLayer(validatedLayer, lyr)
m.reInsertTxsToPool(blocks, invalidBlocks, lyr.Index())
}

func (m *Mesh) getInvalidBlocksByHare(hareLayer *types.Layer) (invalid []*types.Block) {
dbLayer, err := m.GetLayer(hareLayer.Index())
if err != nil {
log.Panic("wtf")
return
}
exists := make(map[types.BlockID]struct{})
for _, block := range hareLayer.Blocks() {
exists[block.Id()] = struct{}{}
}

for _, block := range dbLayer.Blocks() {
if _, has := exists[block.Id()]; !has {
invalid = append(invalid, block)
}
}
return
}

func (m *Mesh) updateStateWithLayer(validatedLayer types.LayerID, layer *types.Layer) {
m.txMutex.Lock()
defer m.txMutex.Unlock()
latest := m.LatestLayerInState()
if validatedLayer <= latest {
log.Info("result received after state has been advanced for layer %v, latest: %v", validatedLayer, latest)
return
}
if m.maxValidatedLayer < validatedLayer {
m.maxValidatedLayer = validatedLayer
}
if validatedLayer > latest+1 {
log.Info("early layer result was received for layer %v, max validated so far %v", validatedLayer, m.maxValidatedLayer)
m.nextValidLayers[validatedLayer] = layer
return
}
m.applyState(layer)
for i := validatedLayer + 1; i <= m.maxValidatedLayer; i++ {
nxtLayer, has := m.nextValidLayers[i]
if !has {
break
}
m.applyState(nxtLayer)
delete(m.nextValidLayers, i)
}
}

func (m *Mesh) setLatestLayerInState(lyr types.LayerID) {
// update validated layer only after applying transactions since loading of state depends on processedLayer param.
m.pMutex.Lock()
Expand Down Expand Up @@ -313,9 +405,8 @@ func (m *Mesh) persistLayerHash() {
}
}

func (m *Mesh) ExtractUniqueOrderedTransactions(l *types.Layer) (validBlockTxs, invalidBlockTxs []*types.Transaction) {
// Separate blocks by validity
validBlocks, invalidBlocks := m.BlocksByValidity(l.Blocks())
func (m *Mesh) ExtractUniqueOrderedTransactions(l *types.Layer) (validBlockTxs []*types.Transaction) {
validBlocks := l.Blocks()

// Deterministically sort valid blocks
types.SortBlocks(validBlocks)
Expand All @@ -332,8 +423,8 @@ func (m *Mesh) ExtractUniqueOrderedTransactions(l *types.Layer) (validBlockTxs,
})

// Get and return unique transactions
seenTxIds := map[types.TransactionId]struct{}{}
return m.getTxs(uniqueTxIds(validBlocks, seenTxIds), l), m.getTxs(uniqueTxIds(invalidBlocks, seenTxIds), l)
seenTxIds := make(map[types.TransactionId]struct{})
return m.getTxs(uniqueTxIds(validBlocks, seenTxIds), l.Index())
}

func toUint64Slice(b []byte) []uint64 {
Expand All @@ -359,35 +450,26 @@ func uniqueTxIds(blocks []*types.Block, seenTxIds map[types.TransactionId]struct
return txIds
}

func (m *Mesh) getTxs(txIds []types.TransactionId, l *types.Layer) []*types.Transaction {
func (m *Mesh) getTxs(txIds []types.TransactionId, l types.LayerID) []*types.Transaction {
txs, missing := m.GetTransactions(txIds)
if len(missing) != 0 {
m.Panic("could not find transactions %v from layer %v", missing, l.Index())
m.Panic("could not find transactions %v from layer %v", missing, l)
}
return txs
}

func (m *Mesh) PushTransactions(l *types.Layer) {
validBlockTxs, invalidBlockTxs := m.ExtractUniqueOrderedTransactions(l)
validBlockTxs := m.ExtractUniqueOrderedTransactions(l)
numFailedTxs, err := m.ApplyTransactions(l.Index(), validBlockTxs)
if err != nil {
m.With().Error("failed to apply transactions",
log.LayerId(l.Index().Uint64()), log.Int("num_failed_txs", numFailedTxs), log.Err(err))
// TODO: We want to panic here once we have a way to "remember" that we didn't apply these txs
// e.g. persist the last layer transactions were applied from and use that instead of `oldBase`
}
m.removeFromUnappliedTxs(validBlockTxs, invalidBlockTxs, l.Index())
for _, tx := range invalidBlockTxs {
err = m.blockBuilder.ValidateAndAddTxToPool(tx)
// We ignore errors here, since they mean that the tx is no longer valid and we shouldn't re-add it
if err == nil {
m.With().Info("transaction from contextually invalid block re-added to mempool",
log.TxId(tx.Id().ShortString()))
}
}
m.removeFromUnappliedTxs(validBlockTxs, l.Index())
m.With().Info("applied transactions",
log.Int("valid_block_txs", len(validBlockTxs)),
log.Int("invalid_block_txs", len(invalidBlockTxs)),
log.LayerId(l.Index().Uint64()),
log.Int("num_failed_txs", numFailedTxs),
)
Expand Down Expand Up @@ -574,17 +656,6 @@ func (m *Mesh) GetOrphanBlocksBefore(l types.LayerID) ([]types.BlockID, error) {
func (m *Mesh) AccumulateRewards(l *types.Layer, params Config) {
ids := make([]types.Address, 0, len(l.Blocks()))
for _, bl := range l.Blocks() {
valid, err := m.ContextualValidity(bl.Id())
if err != nil {
m.With().Error("could not get contextual validity", log.BlockId(bl.Id().String()), log.Err(err))
}
if !valid {
m.With().Info("Withheld reward for contextually invalid block",
log.BlockId(bl.Id().String()),
log.LayerId(l.Index().Uint64()),
)
continue
}
if bl.ATXID == *types.EmptyAtxId {
m.With().Info("skipping reward distribution for block with no ATX",
log.LayerId(uint64(bl.LayerIndex)), log.BlockId(bl.Id().String()))
Expand All @@ -605,7 +676,7 @@ func (m *Mesh) AccumulateRewards(l *types.Layer, params Config) {
}

// aggregate all blocks' rewards
txs, _ := m.ExtractUniqueOrderedTransactions(l)
txs := m.ExtractUniqueOrderedTransactions(l)

totalReward := &big.Int{}
for _, tx := range txs {
Expand Down
Loading

0 comments on commit ed30588

Please sign in to comment.