Skip to content

Commit

Permalink
consensus for {safe,finalized} and rewrite tags
Browse files Browse the repository at this point in the history
  • Loading branch information
felipe-op committed May 26, 2023
1 parent a680097 commit 750edbf
Show file tree
Hide file tree
Showing 8 changed files with 423 additions and 77 deletions.
6 changes: 5 additions & 1 deletion proxyd/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,11 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch
backends = bg.loadBalancedConsensusGroup()

// We also rewrite block tags to enforce compliance with consensus
rctx := RewriteContext{latest: bg.Consensus.GetConsensusBlockNumber()}
rctx := RewriteContext{
latest: bg.Consensus.GetLatestBlockNumber(),
finalized: bg.Consensus.GetFinalizedBlockNumber(),
safe: bg.Consensus.GetSafeBlockNumber(),
}

for i, req := range rpcReqs {
res := RPCRes{JSONRPC: JSONRPCVersion, ID: req.ID}
Expand Down
117 changes: 85 additions & 32 deletions proxyd/consensus_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,12 @@ type backendState struct {

latestBlockNumber hexutil.Uint64
latestBlockHash string
peerCount uint64
inSync bool

finalizedBlockNumber hexutil.Uint64
safeBlockNumber hexutil.Uint64

peerCount uint64
inSync bool

lastUpdate time.Time

Expand All @@ -65,9 +69,19 @@ func (cp *ConsensusPoller) GetConsensusGroup() []*Backend {
return g
}

// GetConsensusBlockNumber returns the agreed block number in a consensus
func (ct *ConsensusPoller) GetConsensusBlockNumber() hexutil.Uint64 {
return ct.tracker.GetConsensusBlockNumber()
// GetLatestBlockNumber returns the `latest` agreed block number in a consensus
func (ct *ConsensusPoller) GetLatestBlockNumber() hexutil.Uint64 {
return ct.tracker.GetLatestBlockNumber()
}

// GetFinalizedBlockNumber returns the `finalized` agreed block number in a consensus
func (ct *ConsensusPoller) GetFinalizedBlockNumber() hexutil.Uint64 {
return ct.tracker.GetFinalizedBlockNumber()
}

// GetSafeBlockNumber returns the `safe` agreed block number in a consensus
func (ct *ConsensusPoller) GetSafeBlockNumber() hexutil.Uint64 {
return ct.tracker.GetSafeBlockNumber()
}

func (cp *ConsensusPoller) Shutdown() {
Expand Down Expand Up @@ -261,7 +275,19 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
log.Warn("error updating backend", "name", be.Name, "err", err)
}

changed, updateDelay := cp.setBackendState(be, peerCount, inSync, latestBlockNumber, latestBlockHash)
finalizedBlockNumber, _, err := cp.fetchBlock(ctx, be, "finalized")
if err != nil {
log.Warn("error updating backend", "name", be.Name, "err", err)
}

safeBlockNumber, _, err := cp.fetchBlock(ctx, be, "safe")
if err != nil {
log.Warn("error updating backend", "name", be.Name, "err", err)
}

changed, updateDelay := cp.setBackendState(be, peerCount, inSync,
latestBlockNumber, latestBlockHash,
finalizedBlockNumber, safeBlockNumber)

if changed {
RecordBackendLatestBlock(be, latestBlockNumber)
Expand All @@ -272,21 +298,27 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
"inSync", inSync,
"latestBlockNumber", latestBlockNumber,
"latestBlockHash", latestBlockHash,
"finalizedBlockNumber", finalizedBlockNumber,
"safeBlockNumber", safeBlockNumber,
"updateDelay", updateDelay)
}
}

// UpdateBackendGroupConsensus resolves the current group consensus based on the state of the backends
func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
var highestBlock hexutil.Uint64
var lowestBlock hexutil.Uint64
var lowestBlockHash string
var highestLatestBlock hexutil.Uint64

var lowestLatestBlock hexutil.Uint64
var lowestLatestBlockHash string

currentConsensusBlockNumber := cp.GetConsensusBlockNumber()
var lowestFinalizedBlock hexutil.Uint64
var lowestSafeBlock hexutil.Uint64

currentConsensusBlockNumber := cp.GetLatestBlockNumber()

// find the highest block, in order to use it defining the highest non-lagging ancestor block
for _, be := range cp.backendGroup.Backends {
peerCount, inSync, backendLatestBlockNumber, _, lastUpdate, _ := cp.getBackendState(be)
peerCount, inSync, backendLatestBlockNumber, _, _, _, lastUpdate, _ := cp.getBackendState(be)

if !be.skipPeerCountCheck && peerCount < cp.minPeerCount {
continue
Expand All @@ -298,14 +330,14 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
continue
}

if backendLatestBlockNumber > highestBlock {
highestBlock = backendLatestBlockNumber
if backendLatestBlockNumber > highestLatestBlock {
highestLatestBlock = backendLatestBlockNumber
}
}

// find the highest common ancestor block
for _, be := range cp.backendGroup.Backends {
peerCount, inSync, backendLatestBlockNumber, backendLatestBlockHash, lastUpdate, _ := cp.getBackendState(be)
peerCount, inSync, backendLatestBlockNumber, backendLatestBlockHash, backendFinalizedBlockNumber, backendSafeBlockNumber, lastUpdate, _ := cp.getBackendState(be)

if !be.skipPeerCountCheck && peerCount < cp.minPeerCount {
continue
Expand All @@ -318,32 +350,40 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
}

// check if backend is lagging behind the highest block
if backendLatestBlockNumber < highestBlock && uint64(highestBlock-backendLatestBlockNumber) > cp.maxBlockLag {
if backendLatestBlockNumber < highestLatestBlock && uint64(highestLatestBlock-backendLatestBlockNumber) > cp.maxBlockLag {
continue
}

if lowestBlock == 0 || backendLatestBlockNumber < lowestBlock {
lowestBlock = backendLatestBlockNumber
lowestBlockHash = backendLatestBlockHash
if lowestLatestBlock == 0 || backendLatestBlockNumber < lowestLatestBlock {
lowestLatestBlock = backendLatestBlockNumber
lowestLatestBlockHash = backendLatestBlockHash
}

if lowestFinalizedBlock == 0 || backendFinalizedBlockNumber < lowestFinalizedBlock {
lowestFinalizedBlock = backendFinalizedBlockNumber
}

if lowestSafeBlock == 0 || backendSafeBlockNumber < lowestSafeBlock {
lowestSafeBlock = backendSafeBlockNumber
}
}

// no block to propose (i.e. initializing consensus)
if lowestBlock == 0 {
if lowestLatestBlock == 0 {
return
}

proposedBlock := lowestBlock
proposedBlockHash := lowestBlockHash
proposedBlock := lowestLatestBlock
proposedBlockHash := lowestLatestBlockHash
hasConsensus := false

// check if everybody agrees on the same block hash
consensusBackends := make([]*Backend, 0, len(cp.backendGroup.Backends))
consensusBackendsNames := make([]string, 0, len(cp.backendGroup.Backends))
filteredBackendsNames := make([]string, 0, len(cp.backendGroup.Backends))

if lowestBlock > currentConsensusBlockNumber {
log.Debug("validating consensus on block", "lowestBlock", lowestBlock)
if lowestLatestBlock > currentConsensusBlockNumber {
log.Debug("validating consensus on block", "lowestLatestBlock", lowestLatestBlock)
}

broken := false
Expand All @@ -362,7 +402,7 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
- in sync
*/

peerCount, inSync, latestBlockNumber, _, lastUpdate, bannedUntil := cp.getBackendState(be)
peerCount, inSync, latestBlockNumber, _, _, _, lastUpdate, bannedUntil := cp.getBackendState(be)
notUpdated := lastUpdate.Add(cp.maxUpdateThreshold).Before(time.Now())
isBanned := time.Now().Before(bannedUntil)
notEnoughPeers := !be.skipPeerCountCheck && peerCount < cp.minPeerCount
Expand Down Expand Up @@ -410,7 +450,9 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
log.Info("consensus broken", "currentConsensusBlockNumber", currentConsensusBlockNumber, "proposedBlock", proposedBlock, "proposedBlockHash", proposedBlockHash)
}

cp.tracker.SetConsensusBlockNumber(proposedBlock)
cp.tracker.SetLatestBlockNumber(proposedBlock)
cp.tracker.SetFinalizedBlockNumber(lowestFinalizedBlock)
cp.tracker.SetSafeBlockNumber(lowestSafeBlock)
cp.consensusGroupMux.Lock()
cp.consensusGroup = consensusBackends
cp.consensusGroupMux.Unlock()
Expand Down Expand Up @@ -512,27 +554,38 @@ func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bo
return res, nil
}

func (cp *ConsensusPoller) getBackendState(be *Backend) (peerCount uint64, inSync bool, blockNumber hexutil.Uint64, blockHash string, lastUpdate time.Time, bannedUntil time.Time) {
func (cp *ConsensusPoller) getBackendState(be *Backend) (peerCount uint64, inSync bool,
latestBlockNumber hexutil.Uint64, latestBlockHash string,
finalizedBlockNumber hexutil.Uint64,
safeBlockNumber hexutil.Uint64,
lastUpdate time.Time, bannedUntil time.Time) {
bs := cp.backendState[be]
defer bs.backendStateMux.Unlock()
bs.backendStateMux.Lock()
peerCount = bs.peerCount
inSync = bs.inSync
blockNumber = bs.latestBlockNumber
blockHash = bs.latestBlockHash
latestBlockNumber = bs.latestBlockNumber
latestBlockHash = bs.latestBlockHash
finalizedBlockNumber = bs.finalizedBlockNumber
safeBlockNumber = bs.safeBlockNumber
lastUpdate = bs.lastUpdate
bannedUntil = bs.bannedUntil
return
}

func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool, blockNumber hexutil.Uint64, blockHash string) (changed bool, updateDelay time.Duration) {
func (cp *ConsensusPoller) setBackendState(be *Backend, peerCount uint64, inSync bool,
latestBlockNumber hexutil.Uint64, latestBlockHash string,
finalizedBlockNumber hexutil.Uint64,
safeBlockNumber hexutil.Uint64) (changed bool, updateDelay time.Duration) {
bs := cp.backendState[be]
bs.backendStateMux.Lock()
changed = bs.latestBlockHash != blockHash
changed = bs.latestBlockHash != latestBlockHash
bs.peerCount = peerCount
bs.inSync = inSync
bs.latestBlockNumber = blockNumber
bs.latestBlockHash = blockHash
bs.latestBlockNumber = latestBlockNumber
bs.latestBlockHash = latestBlockHash
bs.finalizedBlockNumber = finalizedBlockNumber
bs.safeBlockNumber = safeBlockNumber
updateDelay = time.Since(bs.lastUpdate)
bs.lastUpdate = time.Now()
bs.backendStateMux.Unlock()
Expand Down
78 changes: 63 additions & 15 deletions proxyd/consensus_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,35 +13,68 @@ import (
// ConsensusTracker abstracts how we store and retrieve the current consensus
// allowing it to be stored locally in-memory or in a shared Redis cluster
type ConsensusTracker interface {
GetConsensusBlockNumber() hexutil.Uint64
SetConsensusBlockNumber(blockNumber hexutil.Uint64)
GetLatestBlockNumber() hexutil.Uint64
SetLatestBlockNumber(blockNumber hexutil.Uint64)
GetFinalizedBlockNumber() hexutil.Uint64
SetFinalizedBlockNumber(blockNumber hexutil.Uint64)
GetSafeBlockNumber() hexutil.Uint64
SetSafeBlockNumber(blockNumber hexutil.Uint64)
}

// InMemoryConsensusTracker store and retrieve in memory, async-safe
type InMemoryConsensusTracker struct {
consensusBlockNumber hexutil.Uint64
latestBlockNumber hexutil.Uint64
finalizedBlockNumber hexutil.Uint64
safeBlockNumber hexutil.Uint64
mutex sync.Mutex
}

func NewInMemoryConsensusTracker() ConsensusTracker {
return &InMemoryConsensusTracker{
consensusBlockNumber: 0,
mutex: sync.Mutex{},
mutex: sync.Mutex{},
}
}

func (ct *InMemoryConsensusTracker) GetConsensusBlockNumber() hexutil.Uint64 {
func (ct *InMemoryConsensusTracker) GetLatestBlockNumber() hexutil.Uint64 {
defer ct.mutex.Unlock()
ct.mutex.Lock()

return ct.consensusBlockNumber
return ct.latestBlockNumber
}

func (ct *InMemoryConsensusTracker) SetConsensusBlockNumber(blockNumber hexutil.Uint64) {
func (ct *InMemoryConsensusTracker) SetLatestBlockNumber(blockNumber hexutil.Uint64) {
defer ct.mutex.Unlock()
ct.mutex.Lock()

ct.consensusBlockNumber = blockNumber
ct.latestBlockNumber = blockNumber
}

func (ct *InMemoryConsensusTracker) GetFinalizedBlockNumber() hexutil.Uint64 {
defer ct.mutex.Unlock()
ct.mutex.Lock()

return ct.finalizedBlockNumber
}

func (ct *InMemoryConsensusTracker) SetFinalizedBlockNumber(blockNumber hexutil.Uint64) {
defer ct.mutex.Unlock()
ct.mutex.Lock()

ct.finalizedBlockNumber = blockNumber
}

func (ct *InMemoryConsensusTracker) GetSafeBlockNumber() hexutil.Uint64 {
defer ct.mutex.Unlock()
ct.mutex.Lock()

return ct.safeBlockNumber
}

func (ct *InMemoryConsensusTracker) SetSafeBlockNumber(blockNumber hexutil.Uint64) {
defer ct.mutex.Unlock()
ct.mutex.Lock()

ct.safeBlockNumber = blockNumber
}

// RedisConsensusTracker uses a Redis `client` to store and retrieve consensus, async-safe
Expand All @@ -59,14 +92,29 @@ func NewRedisConsensusTracker(ctx context.Context, r *redis.Client, namespace st
}
}

func (ct *RedisConsensusTracker) key() string {
return fmt.Sprintf("consensus_latest_block:%s", ct.backendGroup)
func (ct *RedisConsensusTracker) key(tag string) string {
return fmt.Sprintf("consensus:%s:%s", ct.backendGroup, tag)
}

func (ct *RedisConsensusTracker) GetConsensusBlockNumber() hexutil.Uint64 {
return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key()).Val()))
func (ct *RedisConsensusTracker) GetLatestBlockNumber() hexutil.Uint64 {
return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key("latest")).Val()))
}

func (ct *RedisConsensusTracker) SetLatestBlockNumber(blockNumber hexutil.Uint64) {
ct.client.Set(ct.ctx, ct.key("latest"), blockNumber, 0)
}

func (ct *RedisConsensusTracker) GetFinalizedBlockNumber() hexutil.Uint64 {
return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key("finalized")).Val()))
}

func (ct *RedisConsensusTracker) SetFinalizedBlockNumber(blockNumber hexutil.Uint64) {
ct.client.Set(ct.ctx, ct.key("finalized"), blockNumber, 0)
}
func (ct *RedisConsensusTracker) GetSafeBlockNumber() hexutil.Uint64 {
return hexutil.Uint64(hexutil.MustDecodeUint64(ct.client.Get(ct.ctx, ct.key("safe")).Val()))
}

func (ct *RedisConsensusTracker) SetConsensusBlockNumber(blockNumber hexutil.Uint64) {
ct.client.Set(ct.ctx, ct.key(), blockNumber, 0)
func (ct *RedisConsensusTracker) SetSafeBlockNumber(blockNumber hexutil.Uint64) {
ct.client.Set(ct.ctx, ct.key("safe"), blockNumber, 0)
}
Loading

0 comments on commit 750edbf

Please sign in to comment.