Skip to content

Commit

Permalink
Guarded state and transaction for region and prime
Browse files Browse the repository at this point in the history
  • Loading branch information
gameofpointers committed Apr 27, 2023
1 parent 9878f2a commit 387c28b
Show file tree
Hide file tree
Showing 21 changed files with 606 additions and 266 deletions.
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,10 @@ run-all:
ifeq (,$(wildcard nodelogs))
mkdir nodelogs
endif
@nohup $(BASE_CMD) --miner.etherbase $(PRIME_COINBASE) --port $(PRIME_PORT_TCP) --http.port $(PRIME_PORT_HTTP) --ws.port $(PRIME_PORT_WS) --sub.urls $(PRIME_SUB_URLS) >> nodelogs/prime.log 2>&1 &
@nohup $(BASE_CMD) --miner.etherbase $(REGION_0_COINBASE) --port $(REGION_0_PORT_TCP) --http.port $(REGION_0_PORT_HTTP) --ws.port $(REGION_0_PORT_WS) --dom.url $(REGION_0_DOM_URL):$(PRIME_PORT_WS) --sub.urls $(REGION_0_SUB_URLS) --region 0 >> nodelogs/region-0.log 2>&1 &
@nohup $(BASE_CMD) --miner.etherbase $(REGION_1_COINBASE) --port $(REGION_1_PORT_TCP) --http.port $(REGION_1_PORT_HTTP) --ws.port $(REGION_1_PORT_WS) --dom.url $(REGION_1_DOM_URL):$(PRIME_PORT_WS) --sub.urls $(REGION_1_SUB_URLS) --region 1 >> nodelogs/region-1.log 2>&1 &
@nohup $(BASE_CMD) --miner.etherbase $(REGION_2_COINBASE) --port $(REGION_2_PORT_TCP) --http.port $(REGION_2_PORT_HTTP) --ws.port $(REGION_2_PORT_WS) --dom.url $(REGION_2_DOM_URL):$(PRIME_PORT_WS) --sub.urls $(REGION_2_SUB_URLS) --region 2 >> nodelogs/region-2.log 2>&1 &
@nohup $(BASE_CMD) --port $(PRIME_PORT_TCP) --http.port $(PRIME_PORT_HTTP) --ws.port $(PRIME_PORT_WS) --sub.urls $(PRIME_SUB_URLS) >> nodelogs/prime.log 2>&1 &
@nohup $(BASE_CMD) --port $(REGION_0_PORT_TCP) --http.port $(REGION_0_PORT_HTTP) --ws.port $(REGION_0_PORT_WS) --dom.url $(REGION_0_DOM_URL):$(PRIME_PORT_WS) --sub.urls $(REGION_0_SUB_URLS) --region 0 >> nodelogs/region-0.log 2>&1 &
@nohup $(BASE_CMD) --port $(REGION_1_PORT_TCP) --http.port $(REGION_1_PORT_HTTP) --ws.port $(REGION_1_PORT_WS) --dom.url $(REGION_1_DOM_URL):$(PRIME_PORT_WS) --sub.urls $(REGION_1_SUB_URLS) --region 1 >> nodelogs/region-1.log 2>&1 &
@nohup $(BASE_CMD) --port $(REGION_2_PORT_TCP) --http.port $(REGION_2_PORT_HTTP) --ws.port $(REGION_2_PORT_WS) --dom.url $(REGION_2_DOM_URL):$(PRIME_PORT_WS) --sub.urls $(REGION_2_SUB_URLS) --region 2 >> nodelogs/region-2.log 2>&1 &
@nohup $(BASE_CMD) --miner.etherbase $(ZONE_0_0_COINBASE) --port $(ZONE_0_0_PORT_TCP) --http.port $(ZONE_0_0_PORT_HTTP) --ws.port $(ZONE_0_0_PORT_WS) --dom.url $(ZONE_0_0_DOM_URL):$(REGION_0_PORT_WS) --region 0 --zone 0 >> nodelogs/zone-0-0.log 2>&1 &
@nohup $(BASE_CMD) --miner.etherbase $(ZONE_0_1_COINBASE) --port $(ZONE_0_1_PORT_TCP) --http.port $(ZONE_0_1_PORT_HTTP) --ws.port $(ZONE_0_1_PORT_WS) --dom.url $(ZONE_0_1_DOM_URL):$(REGION_0_PORT_WS) --region 0 --zone 1 >> nodelogs/zone-0-1.log 2>&1 &
@nohup $(BASE_CMD) --miner.etherbase $(ZONE_0_2_COINBASE) --port $(ZONE_0_2_PORT_TCP) --http.port $(ZONE_0_2_PORT_HTTP) --ws.port $(ZONE_0_2_PORT_WS) --dom.url $(ZONE_0_2_DOM_URL):$(REGION_0_PORT_WS) --region 0 --zone 2 >> nodelogs/zone-0-2.log 2>&1 &
Expand Down
9 changes: 6 additions & 3 deletions cmd/go-quai/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,12 @@ func startNode(ctx *cli.Context, stack *node.Node, backend quaiapi.Backend) {
if !ok {
utils.Fatalf("Ethereum service not running: %v", err)
}
// Set the gas price to the limits from the CLI and start mining
gasprice := utils.GlobalBig(ctx, utils.MinerGasPriceFlag.Name)
ethBackend.TxPool().SetGasPrice(gasprice)
nodeCtx := common.NodeLocation.Context()
if nodeCtx == common.ZONE_CTX {
// Set the gas price to the limits from the CLI and start mining
gasprice := utils.GlobalBig(ctx, utils.MinerGasPriceFlag.Name)
ethBackend.TxPool().SetGasPrice(gasprice)
}
// start mining
threads := ctx.GlobalInt(utils.MinerThreadsFlag.Name)
if err := ethBackend.StartMining(threads); err != nil {
Expand Down
5 changes: 4 additions & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -1315,7 +1315,10 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
log.Warn("Disable transaction unindexing for archive node")
}

setEtherbase(ctx, cfg)
// only set etherbase if its a zone chain
if ctx.GlobalIsSet(RegionFlag.Name) && ctx.GlobalIsSet(ZoneFlag.Name) {
setEtherbase(ctx, cfg)
}
setGPO(ctx, &cfg.GPO, ctx.GlobalString(SyncModeFlag.Name) == "light")
setTxPool(ctx, &cfg.TxPool)
setBlake3pow(ctx, cfg)
Expand Down
45 changes: 25 additions & 20 deletions consensus/blake3pow/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,27 +286,29 @@ func (blake3pow *Blake3pow) verifyHeader(chain consensus.ChainHeaderReader, head
}
}

