Skip to content
This repository was archived by the owner on May 15, 2024. It is now read-only.

Commit

Permalink
Removed cancellation of all outstanding requests in discovery service (
Browse files Browse the repository at this point in the history
…hyperledger#62)

* Reworking of discovery client and services

1. Moved DisoveryClient interface to pkg/fab/discovery + mocks, all users can rely on this interface.
2. Reworked pkg/fab/discovery Send, now it returns a channel of Responses, cancellation of the context is delegated to users of this client. Error method was added to Response, so user can check if an error happened.
3. Fixed tests
4. Reworked chservice, localservice, fabricselection by using chan Response
5. Updated DiscoveryError by adding IsAccessDenied method, no need to expose const and compare msg every time.

Signed-off-by: kopaygorodsky <[email protected]>
  • Loading branch information
kopaygorodsky authored Apr 16, 2020
1 parent 594a8dc commit f7729f1
Showing 17 changed files with 500 additions and 257 deletions.
52 changes: 33 additions & 19 deletions pkg/client/common/discovery/dynamicdiscovery/chservice.go
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@ package dynamicdiscovery
import (
discclient "github.com/hyperledger/fabric-sdk-go/internal/github.com/hyperledger/fabric/discovery/client"
"github.com/hyperledger/fabric-sdk-go/pkg/client/common/random"
"github.com/hyperledger/fabric-sdk-go/pkg/common/errors/multi"
coptions "github.com/hyperledger/fabric-sdk-go/pkg/common/options"
contextAPI "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
@@ -73,17 +74,32 @@ func (s *ChannelService) doQueryPeers() ([]fab.Peer, error) {
}

reqCtx, cancel := reqContext.NewRequest(ctx, reqContext.WithTimeout(s.responseTimeout))

defer cancel()

req := fabdiscovery.NewRequest().OfChannel(s.channelID).AddPeersQuery()
responses, err := s.discoveryClient().Send(reqCtx, req, targets...)
responsesCh, err := s.discoveryClient().Send(reqCtx, req, targets...)

if err != nil {
if len(responses) == 0 {
return nil, errors.Wrapf(err, "error calling discover service send")
return nil, errors.Wrapf(err, "error calling discover service send")
}

var respErrors []error

for resp := range responsesCh {
peers, err := s.evaluate(ctx, resp)

if err == nil {
//got successful response, cancel all outstanding requests to other targets
cancel()

return peers, nil
}
logger.Warnf("Received %d response(s) and one or more errors from discovery client: %s", len(responses), err)

respErrors = append(respErrors, err)
}
return s.evaluate(ctx, responses)

return nil, errors.Wrap(multi.New(respErrors...), "no successful response received from any peer")
}

func (s *ChannelService) getTargets(ctx contextAPI.Client) ([]fab.PeerConfig, error) {
@@ -99,26 +115,24 @@ func (s *ChannelService) getTargets(ctx contextAPI.Client) ([]fab.PeerConfig, er
}

// evaluate validates the responses and returns the peers
func (s *ChannelService) evaluate(ctx contextAPI.Client, responses []fabdiscovery.Response) ([]fab.Peer, error) {
if len(responses) == 0 {
return nil, errors.New("no successful response received from any peer")
func (s *ChannelService) evaluate(clientCtx contextAPI.Client, response fabdiscovery.Response) ([]fab.Peer, error) {
if err := response.Error(); err != nil {
logger.Warnf("error from discovery request [%s]: %s", response.Target(), err)
return nil, newDiscoveryError(err, response.Target())
}

endpoints, err := response.ForChannel(s.channelID).Peers()

if err != nil {
logger.Warnf("error getting peers from discovery response. target: %s. %s", response.Target(), err)
return nil, newDiscoveryError(err, response.Target())
}

// TODO: In a future patch:
// - validate the signatures in the responses
// For now just pick the first successful response

var lastErr error
for _, response := range responses {
endpoints, err := response.ForChannel(s.channelID).Peers()
if err != nil {
lastErr = DiscoveryError(err)
logger.Warnf("error getting peers from discovery response: %s", lastErr)
continue
}
return s.asPeers(ctx, endpoints), nil
}
return nil, lastErr
return s.asPeers(clientCtx, endpoints), nil
}

func (s *ChannelService) asPeers(ctx contextAPI.Client, endpoints []*discclient.Peer) []fab.Peer {
68 changes: 43 additions & 25 deletions pkg/client/common/discovery/dynamicdiscovery/chservice_test.go
Original file line number Diff line number Diff line change
@@ -9,12 +9,12 @@ SPDX-License-Identifier: Apache-2.0
package dynamicdiscovery

import (
"errors"
fabDiscovery "github.com/hyperledger/fabric-sdk-go/pkg/fab/discovery"
"github.com/pkg/errors"
"testing"
"time"

"github.com/hyperledger/fabric-sdk-go/pkg/client/common/discovery"
clientmocks "github.com/hyperledger/fabric-sdk-go/pkg/client/common/mocks"
contextAPI "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
pfab "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
@@ -25,10 +25,6 @@ import (
"github.com/stretchr/testify/require"
)

const (
peer1MSP2 = "peer1.org2.com:9999"
)

func TestDiscoveryService(t *testing.T) {
ctx := mocks.NewMockContext(mspmocks.NewMockSigningIdentity("test", mspID1))
config := &config{
@@ -54,14 +50,15 @@ func TestDiscoveryService(t *testing.T) {
}
ctx.SetEndpointConfig(config)

discClient := clientmocks.NewMockDiscoveryClient()
discClient := fabDiscovery.NewMockDiscoveryClient()
discClient.SetResponses(
&clientmocks.MockDiscoverEndpointResponse{
&fabDiscovery.MockDiscoverEndpointResponse{
PeerEndpoints: []*discmocks.MockDiscoveryPeerEndpoint{},
Target: peer1MSP1,
},
)

SetClientProvider(func(ctx contextAPI.Client) (DiscoveryClient, error) {
SetClientProvider(func(ctx contextAPI.Client) (fabDiscovery.Client, error) {
return discClient, nil
})

@@ -71,10 +68,17 @@ func TestDiscoveryService(t *testing.T) {
WithRefreshInterval(10*time.Millisecond),
WithResponseTimeout(100*time.Millisecond),
WithErrorHandler(
func(ctxt fab.ClientContext, channelID string, err error) {
derr, ok := err.(DiscoveryError)
if ok && derr.Error() == AccessDenied {
service.Close()
func(ctx fab.ClientContext, channelID string, err error) {
derr, ok := errors.Cause(err).(DiscoveryError)

if ok {
//peer1MSP1 or peer1MSP2, depending on request
assert.NotEmpty(t, derr.Target())
assert.NotEmpty(t, derr.Error())

if derr.IsAccessDenied() {
service.Close()
}
}
},
),
@@ -87,14 +91,15 @@ func TestDiscoveryService(t *testing.T) {
assert.Equal(t, 0, len(peers))

discClient.SetResponses(
&clientmocks.MockDiscoverEndpointResponse{
&fabDiscovery.MockDiscoverEndpointResponse{
PeerEndpoints: []*discmocks.MockDiscoveryPeerEndpoint{
{
MSPID: mspID1,
Endpoint: peer1MSP1,
LedgerHeight: 5,
},
},
Target: peer1MSP2,
},
)

@@ -105,7 +110,7 @@ func TestDiscoveryService(t *testing.T) {
assert.Equalf(t, 1, len(peers), "Expected 1 peer")

discClient.SetResponses(
&clientmocks.MockDiscoverEndpointResponse{
&fabDiscovery.MockDiscoverEndpointResponse{
PeerEndpoints: []*discmocks.MockDiscoveryPeerEndpoint{
{
MSPID: mspID1,
@@ -118,6 +123,7 @@ func TestDiscoveryService(t *testing.T) {
LedgerHeight: 15,
},
},
Target: peer1MSP1,
},
)

@@ -134,8 +140,9 @@ func TestDiscoveryService(t *testing.T) {

// Non-fatal error
discClient.SetResponses(
&clientmocks.MockDiscoverEndpointResponse{
Error: errors.New("some transient error"),
&fabDiscovery.MockDiscoverEndpointResponse{
Error: errors.New("some transient error"),
Target: peer1MSP1,
},
)

@@ -146,10 +153,11 @@ func TestDiscoveryService(t *testing.T) {
require.NoError(t, err)
assert.Equalf(t, 2, len(peers), "Expected 2 peers")

// Fatal error (access denied can be due due a user being revoked)
// Fatal error (access denied can be due a user being revoked)
discClient.SetResponses(
&clientmocks.MockDiscoverEndpointResponse{
Error: errors.New(AccessDenied),
&fabDiscovery.MockDiscoverEndpointResponse{
Error: errors.New(AccessDenied),
Target: peer1MSP1,
},
)

@@ -159,6 +167,16 @@ func TestDiscoveryService(t *testing.T) {
_, err = service.GetPeers()
require.Error(t, err)
assert.Equal(t, "Discovery client has been closed", err.Error())

ctx = mocks.NewMockContext(mspmocks.NewMockSigningIdentity("test", mspID1))
ctx.SetEndpointConfig(mocks.NewMockEndpointConfig())

service, err = NewChannelService(ctx, mocks.NewMockMembership(), "noChannelPeers")
require.NoError(t, err)

_, err = service.GetPeers()
require.Error(t, err)
assert.Contains(t, err.Error(), "no channel peers configured for channel [noChannelPeers]")
}

func TestDiscoveryServiceWithNewOrgJoined(t *testing.T) {
@@ -188,14 +206,14 @@ func TestDiscoveryServiceWithNewOrgJoined(t *testing.T) {
}
ctx.SetEndpointConfig(config)

discClient := clientmocks.NewMockDiscoveryClient()
discClient := fabDiscovery.NewMockDiscoveryClient()
discClient.SetResponses(
&clientmocks.MockDiscoverEndpointResponse{
&fabDiscovery.MockDiscoverEndpointResponse{
PeerEndpoints: []*discmocks.MockDiscoveryPeerEndpoint{},
},
)

SetClientProvider(func(ctx contextAPI.Client) (DiscoveryClient, error) {
SetClientProvider(func(ctx contextAPI.Client) (fabDiscovery.Client, error) {
return discClient, nil
})

@@ -214,7 +232,7 @@ func TestDiscoveryServiceWithNewOrgJoined(t *testing.T) {
assert.Equal(t, 0, len(peers))

discClient.SetResponses(
&clientmocks.MockDiscoverEndpointResponse{
&fabDiscovery.MockDiscoverEndpointResponse{
PeerEndpoints: []*discmocks.MockDiscoveryPeerEndpoint{
{
MSPID: mspID1,
@@ -232,7 +250,7 @@ func TestDiscoveryServiceWithNewOrgJoined(t *testing.T) {
assert.Equalf(t, 1, len(peers), "Expected 1 peer")

discClient.SetResponses(
&clientmocks.MockDiscoverEndpointResponse{
&fabDiscovery.MockDiscoverEndpointResponse{
PeerEndpoints: []*discmocks.MockDiscoveryPeerEndpoint{
{
MSPID: mspID1,
Original file line number Diff line number Diff line change
@@ -10,9 +10,10 @@ package dynamicdiscovery

import (
contextAPI "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/discovery"
)

// SetClientProvider overrides the discovery client provider for unit tests
func SetClientProvider(provider func(ctx contextAPI.Client) (DiscoveryClient, error)) {
func SetClientProvider(provider func(ctx contextAPI.Client) (discovery.Client, error)) {
clientProvider = provider
}
43 changes: 43 additions & 0 deletions pkg/client/common/discovery/dynamicdiscovery/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
Copyright SecureKey Technologies Inc. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package dynamicdiscovery

import (
"strings"

"github.com/pkg/errors"
)

const (
// AccessDenied indicates that the user does not have permission to perform the operation
AccessDenied = "access denied"
)

// DiscoveryError is an error originating at the Discovery service
type DiscoveryError struct {
error
target string
}

//Error returns string representation with target
func (e DiscoveryError) Error() string {
return errors.Wrapf(e.error, "target [%s]", e.target).Error()
}

//Target returns url of the peer
func (e DiscoveryError) Target() string {
return e.target
}

//IsAccessDenied checks if response contains access denied msg
func (e DiscoveryError) IsAccessDenied() bool {
return strings.Contains(e.Error(), AccessDenied)
}

func newDiscoveryError(err error, target string) error {
return DiscoveryError{target: target, error: err}
}
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@ import (
"testing"
"time"

clientmocks "github.com/hyperledger/fabric-sdk-go/pkg/client/common/mocks"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/discovery"
contextAPI "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context"
pfab "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
discmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/discovery/mocks"
@@ -28,6 +28,7 @@ const (
mspID2 = "Org2MSP"

peer1MSP1 = "peer1.org1.com:9999"
peer1MSP2 = "peer1.org2.com:9999"
)

func TestLocalProvider(t *testing.T) {
@@ -46,14 +47,14 @@ func TestLocalProvider(t *testing.T) {
}
config.SetCustomNetworkPeerCfg([]pfab.NetworkPeer{peer1Org1, peer1Org2})

discClient := clientmocks.NewMockDiscoveryClient()
discClient := discovery.NewMockDiscoveryClient()
discClient.SetResponses(
&clientmocks.MockDiscoverEndpointResponse{
&discovery.MockDiscoverEndpointResponse{
PeerEndpoints: []*discmocks.MockDiscoveryPeerEndpoint{},
},
)

SetClientProvider(func(ctx contextAPI.Client) (DiscoveryClient, error) {
SetClientProvider(func(ctx contextAPI.Client) (discovery.Client, error) {
return discClient, nil
})

Loading

0 comments on commit f7729f1

Please sign in to comment.