Skip to content

Commit

Permalink
Add TxID to GossipTracker (ava-labs#2318)
Browse files Browse the repository at this point in the history
Co-authored-by: Stephen Buttolph <[email protected]>
  • Loading branch information
joshua-kim and StephenButtolph authored Dec 5, 2022
1 parent d55aa59 commit 8ed8985
Show file tree
Hide file tree
Showing 11 changed files with 216 additions and 138 deletions.
18 changes: 8 additions & 10 deletions network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,49 +498,47 @@ func (n *network) Version() (message.OutboundMessage, error) {
)
}

func (n *network) Peers(peerID ids.NodeID) ([]ids.NodeID, []ips.ClaimedIPPort, error) {
func (n *network) Peers(peerID ids.NodeID) ([]ips.ClaimedIPPort, error) {
// Only select validators that we haven't already sent to this peer
unknownValidators, ok, err := n.gossipTracker.GetUnknown(peerID, int(n.config.PeerListNumValidatorIPs))
if err != nil {
return nil, nil, err
return nil, err
}
if !ok {
n.peerConfig.Log.Debug(
"unable to find peer to gossip to",
zap.Stringer("nodeID", peerID),
)
return nil, nil, nil
return nil, nil
}

validatorIDs := make([]ids.NodeID, 0, len(unknownValidators))
validatorIPs := make([]ips.ClaimedIPPort, 0, len(unknownValidators))

for _, validatorID := range unknownValidators {
for _, validator := range unknownValidators {
n.peersLock.RLock()
p, ok := n.connectedPeers.GetByID(validatorID)
p, ok := n.connectedPeers.GetByID(validator.NodeID)
n.peersLock.RUnlock()
if !ok {
n.peerConfig.Log.Debug(
"unable to find validator in connected peers",
zap.Stringer("nodeID", validatorID),
zap.Stringer("nodeID", validator.NodeID),
)
continue
}

peerIP := p.IP()
validatorIDs = append(validatorIDs, validatorID)
validatorIPs = append(validatorIPs,
ips.ClaimedIPPort{
Cert: p.Cert(),
IPPort: peerIP.IP.IP,
Timestamp: peerIP.IP.Timestamp,
Signature: peerIP.Signature,
TxID: ids.ID{}, // TODO: populate this field
TxID: validator.TxID,
},
)
}

return validatorIDs, validatorIPs, nil
return validatorIPs, nil
}

// Dispatch starts accepting connections from other nodes attempting to connect
Expand Down
4 changes: 2 additions & 2 deletions network/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,13 +217,13 @@ func newFullyConnectedTestNetwork(t *testing.T, handlers []router.InboundHandler
}

beacons := validators.NewSet()
err = beacons.Add(nodeIDs[0], nil, ids.Empty, 1)
err = beacons.Add(nodeIDs[0], nil, ids.GenerateTestID(), 1)
require.NoError(err)

primaryVdrs := validators.NewSet()
primaryVdrs.RegisterCallbackListener(&gossipTrackerCallback)
for _, nodeID := range nodeIDs {
err := primaryVdrs.Add(nodeID, nil, ids.Empty, 1)
err := primaryVdrs.Add(nodeID, nil, ids.GenerateTestID(), 1)
require.NoError(err)
}

Expand Down
104 changes: 58 additions & 46 deletions network/peer/gossip_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,32 +55,36 @@ type GossipTracker interface {
StopTrackingPeer(peerID ids.NodeID) bool

// AddValidator adds a validator that can be gossiped about
// bool: False if [validatorID] was already present. True otherwise.
AddValidator(validatorID ids.NodeID) bool
// bool: False if a validator with the same node ID or txID as [validator]
// is present. True otherwise.
AddValidator(validator ValidatorID) bool
// RemoveValidator removes a validator that can be gossiped about
// bool: False if [validatorID] was already not present. True otherwise.
// bool: False if [validator] was already not present. True otherwise.
RemoveValidator(validatorID ids.NodeID) bool

// AddKnown adds [validatorIDs] to the peers validatorIDs by [peerID]
// AddKnown adds [txIDs] to the txIDs known by [peerID]
// Returns:
// bool: False if [peerID] is not tracked. True otherwise.
AddKnown(peerID ids.NodeID, validatorIDs []ids.NodeID) bool
AddKnown(peerID ids.NodeID, txIDs []ids.ID) bool
// GetUnknown gets the peers that we haven't sent to this peer
// Returns:
// []ids.NodeID: a slice of [limit] validatorIDs that [peerID] doesn't know
// about.
// []ValidatorID: a slice of [limit] ValidatorIDs that [peerID] doesn't
// know about.
// bool: False if [peerID] is not tracked. True otherwise.
GetUnknown(peerID ids.NodeID, limit int) ([]ids.NodeID, bool, error)
GetUnknown(peerID ids.NodeID, limit int) ([]ValidatorID, bool, error)
}

type gossipTracker struct {
lock sync.RWMutex
// a mapping of each peer => the validators we have sent them
trackedPeers map[ids.NodeID]ids.BigBitSet
// a mapping of txIDs => the validator added to the validiator set by that
// tx.
txIDsToNodeIDs map[ids.ID]ids.NodeID
// a mapping of validators => the index they occupy in the bitsets
validatorsToIndices map[ids.NodeID]int
nodeIDsToIndices map[ids.NodeID]int
// each validator in the index it occupies in the bitset
validatorIndices []ids.NodeID
validatorIDs []ValidatorID
// a mapping of each peer => the validators they know about
trackedPeers map[ids.NodeID]ids.BigBitSet

metrics gossipTrackerMetrics
}
Expand All @@ -96,9 +100,10 @@ func NewGossipTracker(
}

return &gossipTracker{
trackedPeers: make(map[ids.NodeID]ids.BigBitSet),
validatorsToIndices: make(map[ids.NodeID]int),
metrics: m,
txIDsToNodeIDs: make(map[ids.ID]ids.NodeID),
nodeIDsToIndices: make(map[ids.NodeID]int),
trackedPeers: make(map[ids.NodeID]ids.BigBitSet),
metrics: m,
}, nil
}

Expand Down Expand Up @@ -145,23 +150,26 @@ func (g *gossipTracker) StopTrackingPeer(peerID ids.NodeID) bool {
return true
}

func (g *gossipTracker) AddValidator(validatorID ids.NodeID) bool {
func (g *gossipTracker) AddValidator(validator ValidatorID) bool {
g.lock.Lock()
defer g.lock.Unlock()

// only add validators that are not already present
if _, ok := g.validatorsToIndices[validatorID]; ok {
if _, ok := g.txIDsToNodeIDs[validator.TxID]; ok {
return false
}
if _, ok := g.nodeIDsToIndices[validator.NodeID]; ok {
return false
}

// add the validator to the MSB of the bitset.
msb := len(g.validatorsToIndices)
g.validatorsToIndices[validatorID] = msb
g.validatorIndices = append(g.validatorIndices, validatorID)
msb := len(g.validatorIDs)
g.txIDsToNodeIDs[validator.TxID] = validator.NodeID
g.nodeIDsToIndices[validator.NodeID] = msb
g.validatorIDs = append(g.validatorIDs, validator)

// emit metrics
g.metrics.validatorsToIndicesSize.Set(float64(len(g.validatorsToIndices)))
g.metrics.validatorIndices.Set(float64(len(g.validatorIndices)))
g.metrics.validatorsSize.Set(float64(len(g.validatorIDs)))

return true
}
Expand All @@ -171,26 +179,28 @@ func (g *gossipTracker) RemoveValidator(validatorID ids.NodeID) bool {
defer g.lock.Unlock()

// only remove validators that are already present
indexToRemove, ok := g.validatorsToIndices[validatorID]
indexToRemove, ok := g.nodeIDsToIndices[validatorID]
if !ok {
return false
}
validatorToRemove := g.validatorIDs[indexToRemove]

// swap the validator-to-be-removed with the validator in the last index
// if the element we're swapping with is ourselves, we can skip this swap
// since we only need to delete instead
lastIndex := len(g.validatorIndices) - 1
lastIndex := len(g.validatorIDs) - 1
if indexToRemove != lastIndex {
lastPeer := g.validatorIndices[lastIndex]
lastValidator := g.validatorIDs[lastIndex]

g.validatorIndices[indexToRemove] = lastPeer
g.validatorsToIndices[lastPeer] = indexToRemove
g.nodeIDsToIndices[lastValidator.NodeID] = indexToRemove
g.validatorIDs[indexToRemove] = lastValidator
}

delete(g.validatorsToIndices, validatorID)
g.validatorIndices = g.validatorIndices[:lastIndex]
delete(g.txIDsToNodeIDs, validatorToRemove.TxID)
delete(g.nodeIDsToIndices, validatorID)
g.validatorIDs = g.validatorIDs[:lastIndex]

// invariant: we must remove the validator from everyone else's validator
// Invariant: We must remove the validator from everyone else's validator
// bitsets to make sure that each validator occupies the same position in
// each bitset.
for _, knownPeers := range g.trackedPeers {
Expand All @@ -206,19 +216,18 @@ func (g *gossipTracker) RemoveValidator(validatorID ids.NodeID) bool {
}

// emit metrics
g.metrics.validatorsToIndicesSize.Set(float64(len(g.validatorsToIndices)))
g.metrics.validatorIndices.Set(float64(len(g.validatorIndices)))
g.metrics.validatorsSize.Set(float64(len(g.validatorIDs)))

return true
}

// AddKnown invariants:
// 1. [peerID] SHOULD only be a nodeID that has been tracked with
// StartTrackingPeer().
// 2. [validatorIDs] SHOULD only be a slice of nodeIDs that has been added with
// AddValidator. Trying to learn about validatorIDs that aren't registered
// 2. [txIDs] SHOULD only be a slice of txIDs that have been added with
// AddValidator. Trying to learn about txIDs that aren't registered
// yet will result in dropping the unregistered ID.
func (g *gossipTracker) AddKnown(peerID ids.NodeID, validatorIDs []ids.NodeID) bool {
func (g *gossipTracker) AddKnown(peerID ids.NodeID, txIDs []ids.ID) bool {
g.lock.Lock()
defer g.lock.Unlock()

Expand All @@ -227,22 +236,25 @@ func (g *gossipTracker) AddKnown(peerID ids.NodeID, validatorIDs []ids.NodeID) b
return false
}

for _, nodeID := range validatorIDs {
// sanity check that this node we learned about is actually a validator
idx, ok := g.validatorsToIndices[nodeID]
for _, txID := range txIDs {
nodeID, ok := g.txIDsToNodeIDs[txID]
if !ok {
// if we try to learn about a validator that we don't know about,
// silently continue.
// We don't know about this txID, this can happen due to differences
// between our current validator set and the peer's current
// validator set.
continue
}

knownPeers.Add(idx)
// Because we fetched the nodeID from [g.txIDsToNodeIDs], we are
// guaranteed that the index is populated.
index := g.nodeIDsToIndices[nodeID]
knownPeers.Add(index)
}

return true
}

func (g *gossipTracker) GetUnknown(peerID ids.NodeID, limit int) ([]ids.NodeID, bool, error) {
func (g *gossipTracker) GetUnknown(peerID ids.NodeID, limit int) ([]ValidatorID, bool, error) {
if limit <= 0 {
return nil, false, nil
}
Expand All @@ -259,22 +271,22 @@ func (g *gossipTracker) GetUnknown(peerID ids.NodeID, limit int) ([]ids.NodeID,
// We select a random sample of bits to gossip to avoid starving out a
// validator from being gossiped for ane extended period of time.
s := sampler.NewUniform()
if err := s.Initialize(uint64(len(g.validatorIndices))); err != nil {
if err := s.Initialize(uint64(len(g.validatorIDs))); err != nil {
return nil, false, err
}

// Calculate the unknown information we need to send to this peer. We do
// this by computing the difference between the validators we know about
// and the validators we know we've sent to [peerID].
result := make([]ids.NodeID, 0, limit)
for i := 0; i < len(g.validatorIndices) && len(result) < limit; i++ {
result := make([]ValidatorID, 0, limit)
for i := 0; i < len(g.validatorIDs) && len(result) < limit; i++ {
drawn, err := s.Next()
if err != nil {
return nil, false, err
}

if !knownPeers.Contains(int(drawn)) {
result = append(result, g.validatorIndices[drawn])
result = append(result, g.validatorIDs[drawn])
}
}

Expand Down
9 changes: 7 additions & 2 deletions network/peer/gossip_tracker_callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,17 @@ type GossipTrackerCallback struct {
func (g *GossipTrackerCallback) OnValidatorAdded(
nodeID ids.NodeID,
_ *bls.PublicKey,
_ ids.ID,
txID ids.ID,
_ uint64,
) {
if !g.GossipTracker.AddValidator(nodeID) {
vdr := ValidatorID{
NodeID: nodeID,
TxID: txID,
}
if !g.GossipTracker.AddValidator(vdr) {
g.Log.Error("failed to add a validator",
zap.Stringer("nodeID", nodeID),
zap.Stringer("txID", txID),
)
}
}
Expand Down
21 changes: 6 additions & 15 deletions network/peer/gossip_tracker_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ import (
)

type gossipTrackerMetrics struct {
trackedPeersSize prometheus.Gauge
validatorsToIndicesSize prometheus.Gauge
validatorIndices prometheus.Gauge
trackedPeersSize prometheus.Gauge
validatorsSize prometheus.Gauge
}

func newGossipTrackerMetrics(registerer prometheus.Registerer, namespace string) (gossipTrackerMetrics, error) {
Expand All @@ -24,27 +23,19 @@ func newGossipTrackerMetrics(registerer prometheus.Registerer, namespace string)
Help: "amount of peers that are being tracked",
},
),
validatorsToIndicesSize: prometheus.NewGauge(
validatorsSize: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "validators_to_indices_size",
Help: "amount of validators this node is tracking in validatorsToIndices",
},
),
validatorIndices: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "validator_indices_size",
Help: "amount of validators this node is tracking in validatorIndices",
Name: "validators_size",
Help: "number of validators this node is tracking",
},
),
}

errs := wrappers.Errs{}
errs.Add(
registerer.Register(m.trackedPeersSize),
registerer.Register(m.validatorsToIndicesSize),
registerer.Register(m.validatorIndices),
registerer.Register(m.validatorsSize),
)

return m, errs.Err
Expand Down
Loading

0 comments on commit 8ed8985

Please sign in to comment.