Skip to content

Commit

Permalink
feat(datanode): Support the functionality of allowing only one extent…
Browse files Browse the repository at this point in the history
… data read at a time per disk

Signed-off-by: chihe <[email protected]>
  • Loading branch information
chihe authored and leonrayang committed Mar 27, 2024
1 parent 74dcb76 commit f66c732
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 30 deletions.
33 changes: 27 additions & 6 deletions datanode/data_partition_repair.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"hash/crc32"
"math"
"net"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -249,8 +250,13 @@ func (dp *DataPartition) DoRepair(repairTasks []*DataPartitionRepairTask) {
log.LogDebugf("action[DoRepair] leader to repair len[%v], {%v}", len(repairTasks[0].ExtentsToBeRepaired), repairTasks[0].ExtentsToBeRepaired)
for _, extentInfo := range repairTasks[0].ExtentsToBeRepaired {
log.LogDebugf("action[DoRepair] leader to repair len[%v], {%v}", len(repairTasks[0].ExtentsToBeRepaired), extentInfo)
RETRY:
err := dp.streamRepairExtent(extentInfo)
if err != nil {
if strings.Contains(err.Error(), storage.NoDiskReadRepairExtentTokenError.Error()) {
log.LogDebugf("action[DoRepair] retry dp(%v) extent(%v).", dp.partitionID, extentInfo.FileID)
goto RETRY
}
err = errors.Trace(err, "doStreamExtentFixRepair %v", dp.applyRepairKey(int(extentInfo.FileID)))
localExtentInfo, opErr := dp.ExtentStore().Watermark(uint64(extentInfo.FileID))
if opErr != nil {
Expand Down Expand Up @@ -464,9 +470,13 @@ func (dp *DataPartition) NotifyExtentRepair(members []*DataPartitionRepairTask)
// DoStreamExtentFixRepair executes the repair on the followers.
func (dp *DataPartition) doStreamExtentFixRepair(wg *sync.WaitGroup, remoteExtentInfo *storage.ExtentInfo) {
defer wg.Done()

RETRY:
err := dp.streamRepairExtent(remoteExtentInfo)
if err != nil {
if strings.Contains(err.Error(), storage.NoDiskReadRepairExtentTokenError.Error()) {
log.LogWarnf("action[DoRepair] retry dp(%v) extent(%v).", dp.partitionID, remoteExtentInfo.FileID)
goto RETRY
}
// only decommission repair need to check err cnt
if dp.isDecommissionRecovering() {
atomic.AddUint64(&dp.recoverErrCnt, 1)
Expand All @@ -492,7 +502,7 @@ func (dp *DataPartition) applyRepairKey(extentID int) (m string) {

// The actual repair of an extent happens here.
func (dp *DataPartition) streamRepairExtent(remoteExtentInfo *storage.ExtentInfo) (err error) {
log.LogDebugf("streamRepairExtent dp %v extent %v remote info %v", dp.partitionID, remoteExtentInfo)
log.LogDebugf("streamRepairExtent dp %v extent %v remote info %v", dp.partitionID, remoteExtentInfo.FileID, remoteExtentInfo)
store := dp.ExtentStore()
if !store.HasExtent(remoteExtentInfo.FileID) {
log.LogDebugf("streamRepairExtent dp %v extent %v not exist", remoteExtentInfo)
Expand Down Expand Up @@ -553,28 +563,39 @@ func (dp *DataPartition) streamRepairExtent(remoteExtentInfo *storage.ExtentInfo
err = errors.Trace(err, "streamRepairExtent dp %v extent %v receive data error,localExtentSize(%v) remoteExtentSize(%v)",
dp.partitionID, remoteExtentInfo.FileID, currFixOffset, dstOffset)
isNetError = true
log.LogWarnf("%v", err.Error())
return
}

if reply.ResultCode != proto.OpOk {
err = errors.Trace(fmt.Errorf("unknow result code"),
"streamRepairExtent dp %v extent %v receive opcode error(%v) ,localExtentSize(%v) remoteExtentSize(%v)",
dp.partitionID, remoteExtentInfo.FileID, string(reply.Data[:intMin(len(reply.Data), int(reply.Size))]), currFixOffset, dstOffset)
return
if reply.ResultCode == proto.OpReadRepairExtentAgain {
log.LogDebugf("streamRepairExtent dp %v extent %v wait for token", dp.partitionID, remoteExtentInfo.FileID)
time.Sleep(time.Second * 1)
isNetError = true
return storage.NoDiskReadRepairExtentTokenError
} else {
err = errors.Trace(fmt.Errorf("unknow result code"),
"streamRepairExtent dp %v extent %v receive opcode error(%v) ,localExtentSize(%v) remoteExtentSize(%v)",
dp.partitionID, remoteExtentInfo.FileID, string(reply.Data[:intMin(len(reply.Data), int(reply.Size))]), currFixOffset, dstOffset)
log.LogWarnf("%v", err.Error())
return
}
}

if reply.ReqID != request.ReqID || reply.PartitionID != request.PartitionID ||
reply.ExtentID != request.ExtentID {
err = errors.Trace(fmt.Errorf("unavali reply"), "streamRepairExtent dp %v extent %v receive invalid "+
"request(%v) reply(%v) ,localExtentSize(%v) remoteExtentSize(%v)", dp.partitionID, remoteExtentInfo.FileID,
request.GetUniqueLogId(), reply.GetUniqueLogId(), currFixOffset, dstOffset)
log.LogWarnf("%v", err.Error())
return
}

if !storage.IsTinyExtent(reply.ExtentID) && (reply.Size == 0 || reply.ExtentOffset != int64(currFixOffset)) {
err = errors.Trace(fmt.Errorf("unavali reply"), "streamRepairExtent dp %v extent %v receive invalid "+
"request(%v) reply(%v) localExtentSize(%v) remoteExtentSize(%v)", dp.partitionID, remoteExtentInfo.FileID,
request.GetUniqueLogId(), reply.GetUniqueLogId(), currFixOffset, dstOffset)
log.LogWarnf("%v", err.Error())
return
}
if loopTimes%100 == 0 {
Expand Down
50 changes: 47 additions & 3 deletions datanode/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,16 @@ type Disk struct {
limitWrite *ioLimiter

// diskPartition info
diskPartition *disk.PartitionStat
DiskErrPartitionSet map[uint64]struct{}
decommission bool
diskPartition *disk.PartitionStat
DiskErrPartitionSet map[uint64]struct{}
decommission bool
extentRepairReadLimit chan struct{}
enableExtentRepairReadLimit bool
}

const (
SyncTinyDeleteRecordFromLeaderOnEveryDisk = 5
MaxExtentRepairReadLimit = 1 //
)

type PartitionVisitor func(dp *DataPartition)
Expand Down Expand Up @@ -138,6 +141,8 @@ func NewDisk(path string, reservedSpace, diskRdonlySpace uint64, maxErrCnt int,
// NOTE: continue execution
err = nil
}
d.extentRepairReadLimit = make(chan struct{}, MaxExtentRepairReadLimit)
d.extentRepairReadLimit <- struct{}{}
return
}

Expand Down Expand Up @@ -746,3 +751,42 @@ func isExpiredPartition(id uint64, partitions []uint64) bool {
}
return true
}

func (d *Disk) RequireReadExtentToken() bool {
d.RLock()
if !d.enableExtentRepairReadLimit {
d.RUnlock()
return true
}
d.RUnlock()
hasToken := false
select {
case <-d.extentRepairReadLimit:
hasToken = true
default:
hasToken = false
}
return hasToken
}

func (d *Disk) ReleaseReadExtentToken() {
d.RLock()
if !d.enableExtentRepairReadLimit {
d.RUnlock()
return
}
d.RUnlock()
d.extentRepairReadLimit <- struct{}{}
}

func (d *Disk) SetExtentRepairReadLimitStatus(status bool) {
d.Lock()
defer d.Unlock()
d.enableExtentRepairReadLimit = status
}

func (d *Disk) QueryExtentRepairReadLimitStatus() bool {
d.RLock()
defer d.RUnlock()
return d.enableExtentRepairReadLimit
}
2 changes: 2 additions & 0 deletions datanode/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,8 @@ func (s *DataNode) registerHandler() {
http.HandleFunc("/setDiskQos", s.setDiskQos)
http.HandleFunc("/getDiskQos", s.getDiskQos)
http.HandleFunc("/reloadDataPartition", s.reloadDataPartition)
http.HandleFunc("/setDiskExtentReadLimitStatus", s.setDiskExtentReadLimitStatus)
http.HandleFunc("/queryDiskExtentReadLimitStatus", s.queryDiskExtentReadLimitStatus)
}

func (s *DataNode) startTCPService() (err error) {
Expand Down
39 changes: 39 additions & 0 deletions datanode/server_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,3 +588,42 @@ func (s *DataNode) reloadDataPartition(w http.ResponseWriter, r *http.Request) {
}

}

func (s *DataNode) setDiskExtentReadLimitStatus(w http.ResponseWriter, r *http.Request) {
const (
paramStatus = "status"
)
if err := r.ParseForm(); err != nil {
err = fmt.Errorf("parse form fail: %v", err)
s.buildFailureResp(w, http.StatusBadRequest, err.Error())
return
}
status, err := strconv.ParseBool(r.FormValue(paramStatus))
if err != nil {
err = fmt.Errorf("parse param %v fail: %v", paramStatus, err)
s.buildFailureResp(w, http.StatusBadRequest, err.Error())
return
}
for _, disk := range s.space.disks {
disk.SetExtentRepairReadLimitStatus(status)
}
s.buildSuccessResp(w, "success")
}

type DiskExtentReadLimitInfo struct {
DiskPath string `json:"diskPath"`
ExtentReadLimitStatus bool `json:"extentReadLimitStatus"`
}

type DiskExtentReadLimitStatusResponse struct {
Infos []DiskExtentReadLimitInfo `json:"infos"`
}

func (s *DataNode) queryDiskExtentReadLimitStatus(w http.ResponseWriter, r *http.Request) {
resp := &DiskExtentReadLimitStatusResponse{}
for _, disk := range s.space.disks {
status := disk.QueryExtentRepairReadLimitStatus()
resp.Infos = append(resp.Infos, DiskExtentReadLimitInfo{DiskPath: disk.Path, ExtentReadLimitStatus: status})
}
s.buildSuccessResp(w, resp)
}
11 changes: 10 additions & 1 deletion datanode/wrap_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -956,7 +956,16 @@ func (s *DataNode) handleExtentRepairReadPacket(p *repl.Packet, connect net.Conn
if err != nil {
return
}

partition := p.Object.(*DataPartition)
if !partition.disk.RequireReadExtentToken() {
err = storage.NoDiskReadRepairExtentTokenError
log.LogDebugf("dp(%v) disk(%v) extent(%v) wait for read extent token",
p.PartitionID, partition.disk.Path, p.ExtentID)
return
}
defer partition.disk.ReleaseReadExtentToken()
log.LogDebugf("dp(%v) disk(%v) extent(%v) get read extent token",
p.PartitionID, partition.disk.Path, p.ExtentID)
s.extentRepairReadPacket(p, connect, isRepairRead)
}

Expand Down
8 changes: 4 additions & 4 deletions proto/packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,11 @@ const (
OpTxCommitErr uint8 = 0xEC
OpTxRollbackErr uint8 = 0xED
OpTxUnknownOp uint8 = 0xEE

// multiVersion to dp/mp
OpVersionOperation uint8 = 0xD5
OpSplitMarkDelete uint8 = 0xD6
OpTryOtherExtent uint8 = 0xD7
OpVersionOperation uint8 = 0xD5
OpSplitMarkDelete uint8 = 0xD6
OpTryOtherExtent uint8 = 0xD7
OpReadRepairExtentAgain uint8 = 0xEF
)

const (
Expand Down
2 changes: 2 additions & 0 deletions repl/packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,8 @@ func (p *Packet) identificationErrorResultCode(errLog string, errMsg string) {
} else if strings.Contains(errMsg, storage.VerNotConsistentError.Error()) {
p.ResultCode = proto.ErrCodeVersionOpError
// log.LogDebugf("action[identificationErrorResultCode] not change ver erro code, (%v)", string(debug.Stack()))
} else if strings.Contains(errMsg, storage.NoDiskReadRepairExtentTokenError.Error()) {
p.ResultCode = proto.OpReadRepairExtentAgain
} else {
log.LogErrorf("action[identificationErrorResultCode] error %v, errmsg %v", errLog, errMsg)
p.ResultCode = proto.OpIntraGroupNetErr
Expand Down
33 changes: 17 additions & 16 deletions storage/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,23 @@ import (
)

var (
ExtentHasBeenDeletedError = errors.New("extent has been deleted")
ParameterMismatchError = errors.New("parameter mismatch error")
NoAvailableExtentError = errors.New("no available extent")
NoBrokenExtentError = errors.New("no unavailable extent")
NoSpaceError = errors.New("no space left on the device")
TryAgainError = errors.New("try again")
CrcMismatchError = errors.New("packet Crc is incorrect")
NoLeaderError = errors.New("no raft leader")
ExtentNotFoundError = errors.New("extent does not exist")
ExtentExistsError = errors.New("extent already exists")
ExtentIsFullError = errors.New("extent is full")
BrokenExtentError = errors.New("extent has been broken")
BrokenDiskError = errors.New("disk has broken")
ForbidWriteError = errors.New("single replica decommission forbid write")
VerNotConsistentError = errors.New("ver not consistent")
SnapshotNeedNewExtentError = errors.New("snapshot need new extent error")
ExtentHasBeenDeletedError = errors.New("extent has been deleted")
ParameterMismatchError = errors.New("parameter mismatch error")
NoAvailableExtentError = errors.New("no available extent")
NoBrokenExtentError = errors.New("no unavailable extent")
NoSpaceError = errors.New("no space left on the device")
TryAgainError = errors.New("try again")
CrcMismatchError = errors.New("packet Crc is incorrect")
NoLeaderError = errors.New("no raft leader")
ExtentNotFoundError = errors.New("extent does not exist")
ExtentExistsError = errors.New("extent already exists")
ExtentIsFullError = errors.New("extent is full")
BrokenExtentError = errors.New("extent has been broken")
BrokenDiskError = errors.New("disk has broken")
ForbidWriteError = errors.New("single replica decommission forbid write")
VerNotConsistentError = errors.New("ver not consistent")
SnapshotNeedNewExtentError = errors.New("snapshot need new extent error")
NoDiskReadRepairExtentTokenError = errors.New("no disk read repair extent token")
)

func newParameterError(format string, a ...interface{}) error {
Expand Down

0 comments on commit f66c732

Please sign in to comment.