// Verify that the gas limit is <= 2^63-1
cap := uint64(0x7fffffffffffffff)
if header.GasLimit() > cap {
return fmt.Errorf("invalid gasLimit: have %v, max %v", header.GasLimit(), cap)
}
// Verify that the gasUsed is <= gasLimit
if header.GasUsed() > header.GasLimit() {
return fmt.Errorf("invalid gasUsed: have %d, gasLimit %d", header.GasUsed(), header.GasLimit())
}
// Verify the block's gas usage and (if applicable) verify the base fee.
if !chain.Config().IsLondon(header.Number()) {
// Verify BaseFee not present before EIP-1559 fork.
if header.BaseFee() != nil {
return fmt.Errorf("invalid baseFee before fork: have %d, expected 'nil'", header.BaseFee())
if nodeCtx == common.ZONE_CTX {
// Verify that the gas limit is <= 2^63-1
cap := uint64(0x7fffffffffffffff)
if header.GasLimit() > cap {
return fmt.Errorf("invalid gasLimit: have %v, max %v", header.GasLimit(), cap)
}
// Verify that the gasUsed is <= gasLimit
if header.GasUsed() > header.GasLimit() {
return fmt.Errorf("invalid gasUsed: have %d, gasLimit %d", header.GasUsed(), header.GasLimit())
}
if err := misc.VerifyGaslimit(parent.GasLimit(), header.GasLimit()); err != nil {
// Verify the block's gas usage and (if applicable) verify the base fee.
if !chain.Config().IsLondon(header.Number()) {
// Verify BaseFee not present before EIP-1559 fork.
if header.BaseFee() != nil {
return fmt.Errorf("invalid baseFee before fork: have %d, expected 'nil'", header.BaseFee())
}
if err := misc.VerifyGaslimit(parent.GasLimit(), header.GasLimit()); err != nil {
return err
}
} else if err := misc.VerifyEip1559Header(chain.Config(), parent, header); err != nil {
// Verify the header's EIP-1559 attributes.
return err
}
} else if err := misc.VerifyEip1559Header(chain.Config(), parent, header); err != nil {
// Verify the header's EIP-1559 attributes.
return err
}
// Verify that the block number is parent's +1
if diff := new(big.Int).Sub(header.Number(), parent.Number()); diff.Cmp(big.NewInt(1)) != 0 {
Expand Down Expand Up @@ -436,8 +438,11 @@ func (blake3pow *Blake3pow) Finalize(chain consensus.ChainHeaderReader, header *
// FinalizeAndAssemble implements consensus.Engine, accumulating the block and
// uncle rewards, setting the final state and assembling the block.
func (blake3pow *Blake3pow) FinalizeAndAssemble(chain consensus.ChainHeaderReader, header *types.Header, state *state.StateDB, txs []*types.Transaction, uncles []*types.Header, etxs []*types.Transaction, subManifest types.BlockManifest, receipts []*types.Receipt) (*types.Block, error) {
// Finalize block
blake3pow.Finalize(chain, header, state, txs, uncles)
nodeCtx := common.NodeLocation.Context()
if nodeCtx == common.ZONE_CTX {
// Finalize block
blake3pow.Finalize(chain, header, state, txs, uncles)
}

// Header seems complete, assemble into a block and return
return types.NewBlock(header, txs, uncles, etxs, subManifest, receipts, trie.NewStackTrie(nil)), nil
Expand Down
57 changes: 35 additions & 22 deletions core/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,37 +55,50 @@ func NewBlockValidator(config *params.ChainConfig, headerChain *HeaderChain, eng
func (v *BlockValidator) ValidateBody(block *types.Block) error {
nodeCtx := common.NodeLocation.Context()
// Check whether the block's known, and if not, that it's linkable
if v.hc.bc.processor.HasBlockAndState(block.Hash(), block.NumberU64()) {
return ErrKnownBlock
}
// Header validity is known at this point, check the uncles and transactions
header := block.Header()
if err := v.engine.VerifyUncles(v.hc, block); err != nil {
return err
}
if hash := types.CalcUncleHash(block.Uncles()); hash != header.UncleHash() {
return fmt.Errorf("uncle root hash mismatch: have %x, want %x", hash, header.UncleHash())
}
if hash := types.DeriveSha(block.Transactions(), trie.NewStackTrie(nil)); hash != header.TxHash() {
return fmt.Errorf("transaction root hash mismatch: have %x, want %x", hash, header.TxHash())
}
if hash := types.DeriveSha(block.ExtTransactions(), trie.NewStackTrie(nil)); hash != header.EtxHash() {
return fmt.Errorf("external transaction root hash mismatch: have %x, want %x", hash, header.EtxHash())
if nodeCtx == common.ZONE_CTX {
if v.hc.bc.processor.HasBlockAndState(block.Hash(), block.NumberU64()) {
return ErrKnownBlock
}
}
// Subordinate manifest must match ManifestHash in subordinate context, _iff_
// we have a subordinate (i.e. if we are not a zone)
if nodeCtx < common.ZONE_CTX {
if nodeCtx != common.ZONE_CTX {
// Region nodes should have body with zero length txs and etxs
if len(block.Transactions()) != 0 {
return fmt.Errorf("region body has non zero transactions")
}
if len(block.ExtTransactions()) != 0 {
return fmt.Errorf("region body has non zero etx transactions")
}
if len(block.Uncles()) != 0 {
return fmt.Errorf("region body has non zero uncles")
}
subManifestHash := types.DeriveSha(block.SubManifest(), trie.NewStackTrie(nil))
if subManifestHash == types.EmptyRootHash || subManifestHash != header.ManifestHash(nodeCtx+1) {
// If we have a subordinate chain, it is impossible for the subordinate manifest to be empty
return ErrBadSubManifest
}
}
if !v.hc.bc.processor.HasBlockAndState(block.ParentHash(), block.NumberU64()-1) {
if !v.hc.bc.HasBlock(block.ParentHash(), block.NumberU64()-1) {
return consensus.ErrUnknownAncestor
} else {
// Header validity is known at this point, check the uncles and transactions
header := block.Header()
if err := v.engine.VerifyUncles(v.hc, block); err != nil {
return err
}
if hash := types.CalcUncleHash(block.Uncles()); hash != header.UncleHash() {
return fmt.Errorf("uncle root hash mismatch: have %x, want %x", hash, header.UncleHash())
}
if hash := types.DeriveSha(block.Transactions(), trie.NewStackTrie(nil)); hash != header.TxHash() {
return fmt.Errorf("transaction root hash mismatch: have %x, want %x", hash, header.TxHash())
}
if hash := types.DeriveSha(block.ExtTransactions(), trie.NewStackTrie(nil)); hash != header.EtxHash() {
return fmt.Errorf("external transaction root hash mismatch: have %x, want %x", hash, header.EtxHash())
}
if !v.hc.bc.processor.HasBlockAndState(block.ParentHash(), block.NumberU64()-1) {
if !v.hc.bc.HasBlock(block.ParentHash(), block.NumberU64()-1) {
return consensus.ErrUnknownAncestor
}
return consensus.ErrPrunedAncestor
}
return consensus.ErrPrunedAncestor
}
return nil
}
Expand Down
28 changes: 15 additions & 13 deletions core/bodydb.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package core

import (
"errors"
"sync"
"time"

Expand Down Expand Up @@ -44,7 +43,7 @@ type BodyDb struct {
}

func NewBodyDb(db ethdb.Database, engine consensus.Engine, hc *HeaderChain, chainConfig *params.ChainConfig, cacheConfig *CacheConfig, vmConfig vm.Config) (*BodyDb, error) {

nodeCtx := common.NodeLocation.Context()
blockCache, _ := lru.New(blockCacheLimit)
bodyCache, _ := lru.New(bodyCacheLimit)
bodyRLPCache, _ := lru.New(bodyCacheLimit)
Expand All @@ -58,7 +57,10 @@ func NewBodyDb(db ethdb.Database, engine consensus.Engine, hc *HeaderChain, chai
bodyRLPCache: bodyRLPCache,
}

bc.processor = NewStateProcessor(chainConfig, hc, engine, vmConfig, cacheConfig)
// only start the state processor in zone
if nodeCtx == common.ZONE_CTX {
bc.processor = NewStateProcessor(chainConfig, hc, engine, vmConfig, cacheConfig)
}

return bc, nil
}
Expand All @@ -69,20 +71,20 @@ func (bc *BodyDb) Append(batch ethdb.Batch, block *types.Block, newInboundEtxs t
defer bc.chainmu.Unlock()

stateApply := time.Now()
// Process our block
logs, err := bc.processor.Apply(batch, block, newInboundEtxs)
if err != nil {
return nil, err
nodeCtx := common.NodeLocation.Context()
var logs []*types.Log
var err error
if nodeCtx == common.ZONE_CTX {
// Process our block
logs, err = bc.processor.Apply(batch, block, newInboundEtxs)
if err != nil {
return nil, err
}
rawdb.WriteTxLookupEntriesByBlock(batch, block)
}
log.Info("Time taken to", "apply state:", common.PrettyDuration(time.Since(stateApply)))

if block.Hash() != block.Header().Hash() {
log.Info("BodyDb Append, Roots Mismatch:", "block.Hash:", block.Hash(), "block.Header.Hash", block.Header().Hash(), "parentHeader.Number:", block.NumberU64())
return nil, errors.New("state roots do not match header, append fail")
}
rawdb.WriteBlock(batch, block)
rawdb.WriteTxLookupEntriesByBlock(batch, block)

return logs, nil
}

Expand Down
55 changes: 29 additions & 26 deletions core/slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,10 @@ func NewSlice(db ethdb.Database, config *Config, txConfig *TxPoolConfig, isLocal
}
sl.validator = NewBlockValidator(chainConfig, sl.hc, engine)

sl.txPool = NewTxPool(*txConfig, chainConfig, sl.hc)
// tx pool is only used in zone
if nodeCtx == common.ZONE_CTX {
sl.txPool = NewTxPool(*txConfig, chainConfig, sl.hc)
}
sl.miner = New(sl.hc, sl.txPool, config, db, chainConfig, engine, isLocalBlock)

sl.phCache = make(map[common.Hash]types.PendingHeader)
Expand Down Expand Up @@ -544,11 +547,11 @@ func (sl *Slice) computePendingHeader(localPendingHeaderWithTermini types.Pendin
var newPh *types.Header

if exists {
newPh = sl.combinePendingHeader(localPendingHeaderWithTermini.Header, cachedPendingHeaderWithTermini.Header, nodeCtx)
newPh = sl.combinePendingHeader(localPendingHeaderWithTermini.Header, cachedPendingHeaderWithTermini.Header, nodeCtx, true)
return types.PendingHeader{Header: newPh, Termini: localPendingHeaderWithTermini.Termini}
} else {
if domOrigin {
newPh = sl.combinePendingHeader(localPendingHeaderWithTermini.Header, domPendingHeader, nodeCtx)
newPh = sl.combinePendingHeader(localPendingHeaderWithTermini.Header, domPendingHeader, nodeCtx, true)
return types.PendingHeader{Header: newPh, Termini: localPendingHeaderWithTermini.Termini}
}
return localPendingHeaderWithTermini
Expand All @@ -557,21 +560,14 @@ func (sl *Slice) computePendingHeader(localPendingHeaderWithTermini types.Pendin

// updatePhCacheFromDom combines the recieved pending header with the pending header stored locally at a given terminus for specified context
func (sl *Slice) updatePhCacheFromDom(pendingHeader types.PendingHeader, terminiIndex int, indices []int) error {

nodeCtx := common.NodeLocation.Context()
hash := pendingHeader.Termini[terminiIndex]
localPendingHeader, exists := sl.phCache[hash]

if exists {
combinedPendingHeader := types.CopyHeader(localPendingHeader.Header)
for _, i := range indices {
combinedPendingHeader = sl.combinePendingHeader(pendingHeader.Header, combinedPendingHeader, i)
}
if nodeCtx == common.ZONE_CTX {
combinedPendingHeader.SetDifficulty(localPendingHeader.Header.Difficulty())
combinedPendingHeader.SetLocation(common.NodeLocation)
combinedPendingHeader = sl.combinePendingHeader(pendingHeader.Header, combinedPendingHeader, i, false)
}

sl.writeToPhCacheAndPickPhHead(types.PendingHeader{Header: combinedPendingHeader, Termini: localPendingHeader.Termini})

return nil
Expand All @@ -594,6 +590,7 @@ func (sl *Slice) writeToPhCacheAndPickPhHead(pendingHeaderWithTermini types.Pend
var deepCopyPendingHeaderWithTermini types.PendingHeader
newPhEntropy := pendingHeaderWithTermini.Header.CalcPhS()
deepCopyPendingHeaderWithTermini = types.PendingHeader{Header: types.CopyHeader(pendingHeaderWithTermini.Header), Termini: pendingHeaderWithTermini.Termini, Entropy: newPhEntropy}
deepCopyPendingHeaderWithTermini.Header.SetLocation(common.NodeLocation)
deepCopyPendingHeaderWithTermini.Header.SetTime(uint64(time.Now().Unix()))
if exist {
if sl.poem(newPhEntropy, oldPh.Entropy) {
Expand Down Expand Up @@ -736,7 +733,7 @@ func (sl *Slice) ConstructLocalMinedBlock(header *types.Header) (*types.Block, e
}

// combinePendingHeader updates the pending header at the given index with the value from given header.
func (sl *Slice) combinePendingHeader(header *types.Header, slPendingHeader *types.Header, index int) *types.Header {
func (sl *Slice) combinePendingHeader(header *types.Header, slPendingHeader *types.Header, index int, inSlice bool) *types.Header {
// copying the slPendingHeader and updating the copy to remove any shared memory access issues
combinedPendingHeader := types.CopyHeader(slPendingHeader)

Expand All @@ -746,18 +743,21 @@ func (sl *Slice) combinePendingHeader(header *types.Header, slPendingHeader *typ
combinedPendingHeader.SetManifestHash(header.ManifestHash(index), index)
combinedPendingHeader.SetParentEntropy(header.ParentEntropy(index), index)
combinedPendingHeader.SetParentDeltaS(header.ParentDeltaS(index), index)
combinedPendingHeader.SetEtxRollupHash(header.EtxRollupHash())
combinedPendingHeader.SetDifficulty(header.Difficulty())
combinedPendingHeader.SetUncleHash(header.UncleHash())
combinedPendingHeader.SetTxHash(header.TxHash())
combinedPendingHeader.SetEtxHash(header.EtxHash())
combinedPendingHeader.SetReceiptHash(header.ReceiptHash())
combinedPendingHeader.SetRoot(header.Root())
combinedPendingHeader.SetCoinbase(header.Coinbase())
combinedPendingHeader.SetBloom(header.Bloom())
combinedPendingHeader.SetBaseFee(header.BaseFee())
combinedPendingHeader.SetGasLimit(header.GasLimit())
combinedPendingHeader.SetGasUsed(header.GasUsed())

if inSlice {
combinedPendingHeader.SetEtxRollupHash(header.EtxRollupHash())
combinedPendingHeader.SetDifficulty(header.Difficulty())
combinedPendingHeader.SetUncleHash(header.UncleHash())
combinedPendingHeader.SetTxHash(header.TxHash())
combinedPendingHeader.SetEtxHash(header.EtxHash())
combinedPendingHeader.SetReceiptHash(header.ReceiptHash())
combinedPendingHeader.SetRoot(header.Root())
combinedPendingHeader.SetCoinbase(header.Coinbase())
combinedPendingHeader.SetBloom(header.Bloom())
combinedPendingHeader.SetBaseFee(header.BaseFee())
combinedPendingHeader.SetGasLimit(header.GasLimit())
combinedPendingHeader.SetGasUsed(header.GasUsed())
}

return combinedPendingHeader
}
Expand All @@ -775,7 +775,7 @@ func (sl *Slice) NewGenesisPendingHeader(domPendingHeader *types.Header) {
if nodeCtx == common.PRIME_CTX {
domPendingHeader = types.CopyHeader(localPendingHeader)
} else {
domPendingHeader = sl.combinePendingHeader(localPendingHeader, domPendingHeader, nodeCtx)
domPendingHeader = sl.combinePendingHeader(localPendingHeader, domPendingHeader, nodeCtx, true)
domPendingHeader.SetLocation(common.NodeLocation)
}

Expand Down Expand Up @@ -844,6 +844,7 @@ func (sl *Slice) loadLastState() error {

// Stop stores the phCache and the sl.pendingHeader hash value to the db.
func (sl *Slice) Stop() {
nodeCtx := common.NodeLocation.Context()
// write the ph head hash to the db.
rawdb.WriteBestPhKey(sl.sliceDb, sl.bestPhKey)
// Write the ph cache to the dd.
Expand All @@ -855,7 +856,9 @@ func (sl *Slice) Stop() {
close(sl.quit)

sl.hc.Stop()
sl.txPool.Stop()
if nodeCtx == common.ZONE_CTX {
sl.txPool.Stop()
}
sl.miner.Stop()
}

Expand Down
Loading

0 comments on commit 387c28b

Please sign in to comment.