Skip to content

Commit

Permalink
fix(raft): when raft replicas or raft partitions are deleted, delete …
Browse files Browse the repository at this point in the history
…them from raft monitor accordingly

Signed-off-by: true1064 <[email protected]>
  • Loading branch information
true1064 authored and baijiaruo1 committed Jul 20, 2023
1 parent 249e12d commit 36f704d
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 7 deletions.
3 changes: 3 additions & 0 deletions depends/tiglabs/raft/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,7 @@ type Monitor interface {
MonitorZombie(id uint64, peer proto.Peer, replicasMsg string, du time.Duration)
//If raft election failed continuously. MonitorElection will be called
MonitorElection(id uint64, replicaMsg string, du time.Duration)

RemovePeer(id uint64, peer proto.Peer)
RemovePartition(id uint64, peers []proto.Peer)
}
4 changes: 4 additions & 0 deletions depends/tiglabs/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,10 @@ func (s *raft) runApply() {
switch cmd := apply.command.(type) {
case *proto.ConfChange:
resp, err = s.raftConfig.StateMachine.ApplyMemberChange(cmd, apply.index)
if cmd.Type == proto.ConfRemoveNode && err == nil {
s.raftFsm.mo.RemovePeer(s.raftFsm.id, cmd.Peer)
}

case []byte:
resp, err = s.raftConfig.StateMachine.Apply(cmd, apply.index)
}
Expand Down
6 changes: 6 additions & 0 deletions depends/tiglabs/raft/raft_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,12 @@ func (r *raftFsm) doRandomSeed() {
}

func (r *raftFsm) StopFsm() {
peers := make([]proto.Peer, len(r.replicas))
for _, r := range r.replicas {
peers = append(peers, r.peer)
}

r.mo.RemovePartition(r.id, peers)
close(r.stopCh)
}

Expand Down
57 changes: 50 additions & 7 deletions raftstore/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/cubefs/cubefs/util/config"
"github.com/cubefs/cubefs/util/exporter"
"github.com/cubefs/cubefs/util/log"
"sync"
"time"
)

Expand Down Expand Up @@ -73,15 +74,20 @@ type zombiePeer struct {
}

type monitor struct {
zombieDurations map[zombiePeer]time.Duration
noLeaderDurations map[uint64]time.Duration
zombieDurations map[zombiePeer]time.Duration
zombieDurationMutex sync.RWMutex

noLeaderDurations map[uint64]time.Duration
noLeaderDurationsMutex sync.RWMutex
}

func newMonitor() *monitor {
return &monitor{
zombieDurations: make(map[zombiePeer]time.Duration),
noLeaderDurations: make(map[uint64]time.Duration),
}
var m *monitor
m = &monitor{}

m.zombieDurations = make(map[zombiePeer]time.Duration)
m.noLeaderDurations = make(map[uint64]time.Duration)
return m
}

func (d *monitor) MonitorZombie(id uint64, peer proto.Peer, replicasMsg string, du time.Duration) {
Expand All @@ -96,7 +102,10 @@ func (d *monitor) MonitorZombie(id uint64, peer proto.Peer, replicasMsg string,
partitionID: id,
peer: peer,
}

d.zombieDurationMutex.RLock()
oldDu := d.zombieDurations[zombiePeer]
d.zombieDurationMutex.RUnlock()

if oldDu == 0 || du < oldDu {
// peer became zombie recently
Expand All @@ -116,8 +125,9 @@ func (d *monitor) MonitorZombie(id uint64, peer proto.Peer, replicasMsg string,
if !needReport {
return
}

d.zombieDurationMutex.Lock()
d.zombieDurations[zombiePeer] = du
d.zombieDurationMutex.Unlock()
log.LogError(errMsg)
exporter.Warning(errMsg)
}
Expand All @@ -129,7 +139,9 @@ func (d *monitor) MonitorElection(id uint64, replicaMsg string, du time.Duration
needReport := true
var errMsg string

d.noLeaderDurationsMutex.RLock()
oldDu := d.noLeaderDurations[id]
d.noLeaderDurationsMutex.RUnlock()

if oldDu == 0 || du < oldDu {
// became no leader recently
Expand All @@ -149,7 +161,38 @@ func (d *monitor) MonitorElection(id uint64, replicaMsg string, du time.Duration
return
}

d.noLeaderDurationsMutex.Lock()
d.noLeaderDurations[id] = du
d.noLeaderDurationsMutex.Unlock()
log.LogError(errMsg)
exporter.Warning(errMsg)
}

func (d *monitor) RemovePeer(id uint64, p proto.Peer) {
zp := zombiePeer{
partitionID: id,
peer: p,
}

d.zombieDurationMutex.Lock()
_, present := d.zombieDurations[zp]
if present {
delete(d.zombieDurations, zp)
log.LogInfof("remove peer from raft monitor, partitionID: %v, peer: %v", id, p)
}
d.zombieDurationMutex.Unlock()
}

func (d *monitor) RemovePartition(id uint64, peers []proto.Peer) {
d.noLeaderDurationsMutex.Lock()
_, present := d.noLeaderDurations[id]
if present {
delete(d.noLeaderDurations, id)
log.LogInfof("remove partition from raft monitor, partitionID: %v, peers: %v", id, peers)
}
d.noLeaderDurationsMutex.Unlock()

for _, p := range peers {
d.RemovePeer(id, p)
}
}

0 comments on commit 36f704d

Please sign in to comment.