Skip to content

Commit

Permalink
fix build repair task panic
Browse files Browse the repository at this point in the history
Signed-off-by: mingwei <[email protected]>
  • Loading branch information
SEANSAN authored and heymingwei committed Apr 2, 2021
1 parent b7affad commit 91458a6
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 15 deletions.
38 changes: 23 additions & 15 deletions datanode/data_partition_repair.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,10 @@ func (dp *DataPartition) repair(extentType uint8) {
}
}

repairTasks := make([]*DataPartitionRepairTask, dp.getReplicaLen())
err := dp.buildDataPartitionRepairTask(repairTasks, extentType, tinyExtents)

//fix dp replica index panic , using replica copy
replica := dp.getReplicaCopy()
repairTasks := make([]*DataPartitionRepairTask, len(replica))
err := dp.buildDataPartitionRepairTask(repairTasks, extentType, tinyExtents, replica)
if err != nil {
log.LogErrorf(errors.Stack(err))
log.LogErrorf("action[repair] partition(%v) err(%v).",
Expand Down Expand Up @@ -131,26 +132,25 @@ func (dp *DataPartition) repair(extentType uint8) {
dp.extentStore.BrokenTinyExtentCnt(), (end-start)/int64(time.Millisecond), MasterClient.Nodes())
}

func (dp *DataPartition) buildDataPartitionRepairTask(repairTasks []*DataPartitionRepairTask, extentType uint8, tinyExtents []uint64) (err error) {
func (dp *DataPartition) buildDataPartitionRepairTask(repairTasks []*DataPartitionRepairTask, extentType uint8, tinyExtents []uint64, replica []string) (err error) {
// get the local extent info
extents, leaderTinyDeleteRecordFileSize, err := dp.getLocalExtentInfo(extentType, tinyExtents)
if err != nil {
return err
}
// new repair task for the leader

repairTasks[0] = NewDataPartitionRepairTask(extents, leaderTinyDeleteRecordFileSize, dp.getReplicaAddr(0), dp.getReplicaAddr(0))
repairTasks[0].addr = dp.getReplicaAddr(0)
repairTasks[0] = NewDataPartitionRepairTask(extents, leaderTinyDeleteRecordFileSize, replica[0], replica[0])
repairTasks[0].addr = replica[0]

// new repair tasks for the followers
for index := 1; index < dp.getReplicaLen(); index++ {
extents, err := dp.getRemoteExtentInfo(extentType, tinyExtents, dp.getReplicaAddr(index))
for index := 1; index < len(replica); index++ {
extents, err := dp.getRemoteExtentInfo(extentType, tinyExtents, replica[index])
if err != nil {
log.LogErrorf("buildDataPartitionRepairTask PartitionID(%v) on (%v) err(%v)", dp.partitionID, dp.getReplicaAddr(index), err)
log.LogErrorf("buildDataPartitionRepairTask PartitionID(%v) on (%v) err(%v)", dp.partitionID, replica[index], err)
continue
}
repairTasks[index] = NewDataPartitionRepairTask(extents, leaderTinyDeleteRecordFileSize, dp.getReplicaAddr(index), dp.getReplicaAddr(0))
repairTasks[index].addr = dp.getReplicaAddr(index)
repairTasks[index] = NewDataPartitionRepairTask(extents, leaderTinyDeleteRecordFileSize, replica[index], replica[0])
repairTasks[index].addr = replica[index]
}

return
Expand Down Expand Up @@ -227,7 +227,7 @@ func (dp *DataPartition) DoRepair(repairTasks []*DataPartitionRepairTask) {
store := dp.extentStore
for _, extentInfo := range repairTasks[0].ExtentsToBeCreated {
if !AutoRepairStatus {
log.LogWarnf("AutoRepairStatus is False,so cannot Create extent(%v)", extentInfo.String())
log.LogWarnf("AutoRepairStatus is False,so cannot Create extent(%v),pid=%d", extentInfo.String(), dp.partitionID)
continue
}
if dp.ExtentStore().IsDeletedNormalExtent(extentInfo.FileID) {
Expand Down Expand Up @@ -398,7 +398,10 @@ func (dp *DataPartition) buildExtentRepairTasks(repairTasks []*DataPartitionRepa
func (dp *DataPartition) notifyFollower(wg *sync.WaitGroup, index int, members []*DataPartitionRepairTask) (err error) {
p := repl.NewPacketToNotifyExtentRepair(dp.partitionID) // notify all the followers to repair
var conn *net.TCPConn
target := dp.getReplicaAddr(index)
//target := dp.getReplicaAddr(index)
//fix repair case panic,may be dp's replicas is change
target := members[index].addr

p.Data, _ = json.Marshal(members[index])
p.Size = uint32(len(p.Data))
conn, err = gConnPool.GetConnect(target)
Expand All @@ -423,9 +426,14 @@ func (dp *DataPartition) notifyFollower(wg *sync.WaitGroup, index int, members [
func (dp *DataPartition) NotifyExtentRepair(members []*DataPartitionRepairTask) (err error) {
wg := new(sync.WaitGroup)
for i := 1; i < len(members); i++ {
if members[i] == nil {
if members[i] == nil || !dp.IsExsitReplica(members[i].addr) {
if members[i] != nil {
log.LogInfof("notify extend repair is change ,index(%v),pid(%v),task_member_add(%v),IsExsitReplica(%v)",
i, dp.partitionID, members[i].addr, dp.IsExsitReplica(members[i].addr))
}
continue
}

wg.Add(1)
go dp.notifyFollower(wg, i, members)
}
Expand Down
11 changes: 11 additions & 0 deletions datanode/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,17 @@ func (dp *DataPartition) Replicas() []string {
return dp.replicas
}

func (dp *DataPartition) getReplicaCopy() []string {
dp.replicasLock.RLock()
defer dp.replicasLock.RUnlock()

var tmpCopy []string
tmpCopy = make([]string, len(dp.replicas))
copy(tmpCopy, dp.replicas)

return tmpCopy
}

func (dp *DataPartition) getReplicaAddr(index int) string {
dp.replicasLock.RLock()
defer dp.replicasLock.RUnlock()
Expand Down

0 comments on commit 91458a6

Please sign in to comment.