Skip to content

Commit

Permalink
Merge pull request cubefs#899 from awzhgw/speedRecovery
Browse files Browse the repository at this point in the history
Enhancement: datanode accelerate recovery rate
  • Loading branch information
awzhgw authored Sep 8, 2020
2 parents 00ebf48 + d22d8dd commit c916d3e
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 26 deletions.
4 changes: 4 additions & 0 deletions datanode/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ const (
IsReleased = 1
)

const (
MinAvaliTinyExtentCnt = 5
)

// Sector size
const (
DiskSectorSize = 512
Expand Down
6 changes: 3 additions & 3 deletions datanode/data_partition_repair.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func (dp *DataPartition) sendAllTinyExtentsToC(extentType uint8, availableTinyEx
func (dp *DataPartition) brokenTinyExtents() (brokenTinyExtents []uint64) {
brokenTinyExtents = make([]uint64, 0)
extentsToBeRepaired := MinTinyExtentsToRepair
if dp.extentStore.AvailableTinyExtentCnt() == 0 {
if dp.extentStore.AvailableTinyExtentCnt() <= MinAvaliTinyExtentCnt {
extentsToBeRepaired = storage.TinyExtentCount
}
for i := 0; i < extentsToBeRepaired; i++ {
Expand Down Expand Up @@ -342,7 +342,7 @@ func (dp *DataPartition) buildExtentCreationTasks(repairTasks []*DataPartitionRe
if extentInfo.IsDeleted {
continue
}
if dp.ExtentStore().IsDeletedNormalExtent(extentID){
if dp.ExtentStore().IsDeletedNormalExtent(extentID) {
continue
}
ei := &storage.ExtentInfo{Source: extentInfo.Source, FileID: extentID, Size: extentInfo.Size}
Expand Down Expand Up @@ -372,7 +372,7 @@ func (dp *DataPartition) buildExtentRepairTasks(repairTasks []*DataPartitionRepa
if extentInfo.IsDeleted {
continue
}
if dp.ExtentStore().IsDeletedNormalExtent(extentID){
if dp.ExtentStore().IsDeletedNormalExtent(extentID) {
continue
}
if extentInfo.Size < maxFileInfo.Size {
Expand Down
4 changes: 2 additions & 2 deletions datanode/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ func fininshDoExtentRepair() {
}

func setDoExtentRepair(value int) {
if value<=0 {
value=MaxExtentRepairLimit
if value <= 0 {
value = MaxExtentRepairLimit
}
close(extentRepairLimiteRater)
if value > MaxExtentRepairLimit {
Expand Down
9 changes: 5 additions & 4 deletions datanode/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ func (dp *DataPartition) computeUsage() {
if time.Now().Unix()-dp.intervalToUpdatePartitionSize < IntervalToUpdatePartitionSize {
return
}
dp.used=int(dp.ExtentStore().GetStoreUsedSize())
dp.used = int(dp.ExtentStore().GetStoreUsedSize())
dp.intervalToUpdatePartitionSize = time.Now().Unix()
}

Expand Down Expand Up @@ -552,7 +552,7 @@ func (dp *DataPartition) LaunchRepair(extentType uint8) {
if dp.partitionStatus == proto.Unavailable {
return
}
if err := dp.updateReplicas(); err != nil {
if err := dp.updateReplicas(false); err != nil {
log.LogErrorf("action[LaunchRepair] partition(%v) err(%v).", dp.partitionID, err)
return
}
Expand All @@ -565,8 +565,8 @@ func (dp *DataPartition) LaunchRepair(extentType uint8) {
dp.repair(extentType)
}

func (dp *DataPartition) updateReplicas() (err error) {
if time.Now().Unix()-dp.intervalToUpdateReplicas <= IntervalToUpdateReplica {
func (dp *DataPartition) updateReplicas(isForce bool) (err error) {
if !isForce && time.Now().Unix()-dp.intervalToUpdateReplicas <= IntervalToUpdateReplica {
return
}
dp.isLeader = false
Expand Down Expand Up @@ -797,6 +797,7 @@ func (dp *DataPartition) ChangeRaftMember(changeType raftProto.ConfChangeType, p
resp, err = dp.raftPartition.ChangeMember(changeType, peer, context)
return
}

//
func (dp *DataPartition) canRemoveSelf() (canRemove bool, err error) {
var partition *proto.DataPartitionInfo
Expand Down
4 changes: 2 additions & 2 deletions datanode/partition_op_by_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,8 @@ func (dp *DataPartition) ApplyRandomWrite(command []byte, raftApplyID uint64) (r
if err == nil {
break
}
if strings.Contains(err.Error(),storage.ExtentNotFoundError.Error()){
err=nil
if strings.Contains(err.Error(), storage.ExtentNotFoundError.Error()) {
err = nil
return
}
log.LogErrorf("[ApplyRandomWrite] ApplyID(%v) Partition(%v)_Extent(%v)_ExtentOffset(%v)_Size(%v) apply err(%v) retry(%v)", raftApplyID, dp.partitionID, opItem.extentID, opItem.offset, opItem.size, err, i)
Expand Down
7 changes: 7 additions & 0 deletions datanode/partition_raftfsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package datanode
import (
"encoding/json"
"fmt"
"github.com/chubaofs/chubaofs/storage"
"net"
"sync/atomic"
"time"
Expand Down Expand Up @@ -53,6 +54,12 @@ func (dp *DataPartition) ApplyMemberChange(confChange *raftproto.ConfChange, ind
return
}
isUpdated, err = dp.addRaftNode(req, index)
if isUpdated && err == nil {
dp.updateReplicas(true)
if dp.isLeader {
dp.ExtentStore().MoveAllToBrokenTinyExtentC(storage.TinyExtentCount)
}
}
case raftproto.ConfRemoveNode:
req := &proto.RemoveDataPartitionRaftMemberRequest{}
if err = json.Unmarshal(confChange.Context, req); err != nil {
Expand Down
30 changes: 15 additions & 15 deletions storage/extent_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,8 @@ func (s *ExtentStore) Read(extentID uint64, offset, size int64, nbuf []byte, isR
}

func (s *ExtentStore) tinyDelete(extentID uint64, offset, size int64) (err error) {
e,err:=s.extentWithHeaderByExtentID(extentID)
if err!=nil {
e, err := s.extentWithHeaderByExtentID(extentID)
if err != nil {
return nil
}
if offset+size > e.dataSize {
Expand Down Expand Up @@ -474,26 +474,26 @@ const (
DiskSectorSize = 512
)

func (s *ExtentStore)GetStoreUsedSize()(used int64){
func (s *ExtentStore) GetStoreUsedSize() (used int64) {
extentInfoSlice := make([]*ExtentInfo, 0, len(s.extentInfoMap))
s.eiMutex.RLock()
for _, extentID := range s.extentInfoMap {
extentInfoSlice = append(extentInfoSlice, extentID)
}
s.eiMutex.RUnlock()
for _,einfo:=range extentInfoSlice{
for _, einfo := range extentInfoSlice {
if einfo.IsDeleted {
continue
}
if IsTinyExtent(einfo.FileID){
if IsTinyExtent(einfo.FileID) {
stat := new(syscall.Stat_t)
err := syscall.Stat(fmt.Sprintf("%v/%v", s.dataPath, einfo.FileID), stat)
if err != nil {
continue
}
used +=(stat.Blocks * DiskSectorSize)
}else {
used +=int64(einfo.Size)
used += (stat.Blocks * DiskSectorSize)
} else {
used += int64(einfo.Size)
}
}
return
Expand Down Expand Up @@ -565,7 +565,7 @@ func (s *ExtentStore) initTinyExtent() (err error) {
if err == nil || strings.Contains(err.Error(), syscall.EEXIST.Error()) || err == ExtentExistsError {
err = nil
s.brokenTinyExtentC <- extentID
s.brokenTinyExtentMap.Store(extentID,true)
s.brokenTinyExtentMap.Store(extentID, true)
continue
}
return err
Expand All @@ -588,18 +588,18 @@ func (s *ExtentStore) GetAvailableTinyExtent() (extentID uint64, err error) {

// SendToAvailableTinyExtentC sends the extent to the channel that stores the available tiny extents.
func (s *ExtentStore) SendToAvailableTinyExtentC(extentID uint64) {
if _,ok:=s.availableTinyExtentMap.Load(extentID);!ok {
if _, ok := s.availableTinyExtentMap.Load(extentID); !ok {
s.availableTinyExtentC <- extentID
s.availableTinyExtentMap.Store(extentID,true)
s.availableTinyExtentMap.Store(extentID, true)
}
}

// SendAllToBrokenTinyExtentC sends all the extents to the channel that stores the broken extents.
func (s *ExtentStore) SendAllToBrokenTinyExtentC(extentIds []uint64) {
for _, extentID := range extentIds {
if _,ok:=s.brokenTinyExtentMap.Load(extentID);!ok {
if _, ok := s.brokenTinyExtentMap.Load(extentID); !ok {
s.brokenTinyExtentC <- extentID
s.brokenTinyExtentMap.Store(extentID,true)
s.brokenTinyExtentMap.Store(extentID, true)
}

}
Expand Down Expand Up @@ -628,9 +628,9 @@ func (s *ExtentStore) MoveAllToBrokenTinyExtentC(cnt int) {

// SendToBrokenTinyExtentC sends the given extent id to the channel.
func (s *ExtentStore) SendToBrokenTinyExtentC(extentID uint64) {
if _,ok:=s.brokenTinyExtentMap.Load(extentID);!ok {
if _, ok := s.brokenTinyExtentMap.Load(extentID); !ok {
s.brokenTinyExtentC <- extentID
s.brokenTinyExtentMap.Store(extentID,true)
s.brokenTinyExtentMap.Store(extentID, true)
}

}
Expand Down

0 comments on commit c916d3e

Please sign in to comment.