Skip to content

Commit

Permalink
Pending Etx fetching logic
Browse files Browse the repository at this point in the history
  • Loading branch information
gameofpointers committed Feb 3, 2023
1 parent 0e0033d commit 4499272
Show file tree
Hide file tree
Showing 10 changed files with 234 additions and 30 deletions.
12 changes: 12 additions & 0 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ func (c *Core) GetPendingHeader() (*types.Header, error) {
return c.sl.GetPendingHeader()
}

func (c *Core) SubscribeMissingPendingEtxsEvent(ch chan<- common.Hash) event.Subscription {
return c.sl.SubscribeMissingPendingEtxsEvent(ch)
}

func (c *Core) GetManifest(blockHash common.Hash) (types.BlockManifest, error) {
return c.sl.GetManifest(blockHash)
}
Expand All @@ -230,6 +234,14 @@ func (c *Core) AddPendingEtxs(pEtxs types.PendingEtxs) error {
return c.sl.AddPendingEtxs(pEtxs)
}

func (c *Core) GetPendingEtxs(hash common.Hash) *types.PendingEtxs {
return rawdb.ReadPendingEtxs(c.sl.sliceDb, hash)
}

func (c *Core) HasPendingEtxs(hash common.Hash) bool {
return c.GetPendingEtxs(hash) != nil
}

func (c *Core) SendPendingEtxsToDom(pEtxs types.PendingEtxs) error {
return c.sl.SendPendingEtxsToDom(pEtxs)
}
Expand Down
8 changes: 4 additions & 4 deletions core/rawdb/accessors_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1249,17 +1249,17 @@ func WritePendingEtxsRLP(db ethdb.KeyValueWriter, hash common.Hash, rlp rlp.RawV
}

