Skip to content

Commit

Permalink
enhance: lock protect needed while access metaPartitions info of volu…
Browse files Browse the repository at this point in the history
…me in master

Signed-off-by: leonrayang <[email protected]>
  • Loading branch information
leonrayang committed Mar 16, 2023
1 parent 2f979d6 commit 34cd246
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 1 deletion.
6 changes: 6 additions & 0 deletions master/api_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1814,6 +1814,9 @@ func newSimpleView(vol *Vol) *proto.SimpleVolView {
volInodeCount uint64
volDentryCount uint64
)
vol.mpsLock.RLock()
defer vol.mpsLock.RUnlock()

for _, mp := range vol.MetaPartitions {
volDentryCount = volDentryCount + mp.DentryCount
volInodeCount = volInodeCount + mp.InodeCount
Expand Down Expand Up @@ -3754,6 +3757,9 @@ func volStat(vol *Vol, countByMeta bool) (stat *proto.VolStatInfo) {
}

stat.UsedRatio = strconv.FormatFloat(float64(stat.UsedSize)/float64(stat.TotalSize), 'f', 2, 32)

vol.mpsLock.RLock()
defer vol.mpsLock.RUnlock()
for _, mp := range vol.MetaPartitions {
stat.InodeCount += mp.InodeCount
}
Expand Down
11 changes: 11 additions & 0 deletions master/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,13 +588,16 @@ func (c *Cluster) getNotConsistentIDMetaNodes() (metaNodes []*InvalidNodeView) {
func (c *Cluster) hasNotConsistentIDMetaPartitions(metanode *MetaNode) (notConsistent bool, oldID uint64) {
safeVols := c.allVols()
for _, vol := range safeVols {
vol.mpsLock.RLock()
for _, mp := range vol.MetaPartitions {
for _, peer := range mp.Peers {
if peer.Addr == metanode.Addr && peer.ID != metanode.ID {
vol.mpsLock.RUnlock()
return true, peer.ID
}
}
}
vol.mpsLock.RUnlock()
}
return
}
Expand Down Expand Up @@ -1376,6 +1379,7 @@ func (c *Cluster) getAllMetaPartitionByMetaNode(addr string) (partitions []*Meta
partitions = make([]*MetaPartition, 0)
safeVols := c.allVols()
for _, vol := range safeVols {
vol.mpsLock.RLock()
for _, mp := range vol.MetaPartitions {
for _, host := range mp.Hosts {
if host == addr {
Expand All @@ -1384,6 +1388,7 @@ func (c *Cluster) getAllMetaPartitionByMetaNode(addr string) (partitions []*Meta
}
}
}
vol.mpsLock.RUnlock()
}

return
Expand Down Expand Up @@ -1411,12 +1416,14 @@ func (c *Cluster) getAllMetaPartitionIDByMetaNode(addr string) (partitionIDs []u
safeVols := c.allVols()
for _, vol := range safeVols {
for _, mp := range vol.MetaPartitions {
vol.mpsLock.RLock()
for _, host := range mp.Hosts {
if host == addr {
partitionIDs = append(partitionIDs, mp.PartitionID)
break
}
}
vol.mpsLock.RUnlock()
}
}

Expand All @@ -1428,12 +1435,14 @@ func (c *Cluster) getAllMetaPartitionsByMetaNode(addr string) (partitions []*Met
safeVols := c.allVols()
for _, vol := range safeVols {
for _, mp := range vol.MetaPartitions {
vol.mpsLock.RLock()
for _, host := range mp.Hosts {
if host == addr {
partitions = append(partitions, mp)
break
}
}
vol.mpsLock.RUnlock()
}
}
return
Expand Down Expand Up @@ -2786,7 +2795,9 @@ func (c *Cluster) getDataPartitionCount() (count int) {
func (c *Cluster) getMetaPartitionCount() (count int) {
vols := c.copyVols()
for _, vol := range vols {
vol.mpsLock.RLock()
count = count + len(vol.MetaPartitions)
vol.mpsLock.RUnlock()
}
return count
}
Expand Down
2 changes: 2 additions & 0 deletions master/cluster_stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,11 @@ func (c *Cluster) updateVolStatInfo() {
}

var inodeCount uint64
vol.mpsLock.RLock()
for _, mp := range vol.MetaPartitions {
inodeCount += mp.InodeCount
}
vol.mpsLock.RUnlock()

c.volStatInfo.Store(vol.Name, newVolStatInfo(vol.Name, total, used, cacheTotal, cacheUsed, inodeCount))
}
Expand Down
2 changes: 2 additions & 0 deletions master/cluster_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,11 +300,13 @@ func (c *Cluster) checkLackReplicaMetaPartitions() (lackReplicaMetaPartitions []
lackReplicaMetaPartitions = make([]*MetaPartition, 0)
vols := c.copyVols()
for _, vol := range vols {
vol.mpsLock.RLock()
for _, mp := range vol.MetaPartitions {
if mp.ReplicaNum > uint8(len(mp.Hosts)) {
lackReplicaMetaPartitions = append(lackReplicaMetaPartitions, mp)
}
}
vol.mpsLock.RUnlock()
}
log.LogInfof("clusterID[%v] lackReplicaMetaPartitions count:[%v]", c.Name, len(lackReplicaMetaPartitions))
return
Expand Down
6 changes: 5 additions & 1 deletion master/monitor_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,12 +246,13 @@ func (mm *monitorMetrics) setMpAndDpMetrics() {
dpMissingLeaderCount++
}
}

vol.mpsLock.RLock()
for _, mp := range vol.MetaPartitions {
if !mp.isLeaderExist() {
mpMissingLeaderCount++
}
}
vol.mpsLock.RUnlock()
}

mm.dataPartitionCount.Set(float64(dpCount))
Expand Down Expand Up @@ -356,8 +357,10 @@ func (mm *monitorMetrics) setMpInconsistentErrorMetric() {
defer mm.cluster.volMutex.RUnlock()

for _, vol := range mm.cluster.vols {
vol.mpsLock.RLock()
for _, mp := range vol.MetaPartitions {
if mp.IsRecover || mp.EqualCheckPass {
vol.mpsLock.RUnlock()
continue
}
idStr := strconv.FormatUint(mp.PartitionID, 10)
Expand All @@ -366,6 +369,7 @@ func (mm *monitorMetrics) setMpInconsistentErrorMetric() {
log.LogWarnf("setMpInconsistentErrorMetric.mp %v SetWithLabelValues id %v vol %v", mp.PartitionID, idStr, vol.Name)
delete(deleteMps, idStr)
}
vol.mpsLock.RUnlock()
}

for k, v := range deleteMps {
Expand Down
9 changes: 9 additions & 0 deletions master/vol.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,8 @@ func (vol *Vol) initMetaPartitions(c *Cluster, count int) (err error) {
break
}
}
vol.mpsLock.RLock()
defer vol.mpsLock.RUnlock()
if len(vol.MetaPartitions) != count {
err = fmt.Errorf("action[initMetaPartitions] vol[%v] init meta partition failed,mpCount[%v],expectCount[%v],err[%v]",
vol.Name, len(vol.MetaPartitions), count, err)
Expand Down Expand Up @@ -451,6 +453,10 @@ func (vol *Vol) checkMetaPartitions(c *Cluster) {

func (vol *Vol) checkSplitMetaPartition(c *Cluster) {
maxPartitionID := vol.maxPartitionID()

vol.mpsLock.RLock()
defer vol.mpsLock.RUnlock()

partition, ok := vol.MetaPartitions[maxPartitionID]
if !ok {
return
Expand Down Expand Up @@ -687,6 +693,9 @@ func (vol *Vol) sendViewCacheToFollower(c *Cluster) {
func (vol *Vol) ebsUsedSpace() uint64 {

size := uint64(0)
vol.mpsLock.RLock()
defer vol.mpsLock.RUnlock()

for _, pt := range vol.MetaPartitions {
size += pt.dataSize()
}
Expand Down

0 comments on commit 34cd246

Please sign in to comment.