Skip to content

Commit

Permalink
Report peer stats and expose RTT information from peerset
Browse files Browse the repository at this point in the history
  • Loading branch information
0xalank committed Mar 27, 2023
1 parent 5810fee commit c150ed3
Show file tree
Hide file tree
Showing 12 changed files with 151 additions and 25 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ endif
ifeq ($(QUAI_MINING),true)
BASE_CMD += --mine --miner.threads $(THREADS)
endif
ifeq ($(QUAI_STATS),true)
BASE_CMD += --quaistats ${STATS_NAME}:${STATS_PASS}@${STATS_HOST}
endif

# Build suburl strings for slice specific subclient groups
# WARNING: Only connect to dom/sub clients over a trusted network.
Expand Down
7 changes: 6 additions & 1 deletion eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,11 @@ func (d *Downloader) Synchronise(id string, head common.Hash, number uint64, mod
return err
}

// PeerSet retrieves the current peer set of the downloader.
func (d *Downloader) PeerSet() *peerSet {
return d.peers
}

// synchronise will select the peer and use it for synchronising. If an empty string is given
// it will use the best peer possible and synchronize if its number is higher than our own. If any of the
// checks fail an error will be returned. This method is synchronous
Expand Down Expand Up @@ -477,7 +482,7 @@ func (d *Downloader) fetchHead(p *peerConnection) (head *types.Header, err error
p.log.Debug("Retrieving remote chain head")

// Request the advertised remote head block and wait for the response
latest, _ := p.peer.Head()
latest, _, _ := p.peer.Head()
fetch := 1
go p.peer.RequestHeadersByHash(latest, fetch, uint64(1), false, true)

Expand Down
17 changes: 16 additions & 1 deletion eth/downloader/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type peerConnection struct {

// LightPeer encapsulates the methods required to synchronise with a remote light peer.
type LightPeer interface {
Head() (common.Hash, uint64)
Head() (common.Hash, uint64, time.Time)
RequestHeadersByHash(common.Hash, int, uint64, bool, bool) error
RequestHeadersByNumber(uint64, int, uint64, uint64, bool, bool) error
}
Expand Down Expand Up @@ -105,6 +105,21 @@ func (p *peerConnection) Reset() {
p.lacking = make(map[common.Hash]struct{})
}

// PeerConnection ID returns the unique identifier of the peer.
func (p *peerConnection) ID() string {
return p.id
}

// Tracker returns the message rate trackers of the peer.
func (p *peerConnection) Tracker() *msgrate.Tracker {
return p.rates
}

// Peer returns the underlying peer entity.
func (p *peerConnection) Peer() Peer {
return p.peer
}

// FetchHeaders sends a header retrieval request to the remote peer.
func (p *peerConnection) FetchHeaders(from uint64, count int) error {
// Short circuit if the peer is already fetching
Expand Down
4 changes: 2 additions & 2 deletions eth/handler_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,9 @@ func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, block *types.Block) er
}
}

