Skip to content

Commit

Permalink
fix(cli): cli can display dps encountering IO errors during loading
Browse files Browse the repository at this point in the history
Signed-off-by: chihe <[email protected]>
  • Loading branch information
chihe authored and bboyCH4 committed Jul 5, 2024
1 parent d0ec3fe commit 65ee50a
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 74 deletions.
7 changes: 2 additions & 5 deletions cli/cmd/datapartition.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,17 +252,14 @@ The "reset" command will be released in next version`,
stdoutln()
stdoutln("[Partition with disk error replicas]:")
stdoutln(diskErrorReplicaPartitionInfoTableHeader)
sort.SliceStable(diagnosis.DiskErrorDataPartitionIDs, func(i, j int) bool {
return diagnosis.DiskErrorDataPartitionIDs[i] < diagnosis.DiskErrorDataPartitionIDs[j]
})
for _, pid := range diagnosis.DiskErrorDataPartitionIDs {
for pid, infos := range diagnosis.DiskErrorDataPartitionInfos.DiskErrReplicas {
var partition *proto.DataPartitionInfo
if partition, err = client.AdminAPI().GetDataPartition("", pid); err != nil {
err = fmt.Errorf("Partition not found, err:[%v] ", err)
return
}
if partition != nil {
stdoutln(formatDiskErrorReplicaDpInfoRow(partition))
stdoutln(formatDiskErrorReplicaDpInfoRow(partition, infos))
}
}
},
Expand Down
24 changes: 16 additions & 8 deletions cli/cmd/fmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -979,17 +979,17 @@ func formatDataPartitionDecommissionProgress(info *proto.DecommissionDataPartiti
return sb.String()
}

func formatDiskErrorReplicaDpInfoRow(partition *proto.DataPartitionInfo) string {
func formatDiskErrorReplicaDpInfoRow(partition *proto.DataPartitionInfo, infos []proto.DiskErrReplicaInfo) string {
sb := strings.Builder{}
sb.WriteString("[")
firstItem := true
for _, replica := range partition.Replicas {
if replica.TriggerDiskError {
if !firstItem {
sb.WriteString(",")
}

sb.WriteString(fmt.Sprintf("%v(%v)", replica.Addr, replica.DiskPath))
for _, info := range infos {
if !firstItem {
sb.WriteString(",")
}
// if dp is not loaded, remove replica cannot delete dp from disk heartbeat report
if replicaInHost(partition.Hosts, info.Addr) {
sb.WriteString(fmt.Sprintf("%v(%v)", info.Addr, info.Disk))
firstItem = false
}
}
Expand All @@ -1008,3 +1008,11 @@ func formatDecommissionFailedDiskInfo(info *proto.DecommissionFailedDiskInfo) st
sb.WriteString(fmt.Sprintf("AutoDecommission: %v\n", info.IsAutoDecommission))
return sb.String()
}
func replicaInHost(hosts []string, replica string) bool {
for _, host := range hosts {
if replica == host {
return true
}
}
return false
}
35 changes: 16 additions & 19 deletions datanode/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package datanode
import (
"context"
"fmt"
syslog "log"
"os"
"path"
"regexp"
Expand All @@ -31,7 +30,6 @@ import (
"golang.org/x/time/rate"

"github.com/cubefs/cubefs/proto"
"github.com/cubefs/cubefs/util/exporter"
"github.com/cubefs/cubefs/util/loadutil"
"github.com/cubefs/cubefs/util/log"
"github.com/cubefs/cubefs/util/strutil"
Expand Down Expand Up @@ -85,7 +83,7 @@ type Disk struct {

// diskPartition info
diskPartition *disk.PartitionStat
DiskErrPartitionSet map[uint64]struct{}
DiskErrPartitionSet sync.Map
decommission bool
extentRepairReadLimit chan struct{}
enableExtentRepairReadLimit bool
Expand Down Expand Up @@ -137,8 +135,6 @@ func NewDisk(path string, reservedSpace, diskRdonlySpace uint64, maxErrCnt int,
d.limitRead = newIOLimiter(space.dataNode.diskReadFlow, space.dataNode.diskReadIocc)
d.limitWrite = newIOLimiter(space.dataNode.diskWriteFlow, space.dataNode.diskWriteIocc)

d.DiskErrPartitionSet = make(map[uint64]struct{})

err = d.initDecommissionStatus()
if err != nil {
log.LogErrorf("action[NewDisk]: failed to load disk decommission status")
Expand Down Expand Up @@ -437,7 +433,7 @@ func (d *Disk) doDiskError() {

func (d *Disk) triggerDiskError(rwFlag uint8, dpId uint64) {
mesg := fmt.Sprintf("disk path %v error on %v, dpId %v", d.Path, LocalIP, dpId)
exporter.Warning(mesg)
//exporter.Warning(mesg)
log.LogWarnf(mesg)

if rwFlag == WriteFlag {
Expand All @@ -456,7 +452,7 @@ func (d *Disk) triggerDiskError(rwFlag uint8, dpId uint64) {
msg := fmt.Sprintf("set disk unavailable for too many disk error, "+
"disk path(%v), ip(%v), diskErrCnt(%v), diskErrPartitionCnt(%v) threshold(%v)",
d.Path, LocalIP, diskErrCnt, diskErrPartitionCnt, d.dataNode.diskUnavailablePartitionErrorCount)
exporter.Warning(msg)
//exporter.Warning(msg)
log.LogWarnf(msg)
d.doDiskError()
}
Expand All @@ -471,7 +467,7 @@ func (d *Disk) updateSpaceInfo() (err error) {
if d.Status == proto.Unavailable {
mesg := fmt.Sprintf("disk path %v error on %v", d.Path, LocalIP)
log.LogErrorf(mesg)
exporter.Warning(mesg)
// exporter.Warning(mesg)
// d.ForceExitRaftStore()
} else if d.Available <= 0 {
d.Status = proto.ReadOnly
Expand All @@ -498,7 +494,7 @@ func (d *Disk) AttachDataPartition(dp *DataPartition) {
func (d *Disk) DetachDataPartition(dp *DataPartition) {
d.Lock()
delete(d.partitionMap, dp.partitionID)
delete(d.DiskErrPartitionSet, dp.partitionID)
d.DiskErrPartitionSet.Delete(dp.partitionID)
d.Unlock()

d.computeUsage()
Expand Down Expand Up @@ -643,8 +639,11 @@ func (d *Disk) RestorePartition(visitor PartitionVisitor) (err error) {
mesg := fmt.Sprintf("action[RestorePartition] new partition(%v) err(%v) ",
partitionID, err.Error())
log.LogError(mesg)
exporter.Warning(mesg)
syslog.Println(mesg)
if IsDiskErr(err.Error()) {
d.triggerDiskError(ReadFlag, partitionID)
}
//exporter.Warning(mesg)
//syslog.Println(mesg)
return
}
if visitor != nil {
Expand Down Expand Up @@ -725,22 +724,20 @@ func (d *Disk) getSelectWeight() float64 {
}

func (d *Disk) AddDiskErrPartition(dpId uint64) {
if _, ok := d.DiskErrPartitionSet[dpId]; !ok {
log.LogWarnf("[AddDiskErrPartition] mark dp(%v) as broken", dpId)
d.DiskErrPartitionSet[dpId] = struct{}{}
}
d.DiskErrPartitionSet.Store(dpId, struct{}{})
}

func (d *Disk) GetDiskErrPartitionList() (diskErrPartitionList []uint64) {
diskErrPartitionList = make([]uint64, 0)
for k := range d.DiskErrPartitionSet {
diskErrPartitionList = append(diskErrPartitionList, k)
}
d.DiskErrPartitionSet.Range(func(key, value interface{}) bool {
diskErrPartitionList = append(diskErrPartitionList, key.(uint64))
return true
})
return diskErrPartitionList
}

func (d *Disk) GetDiskErrPartitionCount() uint64 {
return uint64(len(d.DiskErrPartitionSet))
return uint64(len(d.GetDiskErrPartitionList()))
}

// isExpiredPartition return whether one partition is expired
Expand Down
48 changes: 22 additions & 26 deletions master/api_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1904,24 +1904,23 @@ func (m *Server) decommissionDataPartition(w http.ResponseWriter, r *http.Reques

func (m *Server) diagnoseDataPartition(w http.ResponseWriter, r *http.Request) {
var (
err error
rstMsg *proto.DataPartitionDiagnosis
inactiveNodes []string
corruptDps []*DataPartition
lackReplicaDps []*DataPartition
badReplicaDps []*DataPartition
repFileCountDifferDps []*DataPartition
repUsedSizeDifferDps []*DataPartition
excessReplicaDPs []*DataPartition
diskErrorReplicaDPs []*DataPartition
corruptDpIDs []uint64
lackReplicaDpIDs []uint64
badReplicaDpIDs []uint64
repFileCountDifferDpIDs []uint64
repUsedSizeDifferDpIDs []uint64
excessReplicaDpIDs []uint64
badDataPartitionInfos []proto.BadPartitionRepairView
diskErrorDataPartitionIDs []uint64
err error
rstMsg *proto.DataPartitionDiagnosis
inactiveNodes []string
corruptDps []*DataPartition
lackReplicaDps []*DataPartition
badReplicaDps []*DataPartition
repFileCountDifferDps []*DataPartition
repUsedSizeDifferDps []*DataPartition
excessReplicaDPs []*DataPartition
corruptDpIDs []uint64
lackReplicaDpIDs []uint64
badReplicaDpIDs []uint64
repFileCountDifferDpIDs []uint64
repUsedSizeDifferDpIDs []uint64
excessReplicaDpIDs []uint64
badDataPartitionInfos []proto.BadPartitionRepairView
diskErrorDataPartitionInfos proto.DiskErrPartitionView
)
metric := exporter.NewTPCnt(apiToMetricsName(proto.AdminDiagnoseDataPartition))
defer func() {
Expand All @@ -1940,15 +1939,14 @@ func (m *Server) diagnoseDataPartition(w http.ResponseWriter, r *http.Request) {
repFileCountDifferDpIDs = make([]uint64, 0)
repUsedSizeDifferDpIDs = make([]uint64, 0)
excessReplicaDpIDs = make([]uint64, 0)
diskErrorDataPartitionIDs = make([]uint64, 0)

if inactiveNodes, err = m.cluster.checkInactiveDataNodes(); err != nil {
sendErrReply(w, r, newErrHTTPReply(err))
return
}

if lackReplicaDps, badReplicaDps, repFileCountDifferDps, repUsedSizeDifferDps, excessReplicaDPs,
corruptDps, diskErrorReplicaDPs, err = m.cluster.checkReplicaOfDataPartitions(ignoreDiscardDp); err != nil {
corruptDps, err = m.cluster.checkReplicaOfDataPartitions(ignoreDiscardDp); err != nil {
sendErrReply(w, r, newErrHTTPReply(err))
return
}
Expand All @@ -1971,11 +1969,9 @@ func (m *Server) diagnoseDataPartition(w http.ResponseWriter, r *http.Request) {
excessReplicaDpIDs = append(excessReplicaDpIDs, dp.PartitionID)
}

for _, dp := range diskErrorReplicaDPs {
diskErrorDataPartitionIDs = append(diskErrorDataPartitionIDs, dp.PartitionID)
}
// badDataPartitions = m.cluster.getBadDataPartitionsView()
badDataPartitionInfos = m.cluster.getBadDataPartitionsRepairView()
diskErrorDataPartitionInfos = m.cluster.getDiskErrDataPartitionsView()
rstMsg = &proto.DataPartitionDiagnosis{
InactiveDataNodes: inactiveNodes,
CorruptDataPartitionIDs: corruptDpIDs,
Expand All @@ -1985,14 +1981,14 @@ func (m *Server) diagnoseDataPartition(w http.ResponseWriter, r *http.Request) {
RepFileCountDifferDpIDs: repFileCountDifferDpIDs,
RepUsedSizeDifferDpIDs: repUsedSizeDifferDpIDs,
ExcessReplicaDpIDs: excessReplicaDpIDs,
DiskErrorDataPartitionIDs: diskErrorDataPartitionIDs,
DiskErrorDataPartitionInfos: diskErrorDataPartitionInfos,
}
log.LogInfof("diagnose dataPartition[%v] inactiveNodes:[%v], corruptDpIDs:[%v], "+
"lackReplicaDpIDs:[%v], BadReplicaDataPartitionIDs[%v], "+
"repFileCountDifferDpIDs:[%v], RepUsedSizeDifferDpIDs[%v], excessReplicaDpIDs[%v], diskErrorDataPartitionIDs[%v]",
"repFileCountDifferDpIDs:[%v], RepUsedSizeDifferDpIDs[%v], excessReplicaDpIDs[%v]",
m.cluster.Name, inactiveNodes, corruptDpIDs,
lackReplicaDpIDs, badReplicaDpIDs,
repFileCountDifferDpIDs, repUsedSizeDifferDpIDs, excessReplicaDpIDs, diskErrorDataPartitionIDs)
repFileCountDifferDpIDs, repUsedSizeDifferDpIDs, excessReplicaDpIDs)
sendOkReply(w, r, newSuccessHTTPReply(rstMsg))
}

Expand Down
38 changes: 26 additions & 12 deletions master/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1215,13 +1215,11 @@ func (c *Cluster) checkLackReplicaDataPartitions() (lackReplicaDataPartitions []

func (c *Cluster) checkReplicaOfDataPartitions(ignoreDiscardDp bool) (
lackReplicaDPs []*DataPartition, unavailableReplicaDPs []*DataPartition, repFileCountDifferDps []*DataPartition,
repUsedSizeDifferDps []*DataPartition, excessReplicaDPs []*DataPartition, noLeaderDPs []*DataPartition,
hasBadDiskErrorReplicaDPs []*DataPartition, err error) {
repUsedSizeDifferDps []*DataPartition, excessReplicaDPs []*DataPartition, noLeaderDPs []*DataPartition, err error) {
noLeaderDPs = make([]*DataPartition, 0)
lackReplicaDPs = make([]*DataPartition, 0)
unavailableReplicaDPs = make([]*DataPartition, 0)
excessReplicaDPs = make([]*DataPartition, 0)
hasBadDiskErrorReplicaDPs = make([]*DataPartition, 0)

vols := c.copyVols()
for _, vol := range vols {
Expand Down Expand Up @@ -1261,15 +1259,11 @@ func (c *Cluster) checkReplicaOfDataPartitions(ignoreDiscardDp bool) (
}

recordReplicaUnavailable := false
hasBadDiskErrorReplica := false
for _, replica := range dp.Replicas {
if !recordReplicaUnavailable && replica.Status == proto.Unavailable {
unavailableReplicaDPs = append(unavailableReplicaDPs, dp)
recordReplicaUnavailable = true
}
if replica.TriggerDiskError {
hasBadDiskErrorReplica = true
}
if dp.IsDoingDecommission() {
continue
}
Expand All @@ -1292,18 +1286,15 @@ func (c *Cluster) checkReplicaOfDataPartitions(ignoreDiscardDp bool) (
if repFileCountDiff > c.cfg.diffReplicaFileCount {
repFileCountDifferDps = append(repFileCountDifferDps, dp)
}
if hasBadDiskErrorReplica {
hasBadDiskErrorReplicaDPs = append(hasBadDiskErrorReplicaDPs, dp)
}
}
}

log.LogInfof("clusterID[%v] lackReplicaDp count:[%v], unavailableReplicaDp count:[%v], "+
"repFileCountDifferDps count[%v], repUsedSizeDifferDps count[%v], "+
"excessReplicaDPs count[%v], noLeaderDPs count[%v] hasBadDiskErrorReplicaDPs count[%v]",
"excessReplicaDPs count[%v], noLeaderDPs count[%v] ",
c.Name, len(lackReplicaDPs), len(unavailableReplicaDPs),
len(repFileCountDifferDps), len(repUsedSizeDifferDps),
len(excessReplicaDPs), len(noLeaderDPs), len(hasBadDiskErrorReplicaDPs))
len(excessReplicaDPs), len(noLeaderDPs))
return
}

Expand Down Expand Up @@ -5042,3 +5033,26 @@ func (c *Cluster) removeDPFromBadDataPartitionIDs(addr, diskPath string, partiti
c.BadDataPartitionIds.Store(key, newBadPartitionIDs)
return nil
}

func (c *Cluster) getDiskErrDataPartitionsView() (dps proto.DiskErrPartitionView) {
dps = proto.DiskErrPartitionView{
DiskErrReplicas: make(map[uint64][]proto.DiskErrReplicaInfo),
}
c.dataNodes.Range(func(addr, node interface{}) bool {
dataNode, ok := node.(*DataNode)
if !ok {
return true
}
dataNode.RLock()
for _, disk := range dataNode.BadDiskStats {
for _, dpId := range disk.DiskErrPartitionList {
dps.DiskErrReplicas[dpId] = append(dps.DiskErrReplicas[dpId],
proto.DiskErrReplicaInfo{Addr: dataNode.Addr, Disk: disk.DiskPath})
}
}
dataNode.RUnlock()
return true
})

return
}
14 changes: 11 additions & 3 deletions proto/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,14 @@ type BadPartitionView struct {
PartitionIDs []uint64
}

type DiskErrPartitionView struct {
DiskErrReplicas map[uint64][]DiskErrReplicaInfo
}

type DiskErrReplicaInfo struct {
Addr string
Disk string
}
type ClusterStatInfo struct {
DataNodeStatInfo *NodeStatInfo
MetaNodeStatInfo *NodeStatInfo
Expand Down Expand Up @@ -323,9 +331,9 @@ type DataPartitionDiagnosis struct {
RepUsedSizeDifferDpIDs []uint64
ExcessReplicaDpIDs []uint64
// BadDataPartitionIDs []BadPartitionView
BadDataPartitionInfos []BadPartitionRepairView
BadReplicaDataPartitionIDs []uint64
DiskErrorDataPartitionIDs []uint64
BadDataPartitionInfos []BadPartitionRepairView
BadReplicaDataPartitionIDs []uint64
DiskErrorDataPartitionInfos DiskErrPartitionView
}

// meta partition diagnosis represents the inactive meta nodes, corrupt meta partitions, and meta partitions lack of replicas
Expand Down
2 changes: 1 addition & 1 deletion raftstore/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (d *monitor) MonitorZombie(id uint64, peer proto.Peer, replicasMsg string,
d.zombieDurations[zombiePeer] = du
d.zombieDurationMutex.Unlock()
log.LogError(errMsg)
exporter.Warning(errMsg)
// exporter.Warning(errMsg)
}

func (d *monitor) MonitorElection(id uint64, replicaMsg string, du time.Duration) {
Expand Down

0 comments on commit 65ee50a

Please sign in to comment.