forked from hashicorp/raft
-
Notifications
You must be signed in to change notification settings - Fork 0
/
replication.go
140 lines (124 loc) · 3.64 KB
/
replication.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package raft
import (
"log"
"net"
)
type followerReplication struct {
peer net.Addr
inflight *inflight
stopCh chan uint64
triggerCh chan struct{}
currentTerm uint64
matchIndex uint64
nextIndex uint64
}
// replicate is a long running routine that is used to manage
// the process of replicating logs to our followers
func (r *Raft) replicate(s *followerReplication) {
// Start an async heartbeating routing
stopHeartbeat := make(chan struct{})
defer close(stopHeartbeat)
r.goFunc(func() { r.heartbeat(s, stopHeartbeat) })
shouldStop := false
for !shouldStop {
select {
case <-s.triggerCh:
shouldStop = r.replicateTo(s, r.getLastLog())
case <-randomTimeout(r.conf.CommitTimeout):
shouldStop = r.replicateTo(s, r.getLastLog())
case maxIndex := <-s.stopCh:
// Make a best effort to replicate up to this index
if maxIndex > 0 {
r.replicateTo(s, maxIndex)
}
return
}
}
}
// replicateTo is used to replicate the logs up to a given last index.
// If the follower log is behind, we take care to bring them up to date
func (r *Raft) replicateTo(s *followerReplication, lastIndex uint64) (shouldStop bool) {
// Create the base request
var l Log
var req AppendEntriesRequest
var resp AppendEntriesResponse
START:
req = AppendEntriesRequest{
Term: s.currentTerm,
Leader: r.localAddr,
LeaderCommitIndex: r.getCommitIndex(),
}
// Get the previous log entry based on the nextIndex.
// Guard for the first index, since there is no 0 log entry
if s.nextIndex > 1 {
if err := r.logs.GetLog(s.nextIndex-1, &l); err != nil {
log.Printf("[ERR] Failed to get log at index %d: %v",
s.nextIndex-1, err)
return
}
// Set the previous index and term (0 if nextIndex is 1)
req.PrevLogEntry = l.Index
req.PrevLogTerm = l.Term
} else {
req.PrevLogEntry = 0
req.PrevLogTerm = 0
}
// Append up to MaxAppendEntries or up to the lastIndex
req.Entries = make([]*Log, 0, r.conf.MaxAppendEntries)
maxIndex := min(s.nextIndex+uint64(r.conf.MaxAppendEntries)-1, lastIndex)
for i := s.nextIndex; i <= maxIndex; i++ {
oldLog := new(Log)
if err := r.logs.GetLog(i, oldLog); err != nil {
log.Printf("[ERR] Failed to get log at index %d: %v", i, err)
return
}
req.Entries = append(req.Entries, oldLog)
}
// Make the RPC call
if err := r.trans.AppendEntries(s.peer, &req, &resp); err != nil {
log.Printf("[ERR] Failed to AppendEntries to %v: %v", s.peer, err)
return
}
// Check for a newer term, stop running
if resp.Term > req.Term {
return true
}
// Update the s based on success
if resp.Success {
// Mark any inflight logs as committed
for i := s.matchIndex; i <= maxIndex; i++ {
s.inflight.Commit(i, s.peer)
}
s.matchIndex = maxIndex
s.nextIndex = maxIndex + 1
} else {
log.Printf("[WARN] AppendEntries to %v rejected, sending older logs", s.peer)
s.nextIndex = max(min(s.nextIndex-1, resp.LastLog+1), 1)
s.matchIndex = s.nextIndex - 1
}
// Check if there are more logs to replicate
if s.nextIndex <= lastIndex {
goto START
}
return
}
// hearbeat is used to periodically invoke AppendEntries on a peer
// to ensure they don't time out. This is done async of replicate(),
// since that routine could potentially be blocked on disk IO
func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) {
req := AppendEntriesRequest{
Term: s.currentTerm,
Leader: r.localAddr,
}
var resp AppendEntriesResponse
for {
select {
case <-randomTimeout(r.conf.HeartbeatTimeout / 4):
if err := r.trans.AppendEntries(s.peer, &req, &resp); err != nil {
log.Printf("[ERR] Failed to heartbeat to %v: %v", s.peer, err)
}
case <-stopCh:
return
}
}
}