// ReadPendingEtxs retreives the pending ETXs corresponding to a given block
func ReadPendingEtxs(db ethdb.Reader, hash common.Hash) types.PendingEtxs {
func ReadPendingEtxs(db ethdb.Reader, hash common.Hash) *types.PendingEtxs {
data := ReadPendingEtxsRLP(db, hash)
if len(data) == 0 {
return types.PendingEtxs{}
return nil
}
pendingEtxs := types.PendingEtxs{}
if err := rlp.Decode(bytes.NewReader(data), &pendingEtxs); err != nil {
log.Error("Invalid pending etxs RLP", "hash", hash, "err", err)
return types.PendingEtxs{}
return nil
}
return pendingEtxs
return &pendingEtxs
}

// WritePendingEtxs stores the pending ETXs corresponding to a given block
Expand Down
15 changes: 11 additions & 4 deletions core/slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ type Slice struct {
domUrl string
subClients []*quaiclient.Client

scope event.SubscriptionScope
downloaderWaitFeed event.Feed
missingBodyFeed event.Feed
scope event.SubscriptionScope
downloaderWaitFeed event.Feed
missingBodyFeed event.Feed
missingPendingEtxsFeed event.Feed

pendingEtxs *lru.Cache

Expand Down Expand Up @@ -294,10 +295,12 @@ func (sl *Slice) CollectSubRollups(b *types.Block) ([]types.Transactions, error)
// Look for pending ETXs first in pending ETX cache, then in database
if res, ok := sl.pendingEtxs.Get(hash); ok && res != nil {
pendingEtxs = res.([]types.Transactions)
} else if res := rawdb.ReadPendingEtxs(sl.sliceDb, hash); res.Header != nil {
} else if res := rawdb.ReadPendingEtxs(sl.sliceDb, hash); res != nil {
pendingEtxs = res.Etxs
} else {
log.Warn("unable to find pending etxs for hash in manifest", "hash:", hash.String())
// Send the pendingEtxs to the feed for broadcast
sl.missingPendingEtxsFeed.Send(hash)
return nil, ErrPendingEtxNotFound
}
for ctx := nodeCtx; ctx < common.HierarchyDepth; ctx++ {
Expand Down Expand Up @@ -810,6 +813,10 @@ func (sl *Slice) GetPendingBlockBody(header *types.Header) *types.Body {
return sl.miner.worker.GetPendingBlockBody(header)
}

func (sl *Slice) SubscribeMissingPendingEtxsEvent(ch chan<- common.Hash) event.Subscription {
return sl.scope.Track(sl.missingPendingEtxsFeed.Subscribe(ch))
}

// MakeDomClient creates the quaiclient for the given domurl
func makeDomClient(domurl string) *quaiclient.Client {
if domurl == "" {
Expand Down
3 changes: 3 additions & 0 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ type Core interface {
// CurrentBlock retrieves the head of local chain.
CurrentBlock() *types.Block

// AddPendingEtxs adds the pendingEtxs to the database.
AddPendingEtxs(pendingEtxs types.PendingEtxs) error

// InsertChain inserts a batch of blocks into the local chain.
InsertChain(types.Blocks) (int, error)

Expand Down
70 changes: 61 additions & 9 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ const (
// missingBodyChanSize is the size of channel listening to missingBodyEvent.
missingBodyChanSize = 10

// missingPendingEtxsChanSize is the size of channel listening to the MissingPendingEtxsEvent
missingPendingEtxsChanSize = 10

// minPeerSend is the threshold for sending the block updates. If
// sqrt of len(peers) is less than 5 we make the block announcement
// to as much as minPeerSend peers otherwise send it to sqrt of len(peers).
Expand Down Expand Up @@ -108,12 +111,14 @@ type handler struct {
txFetcher *fetcher.TxFetcher
peers *peerSet

eventMux *event.TypeMux
txsCh chan core.NewTxsEvent
txsSub event.Subscription
minedBlockSub *event.TypeMuxSubscription
missingBodyCh chan *types.Header
missingBodySub event.Subscription
eventMux *event.TypeMux
txsCh chan core.NewTxsEvent
txsSub event.Subscription
minedBlockSub *event.TypeMuxSubscription
missingBodyCh chan *types.Header
missingBodySub event.Subscription
missingPendingEtxsCh chan common.Hash
missingPendingEtxsSub event.Subscription

whitelist map[uint64]common.Hash

Expand Down Expand Up @@ -281,6 +286,12 @@ func (h *handler) Start(maxPeers int) {
h.txsSub = h.txpool.SubscribeNewTxsEvent(h.txsCh)
go h.txBroadcastLoop()

// broadcast transactions
h.wg.Add(1)
h.missingPendingEtxsCh = make(chan common.Hash, missingPendingEtxsChanSize)
h.missingPendingEtxsSub = h.core.SubscribeMissingPendingEtxsEvent(h.missingPendingEtxsCh)
go h.missingPendingEtxsLoop()

// broadcast mined blocks
h.wg.Add(1)
h.minedBlockSub = h.eventMux.Subscribe(core.NewMinedBlockEvent{})
Expand All @@ -298,9 +309,10 @@ func (h *handler) Start(maxPeers int) {
}

func (h *handler) Stop() {
h.txsSub.Unsubscribe() // quits txBroadcastLoop
h.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
h.missingBodySub.Unsubscribe() // quits missingBodyLoop
h.txsSub.Unsubscribe() // quits txBroadcastLoop
h.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
h.missingBodySub.Unsubscribe() // quits missingBodyLoop
h.missingPendingEtxsSub.Unsubscribe() // quits pendingEtxsBroadcastLoop

// Quit chainSync and txsync64.
// After this is done, no new peers will be accepted.
Expand Down Expand Up @@ -456,3 +468,43 @@ func (h *handler) missingBodyLoop() {
}
}
}

// pendingEtxsBroadcastLoop announces new pendingEtxs to connected peers.
func (h *handler) missingPendingEtxsLoop() {
defer h.wg.Done()
for {
select {
case hash := <-h.missingPendingEtxsCh:
// Get the min(sqrt(len(peers)), minPeerRequest)
var peerThreshold int
sqrtNumPeers := int(math.Sqrt(float64(len(h.peers.peers))))
if sqrtNumPeers < minPeerRequest {
if minPeerRequest < len(h.peers.peers) {
peerThreshold = minPeerRequest
} else {
peerThreshold = len(h.peers.peers)
}
} else {
peerThreshold = sqrtNumPeers
}

var allPeers []*ethPeer
for _, peer := range h.peers.peers {
allPeers = append(allPeers, peer)
}
// shuffle the filteredPeers
rand.Seed(time.Now().UnixNano())
rand.Shuffle(len(allPeers), func(i, j int) { allPeers[i], allPeers[j] = allPeers[j], allPeers[i] })

// Check if any of the peers have the body
for _, peer := range allPeers[:peerThreshold] {
log.Trace("Fetching the missing pending etxs from", "peer", peer.ID(), "hash", hash)
if err := peer.RequestOnePendingEtxs(hash); err != nil {
return
}
}
case <-h.missingPendingEtxsSub.Err():
return
}
}
}
17 changes: 17 additions & 0 deletions eth/handler_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/dominant-strategies/go-quai/eth/protocols/eth"
"github.com/dominant-strategies/go-quai/log"
"github.com/dominant-strategies/go-quai/p2p/enode"
"github.com/dominant-strategies/go-quai/trie"
)

const (
Expand Down Expand Up @@ -94,6 +95,10 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
case *eth.PooledTransactionsPacket:
return h.txFetcher.Enqueue(peer.ID(), *packet, true)

case *eth.PendingEtxsPacket:
pendingEtxs := packet.Unpack()
return h.handlePendingEtxs(pendingEtxs)

default:
return fmt.Errorf("unexpected eth packet type: %T", packet)
}
Expand Down Expand Up @@ -194,3 +199,15 @@ func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, block *types.Block) er
}
return nil
}

func (h *ethHandler) handlePendingEtxs(pendingEtxs types.PendingEtxs) error {
if !pendingEtxs.IsValid(trie.NewStackTrie(nil)) {
log.Warn("PendingEtxs is not valid", pendingEtxs.Etxs, pendingEtxs.Header.EtxHashArray())
return nil
}
err := h.core.AddPendingEtxs(pendingEtxs)
if err != nil {
log.Error("Error in handling pendingEtxs broadcast", "err", err)
}
return nil
}
2 changes: 2 additions & 0 deletions eth/protocols/eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ var eth66 = map[uint64]msgHandler{
GetReceiptsMsg: handleGetReceipts66,
ReceiptsMsg: handleReceipts66,
GetPooledTransactionsMsg: handleGetPooledTransactions66,
PendingEtxsMsg: handlePendingEtxs,
GetOnePendingEtxsMsg: handleGetOnePendingEtxs66,
PooledTransactionsMsg: handlePooledTransactions66,
GetBlockMsg: handleGetBlock66,
}
Expand Down
28 changes: 28 additions & 0 deletions eth/protocols/eth/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,34 @@ func handleGetBlock66(backend Backend, msg Decoder, peer *Peer) error {
return nil
}

func handlePendingEtxs(backend Backend, msg Decoder, peer *Peer) error {
// Decode the block pending etxs retrieval message
ann := new(PendingEtxsPacket)
if err := msg.Decode(&ann); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
// Mark the hashes as present at the remote node
peer.markPendingEtxs(ann.PendingEtxs.Header.Hash())

return backend.Handle(peer, ann)
}

func handleGetOnePendingEtxs66(backend Backend, msg Decoder, peer *Peer) error {
// Decode the block pending etxs retrieval message
var query GetOnePendingEtxsPacket66
if err := msg.Decode(&query); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
requestTracker.Fulfil(peer.id, peer.version, GetOnePendingEtxsMsg, query.RequestId)
pendingEtxs := backend.Core().GetPendingEtxs(query.Hash)
if pendingEtxs == nil {
log.Debug("Couldn't complete a pendingEtxs request for", "Hash", query.Hash)
return nil
}
log.Trace("Completing a pendingEtxs request for", "Hash", pendingEtxs.Header.Hash())
return peer.SendPendingEtxs(*pendingEtxs)
}

func handleNewBlockhashes(backend Backend, msg Decoder, peer *Peer) error {
// A batch of new block announcements just arrived
ann := new(NewBlockHashesPacket)
Expand Down
69 changes: 57 additions & 12 deletions eth/protocols/eth/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package eth

import (
"errors"
"math/rand"
"sync"

Expand All @@ -36,6 +37,10 @@ const (
// before starting to randomly evict them.
maxKnownBlocks = 1024

// maxKnownPendingEtxs is the maximum pendingEtxs Header hashes to keep in the known list
// before starting to randomly evict them.
maxKnownPendingEtxs = 1024

// maxQueuedTxs is the maximum number of transactions to queue up before dropping
// older broadcasts.
maxQueuedTxs = 4096
Expand Down Expand Up @@ -78,6 +83,8 @@ type Peer struct {
queuedBlocks chan *blockPropagation // Queue of blocks to broadcast to the peer
queuedBlockAnns chan *types.Block // Queue of blocks to announce to the peer

knownPendingEtxs mapset.Set // Set of pending etxs hashes known to be known by this peer

txpool TxPool // Transaction pool used by the broadcasters for liveness checks
knownTxs mapset.Set // Set of transaction hashes known to be known by this peer
txBroadcast chan []common.Hash // Channel used to queue transaction propagation requests
Expand All @@ -91,18 +98,19 @@ type Peer struct {
// version.
func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Peer {
peer := &Peer{
id: p.ID().String(),
Peer: p,
rw: rw,
version: version,
knownTxs: mapset.NewSet(),
knownBlocks: mapset.NewSet(),
queuedBlocks: make(chan *blockPropagation, maxQueuedBlocks),
queuedBlockAnns: make(chan *types.Block, maxQueuedBlockAnns),
txBroadcast: make(chan []common.Hash),
txAnnounce: make(chan []common.Hash),
txpool: txpool,
term: make(chan struct{}),
id: p.ID().String(),
Peer: p,
rw: rw,
version: version,
knownTxs: mapset.NewSet(),
knownBlocks: mapset.NewSet(),
knownPendingEtxs: mapset.NewSet(),
queuedBlocks: make(chan *blockPropagation, maxQueuedBlocks),
queuedBlockAnns: make(chan *types.Block, maxQueuedBlockAnns),
txBroadcast: make(chan []common.Hash),
txAnnounce: make(chan []common.Hash),
txpool: txpool,
term: make(chan struct{}),
}
// Start up all the broadcasters
go peer.broadcastBlocks()
Expand Down Expand Up @@ -178,6 +186,16 @@ func (p *Peer) markTransaction(hash common.Hash) {
p.knownTxs.Add(hash)
}

// markPendingEtxs marks a pendingEtxs header as known for the peer, ensuring that the block will
// never be propagated to this particular peer.
func (p *Peer) markPendingEtxs(hash common.Hash) {
// If we reached the memory allowance, drop a previously known block hash
for p.knownPendingEtxs.Cardinality() >= maxKnownPendingEtxs {
p.knownPendingEtxs.Pop()
}
p.knownPendingEtxs.Add(hash)
}

// SendTransactions sends transactions to the peer and includes the hashes
// in its transaction hash set for future reference.
//
Expand Down Expand Up @@ -545,3 +563,30 @@ func (p *Peer) RequestTxs(hashes []common.Hash) error {
}
return p2p.Send(p.rw, GetPooledTransactionsMsg, GetPooledTransactionsPacket(hashes))
}

// RequestOnePendingEtx fetches a pendingEtx for a given block hash from a remote node.
func (p *Peer) RequestOnePendingEtxs(hash common.Hash) error {
p.Log().Debug("Fetching a pending etx", "hash", hash)
if p.Version() >= ETH66 {
id := rand.Uint64()

requestTracker.Track(p.id, p.version, GetOnePendingEtxsMsg, PendingEtxsMsg, id)
return p2p.Send(p.rw, GetOnePendingEtxsMsg, &GetOnePendingEtxsPacket66{
RequestId: id,
GetOnePendingEtxsPacket: GetOnePendingEtxsPacket{Hash: hash},
})
}
return errors.New("eth65 not supported for this call")
}

// SendNewPendingEtxs propagates an entire block to a remote peer.
func (p *Peer) SendPendingEtxs(pendingEtxs types.PendingEtxs) error {
// Mark all the block hash as known, but ensure we don't overflow our limits
for p.knownPendingEtxs.Cardinality() >= maxKnownPendingEtxs {
p.knownPendingEtxs.Pop()
}
p.knownPendingEtxs.Add(pendingEtxs.Header.Hash())
return p2p.Send(p.rw, PendingEtxsMsg, &PendingEtxsPacket{
PendingEtxs: pendingEtxs,
})
}
Loading

0 comments on commit 4499272

Please sign in to comment.