Skip to content

Commit

Permalink
send previndex in heartbeat, all part2b test passed (testbackup ocass…
Browse files Browse the repository at this point in the history
…ionally run out of time)
  • Loading branch information
JiayuZzz committed Nov 22, 2018
1 parent 47f0382 commit bccb440
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 20 deletions.
2 changes: 1 addition & 1 deletion src/raft/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ func (cfg *config) one(cmd int, expectedServers int, retry bool) int {
t1 := time.Now()
for time.Since(t1).Seconds() < 2 {
nd, cmd1 := cfg.nCommitted(index)
//DPrintf("%v servers think %v is committed\n",nd,index)
DPrintf("%v servers think %v is committed\n",nd,index)
if nd > 0 && nd >= expectedServers {
// committed
if cmd2, ok := cmd1.(int); ok && cmd2 == cmd {
Expand Down
47 changes: 28 additions & 19 deletions src/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
return
}

// if candidate's log is more up-to-date
var upToDate bool
lastIndex := len(rf.log)-1
if rf.log[lastIndex].Term < args.LastLogTerm {
Expand All @@ -215,10 +216,11 @@ func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
}

if !upToDate {
DPrintf("%v reject %v because log is not up-to-date\n",rf.me,args.CandidateID)
DPrintf("server %v reject grant candidate %v in term % v because log is not up-to-date\n",rf.me,args.CandidateID,args.Term)
return
}

// if candidate's term is greater, grant
if args.Term > rf.currentTerm {
rf.votedFor = args.CandidateID
rf.currentTerm = args.Term
Expand All @@ -237,11 +239,11 @@ func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
reply.Success = true
// success only if leader is valid and prevEntry matched
reply.Success = false

if args.Term<rf.currentTerm {
reply.Term = rf.currentTerm
reply.Success = false
return
}
rf.resetTimerCH <- struct{}{}
Expand All @@ -255,20 +257,23 @@ func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply
}

reply.Term = rf.currentTerm
if args.Entries!=nil {
// check prev
if args.PrevLogIndex<len(rf.log) && rf.log[args.PrevLogIndex].Term == args.PrevLogTerm {

last:=0 // last entry matched
prevEntryMatch := args.PrevLogIndex<len(rf.log) && rf.log[args.PrevLogIndex].Term == args.PrevLogTerm

if prevEntryMatch {
last = args.PrevLogIndex
reply.Success = true
if args.Entries!=nil {
rf.log = rf.log[0:args.PrevLogIndex+1] // delete conflict entries
rf.log = append(rf.log, args.Entries[0])
DPrintf("%v receive %v, at: %v\n", rf.me, args.Term, len(rf.log)-1)
} else {
reply.Success = false // not match
last+=1
DPrintf("server %v receive entry term %v, at index: %v\n", rf.me, args.Term, len(rf.log)-1)
}
}

if args.LeaderCommit > rf.commitIndex {
rf.commitToIndex(min(args.LeaderCommit, len(rf.log)-1))
DPrintf("%v set commit to %v", rf.me, rf.commitIndex)
if args.LeaderCommit > rf.commitIndex && prevEntryMatch {
rf.commitToIndex(min(args.LeaderCommit, last))
}
}

Expand Down Expand Up @@ -450,6 +455,7 @@ func (rf *Raft) startElection() {
if rf.votedCnt == len(rf.peers)/2 { // already vote for itself
//fmt.Printf("%v became leader for term %v\n",rf.me, rf.currentTerm)
rf.electionTimer.Stop() // stop timer until expired
DPrintf("server %v become leader\n",rf.me)
rf.setRole(Leader)
rf.leaderInit()
go rf.broadcast() // broadcast as leader
Expand Down Expand Up @@ -507,21 +513,23 @@ func (rf *Raft) broadcast() {
continue
}

preIndex := rf.nextIndex[i] - 1

args := AppendEntriesArgs{
rf.currentTerm,
nil,
rf.commitIndex,
0,
0,
preIndex,
rf.log[preIndex].Term,
}

if rf.nextIndex[i] < len(rf.log) {
// Todo: append more entries
args.Entries = append(args.Entries, rf.log[rf.nextIndex[i]])
preIndex := rf.nextIndex[i] - 1
args.PrevLogIndex = preIndex
args.PrevLogTerm = rf.log[preIndex].Term
DPrintf("leader %v to send in term %v, pre index %v, to %v\n", rf.me, args.Entries[0].Term,preIndex, i)
//preIndex := rf.nextIndex[i] - 1
//args.PrevLogIndex = preIndex
//args.PrevLogTerm = rf.log[preIndex].Term
DPrintf("leader %v to send in term %v, pre index %v, to server %v\n", rf.me, args.Entries[0].Term,preIndex, i)
}

go rf.startAppendEntries(i, args)
Expand All @@ -547,14 +555,15 @@ func (rf *Raft) tryCommit() {
// commit index and all indices preceding index
func (rf *Raft) commitToIndex(index int) {
if rf.commitIndex<index {
for i:=rf.commitIndex+1;i<=index;i++ {
for i:=rf.commitIndex+1;i<=index&&i<len(rf.log);i++ {
rf.commitIndex = i
msg:=ApplyMsg{
true,
rf.log[i].Command,
i,
}
rf.applyCh<-msg
DPrintf("server %v set commit to index %v", rf.me, rf.commitIndex)
}
}
}
Expand Down

0 comments on commit bccb440

Please sign in to comment.