-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathfsm.go
114 lines (105 loc) · 2.54 KB
/
fsm.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package raft
import (
"github.com/dzdx/raft/raftpb"
"context"
"github.com/dzdx/raft/util"
"io"
)
type IFsm interface {
Apply(context.Context, []DataFuture)
Snapshot(context.Context) (io.ReadCloser, error)
Restore(context.Context, io.ReadCloser) error
}
func entryToFutures(entry *raftpb.LogEntry) (*DataFuture, *IndexFuture) {
respChan := make(chan RespWithError, 1)
dataFuture := &DataFuture{
Data: entry.Data,
future: future{
respChan: respChan,
},
}
indexFuture := &IndexFuture{
Index: entry.Index,
future: future{
respChan: respChan,
},
}
return dataFuture, indexFuture
}
func (r *RaftNode) batchApplyToFSM() {
for r.lastApplied < r.commitIndex {
start := r.lastApplied + 1
end := util.MinUint64(r.commitIndex, r.lastApplied+uint64(r.config.MaxBatchApplyEntries))
entries, err := r.entryStore.GetEntries(start, end)
if err != nil {
r.logger.Errorf("get entries failed: %s", err.Error())
continue
}
count := len(entries)
futures := make([]IndexFuture, 0, count)
applyFutures := make([]DataFuture, 0, count)
for _, e := range entries {
dataFuture, indexFuture := entryToFutures(e)
futures = append(futures, *indexFuture)
switch e.LogType {
case raftpb.LogEntry_LogNoop:
indexFuture.Respond(nil, nil)
case raftpb.LogEntry_LogCommand:
applyFutures = append(applyFutures, *dataFuture)
case raftpb.LogEntry_LogConf:
indexFuture.Respond(nil, nil)
}
}
r.waitGroup.Start(func() {
r.fsm.Apply(r.ctx, applyFutures)
})
for _, future := range futures {
select {
case <-r.ctx.Done():
return
case resp := <-future.Response():
index := future.Index
r.lastApplied = index
if r.leaderState != nil {
// leader respond request
r.mutex.Lock()
reqFuture, ok := r.leaderState.inflightingFutures[index]
r.mutex.Unlock()
if ok {
reqFuture.Respond(resp.Resp, resp.Err)
r.mutex.Lock()
delete(r.leaderState.inflightingFutures, index)
r.mutex.Unlock()
}
}
}
}
}
r.logger.Debugf("applied log to %d", r.lastApplied)
}
func (r *RaftNode) runFSM() {
for {
select {
case <-r.ctx.Done():
return
case <-r.notifyApplyCh:
r.batchApplyToFSM()
case req := <-r.fsmSnapshotCh:
r.fsmSnapshot(&req)
}
}
}
func (r *RaftNode) fsmSnapshot(req *snapshotFuture) {
reader, err := r.fsm.Snapshot(r.ctx)
entry, err := r.entryStore.GetEntry(r.lastApplied)
if err != nil {
req.Respond(nil, err)
return
}
resp := &snapshotResp{
index: r.lastApplied,
term: entry.Term,
reader: reader,
}
req.Respond(resp, err)
}