Skip to content

Commit

Permalink
Add SDK Sampling interface (ava-labs#1877)
Browse files Browse the repository at this point in the history
  • Loading branch information
joshua-kim authored Aug 23, 2023
1 parent a37bb0a commit a3c4649
Show file tree
Hide file tree
Showing 8 changed files with 501 additions and 111 deletions.
11 changes: 5 additions & 6 deletions network/p2p/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type Client struct {
handlerPrefix []byte
router *Router
sender common.AppSender
// nodeSampler is used to select nodes to route AppRequestAny to
nodeSampler NodeSampler
}

// AppRequestAny issues an AppRequest to an arbitrary node decided by Client.
Expand All @@ -51,15 +53,12 @@ func (c *Client) AppRequestAny(
appRequestBytes []byte,
onResponse AppResponseCallback,
) error {
c.router.lock.RLock()
peers := c.router.peers.Sample(1)
c.router.lock.RUnlock()

if len(peers) != 1 {
sampled := c.nodeSampler.Sample(ctx, 1)
if len(sampled) != 1 {
return ErrNoPeers
}

nodeIDs := set.Of(peers[0])
nodeIDs := set.Of(sampled...)
return c.AppRequest(ctx, nodeIDs, appRequestBytes, onResponse)
}

Expand Down
17 changes: 17 additions & 0 deletions network/p2p/node_sampler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package p2p

import (
"context"

"github.com/ava-labs/avalanchego/ids"
)

// NodeSampler samples nodes in network
type NodeSampler interface {
// Sample returns at most [limit] nodes. This may return fewer nodes if
// fewer than [limit] are available.
Sample(ctx context.Context, limit int) []ids.NodeID
}
50 changes: 50 additions & 0 deletions network/p2p/peers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package p2p

import (
"context"
"sync"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow/validators"
"github.com/ava-labs/avalanchego/utils/set"
"github.com/ava-labs/avalanchego/version"
)

var (
_ validators.Connector = (*Peers)(nil)
_ NodeSampler = (*Peers)(nil)
)

// Peers contains a set of nodes that we are connected to.
type Peers struct {
lock sync.RWMutex
peers set.SampleableSet[ids.NodeID]
}

func (p *Peers) Connected(_ context.Context, nodeID ids.NodeID, _ *version.Application) error {
p.lock.Lock()
defer p.lock.Unlock()

p.peers.Add(nodeID)

return nil
}

func (p *Peers) Disconnected(_ context.Context, nodeID ids.NodeID) error {
p.lock.Lock()
defer p.lock.Unlock()

p.peers.Remove(nodeID)

return nil
}

func (p *Peers) Sample(_ context.Context, limit int) []ids.NodeID {
p.lock.RLock()
defer p.lock.RUnlock()

return p.peers.Sample(limit)
}
148 changes: 148 additions & 0 deletions network/p2p/peers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package p2p

import (
"context"
"testing"

"github.com/stretchr/testify/require"

"go.uber.org/mock/gomock"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/math"
"github.com/ava-labs/avalanchego/utils/set"
)

// Sample should always return up to [limit] peers, and less if fewer than
// [limit] peers are available.
func TestPeersSample(t *testing.T) {
nodeID1 := ids.GenerateTestNodeID()
nodeID2 := ids.GenerateTestNodeID()
nodeID3 := ids.GenerateTestNodeID()

tests := []struct {
name string
connected set.Set[ids.NodeID]
disconnected set.Set[ids.NodeID]
limit int
}{
{
name: "no peers",
limit: 1,
},
{
name: "one peer connected",
connected: set.Of(nodeID1),
limit: 1,
},
{
name: "multiple peers connected",
connected: set.Of(nodeID1, nodeID2, nodeID3),
limit: 1,
},
{
name: "peer connects and disconnects - 1",
connected: set.Of(nodeID1),
disconnected: set.Of(nodeID1),
limit: 1,
},
{
name: "peer connects and disconnects - 2",
connected: set.Of(nodeID1, nodeID2),
disconnected: set.Of(nodeID2),
limit: 1,
},
{
name: "peer connects and disconnects - 2",
connected: set.Of(nodeID1, nodeID2, nodeID3),
disconnected: set.Of(nodeID1, nodeID2),
limit: 1,
},
{
name: "less than limit peers",
connected: set.Of(nodeID1, nodeID2, nodeID3),
limit: 4,
},
{
name: "limit peers",
connected: set.Of(nodeID1, nodeID2, nodeID3),
limit: 3,
},
{
name: "more than limit peers",
connected: set.Of(nodeID1, nodeID2, nodeID3),
limit: 2,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require := require.New(t)
peers := &Peers{}

for connected := range tt.connected {
require.NoError(peers.Connected(context.Background(), connected, nil))
}

for disconnected := range tt.disconnected {
require.NoError(peers.Disconnected(context.Background(), disconnected))
}

sampleable := set.Set[ids.NodeID]{}
sampleable.Union(tt.connected)
sampleable.Difference(tt.disconnected)

sampled := peers.Sample(context.Background(), tt.limit)
require.Len(sampled, math.Min(tt.limit, len(sampleable)))
require.Subset(sampleable, sampled)
})
}
}

func TestAppRequestAnyNodeSelection(t *testing.T) {
tests := []struct {
name string
peers []ids.NodeID
expected error
}{
{
name: "no peers",
expected: ErrNoPeers,
},
{
name: "has peers",
peers: []ids.NodeID{ids.GenerateTestNodeID()},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require := require.New(t)
ctrl := gomock.NewController(t)
mockAppSender := common.NewMockSender(ctrl)

expectedCalls := 0
if tt.expected == nil {
expectedCalls = 1
}
mockAppSender.EXPECT().SendAppRequest(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(expectedCalls)

r := NewRouter(logging.NoLog{}, mockAppSender)
peers := &Peers{}
for _, peer := range tt.peers {
require.NoError(peers.Connected(context.Background(), peer, nil))
}

client, err := r.RegisterAppProtocol(1, nil, peers)
require.NoError(err)

err = client.AppRequestAny(context.Background(), []byte("foobar"), nil)
require.ErrorIs(err, tt.expected)
})
}
}
26 changes: 3 additions & 23 deletions network/p2p/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,14 @@ import (
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/message"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/snow/validators"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/set"
"github.com/ava-labs/avalanchego/version"
)

var (
ErrExistingAppProtocol = errors.New("existing app protocol")
ErrUnrequestedResponse = errors.New("unrequested response")

_ common.AppHandler = (*Router)(nil)
_ validators.Connector = (*Router)(nil)
_ common.AppHandler = (*Router)(nil)
)

// Router routes incoming application messages to the corresponding registered
Expand All @@ -42,7 +38,6 @@ type Router struct {
pendingAppRequests map[uint32]AppResponseCallback
pendingCrossChainAppRequests map[uint32]CrossChainAppResponseCallback
requestID uint32
peers set.SampleableSet[ids.NodeID]
}

// NewRouter returns a new instance of Router
Expand All @@ -56,26 +51,10 @@ func NewRouter(log logging.Logger, sender common.AppSender) *Router {
}
}

func (r *Router) Connected(_ context.Context, nodeID ids.NodeID, _ *version.Application) error {
r.lock.Lock()
defer r.lock.Unlock()

r.peers.Add(nodeID)
return nil
}

func (r *Router) Disconnected(_ context.Context, nodeID ids.NodeID) error {
r.lock.Lock()
defer r.lock.Unlock()

r.peers.Remove(nodeID)
return nil
}

// RegisterAppProtocol reserves an identifier for an application protocol and
// returns a Client that can be used to send messages for the corresponding
// protocol.
func (r *Router) RegisterAppProtocol(handlerID uint64, handler Handler) (*Client, error) {
func (r *Router) RegisterAppProtocol(handlerID uint64, handler Handler, nodeSampler NodeSampler) (*Client, error) {
r.lock.Lock()
defer r.lock.Unlock()

Expand All @@ -94,6 +73,7 @@ func (r *Router) RegisterAppProtocol(handlerID uint64, handler Handler) (*Client
handlerPrefix: binary.AppendUvarint(nil, handlerID),
sender: r.sender,
router: r,
nodeSampler: nodeSampler,
}, nil
}

Expand Down
Loading

0 comments on commit a3c4649

Please sign in to comment.