Skip to content

Commit

Permalink
Add sdk gossip handler metrics (ava-labs#1997)
Browse files Browse the repository at this point in the history
  • Loading branch information
joshua-kim authored Sep 11, 2023
1 parent d1e9c0f commit afa2b7a
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 24 deletions.
43 changes: 37 additions & 6 deletions network/p2p/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"context"
"time"

"github.com/prometheus/client_golang/prometheus"

"go.uber.org/zap"

"google.golang.org/protobuf/proto"
Expand All @@ -15,6 +17,7 @@ import (
"github.com/ava-labs/avalanchego/network/p2p"
"github.com/ava-labs/avalanchego/proto/pb/sdk"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/wrappers"
)

// GossipableAny exists to help create non-nil pointers to a concrete Gossipable
Expand All @@ -25,6 +28,7 @@ type GossipableAny[T any] interface {
}

type Config struct {
Namespace string
Frequency time.Duration
PollSize int
}
Expand All @@ -34,20 +38,41 @@ func NewGossiper[T any, U GossipableAny[T]](
log logging.Logger,
set Set[U],
client *p2p.Client,
) *Gossiper[T, U] {
return &Gossiper[T, U]{
metrics prometheus.Registerer,
) (*Gossiper[T, U], error) {
g := &Gossiper[T, U]{
config: config,
log: log,
set: set,
client: client,
receivedN: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: config.Namespace,
Name: "gossip_received_n",
Help: "amount of gossip received (n)",
}),
receivedBytes: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: config.Namespace,
Name: "gossip_received_bytes",
Help: "amount of gossip received (bytes)",
}),
}

errs := wrappers.Errs{}
errs.Add(
metrics.Register(g.receivedN),
metrics.Register(g.receivedBytes),
)

return g, errs.Err
}

type Gossiper[T any, U GossipableAny[T]] struct {
config Config
log logging.Logger
set Set[U]
client *p2p.Client
config Config
log logging.Logger
set Set[U]
client *p2p.Client
receivedN prometheus.Counter
receivedBytes prometheus.Counter
}

