diff --git a/datanode/const.go b/datanode/const.go index 184ba23c21..28814bdcd9 100644 --- a/datanode/const.go +++ b/datanode/const.go @@ -74,6 +74,10 @@ const ( IsReleased = 1 ) +const ( + MinAvaliTinyExtentCnt = 5 +) + // Sector size const ( DiskSectorSize = 512 diff --git a/datanode/data_partition_repair.go b/datanode/data_partition_repair.go index 3a47860052..39309a85d7 100644 --- a/datanode/data_partition_repair.go +++ b/datanode/data_partition_repair.go @@ -276,7 +276,7 @@ func (dp *DataPartition) sendAllTinyExtentsToC(extentType uint8, availableTinyEx func (dp *DataPartition) brokenTinyExtents() (brokenTinyExtents []uint64) { brokenTinyExtents = make([]uint64, 0) extentsToBeRepaired := MinTinyExtentsToRepair - if dp.extentStore.AvailableTinyExtentCnt() == 0 { + if dp.extentStore.AvailableTinyExtentCnt() <= MinAvaliTinyExtentCnt { extentsToBeRepaired = storage.TinyExtentCount } for i := 0; i < extentsToBeRepaired; i++ { @@ -342,7 +342,7 @@ func (dp *DataPartition) buildExtentCreationTasks(repairTasks []*DataPartitionRe if extentInfo.IsDeleted { continue } - if dp.ExtentStore().IsDeletedNormalExtent(extentID){ + if dp.ExtentStore().IsDeletedNormalExtent(extentID) { continue } ei := &storage.ExtentInfo{Source: extentInfo.Source, FileID: extentID, Size: extentInfo.Size} @@ -372,7 +372,7 @@ func (dp *DataPartition) buildExtentRepairTasks(repairTasks []*DataPartitionRepa if extentInfo.IsDeleted { continue } - if dp.ExtentStore().IsDeletedNormalExtent(extentID){ + if dp.ExtentStore().IsDeletedNormalExtent(extentID) { continue } if extentInfo.Size < maxFileInfo.Size { diff --git a/datanode/limit.go b/datanode/limit.go index 8e6cecd0d3..443676615b 100644 --- a/datanode/limit.go +++ b/datanode/limit.go @@ -35,8 +35,8 @@ func fininshDoExtentRepair() { } func setDoExtentRepair(value int) { - if value<=0 { - value=MaxExtentRepairLimit + if value <= 0 { + value = MaxExtentRepairLimit } close(extentRepairLimiteRater) if value > MaxExtentRepairLimit { diff --git a/datanode/partition.go b/datanode/partition.go index 72c83fc329..53f335b4aa 100644 --- a/datanode/partition.go +++ b/datanode/partition.go @@ -515,7 +515,7 @@ func (dp *DataPartition) computeUsage() { if time.Now().Unix()-dp.intervalToUpdatePartitionSize < IntervalToUpdatePartitionSize { return } - dp.used=int(dp.ExtentStore().GetStoreUsedSize()) + dp.used = int(dp.ExtentStore().GetStoreUsedSize()) dp.intervalToUpdatePartitionSize = time.Now().Unix() } @@ -552,7 +552,7 @@ func (dp *DataPartition) LaunchRepair(extentType uint8) { if dp.partitionStatus == proto.Unavailable { return } - if err := dp.updateReplicas(); err != nil { + if err := dp.updateReplicas(false); err != nil { log.LogErrorf("action[LaunchRepair] partition(%v) err(%v).", dp.partitionID, err) return } @@ -565,8 +565,8 @@ func (dp *DataPartition) LaunchRepair(extentType uint8) { dp.repair(extentType) } -func (dp *DataPartition) updateReplicas() (err error) { - if time.Now().Unix()-dp.intervalToUpdateReplicas <= IntervalToUpdateReplica { +func (dp *DataPartition) updateReplicas(isForce bool) (err error) { + if !isForce && time.Now().Unix()-dp.intervalToUpdateReplicas <= IntervalToUpdateReplica { return } dp.isLeader = false @@ -797,6 +797,7 @@ func (dp *DataPartition) ChangeRaftMember(changeType raftProto.ConfChangeType, p resp, err = dp.raftPartition.ChangeMember(changeType, peer, context) return } + // func (dp *DataPartition) canRemoveSelf() (canRemove bool, err error) { var partition *proto.DataPartitionInfo diff --git a/datanode/partition_op_by_raft.go b/datanode/partition_op_by_raft.go index 450a66144a..fc1409ec97 100644 --- a/datanode/partition_op_by_raft.go +++ b/datanode/partition_op_by_raft.go @@ -244,8 +244,8 @@ func (dp *DataPartition) ApplyRandomWrite(command []byte, raftApplyID uint64) (r if err == nil { break } - if strings.Contains(err.Error(),storage.ExtentNotFoundError.Error()){ - err=nil + if strings.Contains(err.Error(), storage.ExtentNotFoundError.Error()) { + err = nil return } log.LogErrorf("[ApplyRandomWrite] ApplyID(%v) Partition(%v)_Extent(%v)_ExtentOffset(%v)_Size(%v) apply err(%v) retry(%v)", raftApplyID, dp.partitionID, opItem.extentID, opItem.offset, opItem.size, err, i) diff --git a/datanode/partition_raftfsm.go b/datanode/partition_raftfsm.go index e30e683b3c..c2d180ed3d 100644 --- a/datanode/partition_raftfsm.go +++ b/datanode/partition_raftfsm.go @@ -17,6 +17,7 @@ package datanode import ( "encoding/json" "fmt" + "github.com/chubaofs/chubaofs/storage" "net" "sync/atomic" "time" @@ -53,6 +54,12 @@ func (dp *DataPartition) ApplyMemberChange(confChange *raftproto.ConfChange, ind return } isUpdated, err = dp.addRaftNode(req, index) + if isUpdated && err == nil { + dp.updateReplicas(true) + if dp.isLeader { + dp.ExtentStore().MoveAllToBrokenTinyExtentC(storage.TinyExtentCount) + } + } case raftproto.ConfRemoveNode: req := &proto.RemoveDataPartitionRaftMemberRequest{} if err = json.Unmarshal(confChange.Context, req); err != nil { diff --git a/storage/extent_store.go b/storage/extent_store.go index 81c128daa6..bd5ed5c086 100644 --- a/storage/extent_store.go +++ b/storage/extent_store.go @@ -359,8 +359,8 @@ func (s *ExtentStore) Read(extentID uint64, offset, size int64, nbuf []byte, isR } func (s *ExtentStore) tinyDelete(extentID uint64, offset, size int64) (err error) { - e,err:=s.extentWithHeaderByExtentID(extentID) - if err!=nil { + e, err := s.extentWithHeaderByExtentID(extentID) + if err != nil { return nil } if offset+size > e.dataSize { @@ -474,26 +474,26 @@ const ( DiskSectorSize = 512 ) -func (s *ExtentStore)GetStoreUsedSize()(used int64){ +func (s *ExtentStore) GetStoreUsedSize() (used int64) { extentInfoSlice := make([]*ExtentInfo, 0, len(s.extentInfoMap)) s.eiMutex.RLock() for _, extentID := range s.extentInfoMap { extentInfoSlice = append(extentInfoSlice, extentID) } s.eiMutex.RUnlock() - for _,einfo:=range extentInfoSlice{ + for _, einfo := range extentInfoSlice { if einfo.IsDeleted { continue } - if IsTinyExtent(einfo.FileID){ + if IsTinyExtent(einfo.FileID) { stat := new(syscall.Stat_t) err := syscall.Stat(fmt.Sprintf("%v/%v", s.dataPath, einfo.FileID), stat) if err != nil { continue } - used +=(stat.Blocks * DiskSectorSize) - }else { - used +=int64(einfo.Size) + used += (stat.Blocks * DiskSectorSize) + } else { + used += int64(einfo.Size) } } return @@ -565,7 +565,7 @@ func (s *ExtentStore) initTinyExtent() (err error) { if err == nil || strings.Contains(err.Error(), syscall.EEXIST.Error()) || err == ExtentExistsError { err = nil s.brokenTinyExtentC <- extentID - s.brokenTinyExtentMap.Store(extentID,true) + s.brokenTinyExtentMap.Store(extentID, true) continue } return err @@ -588,18 +588,18 @@ func (s *ExtentStore) GetAvailableTinyExtent() (extentID uint64, err error) { // SendToAvailableTinyExtentC sends the extent to the channel that stores the available tiny extents. func (s *ExtentStore) SendToAvailableTinyExtentC(extentID uint64) { - if _,ok:=s.availableTinyExtentMap.Load(extentID);!ok { + if _, ok := s.availableTinyExtentMap.Load(extentID); !ok { s.availableTinyExtentC <- extentID - s.availableTinyExtentMap.Store(extentID,true) + s.availableTinyExtentMap.Store(extentID, true) } } // SendAllToBrokenTinyExtentC sends all the extents to the channel that stores the broken extents. func (s *ExtentStore) SendAllToBrokenTinyExtentC(extentIds []uint64) { for _, extentID := range extentIds { - if _,ok:=s.brokenTinyExtentMap.Load(extentID);!ok { + if _, ok := s.brokenTinyExtentMap.Load(extentID); !ok { s.brokenTinyExtentC <- extentID - s.brokenTinyExtentMap.Store(extentID,true) + s.brokenTinyExtentMap.Store(extentID, true) } } @@ -628,9 +628,9 @@ func (s *ExtentStore) MoveAllToBrokenTinyExtentC(cnt int) { // SendToBrokenTinyExtentC sends the given extent id to the channel. func (s *ExtentStore) SendToBrokenTinyExtentC(extentID uint64) { - if _,ok:=s.brokenTinyExtentMap.Load(extentID);!ok { + if _, ok := s.brokenTinyExtentMap.Load(extentID); !ok { s.brokenTinyExtentC <- extentID - s.brokenTinyExtentMap.Store(extentID,true) + s.brokenTinyExtentMap.Store(extentID, true) } }