Skip to content

Commit

Permalink
enhance: Optimize raft health monitoring policies.
Browse files Browse the repository at this point in the history
Signed-off-by: true1064 <[email protected]>
  • Loading branch information
true1064 authored and Victor1319 committed Oct 10, 2022
1 parent c5a63ed commit e2b2241
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 28 deletions.
7 changes: 4 additions & 3 deletions authnode/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (m *Server) initFsm() {
m.fsm.restore()
}

func (m *Server) createRaftServer() (err error) {
func (m *Server) createRaftServer(cfg *config.Config) (err error) {
raftCfg := &raftstore.Config{
NodeID: m.id,
RaftPath: m.walDir,
Expand All @@ -167,7 +167,7 @@ func (m *Server) createRaftServer() (err error) {
TickInterval: m.tickInterval,
ElectionTick: m.electionTick,
}
if m.raftStore, err = raftstore.NewRaftStore(raftCfg); err != nil {
if m.raftStore, err = raftstore.NewRaftStore(raftCfg, cfg); err != nil {
return errors.Trace(err, "NewRaftStore failed! id[%v] walPath[%v]", m.id, m.walDir)
}
m.initFsm()
Expand Down Expand Up @@ -195,7 +195,8 @@ func (m *Server) Start(cfg *config.Config) (err error) {
log.LogErrorf("Start: init RocksDB fail: err(%v)", err)
return
}
if err = m.createRaftServer(); err != nil {

if err = m.createRaftServer(cfg); err != nil {
log.LogError(errors.Stack(err))
return
}
Expand Down
2 changes: 1 addition & 1 deletion datanode/partition_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ func (s *DataNode) startRaftServer(cfg *config.Config) (err error) {
TickInterval: s.tickInterval,
RecvBufSize: s.raftRecvBufSize,
}
s.raftStore, err = raftstore.NewRaftStore(raftConf)
s.raftStore, err = raftstore.NewRaftStore(raftConf, cfg)
if err != nil {
err = errors.NewErrorf("new raftStore: %s", err.Error())
log.LogErrorf("action[startRaftServer] cannot start raft server err(%v)", err)
Expand Down
6 changes: 3 additions & 3 deletions master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (m *Server) Start(cfg *config.Config) (err error) {
return
}

if err = m.createRaftServer(); err != nil {
if err = m.createRaftServer(cfg); err != nil {
log.LogError(errors.Stack(err))
return
}
Expand Down Expand Up @@ -301,7 +301,7 @@ func (m *Server) checkConfig(cfg *config.Config) (err error) {
return
}

func (m *Server) createRaftServer() (err error) {
func (m *Server) createRaftServer(cfg *config.Config) (err error) {
raftCfg := &raftstore.Config{
NodeID: m.id,
RaftPath: m.walDir,
Expand All @@ -312,7 +312,7 @@ func (m *Server) createRaftServer() (err error) {
ElectionTick: m.electionTick,
RecvBufSize: m.raftRecvBufSize,
}
if m.raftStore, err = raftstore.NewRaftStore(raftCfg); err != nil {
if m.raftStore, err = raftstore.NewRaftStore(raftCfg, cfg); err != nil {
return errors.Trace(err, "NewRaftStore failed! id[%v] walPath[%v]", m.id, m.walDir)
}
syslog.Printf("peers[%v],tickInterval[%v],electionTick[%v]\n", m.config.peers, m.tickInterval, m.electionTick)
Expand Down
2 changes: 1 addition & 1 deletion metanode/metanode.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func doStart(s common.Server, cfg *config.Config) (err error) {
return
}

if err = m.startRaftServer(); err != nil {
if err = m.startRaftServer(cfg); err != nil {
return
}
if err = m.newMetaManager(); err != nil {
Expand Down
5 changes: 3 additions & 2 deletions metanode/raft_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ import (
"strconv"

"github.com/cubefs/cubefs/raftstore"
"github.com/cubefs/cubefs/util/config"
"github.com/cubefs/cubefs/util/errors"
)

// StartRaftServer initializes the address resolver and the raftStore server instance.
func (m *MetaNode) startRaftServer() (err error) {
func (m *MetaNode) startRaftServer(cfg *config.Config) (err error) {
_, err = os.Stat(m.raftDir)
if err != nil {
if !os.IsNotExist(err) {
Expand All @@ -48,7 +49,7 @@ func (m *MetaNode) startRaftServer() (err error) {
RecvBufSize: m.raftRecvBufSize,
NumOfLogsToRetain: raftstore.DefaultNumOfLogsToRetain * 2,
}
m.raftStore, err = raftstore.NewRaftStore(raftConf)
m.raftStore, err = raftstore.NewRaftStore(raftConf, cfg)
if err != nil {
err = errors.NewErrorf("new raftStore: %s", err.Error())
}
Expand Down
115 changes: 98 additions & 17 deletions raftstore/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,70 @@ package raftstore
import (
"fmt"
"github.com/cubefs/cubefs/depends/tiglabs/raft/proto"
"github.com/cubefs/cubefs/util/config"
"github.com/cubefs/cubefs/util/exporter"
"github.com/cubefs/cubefs/util/log"
"time"
)

const (
reportDuration = time.Minute * 3
zombieThreshold = time.Minute * 3
raftNotHealthyThreshold = time.Second * 30
defaultReportDuration = time.Minute * 3
defaultZombieThreshold = time.Minute * 3
defaultNoLeaderThreshold = time.Second * 30
)

const (
cfgZombieThresholdSec = "raftMonZombieThrSec"
cfgZombieTooLongThresholdSec = "raftMonZombieTooLongThrSec"
cfgNoLeaderThresholdSec = "raftMonNoLeaderThrSec"
cfgNoLeaderTooLongThresholdSec = "raftMonNoLeaderTooLongThrSec"
)

type monitorConf struct {
ZombieThreshold time.Duration
ZombieTooLongThreshold time.Duration
NoLeaderThreshold time.Duration
NoLeaderTooLongThreshold time.Duration
}

var gMonConf = monitorConf{
ZombieThreshold: defaultZombieThreshold,
ZombieTooLongThreshold: defaultReportDuration,
NoLeaderThreshold: defaultNoLeaderThreshold,
NoLeaderTooLongThreshold: defaultReportDuration,
}

func setMonitorConf(cfg *config.Config) {
if cfg == nil {
return
}

cfgZomThr := cfg.GetInt64(cfgZombieThresholdSec)
if cfgZomThr > 0 {
gMonConf.ZombieThreshold = time.Second * time.Duration(cfgZomThr)
}

cfgZomTooLongThr := cfg.GetInt64(cfgZombieTooLongThresholdSec)
if cfgZomTooLongThr > 0 {
gMonConf.ZombieTooLongThreshold = time.Second * time.Duration(cfgZomTooLongThr)
}

cfgNoLeaderThr := cfg.GetInt64(cfgNoLeaderThresholdSec)
if cfgNoLeaderThr > 0 {
gMonConf.NoLeaderThreshold = time.Second * time.Duration(cfgNoLeaderThr)
}

cfgNoLeaderTooLongThr := cfg.GetInt64(cfgNoLeaderTooLongThresholdSec)
if cfgNoLeaderTooLongThr > 0 {
gMonConf.NoLeaderTooLongThreshold = time.Second * time.Duration(cfgNoLeaderTooLongThr)
}

log.LogInfof("set raft monitor cfg: zombieThreshold:[%v], zombieTooLongThreshold:[%v],"+
" noLeaderThreshold:[%v], noLeaderTooLongThreshold:[%v]",
gMonConf.ZombieThreshold, gMonConf.ZombieTooLongThreshold,
gMonConf.NoLeaderThreshold, gMonConf.NoLeaderTooLongThreshold)
}

type zombiePeer struct {
partitionID uint64
peer proto.Peer
Expand All @@ -32,43 +85,71 @@ func newMonitor() *monitor {
}

func (d *monitor) MonitorZombie(id uint64, peer proto.Peer, replicasMsg string, du time.Duration) {
if du < zombieThreshold {
if du < gMonConf.ZombieThreshold {
return
}
needReport := false

needReport := true
var errMsg string

zombiePeer := zombiePeer{
partitionID: id,
peer: peer,
}
oldDu := d.zombieDurations[zombiePeer]
if oldDu == 0 || du < oldDu || du-oldDu > reportDuration {
d.zombieDurations[zombiePeer] = du
needReport = true

if oldDu == 0 || du < oldDu {
// peer became zombie recently
errMsg = fmt.Sprintf("[MonitorZombie] raft peer zombie, "+
"partitionID[%d] replicaID[%v] replicasMsg[%s] zombiePeer[%v] zombieDuration[%v]",
id, peer.PeerID, replicasMsg, peer, du)
} else if du-oldDu > gMonConf.ZombieTooLongThreshold {
// peer keeping zombie for too long
errMsg = fmt.Sprintf("[MonitorZombieTooLong] raft peer zombie too long, "+
"partitionID[%d] replicaID[%v] replicasMsg[%s] zombiePeer[%v] zombieDuration[%v]",
id, peer.PeerID, replicasMsg, peer, du)
} else {
// peer keeping zombie, but it's not time for another too-long-report yet
needReport = false
}

if !needReport {
return
}
errMsg := fmt.Sprintf("[MonitorZombie] raft partitionID[%d] replicaID[%v] replicasMsg[%s] zombiePeer[%v] zombieDuration[%v]",
id, peer.PeerID, replicasMsg, peer, du)

d.zombieDurations[zombiePeer] = du
log.LogError(errMsg)
exporter.Warning(errMsg)
}

func (d *monitor) MonitorElection(id uint64, replicaMsg string, du time.Duration) {
if du < raftNotHealthyThreshold {
if du < gMonConf.NoLeaderThreshold {
return
}
needReport := false
needReport := true
var errMsg string

oldDu := d.noLeaderDurations[id]
if oldDu == 0 || du < oldDu || du-oldDu > reportDuration {
d.noLeaderDurations[id] = du
needReport = true

if oldDu == 0 || du < oldDu {
// became no leader recently
errMsg = fmt.Sprintf("[RaftNoLeader] raft no leader partitionID[%d]_replicas[%v]_Duration[%v]",
id, replicaMsg, du)
} else if du-oldDu > gMonConf.NoLeaderTooLongThreshold {
// keeping no leader for too long
errMsg = fmt.Sprintf("[RaftNoLeaderTooLong] raft no leader too long, "+
"partitionID[%d]_replicas[%v]_Duration[%v]",
id, replicaMsg, du)
} else {
// keeping not health, but it's not time for another too-long-report yet
needReport = false
}

if !needReport {
return
}
errMsg := fmt.Sprintf("[MonitorElection] raft status not health partitionID[%d]_replicas[%v]_noLeaderDuration[%v]",
id, replicaMsg, du)

d.noLeaderDurations[id] = du
log.LogError(errMsg)
exporter.Warning(errMsg)
}
4 changes: 3 additions & 1 deletion raftstore/raftstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cubefs/cubefs/depends/tiglabs/raft/proto"
"github.com/cubefs/cubefs/depends/tiglabs/raft/storage/wal"
raftlog "github.com/cubefs/cubefs/depends/tiglabs/raft/util/log"
utilConfig "github.com/cubefs/cubefs/util/config"
)

// RaftStore defines the interface for the raft store.
Expand Down Expand Up @@ -95,10 +96,11 @@ func newRaftLogger(dir string) {
}

// NewRaftStore returns a new raft store instance.
func NewRaftStore(cfg *Config) (mr RaftStore, err error) {
func NewRaftStore(cfg *Config, extendCfg *utilConfig.Config) (mr RaftStore, err error) {
resolver := NewNodeResolver()

newRaftLogger(cfg.RaftPath)
setMonitorConf(extendCfg)

rc := raft.DefaultConfig()
rc.NodeID = cfg.NodeID
Expand Down

0 comments on commit e2b2241

Please sign in to comment.