forked from hyperledger/fabric
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhandler.go
294 lines (262 loc) · 9.89 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package peer
import (
"fmt"
"sync"
"time"
"github.com/golang/protobuf/proto"
"github.com/looplab/fsm"
"github.com/spf13/viper"
pb "github.com/hyperledger/fabric/protos/peer"
)
// Handler peer handler implementation.
type Handler struct {
chatMutex sync.Mutex
ToPeerEndpoint *pb.PeerEndpoint
Coordinator MessageHandlerCoordinator
ChatStream ChatStream
doneChan chan struct{}
FSM *fsm.FSM
initiatedStream bool // Was the stream initiated within this Peer
registered bool
syncBlocks chan *pb.SyncBlocks
}
// NewPeerHandler returns a new Peer handler
// Is instance of HandlerFactory
func NewPeerHandler(coord MessageHandlerCoordinator, stream ChatStream, initiatedStream bool, nextHandler MessageHandler) (MessageHandler, error) {
d := &Handler{
ChatStream: stream,
initiatedStream: initiatedStream,
Coordinator: coord,
}
d.doneChan = make(chan struct{})
d.FSM = fsm.NewFSM(
"created",
fsm.Events{
{Name: pb.Message_DISC_HELLO.String(), Src: []string{"created"}, Dst: "established"},
{Name: pb.Message_DISC_GET_PEERS.String(), Src: []string{"established"}, Dst: "established"},
{Name: pb.Message_DISC_PEERS.String(), Src: []string{"established"}, Dst: "established"},
{Name: pb.Message_SYNC_BLOCK_ADDED.String(), Src: []string{"established"}, Dst: "established"},
{Name: pb.Message_SYNC_GET_BLOCKS.String(), Src: []string{"established"}, Dst: "established"},
{Name: pb.Message_SYNC_BLOCKS.String(), Src: []string{"established"}, Dst: "established"},
{Name: pb.Message_SYNC_STATE_GET_SNAPSHOT.String(), Src: []string{"established"}, Dst: "established"},
{Name: pb.Message_SYNC_STATE_SNAPSHOT.String(), Src: []string{"established"}, Dst: "established"},
{Name: pb.Message_SYNC_STATE_GET_DELTAS.String(), Src: []string{"established"}, Dst: "established"},
{Name: pb.Message_SYNC_STATE_DELTAS.String(), Src: []string{"established"}, Dst: "established"},
},
fsm.Callbacks{
"enter_state": func(e *fsm.Event) { d.enterState(e) },
"before_" + pb.Message_DISC_HELLO.String(): func(e *fsm.Event) { d.beforeHello(e) },
"before_" + pb.Message_DISC_GET_PEERS.String(): func(e *fsm.Event) { d.beforeGetPeers(e) },
"before_" + pb.Message_DISC_PEERS.String(): func(e *fsm.Event) { d.beforePeers(e) },
"before_" + pb.Message_SYNC_BLOCK_ADDED.String(): func(e *fsm.Event) { d.beforeBlockAdded(e) },
},
)
// If the stream was initiated from this Peer, send an Initial HELLO message
if d.initiatedStream {
// Send intiial Hello
helloMessage, err := d.Coordinator.NewOpenchainDiscoveryHello()
if err != nil {
return nil, fmt.Errorf("Error getting new HelloMessage: %s", err)
}
if err := d.SendMessage(helloMessage); err != nil {
return nil, fmt.Errorf("Error creating new Peer Handler, error returned sending %s: %s", pb.Message_DISC_HELLO, err)
}
}
return d, nil
}
func (d *Handler) enterState(e *fsm.Event) {
peerLogger.Debugf("The Peer's bi-directional stream to %s is %s, from event %s\n", d.ToPeerEndpoint, e.Dst, e.Event)
}
func (d *Handler) deregister() error {
var err error
if d.registered {
err = d.Coordinator.DeregisterHandler(d)
//doneChan is created and waiting for registered handlers only
d.doneChan <- struct{}{}
d.registered = false
}
return err
}
// To return the PeerEndpoint this Handler is connected to.
func (d *Handler) To() (pb.PeerEndpoint, error) {
if d.ToPeerEndpoint == nil {
return pb.PeerEndpoint{}, fmt.Errorf("No peer endpoint for handler")
}
return *(d.ToPeerEndpoint), nil
}
// Stop stops this handler, which will trigger the Deregister from the MessageHandlerCoordinator.
func (d *Handler) Stop() error {
// Deregister the handler
err := d.deregister()
if err != nil {
return fmt.Errorf("Error stopping MessageHandler: %s", err)
}
return nil
}
func (d *Handler) beforeHello(e *fsm.Event) {
peerLogger.Debugf("Received %s, parsing out Peer identification", e.Event)
// Parse out the PeerEndpoint information
if _, ok := e.Args[0].(*pb.Message); !ok {
e.Cancel(fmt.Errorf("Received unexpected message type"))
return
}
msg := e.Args[0].(*pb.Message)
helloMessage := &pb.HelloMessage{}
err := proto.Unmarshal(msg.Payload, helloMessage)
if err != nil {
e.Cancel(fmt.Errorf("Error unmarshalling HelloMessage: %s", err))
return
}
// Store the PeerEndpoint
d.ToPeerEndpoint = helloMessage.PeerEndpoint
peerLogger.Debugf("Received %s from endpoint=%s", e.Event, helloMessage)
// If security enabled, need to verify the signature on the hello message
if SecurityEnabled() {
if err := d.Coordinator.GetSecHelper().Verify(helloMessage.PeerEndpoint.PkiID, msg.Signature, msg.Payload); err != nil {
e.Cancel(fmt.Errorf("Error Verifying signature for received HelloMessage: %s", err))
return
}
peerLogger.Debugf("Verified signature for %s", e.Event)
}
if d.initiatedStream == false {
// Did NOT intitiate the stream, need to send back HELLO
peerLogger.Debugf("Received %s, sending back %s", e.Event, pb.Message_DISC_HELLO.String())
// Send back out PeerID information in a Hello
helloMessage, err := d.Coordinator.NewOpenchainDiscoveryHello()
if err != nil {
e.Cancel(fmt.Errorf("Error getting new HelloMessage: %s", err))
return
}
if err := d.SendMessage(helloMessage); err != nil {
e.Cancel(fmt.Errorf("Error sending response to %s: %s", e.Event, err))
return
}
}
// Register
err = d.Coordinator.RegisterHandler(d)
if err != nil {
e.Cancel(fmt.Errorf("Error registering Handler: %s", err))
} else {
// Registered successfully
d.registered = true
otherPeer := d.ToPeerEndpoint.Address
if !d.Coordinator.GetDiscHelper().FindNode(otherPeer) {
if ok := d.Coordinator.GetDiscHelper().AddNode(otherPeer); !ok {
peerLogger.Warningf("Unable to add peer %v to discovery list", otherPeer)
}
err = d.Coordinator.StoreDiscoveryList()
if err != nil {
peerLogger.Error(err)
}
}
go d.start()
}
}
func (d *Handler) beforeGetPeers(e *fsm.Event) {
peersMessage, err := d.Coordinator.GetPeers()
if err != nil {
e.Cancel(fmt.Errorf("Error Getting Peers: %s", err))
return
}
data, err := proto.Marshal(peersMessage)
if err != nil {
e.Cancel(fmt.Errorf("Error Marshalling PeersMessage: %s", err))
return
}
peerLogger.Debugf("Sending back %s", pb.Message_DISC_PEERS.String())
if err := d.SendMessage(&pb.Message{Type: pb.Message_DISC_PEERS, Payload: data}); err != nil {
e.Cancel(err)
}
}
func (d *Handler) beforePeers(e *fsm.Event) {
peerLogger.Debugf("Received %s, grabbing peers message", e.Event)
// Parse out the PeerEndpoint information
if _, ok := e.Args[0].(*pb.Message); !ok {
e.Cancel(fmt.Errorf("Received unexpected message type"))
return
}
msg := e.Args[0].(*pb.Message)
peersMessage := &pb.PeersMessage{}
err := proto.Unmarshal(msg.Payload, peersMessage)
if err != nil {
e.Cancel(fmt.Errorf("Error unmarshalling PeersMessage: %s", err))
return
}
peerLogger.Debugf("Received PeersMessage with Peers: %s", peersMessage)
d.Coordinator.PeersDiscovered(peersMessage)
// // Can be used to demonstrate Broadcast function
// if viper.GetString("peer.id") == "jdoe" {
// d.Coordinator.Broadcast(&pb.Message{Type: pb.Message_UNDEFINED})
// }
}
func (d *Handler) beforeBlockAdded(e *fsm.Event) {
peerLogger.Debugf("Received message: %s", e.Event)
msg, ok := e.Args[0].(*pb.Message)
if !ok {
e.Cancel(fmt.Errorf("Received unexpected message type"))
return
}
// Add the block and any delta state to the ledger
_ = msg
}
func (d *Handler) when(stateToCheck string) bool {
return d.FSM.Is(stateToCheck)
}
// HandleMessage handles the Openchain messages for the Peer.
func (d *Handler) HandleMessage(msg *pb.Message) error {
peerLogger.Debugf("Handling Message of type: %s ", msg.Type)
if d.FSM.Cannot(msg.Type.String()) {
return fmt.Errorf("Peer FSM cannot handle message (%s) with payload size (%d) while in state: %s", msg.Type.String(), len(msg.Payload), d.FSM.Current())
}
err := d.FSM.Event(msg.Type.String(), msg)
if err != nil {
if _, ok := err.(*fsm.NoTransitionError); !ok {
// Only allow NoTransitionError's, all others are considered true error.
return fmt.Errorf("Peer FSM failed while handling message (%s): current state: %s, error: %s", msg.Type.String(), d.FSM.Current(), err)
//t.Error("expected only 'NoTransitionError'")
}
}
return nil
}
// SendMessage sends a message to the remote PEER through the stream
func (d *Handler) SendMessage(msg *pb.Message) error {
//make sure Sends are serialized. Also make sure everyone uses SendMessage
//instead of calling Send directly on the grpc stream
d.chatMutex.Lock()
defer d.chatMutex.Unlock()
peerLogger.Debugf("Sending message to stream of type: %s ", msg.Type)
err := d.ChatStream.Send(msg)
if err != nil {
return fmt.Errorf("Error Sending message through ChatStream: %s", err)
}
return nil
}
// start starts the Peer server function
func (d *Handler) start() error {
discPeriod := viper.GetDuration("peer.discovery.period")
tickChan := time.NewTicker(discPeriod).C
peerLogger.Debug("Starting Peer discovery service")
for {
select {
case <-tickChan:
if err := d.SendMessage(&pb.Message{Type: pb.Message_DISC_GET_PEERS}); err != nil {
peerLogger.Errorf("Error sending %s during handler discovery tick: %s", pb.Message_DISC_GET_PEERS, err)
}
case <-d.doneChan:
peerLogger.Debug("Stopping discovery service")
return nil
}
}
}