Skip to content

Commit

Permalink
2A pass one test
Browse files Browse the repository at this point in the history
  • Loading branch information
chenjialin committed Apr 9, 2023
1 parent dbfd891 commit 991cded
Showing 1 changed file with 33 additions and 19 deletions.
52 changes: 33 additions & 19 deletions src/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,19 @@ type AppendEntriesReply struct {

// AppendEntries RPC
func (rf *Raft) AppendEntries(arg *AppendEntriesArgs, reply *AppendEntriesReply) {
fmt.Println("append entries from ", arg.LeaderId)
rf.mu.Lock()
if arg.LeaderTerm < rf.current_term {
reply.Term = rf.current_term
reply.Success = false
rf.voted_for = -1
rf.mu.Unlock()
return
}
// add more condition
reply.Success = true
reply.Term = rf.current_term
rf.current_term = arg.LeaderTerm
rf.mu.Unlock()
rf.hb_chan <- arg.LeaderId
}
Expand Down Expand Up @@ -327,7 +331,6 @@ func (rf *Raft) ticker() {
select {
case h := <-rf.hb_chan:
fmt.Println("heartbeat from ", h)
continue
case <-time.After(time.Duration(sleep_time) * time.Millisecond):
fmt.Println("have not heard from leader after ", sleep_time, "ms...")
rf.mu.Lock()
Expand All @@ -340,6 +343,7 @@ func (rf *Raft) ticker() {
// Candidate then request vote
rf.mu.Lock()
rf.current_term += 1
rf.voted_for = rf.me
rf.mu.Unlock()
var mu sync.Mutex
cond := sync.NewCond(&mu)
Expand All @@ -356,8 +360,10 @@ func (rf *Raft) ticker() {
// send request vote rpc
var args RequestVoteArgs
var reply RequestVoteReply
rf.mu.Lock()
args.CandidateId = rf.me
args.CandidateTerm = rf.current_term
rf.mu.Unlock()
// args.LastLogIndex
// args.LastLogTerm
ok := rf.sendRequestVote(x, &args, &reply)
Expand All @@ -371,14 +377,13 @@ func (rf *Raft) ticker() {
}
}
mu.Lock()
for count < half && finished < len(rf.peers) {
for count < half && finished < len(rf.peers)-1 {
cond.Wait()
}
if count >= half {
fmt.Println("received ", count, " vote, more than half")
rf.mu.Lock()
rf.role = Leader
rf.current_term++
rf.mu.Unlock()
} else {
fmt.Println("reveived ", count, " vote, less than half, become follower")
Expand All @@ -389,29 +394,37 @@ func (rf *Raft) ticker() {
mu.Unlock()
case Leader:
// Leader then send heartbeats
w := sync.WaitGroup{}
for i := 0; i < len(rf.peers); i++ {
if i == rf.me {
continue
}
var args AppendEntriesArgs
var reply AppendEntriesReply
args.LeaderId = rf.me
rf.mu.Lock()
args.LeaderTerm = rf.current_term
rf.mu.Unlock()
ok := rf.sendAppendEntries(i, &args, &reply)
if ok {
fmt.Println("send heartbeat to ", i, ", reply success: ", reply.Success, ", its term: ", reply.Term)
w.Add(1)
go func(x int) {
defer w.Done()
var args AppendEntriesArgs
var reply AppendEntriesReply
args.LeaderId = rf.me
rf.mu.Lock()
if reply.Term > rf.current_term {
rf.role = Follower
}
args.LeaderTerm = rf.current_term
rf.mu.Unlock()
continue
} else {
// try more times
}
fmt.Println("append entries to ", x)
ok := rf.sendAppendEntries(x, &args, &reply)
if ok {
fmt.Println("send heartbeat to ", x, ", reply success: ", reply.Success, ", its term: ", reply.Term)
rf.mu.Lock()
if reply.Term > rf.current_term {
rf.role = Follower
}
rf.mu.Unlock()
} else {
// try more times
fmt.Println("send heartbeat not ok")
}
}(i)
}
w.Wait()
fmt.Println("send all")
time.Sleep(time.Duration(100) * time.Millisecond)
}

Expand Down Expand Up @@ -440,6 +453,7 @@ func Make(peers []*labrpc.ClientEnd, me int,
rf.commit_index = 0
rf.last_applied = 0
rf.role = Follower
rf.hb_chan = make(chan int)

// initialize from state persisted before a crash
rf.readPersist(persister.ReadRaftState())
Expand Down

0 comments on commit 991cded

Please sign in to comment.