Skip to content

Commit

Permalink
Added slices running flags to the node to perform Peer bucketing
Browse files Browse the repository at this point in the history
  • Loading branch information
gameofpointers committed May 17, 2023
1 parent 85562a9 commit 48cf1e1
Show file tree
Hide file tree
Showing 14 changed files with 140 additions and 61 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ include network.env
BASE_CMD = nice -n -20 ./build/bin/go-quai --$(NETWORK) --syncmode $(SYNCMODE) --verbosity $(VERBOSITY) --nonce $(NONCE)
BASE_CMD += --http --http.vhosts=* --http.addr $(HTTP_ADDR) --http.api $(HTTP_API)
BASE_CMD += --ws --ws.addr $(WS_ADDR) --ws.api $(WS_API)
BASE_CMD += --slices $(SLICES)
ifeq ($(ENABLE_ARCHIVE),true)
BASE_CMD += --gcmode archive
endif
Expand Down
1 change: 1 addition & 0 deletions cmd/go-quai/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ var (
utils.NodeKeyFileFlag,
utils.NodeKeyHexFlag,
utils.DNSDiscoveryFlag,
utils.SlicesRunningFlag,
utils.ColosseumFlag,
utils.DeveloperFlag,
utils.DeveloperPeriodFlag,
Expand Down
25 changes: 25 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ var (
Usage: "Explicitly set network id (integer)(For testnets: use --garden)",
Value: ethconfig.Defaults.NetworkId,
}
SlicesRunningFlag = cli.StringFlag{
Name: "slices",
Usage: "All the slices that are running on this node",
}
ColosseumFlag = cli.BoolFlag{
Name: "colosseum",
Usage: "Quai Colosseum testnet",
Expand Down Expand Up @@ -913,6 +917,24 @@ func makeSubUrls(ctx *cli.Context) []string {
return strings.Split(ctx.GlobalString(SubUrls.Name), ",")
}

// setSlicesRunning sets the slices running flag
func setSlicesRunning(ctx *cli.Context, cfg *ethconfig.Config) {
slices := strings.Split(ctx.GlobalString(SlicesRunningFlag.Name), ",")

// Sanity checks
if len(slices) == 0 {
Fatalf("no slices are specified")
}
if len(slices) > common.NumRegionsInPrime*common.NumZonesInRegion {
Fatalf("number of slices exceed the current ontology")
}
slicesRunning := []common.Location{}
for _, slice := range slices {
slicesRunning = append(slicesRunning, common.Location{slice[1] - 48, slice[3] - 48})
}
cfg.SlicesRunning = slicesRunning
}

// MakeDatabaseHandles raises out the number of allowed file handles per process
// for Geth and returns half of the allowance to assign to the database.
func MakeDatabaseHandles() int {
Expand Down Expand Up @@ -1270,6 +1292,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
// set the subordinate chain websocket urls
setSubUrls(ctx, cfg)

// set the slices that the node is running
setSlicesRunning(ctx, cfg)

// Cap the cache allowance and tune the garbage collector
mem, err := gopsutil.VirtualMemory()
if err == nil {
Expand Down
2 changes: 1 addition & 1 deletion core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ func (c *Core) GetTerminiByHash(hash common.Hash) []common.Hash {
return c.sl.hc.GetTerminiByHash(hash)
}

func (c *Core) SubscribeMissingPendingEtxsEvent(ch chan<- common.Hash) event.Subscription {
func (c *Core) SubscribeMissingPendingEtxsEvent(ch chan<- types.HashAndLocation) event.Subscription {
return c.sl.hc.SubscribeMissingPendingEtxsEvent(ch)
}

Expand Down
26 changes: 13 additions & 13 deletions core/headerchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package core
import (
"errors"
"fmt"
sync "github.com/sasha-s/go-deadlock"
"io"
"math/big"
sync "github.com/sasha-s/go-deadlock"
"sync/atomic"
"time"

Expand Down Expand Up @@ -171,7 +171,7 @@ func (hc *HeaderChain) CollectSubRollup(b *types.Block) (types.Transactions, err
go hc.backfillPETXs(b.Header(), b.SubManifest())
return nil, ErrPendingEtxNotFound
}
subRollup = append(subRollup, pendingEtxs...)
subRollup = append(subRollup, pendingEtxs.Etxs...)
}
} else {
// Start backfilling the missing pending ETXs needed to process this block
Expand All @@ -186,7 +186,7 @@ func (hc *HeaderChain) CollectSubRollup(b *types.Block) (types.Transactions, err
go hc.backfillPETXs(b.Header(), b.SubManifest())
return nil, ErrPendingEtxNotFound
}
subRollup = append(subRollup, pendingEtxs...)
subRollup = append(subRollup, pendingEtxs.Etxs...)
}
}
// Rolluphash is specifically for zone rollup, which can only be validated by region
Expand All @@ -200,18 +200,18 @@ func (hc *HeaderChain) CollectSubRollup(b *types.Block) (types.Transactions, err
}

// GetPendingEtxs gets the pendingEtxs form the
func (hc *HeaderChain) GetPendingEtxs(hash common.Hash) (types.Transactions, error) {
var pendingEtxs types.Transactions
func (hc *HeaderChain) GetPendingEtxs(hash common.Hash) (*types.PendingEtxs, error) {
var pendingEtxs types.PendingEtxs
// Look for pending ETXs first in pending ETX cache, then in database
if res, ok := hc.pendingEtxs.Get(hash); ok && res != nil {
pendingEtxs = res.(types.Transactions)
pendingEtxs = res.(types.PendingEtxs)
} else if res := rawdb.ReadPendingEtxs(hc.headerDb, hash); res != nil {
pendingEtxs = res.Etxs
pendingEtxs = *res
} else {
log.Trace("unable to find pending etxs for hash in manifest", "hash:", hash.String())
return nil, ErrPendingEtxNotFound
}
return pendingEtxs, nil
return &pendingEtxs, nil
}

func (hc *HeaderChain) GetPendingEtxsRollup(hash common.Hash) (types.BlockManifest, error) {
Expand Down Expand Up @@ -239,9 +239,9 @@ func (hc *HeaderChain) backfillPETXs(header *types.Header, subManifest types.Blo
// and then fetch the pending etx for each of the rollup hashes
if manifest, err := hc.GetPendingEtxsRollup(hash); err == nil {
for _, pEtxHash := range manifest {
if _, err := hc.GetPendingEtxs(pEtxHash); err != nil {
if pEtx, err := hc.GetPendingEtxs(pEtxHash); err != nil {
// Send the pendingEtxs to the feed for broadcast
hc.missingPendingEtxsFeed.Send(pEtxHash)
hc.missingPendingEtxsFeed.Send(types.HashAndLocation{Hash: pEtxHash, Location: pEtx.Header.Location()})
}
}
} else {
Expand All @@ -250,7 +250,7 @@ func (hc *HeaderChain) backfillPETXs(header *types.Header, subManifest types.Blo
} else if nodeCtx == common.REGION_CTX {
if _, err := hc.GetPendingEtxs(hash); err != nil {
// Send the pendingEtxs to the feed for broadcast
hc.missingPendingEtxsFeed.Send(hash)
hc.missingPendingEtxsFeed.Send(types.HashAndLocation{Hash: hash, Location: header.Location()})
}
}
}
Expand Down Expand Up @@ -429,7 +429,7 @@ func (hc *HeaderChain) AddPendingEtxs(pEtxs types.PendingEtxs) error {
// Write to pending ETX database
rawdb.WritePendingEtxs(hc.headerDb, pEtxs)
// Also write to cache for faster access
hc.pendingEtxs.Add(pEtxs.Header.Hash(), pEtxs.Etxs)
hc.pendingEtxs.Add(pEtxs.Header.Hash(), pEtxs)
} else {
return ErrPendingEtxAlreadyKnown
}
Expand Down Expand Up @@ -885,7 +885,7 @@ func (hc *HeaderChain) SubscribeChainSideEvent(ch chan<- ChainSideEvent) event.S
return hc.scope.Track(hc.chainSideFeed.Subscribe(ch))
}

func (hc *HeaderChain) SubscribeMissingPendingEtxsEvent(ch chan<- common.Hash) event.Subscription {
func (hc *HeaderChain) SubscribeMissingPendingEtxsEvent(ch chan<- types.HashAndLocation) event.Subscription {
return hc.scope.Track(hc.missingPendingEtxsFeed.Subscribe(ch))
}

Expand Down
5 changes: 5 additions & 0 deletions core/types/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -1069,3 +1069,8 @@ type HashAndNumber struct {
Hash common.Hash
Number uint64
}

type HashAndLocation struct {
Hash common.Hash
Location common.Location
}
17 changes: 9 additions & 8 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,14 +195,15 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
// Permit the downloader to use the trie cache allowance during fast sync
cacheLimit := cacheConfig.TrieCleanLimit + cacheConfig.TrieDirtyLimit + cacheConfig.SnapshotLimit
if eth.handler, err = newHandler(&handlerConfig{
Database: chainDb,
Core: eth.core,
TxPool: eth.core.TxPool(),
Network: config.NetworkId,
Sync: config.SyncMode,
BloomCache: uint64(cacheLimit),
EventMux: eth.eventMux,
Whitelist: config.Whitelist,
Database: chainDb,
Core: eth.core,
TxPool: eth.core.TxPool(),
Network: config.NetworkId,
Sync: config.SyncMode,
BloomCache: uint64(cacheLimit),
EventMux: eth.eventMux,
Whitelist: config.Whitelist,
SlicesRunning: config.SlicesRunning,
}); err != nil {
return nil, err
}
Expand Down
3 changes: 3 additions & 0 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ type Config struct {

// Sub node websocket urls
SubUrls []string

// Slices running on the node
SlicesRunning []common.Location
}

// CreateConsensusEngine creates a consensus engine for the given chain configuration.
Expand Down
74 changes: 39 additions & 35 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,19 +100,21 @@ type txPool interface {
// handlerConfig is the collection of initialization parameters to create a full
// node network handler.
type handlerConfig struct {
Database ethdb.Database // Database for direct sync insertions
Core *core.Core // Core to serve data from
TxPool txPool // Transaction pool to propagate from
Network uint64 // Network identifier to adfvertise
Sync downloader.SyncMode // Whether to fast or full sync
BloomCache uint64 // Megabytes to alloc for fast sync bloom
EventMux *event.TypeMux // Legacy event mux, deprecate for `feed`
Whitelist map[uint64]common.Hash // Hard coded whitelist for sync challenged
Database ethdb.Database // Database for direct sync insertions
Core *core.Core // Core to serve data from
TxPool txPool // Transaction pool to propagate from
Network uint64 // Network identifier to adfvertise
Sync downloader.SyncMode // Whether to fast or full sync
BloomCache uint64 // Megabytes to alloc for fast sync bloom
EventMux *event.TypeMux // Legacy event mux, deprecate for `feed`
Whitelist map[uint64]common.Hash // Hard coded whitelist for sync challenged
SlicesRunning []common.Location // Slices run by the node
}

type handler struct {
networkID uint64
forkFilter forkid.Filter // Fork ID filter, constant across the lifetime of the node
networkID uint64
forkFilter forkid.Filter // Fork ID filter, constant across the lifetime of the node
slicesRunning []common.Location // Slices running on the node

acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)

Expand All @@ -132,7 +134,7 @@ type handler struct {
minedBlockSub *event.TypeMuxSubscription
missingBodyCh chan *types.Header
missingBodySub event.Subscription
missingPendingEtxsCh chan common.Hash
missingPendingEtxsCh chan types.HashAndLocation
missingPendingEtxsSub event.Subscription
missingParentCh chan common.Hash
missingParentSub event.Subscription
Expand Down Expand Up @@ -163,16 +165,17 @@ func newHandler(config *handlerConfig) (*handler, error) {
config.EventMux = new(event.TypeMux) // Nicety initialization for tests
}
h := &handler{
networkID: config.Network,
forkFilter: forkid.NewFilter(config.Core),
eventMux: config.EventMux,
database: config.Database,
txpool: config.TxPool,
core: config.Core,
peers: newPeerSet(),
whitelist: config.Whitelist,
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
networkID: config.Network,
slicesRunning: config.SlicesRunning,
forkFilter: forkid.NewFilter(config.Core),
eventMux: config.EventMux,
database: config.Database,
txpool: config.TxPool,
core: config.Core,
peers: newPeerSet(),
whitelist: config.Whitelist,
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
}

h.downloader = downloader.New(config.Database, h.eventMux, h.core, h.removePeer)
Expand Down Expand Up @@ -229,7 +232,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
entropy = head.CalcS()
)
forkID := forkid.NewID(h.core.Config(), h.core.Genesis().Hash(), h.core.CurrentHeader().Number().Uint64())
if err := peer.Handshake(h.networkID, entropy, hash, genesis.Hash(), forkID, h.forkFilter); err != nil {
if err := peer.Handshake(h.networkID, h.slicesRunning, entropy, hash, genesis.Hash(), forkID, h.forkFilter); err != nil {
peer.Log().Debug("Ethereum handshake failed", "err", err)
return err
}
Expand Down Expand Up @@ -327,7 +330,7 @@ func (h *handler) Start(maxPeers int) {

// broadcast pending etxs
h.wg.Add(1)
h.missingPendingEtxsCh = make(chan common.Hash, missingPendingEtxsChanSize)
h.missingPendingEtxsCh = make(chan types.HashAndLocation, missingPendingEtxsChanSize)
h.missingPendingEtxsSub = h.core.SubscribeMissingPendingEtxsEvent(h.missingPendingEtxsCh)
go h.missingPendingEtxsLoop()

Expand Down Expand Up @@ -536,9 +539,7 @@ func (h *handler) missingPEtxsRollupLoop() {
// Check if any of the peers have the body
for _, peer := range h.selectSomePeers() {
log.Trace("Fetching the missing pending etxs rollup from", "peer", peer.ID(), "hash", hash)
if err := peer.RequestOnePendingEtxsRollup(hash); err != nil {
return
}
peer.RequestOnePendingEtxsRollup(hash)
}

case <-h.missingPEtxsRollupSub.Err():
Expand All @@ -552,13 +553,18 @@ func (h *handler) missingPendingEtxsLoop() {
defer h.wg.Done()
for {
select {
case hash := <-h.missingPendingEtxsCh:
case hashAndLocation := <-h.missingPendingEtxsCh:
// Only ask from peers running the slice for the missing pending etxs
// In the future, peers not responding before the timeout has to be punished
peersRunningSlice := h.peers.peerRunningSlice(hashAndLocation.Location)
// If the node doesn't have any peer running that slice, add a warning
if len(peersRunningSlice) == 0 {
log.Warn("Node doesn't have peers for given Location", "location", hashAndLocation.Location)
}
// Check if any of the peers have the body
for _, peer := range h.selectSomePeers() {
log.Trace("Fetching the missing pending etxs from", "peer", peer.ID(), "hash", hash)
if err := peer.RequestOnePendingEtxs(hash); err != nil {
return
}
for _, peer := range peersRunningSlice {
log.Trace("Fetching the missing pending etxs from", "peer", peer.ID(), "hash", hashAndLocation.Hash)
peer.RequestOnePendingEtxs(hashAndLocation.Hash)
}
case <-h.missingPendingEtxsSub.Err():
return
Expand All @@ -575,9 +581,7 @@ func (h *handler) missingParentLoop() {
// Check if any of the peers have the body
for _, peer := range h.selectSomePeers() {
log.Trace("Fetching the missing parent from", "peer", peer.ID(), "hash", hash)
if err := peer.RequestBlockByHash(hash); err != nil {
return
}
peer.RequestBlockByHash(hash)
}
case <-h.missingParentSub.Err():
return
Expand Down
22 changes: 22 additions & 0 deletions eth/peerset.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,28 @@ func (ps *peerSet) peerWithHighestEntropy() *eth.Peer {
return bestPeer
}

func (ps *peerSet) peerRunningSlice(location common.Location) []*eth.Peer {
ps.lock.RLock()
defer ps.lock.RUnlock()

containsLocation := func(s []common.Location, e common.Location) bool {
for _, a := range s {
if common.Location.Equal(a, e) {
return true
}
}
return false
}

var peersRunningSlice []*eth.Peer
for _, p := range ps.peers {
if containsLocation(p.Peer.SlicesRunning(), location) {
peersRunningSlice = append(peersRunningSlice, p.Peer)
}
}
return peersRunningSlice
}

// close disconnects all peers.
func (ps *peerSet) close() {
ps.lock.Lock()
Expand Down
Loading

0 comments on commit 48cf1e1

Please sign in to comment.