Skip to content

Commit

Permalink
bugfix: added another data path for block request in the case of regi…
Browse files Browse the repository at this point in the history
…on doesn't get the broadcast

Added another data path if dom doesnt receive a broadcast, sub will inform them
  • Loading branch information
gameofpointers committed Aug 23, 2023
1 parent 6ad1030 commit 9009086
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 1 deletion.
43 changes: 42 additions & 1 deletion core/core.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package core

import (
"context"
"errors"
"io"
"math/big"
Expand Down Expand Up @@ -194,6 +195,18 @@ func (c *Core) serviceBlocks(hashNumberList []types.HashAndNumber) {
}
parentHeader := c.GetHeader(header.ParentHash(), header.NumberU64()-1)
if parentHeader != nil {
// If parent header is dom, send a signal to dom to request for the block if it doesnt have it
_, parentHeaderOrder, err := c.sl.engine.CalcOrder(parentHeader)
if err != nil {
log.Warn("Error calculating the parent block order in serviceBlocks", "Hash", parentHeader.Hash(), "Number", parentHeader.NumberArray())
continue
}
nodeCtx := common.NodeLocation.Context()
if parentHeaderOrder < nodeCtx && c.GetHeaderByHash(parentHeader.Hash()) == nil {
log.Info("Requesting the dom to get the block if it doesnt have and try to append", "Hash", parentHeader.Hash(), "Order", parentHeaderOrder)
// send a signal to the required dom to fetch the block if it doesnt have it, or its not in its appendqueue
go c.sl.domClient.RequestDomToAppendOrFetch(context.Background(), parentHeader.Hash(), parentHeaderOrder)
}
// Using a empty block to append here because append only takes in a
// header and we read the block inside append again, so to save the
// ram, we are using the header
Expand All @@ -209,6 +222,30 @@ func (c *Core) serviceBlocks(hashNumberList []types.HashAndNumber) {
}
}

func (c *Core) RequestDomToAppendOrFetch(hash common.Hash, order int) {
// TODO: optimize to check if the block is in the appendqueue or already
// appended to reduce the network bandwidth utilization
nodeCtx := common.NodeLocation.Context()
if nodeCtx == common.PRIME_CTX {
// If prime all you can do it to ask for the block
_, exists := c.appendQueue.Get(hash)
if !exists {
log.Warn("Block sub asked doesnt exist in append queue, so request the peers for it", "Hash", hash, "Order", order)
c.sl.missingParentFeed.Send(hash) // Using the missing parent feed to ask for the block
}
} else if nodeCtx == common.REGION_CTX {
if order < nodeCtx { // Prime block
go c.sl.domClient.RequestDomToAppendOrFetch(context.Background(), hash, order)
}
_, exists := c.appendQueue.Get(hash)
if !exists {
log.Warn("Block sub asked doesnt exist in append queue, so request the peers for it", "Hash", hash, "Order", order)
c.sl.missingParentFeed.Send(hash) // Using the missing parent feed to ask for the block
}
}

}

// addToAppendQueue adds a block to the append queue
func (c *Core) addToAppendQueue(block *types.Block) error {
c.appendQueue.ContainsOrAdd(block.Hash(), blockNumberAndRetryCounter{block.NumberU64(), 0})
Expand Down Expand Up @@ -310,8 +347,12 @@ func (c *Core) WriteBlock(block *types.Block) {
if err != nil {
return
}
if order == common.NodeLocation.Context() {
nodeCtx := common.NodeLocation.Context()
if order == nodeCtx {
c.addToAppendQueue(block)
// If a dom block comes in and we havent appended it yet
} else if order < nodeCtx && c.GetHeaderByHash(block.Hash()) == nil {
go c.sl.domClient.RequestDomToAppendOrFetch(context.Background(), block.Hash(), order)
}
}
if c.GetHeaderOrCandidateByHash(block.Hash()) == nil {
Expand Down
2 changes: 2 additions & 0 deletions core/slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,8 @@ func (sl *Slice) relayPh(block *types.Block, pendingHeaderWithTermini types.Pend
bestPh.Header().SetLocation(common.NodeLocation)
sl.miner.worker.pendingHeaderFeed.Send(bestPh.Header())
return
} else {
log.Warn("Pending Header for Best ph key does not exist", "best ph key", sl.bestPhKey)
}
} else if !domOrigin {
for _, i := range sl.randomRelayArray() {
Expand Down
4 changes: 4 additions & 0 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,10 @@ func (b *QuaiAPIBackend) UpdateDom(oldTerminus common.Hash, newTerminus common.H
b.eth.core.UpdateDom(oldTerminus, newTerminus, location)
}

func (b *QuaiAPIBackend) RequestDomToAppendOrFetch(hash common.Hash, order int) {
b.eth.core.RequestDomToAppendOrFetch(hash, order)
}

func (b *QuaiAPIBackend) ProcessingState() bool {
return b.eth.core.ProcessingState()
}
Expand Down
1 change: 1 addition & 0 deletions internal/quaiapi/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ type Backend interface {
PendingBlock() *types.Block
SubRelayPendingHeader(pendingHeader types.PendingHeader, location common.Location, subReorg bool)
UpdateDom(oldTerminus common.Hash, newTerminus common.Hash, location common.Location)
RequestDomToAppendOrFetch(hash common.Hash, order int)
NewGenesisPendingHeader(pendingHeader *types.Header)
GetPendingHeader() (*types.Header, error)
GetManifest(blockHash common.Hash) (types.BlockManifest, error)
Expand Down
12 changes: 12 additions & 0 deletions internal/quaiapi/quai_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,18 @@ func (s *PublicBlockChainQuaiAPI) UpdateDom(ctx context.Context, raw json.RawMes
s.b.UpdateDom(domUpdate.OldTerminus, domUpdate.NewTerminus, domUpdate.Location)
}

type RequestDomToAppendOrFetchArgs struct {
Hash common.Hash
Order int
}

func (s *PublicBlockChainQuaiAPI) RequestDomToAppendOrFetch(ctx context.Context, raw json.RawMessage) {
var requestDom RequestDomToAppendOrFetchArgs
if err := json.Unmarshal(raw, &requestDom); err != nil {
return
}
s.b.RequestDomToAppendOrFetch(requestDom.Hash, requestDom.Order)
}
func (s *PublicBlockChainQuaiAPI) NewGenesisPendingHeader(ctx context.Context, raw json.RawMessage) {
var pendingHeader *types.Header
if err := json.Unmarshal(raw, &pendingHeader); err != nil {
Expand Down
7 changes: 7 additions & 0 deletions quaiclient/quaiclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,13 @@ func (ec *Client) UpdateDom(ctx context.Context, oldTerminus common.Hash, newTer
ec.c.CallContext(ctx, nil, "quai_updateDom", data)
}

func (ec *Client) RequestDomToAppendOrFetch(ctx context.Context, hash common.Hash, order int) {
data := map[string]interface{}{"Hash": hash}
data["Order"] = order

ec.c.CallContext(ctx, nil, "quai_requestDomToAppendOrFetch", data)
}

func (ec *Client) NewGenesisPendingHeader(ctx context.Context, header *types.Header) {
ec.c.CallContext(ctx, nil, "quai_newGenesisPendingHeader", header.RPCMarshalHeader())
}
Expand Down

0 comments on commit 9009086

Please sign in to comment.