Skip to content

Commit

Permalink
[FAB-5849] calibrate state transfer pace
Browse files Browse the repository at this point in the history
Blocks received from peers via state transfer are added to the payload
buffer right away regardless the payload buffer's size.

In cases when state transfer is much faster than the commit process,
blocks pile up in the payload buffer and the peer might be out of memory.

This change set makes the method that handles payload reception from remote
peers to add the payloads through the same code path that receives blocks
from the orderer, which blocks in case the payload buffer is too overpopulated.

Change-Id: I2fc1a916b809311a7d3aa0308b64d2127ad1ee60
Signed-off-by: yacovm <[email protected]>
Signed-off-by: Gari Singh <[email protected]>
Signed-off-by: yacovm <[email protected]>
  • Loading branch information
yacovm committed Oct 2, 2017
1 parent 8d07299 commit 2e27110
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 14 deletions.
11 changes: 6 additions & 5 deletions gossip/state/mocks/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,17 @@ func (*GossipMock) SuspectPeers(s api.PeerSuspector) {
panic("implement me")
}

func (*GossipMock) Send(msg *proto.GossipMessage, peers ...*comm.RemotePeer) {
panic("implement me")
func (g *GossipMock) Send(msg *proto.GossipMessage, peers ...*comm.RemotePeer) {
g.Called(msg, peers)
}

func (*GossipMock) Peers() []discovery.NetworkMember {
panic("implement me")
}

func (*GossipMock) PeersOfChannel(common.ChainID) []discovery.NetworkMember {
return nil
func (g *GossipMock) PeersOfChannel(chainID common.ChainID) []discovery.NetworkMember {
args := g.Called(chainID)
return args.Get(0).([]discovery.NetworkMember)
}

func (*GossipMock) UpdateMetadata(metadata []byte) {
Expand All @@ -60,7 +61,7 @@ func (*GossipMock) Gossip(msg *proto.GossipMessage) {
func (g *GossipMock) Accept(acceptor common.MessageAcceptor, passThrough bool) (<-chan *proto.GossipMessage, <-chan proto.ReceivedMessage) {
args := g.Called(acceptor, passThrough)
if args.Get(0) == nil {
return nil, args.Get(1).(<-chan proto.ReceivedMessage)
return nil, args.Get(1).(chan proto.ReceivedMessage)
}
return args.Get(0).(<-chan *proto.GossipMessage), nil
}
Expand Down
2 changes: 2 additions & 0 deletions gossip/state/mocks/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/discovery"
proto "github.com/hyperledger/fabric/protos/gossip"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand All @@ -34,6 +35,7 @@ func TestGossipMock(t *testing.T) {
return c
}
g.On("Accept", mock.Anything, false).Return(mkChan(), nil)
g.On("PeersOfChannel", mock.Anything).Return([]discovery.NetworkMember{})
a, b := g.Accept(func(o interface{}) bool {
return true
}, false)
Expand Down
5 changes: 3 additions & 2 deletions gossip/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,9 +379,10 @@ func (s *GossipStateProviderImpl) handleStateResponse(msg proto.ReceivedMessage)
if max < payload.SeqNum {
max = payload.SeqNum
}
err := s.payloads.Push(payload)

err := s.addPayload(payload, blocking)
if err != nil {
logger.Warningf("Payload with sequence number %d was received earlier", payload.SeqNum)
logger.Warningf("Payload with sequence number %d wasn't added to payload buffer: %v", payload.SeqNum, err)
}
}
return max, nil
Expand Down
90 changes: 83 additions & 7 deletions gossip/state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@ import (
"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/comm"
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/discovery"
"github.com/hyperledger/fabric/gossip/gossip"
"github.com/hyperledger/fabric/gossip/identity"
"github.com/hyperledger/fabric/gossip/state/mocks"
gutil "github.com/hyperledger/fabric/gossip/util"
pcomm "github.com/hyperledger/fabric/protos/common"
proto "github.com/hyperledger/fabric/protos/gossip"
"github.com/op/go-logging"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand All @@ -63,6 +65,7 @@ type joinChanMsg struct {

func init() {
gutil.SetupTestLogging()
logging.SetLevel(logging.DEBUG, gutil.LoggingStateModule)
}

// SequenceNumber returns the sequence number of the block that the message
Expand Down Expand Up @@ -273,7 +276,7 @@ func TestNilDirectMsg(t *testing.T) {
mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil)
g := &mocks.GossipMock{}
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
p := newPeerNodeWithGossip(newGossipConfig(0), mc, noopPeerIdentityAcceptor, g)
defer p.shutdown()
p.s.handleStateRequest(nil)
Expand All @@ -290,7 +293,7 @@ func TestNilAddPayload(t *testing.T) {
mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil)
g := &mocks.GossipMock{}
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
p := newPeerNodeWithGossip(newGossipConfig(0), mc, noopPeerIdentityAcceptor, g)
defer p.shutdown()
err := p.s.AddPayload(nil)
Expand All @@ -303,7 +306,7 @@ func TestAddPayloadLedgerUnavailable(t *testing.T) {
mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil)
g := &mocks.GossipMock{}
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
p := newPeerNodeWithGossip(newGossipConfig(0), mc, noopPeerIdentityAcceptor, g)
defer p.shutdown()
// Simulate a problem in the ledger
Expand All @@ -324,6 +327,77 @@ func TestAddPayloadLedgerUnavailable(t *testing.T) {
assert.Contains(t, err.Error(), "cannot query ledger")
}

func TestLargeBlockGap(t *testing.T) {
// Scenario: the peer knows of a peer who has a ledger height much higher
// than itself (500 blocks higher).
// The peer needs to ask blocks in a way such that the size of the payload buffer
// never rises above a certain threshold.

mc := &mockCommitter{}
blocksPassedToLedger := make(chan uint64, 200)
mc.On("Commit", mock.Anything).Run(func(arg mock.Arguments) {
blocksPassedToLedger <- arg.Get(0).(*pcomm.Block).Header.Number
})
msgsFromPeer := make(chan proto.ReceivedMessage)
mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil)
g := &mocks.GossipMock{}
metaState := NewNodeMetastate(500)
md, _ := metaState.Bytes()
membership := []discovery.NetworkMember{
{
PKIid: common.PKIidType("a"),
Endpoint: "a",
Metadata: md,
}}
g.On("PeersOfChannel", mock.Anything).Return(membership)
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
g.On("Accept", mock.Anything, true).Return(nil, msgsFromPeer)
g.On("Send", mock.Anything, mock.Anything).Run(func(arguments mock.Arguments) {
msg := arguments.Get(0).(*proto.GossipMessage)
// The peer requested a state request
req := msg.GetStateRequest()
// Construct a skeleton for the response
res := &proto.GossipMessage{
Nonce: msg.Nonce,
Channel: []byte(util.GetTestChainID()),
Content: &proto.GossipMessage_StateResponse{
StateResponse: &proto.RemoteStateResponse{},
},
}
// Populate the response with payloads according to what the peer asked
for seq := req.StartSeqNum; seq <= req.EndSeqNum; seq++ {
rawblock := pcomm.NewBlock(seq, []byte{})
b, _ := pb.Marshal(rawblock)
payload := &proto.Payload{
SeqNum: seq,
Data: b,
}
res.GetStateResponse().Payloads = append(res.GetStateResponse().Payloads, payload)
}
// Finally, send the response down the channel the peer expects to receive it from
sMsg, _ := res.NoopSign()
msgsFromPeer <- &comm.ReceivedMessageImpl{
SignedGossipMessage: sMsg,
}
})
p := newPeerNodeWithGossip(newGossipConfig(0), mc, noopPeerIdentityAcceptor, g)
defer p.shutdown()

// Process blocks at a speed of 20 Millisecond for each block.
// The imaginative peer that responds to state
// If the payload buffer expands above defMaxBlockDistance*2 + defAntiEntropyBatchSize blocks, fail the test
blockProcessingTime := 20 * time.Millisecond // 10 seconds for total 500 blocks
expectedSequence := 1
for expectedSequence < 500 {
blockSeq := <-blocksPassedToLedger
assert.Equal(t, expectedSequence, int(blockSeq))
// Ensure payload buffer isn't over-populated
assert.True(t, p.s.payloads.Size() <= defMaxBlockDistance*2+defAntiEntropyBatchSize, "payload buffer size is %d", p.s.payloads.Size())
expectedSequence++
time.Sleep(blockProcessingTime)
}
}

func TestOverPopulation(t *testing.T) {
// Scenario: Add to the state provider blocks
// with a gap in between, and ensure that the payload buffer
Expand All @@ -338,7 +412,7 @@ func TestOverPopulation(t *testing.T) {
mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil)
g := &mocks.GossipMock{}
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
p := newPeerNode(newGossipConfig(0), mc, noopPeerIdentityAcceptor)
defer p.shutdown()

Expand Down Expand Up @@ -400,7 +474,7 @@ func TestBlockingEnqueue(t *testing.T) {
mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil)
g := &mocks.GossipMock{}
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
p := newPeerNode(newGossipConfig(0), mc, noopPeerIdentityAcceptor)
defer p.shutdown()

Expand Down Expand Up @@ -461,7 +535,8 @@ func TestFailures(t *testing.T) {
mc.On("LedgerHeight", mock.Anything).Return(uint64(0), nil)
g := &mocks.GossipMock{}
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
g.On("PeersOfChannel", mock.Anything).Return([]discovery.NetworkMember{})
assert.Panics(t, func() {
newPeerNodeWithGossip(newGossipConfig(0), mc, noopPeerIdentityAcceptor, g)
})
Expand Down Expand Up @@ -519,7 +594,8 @@ func TestGossipReception(t *testing.T) {
g.On("Accept", mock.Anything, false).Return(rmc, nil).Run(func(_ mock.Arguments) {
signalChan <- struct{}{}
})
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
g.On("PeersOfChannel", mock.Anything).Return([]discovery.NetworkMember{})
mc := &mockCommitter{}
receivedChan := make(chan struct{})
mc.On("Commit", mock.Anything).Run(func(arguments mock.Arguments) {
Expand Down

0 comments on commit 2e27110

Please sign in to comment.