_, number := peer.Head()
_, number, _ := peer.Head()
if (block.NumberU64() - 1) > number {
peer.SetHead(block.ParentHash(), block.NumberU64()-1)
peer.SetHead(block.ParentHash(), block.NumberU64()-1, block.ReceivedAt)
h.chainSync.handlePeerEvent(peer)
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion eth/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type ethPeer struct {

// info gathers and returns some `eth` protocol metadata known about a peer.
func (p *ethPeer) info() *ethPeerInfo {
hash, number := p.Head()
hash, number, _ := p.Head()

return &ethPeerInfo{
Version: p.Version(),
Expand Down
2 changes: 1 addition & 1 deletion eth/peerset.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (ps *peerSet) peerWithHighestNumber() *eth.Peer {
bestNumber uint64
)
for _, p := range ps.peers {
if _, number := p.Head(); bestPeer == nil || number > bestNumber {
if _, number, _ := p.Head(); bestPeer == nil || number > bestNumber {
bestPeer, bestNumber = p.Peer, number
}
}
Expand Down
9 changes: 6 additions & 3 deletions eth/protocols/eth/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"math/rand"
"sync"
"time"

mapset "github.com/deckarep/golang-set"
"github.com/dominant-strategies/go-quai/common"
Expand Down Expand Up @@ -78,6 +79,7 @@ type Peer struct {

head common.Hash // Latest advertised head block hash
number uint64 // Latest advertised head block number
receivedHeadAt time.Time // Time when the head was received

knownBlocks mapset.Set // Set of block hashes known to be known by this peer
queuedBlocks chan *blockPropagation // Queue of blocks to broadcast to the peer
Expand Down Expand Up @@ -139,21 +141,22 @@ func (p *Peer) Version() uint {
}

// Head retrieves the current head hash and head number of the peer.
func (p *Peer) Head() (hash common.Hash, number uint64) {
func (p *Peer) Head() (hash common.Hash, number uint64, receivedAt time.Time) {
p.lock.RLock()
defer p.lock.RUnlock()

copy(hash[:], p.head[:])
return hash, p.number
return hash, p.number, p.receivedHeadAt
}

// SetHead updates the head hash and head number of the peer.
func (p *Peer) SetHead(hash common.Hash, number uint64) {
func (p *Peer) SetHead(hash common.Hash, number uint64, receivedAt time.Time) {
p.lock.Lock()
defer p.lock.Unlock()

copy(p.head[:], hash[:])
p.number = number
p.receivedHeadAt = receivedAt
}

// KnownBlock returns whether peer is known to already have a block.
Expand Down
2 changes: 1 addition & 1 deletion eth/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (cs *chainSyncer) nextSyncOp() *chainSyncOp {
}

func peerToSyncOp(mode downloader.SyncMode, p *eth.Peer) *chainSyncOp {
peerHead, peerNumber := p.Head()
peerHead, peerNumber, _ := p.Head()
return &chainSyncOp{mode: mode, peer: p, number: peerNumber, head: peerHead}
}

Expand Down
8 changes: 7 additions & 1 deletion network.env.dist
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,10 @@ CORS=false
VERBOSITY=3

# Syncmode variable
SYNCMODE=full
SYNCMODE=full

# Stats information
QUAI_STATS=false
STATS_NAME=
STATS_PASS=
STATS_HOST=
7 changes: 7 additions & 0 deletions p2p/msgrate/msgrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,13 @@ func (t *Tracker) Capacity(kind uint64, targetRTT time.Duration) int {
return roundCapacity(1 + capacityOverestimation*throughput)
}

// Roundtrip returns the estimated roundtrip time of the peer.
func (t *Tracker) Roundtrip() time.Duration {
t.lock.RLock()
defer t.lock.RUnlock()
return t.roundtrip
}

// roundCapacity gives the integer value of a capacity.
// The result fits int32, and is guaranteed to be positive.
func roundCapacity(cap float64) int {
Expand Down
4 changes: 4 additions & 0 deletions p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ func (p *Peer) LocalAddr() net.Addr {
return p.rw.fd.LocalAddr()
}

func (p *Peer) ConnectedTime() time.Duration {
return time.Duration(mclock.Now() - p.created)
}

// Disconnect terminates the peer connection with the given reason.
// It returns immediately and does not wait until the connection is closed.
func (p *Peer) Disconnect(reason DiscReason) {
Expand Down
111 changes: 97 additions & 14 deletions quaistats/quaistats.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/dominant-strategies/go-quai/log"
"github.com/dominant-strategies/go-quai/node"
"github.com/dominant-strategies/go-quai/p2p"
"github.com/dominant-strategies/go-quai/params"
"github.com/dominant-strategies/go-quai/rpc"
"github.com/gorilla/websocket"
)
Expand All @@ -55,6 +56,9 @@ const (
txChanSize = 4096
// chainHeadChanSize is the size of channel listening to ChainHeadEvent.
chainHeadChanSize = 10

// reportInterval is the time interval between two reports.
reportInterval = 15
)

// backend encompasses the bare-minimum functionality needed for quaistats reporting
Expand All @@ -66,6 +70,7 @@ type backend interface {
GetTd(ctx context.Context, hash common.Hash) *big.Int
Stats() (pending int, queued int)
Downloader() *downloader.Downloader
ChainConfig() *params.ChainConfig
}

// fullNodeBackend encompasses the functionality necessary for a full node
Expand Down Expand Up @@ -94,6 +99,8 @@ type Service struct {

headSub event.Subscription
txSub event.Subscription

chainID *big.Int
}

// connWrapper is a wrapper to prevent concurrent-write or concurrent-read on the
Expand Down Expand Up @@ -181,6 +188,7 @@ func New(node *node.Node, backend backend, engine consensus.Engine, url string)
host: parts[2],
pongCh: make(chan struct{}),
histCh: make(chan []uint64, 1),
chainID: backend.ChainConfig().ChainID,
}

node.RegisterLifecycle(quaistats)
Expand Down Expand Up @@ -307,7 +315,7 @@ func (s *Service) loop(chainHeadCh chan core.ChainHeadEvent, txEventCh chan core
continue
}
// Keep sending status updates until the connection breaks
fullReport := time.NewTicker(15 * time.Second)
fullReport := time.NewTicker(reportInterval * time.Second)

for err == nil {
select {
Expand Down Expand Up @@ -450,6 +458,8 @@ type nodeInfo struct {
OsVer string `json:"os_v"`
Client string `json:"client"`
History bool `json:"canUpdateHistory"`
Chain string `json:"chain"`
ChainID uint64 `json:"chainId"`
}

// authMsg is the authentication infos needed to login to a monitoring server.
Expand Down Expand Up @@ -485,6 +495,8 @@ func (s *Service) login(conn *connWrapper) error {
OsVer: runtime.GOARCH,
Client: "0.1.1",
History: true,
Chain: common.NodeLocation.Name(),
ChainID: s.chainID.Uint64(),
},
Secret: s.pass,
}
Expand Down Expand Up @@ -518,6 +530,9 @@ func (s *Service) report(conn *connWrapper) error {
if err := s.reportStats(conn); err != nil {
return err
}
if err := s.reportPeers(conn); err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -586,7 +601,8 @@ type blockStats struct {
ManifestHash common.Hash `json:"manifestHash"`
Root common.Hash `json:"stateRoot"`
Uncles uncleStats `json:"uncles"`
Chain []int `json:"chain"`
Chain string `json:"chain"`
ChainID uint64 `json:"chainId"`
}

// txStats is the information to report about individual transactions.
Expand Down Expand Up @@ -679,18 +695,8 @@ func (s *Service) assembleBlockStats(block *types.Block) *blockStats {
ManifestHash: header.ManifestHash(),
Root: header.Root(),
Uncles: uncles,
Chain: IntArrayLocation(common.NodeLocation),
}
}

func IntArrayLocation(l common.Location) []int {
switch l.Context() {
case common.REGION_CTX:
return []int{l.Region()}
case common.ZONE_CTX:
return []int{l.Region(), l.Zone()}
default:
return []int{}
Chain: common.NodeLocation.Name(),
ChainID: s.chainID.Uint64(),
}
}

Expand Down Expand Up @@ -785,6 +791,8 @@ type nodeStats struct {
Peers int `json:"peers"`
GasPrice int `json:"gasPrice"`
Uptime int `json:"uptime"`
Chain string `json:"chain"`
ChainID uint64 `json:"chainId"`
}

// reportStats retrieves various stats about the node at the networking and
Expand Down Expand Up @@ -828,10 +836,85 @@ func (s *Service) reportStats(conn *connWrapper) error {
GasPrice: gasprice,
Syncing: syncing,
Uptime: 100,
Chain: common.NodeLocation.Name(),
ChainID: s.chainID.Uint64(),
},
}
report := map[string][]interface{}{
"emit": {"stats", stats},
}
return conn.WriteJSON(report)
}


// peerStats is the information to report about peers.
type peerStats struct {
Chain string `json:"chain"`
ChainID uint64 `json:"chainId"`
Count int `json:"count"`
PeerData []*peerData `json:"peerData"`
}

// peerStat is the information to report about peers.
type peerData struct {
Chain string `json:"chain"`
Enode string `json:"enode"` // Unique node identifier (enode URL)
SoftwareName string `json:"softwareName"` // Node software version
LocalAddress string `json:"localAddress"`
RemoteAddress string `json:"remoteAddress"`
RTT string `json:"rtt"`
LatestHeight uint64 `json:"latestHeight"`
LatestHash string `json:"latestHash"`
RecvLastBlockTime time.Time `json:"recvLastBlockTime"`
ConnectedTime time.Duration `json:"uptime"`
PeerUptime string `json:"peerUptime"` // TODO: add peer uptime
}

// reportPeers retrieves various stats about the peers for a node.
func (s *Service) reportPeers(conn *connWrapper) error {
// Assemble the node stats and send it to the server
log.Trace("Sending peer details to quaistats")

peerInfo := s.backend.Downloader().PeerSet()

allPeerData := make([]*peerData, 0)

srvPeers := s.server.Peers()

for _, peer := range srvPeers {
peerStat := &peerData{
Enode: peer.ID().String(),
SoftwareName: peer.Fullname(),
LocalAddress: peer.LocalAddr().String(),
RemoteAddress: peer.RemoteAddr().String(),
Chain: common.NodeLocation.Name(),
ConnectedTime: peer.ConnectedTime(),
}

downloaderPeer := peerInfo.Peer(peer.ID().String())
if downloaderPeer != nil {
hash, num, receivedAt := downloaderPeer.Peer().Head()
peerStat.RTT = downloaderPeer.Tracker().Roundtrip().String()
peerStat.LatestHeight = num
peerStat.LatestHash = hash.String()
if receivedAt != *new(time.Time) {
peerStat.RecvLastBlockTime = receivedAt.UTC()
}
}
allPeerData = append(allPeerData, peerStat)
}

peers := map[string]interface{}{
"id": s.node,
"peers": &peerStats{
Chain: common.NodeLocation.Name(),
ChainID: s.chainID.Uint64(),
Count: len(srvPeers),
PeerData: allPeerData,
},
}
report := map[string][]interface{}{
"emit": {"peers", peers},
}
return conn.WriteJSON(report)
}

0 comments on commit c150ed3

Please sign in to comment.