func (g *Gossiper[_, _]) Gossip(ctx context.Context) {
Expand Down Expand Up @@ -112,7 +137,10 @@ func (g *Gossiper[T, U]) handleResponse(
return
}

receivedBytes := 0
for _, bytes := range response.Gossip {
receivedBytes += len(bytes)

gossipable := U(new(T))
if err := gossipable.Unmarshal(bytes); err != nil {
g.log.Debug(
Expand All @@ -139,4 +167,7 @@ func (g *Gossiper[T, U]) handleResponse(
continue
}
}

g.receivedN.Add(float64(len(response.Gossip)))
g.receivedBytes.Add(float64(receivedBytes))
}
60 changes: 45 additions & 15 deletions network/p2p/gossip/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/stretchr/testify/require"

"go.uber.org/mock/gomock"
Expand All @@ -20,9 +22,19 @@ import (
"github.com/ava-labs/avalanchego/utils/set"
)

func TestGossiperShutdown(_ *testing.T) {
func TestGossiperShutdown(t *testing.T) {
require := require.New(t)

config := Config{Frequency: time.Second}
gossiper := NewGossiper[testTx](config, logging.NoLog{}, nil, nil)
metrics := prometheus.NewRegistry()
gossiper, err := NewGossiper[testTx](
config,
logging.NoLog{},
nil,
nil,
metrics,
)
require.NoError(err)
ctx, cancel := context.WithCancel(context.Background())

wg := &sync.WaitGroup{}
Expand All @@ -40,7 +52,7 @@ func TestGossiperShutdown(_ *testing.T) {
func TestGossiperGossip(t *testing.T) {
tests := []struct {
name string
maxResponseSize int
config HandlerConfig
requester []*testTx // what we have
responder []*testTx // what the peer we're requesting gossip from has
expectedPossibleValues []*testTx // possible values we can have
Expand All @@ -50,38 +62,48 @@ func TestGossiperGossip(t *testing.T) {
name: "no gossip - no one knows anything",
},
{
name: "no gossip - requester knows more than responder",
maxResponseSize: 1024,
name: "no gossip - requester knows more than responder",
config: HandlerConfig{
TargetResponseSize: 1024,
},
requester: []*testTx{{id: ids.ID{0}}},
expectedPossibleValues: []*testTx{{id: ids.ID{0}}},
expectedLen: 1,
},
{
name: "no gossip - requester knows everything responder knows",
maxResponseSize: 1024,
name: "no gossip - requester knows everything responder knows",
config: HandlerConfig{
TargetResponseSize: 1024,
},
requester: []*testTx{{id: ids.ID{0}}},
responder: []*testTx{{id: ids.ID{0}}},
expectedPossibleValues: []*testTx{{id: ids.ID{0}}},
expectedLen: 1,
},
{
name: "gossip - requester knows nothing",
maxResponseSize: 1024,
name: "gossip - requester knows nothing",
config: HandlerConfig{
TargetResponseSize: 1024,
},
responder: []*testTx{{id: ids.ID{0}}},
expectedPossibleValues: []*testTx{{id: ids.ID{0}}},
expectedLen: 1,
},
{
name: "gossip - requester knows less than responder",
maxResponseSize: 1024,
name: "gossip - requester knows less than responder",
config: HandlerConfig{
TargetResponseSize: 1024,
},
requester: []*testTx{{id: ids.ID{0}}},
responder: []*testTx{{id: ids.ID{0}}, {id: ids.ID{1}}},
expectedPossibleValues: []*testTx{{id: ids.ID{0}}, {id: ids.ID{1}}},
expectedLen: 2,
},
{
name: "gossip - target response size exceeded",
maxResponseSize: 32,
name: "gossip - target response size exceeded",
config: HandlerConfig{
TargetResponseSize: 32,
},
responder: []*testTx{{id: ids.ID{0}}, {id: ids.ID{1}}, {id: ids.ID{2}}},
expectedPossibleValues: []*testTx{{id: ids.ID{0}}, {id: ids.ID{1}}, {id: ids.ID{2}}},
expectedLen: 2,
Expand All @@ -107,7 +129,8 @@ func TestGossiperGossip(t *testing.T) {
peers := &p2p.Peers{}
require.NoError(peers.Connected(context.Background(), ids.EmptyNodeID, nil))

handler := NewHandler[*testTx](responseSet, tt.maxResponseSize)
handler, err := NewHandler[*testTx](responseSet, tt.config, prometheus.NewRegistry())
require.NoError(err)
_, err = responseRouter.RegisterAppProtocol(0x0, handler, peers)
require.NoError(err)

Expand Down Expand Up @@ -146,7 +169,14 @@ func TestGossiperGossip(t *testing.T) {
Frequency: 500 * time.Millisecond,
PollSize: 1,
}
gossiper := NewGossiper[testTx, *testTx](config, logging.NoLog{}, requestSet, requestClient)
gossiper, err := NewGossiper[testTx, *testTx](
config,
logging.NoLog{},
requestSet,
requestClient,
prometheus.NewRegistry(),
)
require.NoError(err)
received := set.Set[*testTx]{}
requestSet.onAdd = func(tx *testTx) {
received.Add(tx)
Expand Down
42 changes: 39 additions & 3 deletions network/p2p/gossip/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@ import (

bloomfilter "github.com/holiman/bloomfilter/v2"

"github.com/prometheus/client_golang/prometheus"

"google.golang.org/protobuf/proto"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/network/p2p"
"github.com/ava-labs/avalanchego/proto/pb/sdk"
"github.com/ava-labs/avalanchego/utils/wrappers"
)

var (
Expand All @@ -23,18 +26,48 @@ var (
ErrInvalidID = errors.New("invalid id")
)

func NewHandler[T Gossipable](set Set[T], targetResponseSize int) *Handler[T] {
return &Handler[T]{
type HandlerConfig struct {
Namespace string
TargetResponseSize int
}

func NewHandler[T Gossipable](
set Set[T],
config HandlerConfig,
metrics prometheus.Registerer,
) (*Handler[T], error) {
h := &Handler[T]{
Handler: p2p.NoOpHandler{},
set: set,
targetResponseSize: targetResponseSize,
targetResponseSize: config.TargetResponseSize,
sentN: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: config.Namespace,
Name: "gossip_sent_n",
Help: "amount of gossip sent (n)",
}),
sentBytes: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: config.Namespace,
Name: "gossip_sent_bytes",
Help: "amount of gossip sent (bytes)",
}),
}

errs := wrappers.Errs{}
errs.Add(
metrics.Register(h.sentN),
metrics.Register(h.sentBytes),
)

return h, errs.Err
}

type Handler[T Gossipable] struct {
p2p.Handler
set Set[T]
targetResponseSize int

sentN prometheus.Counter
sentBytes prometheus.Counter
}

func (h Handler[T]) AppRequest(_ context.Context, _ ids.NodeID, _ time.Time, requestBytes []byte) ([]byte, error) {
Expand Down Expand Up @@ -86,5 +119,8 @@ func (h Handler[T]) AppRequest(_ context.Context, _ ids.NodeID, _ time.Time, req
Gossip: gossipBytes,
}

h.sentN.Add(float64(len(response.Gossip)))
h.sentBytes.Add(float64(responseSize))

return proto.Marshal(response)
}

0 comments on commit afa2b7a

Please sign in to comment.