Skip to content

Commit

Permalink
feat(master): support abort disk decommission
Browse files Browse the repository at this point in the history
Signed-off-by: NaturalSelect <[email protected]>
  • Loading branch information
NaturalSelect authored and bboyCH4 committed Jul 5, 2024
1 parent 0a158c3 commit fdec047
Show file tree
Hide file tree
Showing 12 changed files with 113 additions and 29 deletions.
6 changes: 4 additions & 2 deletions cli/cmd/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,8 @@ func newClusterQueryDecommissionFailedDisk(client *master.MasterClient) *cobra.C
}()

args[0] = strings.ToLower(args[0])
if args[0] != "auto" && args[0] != "manual" {
err = fmt.Errorf("unknown decommission type %v, not \"auto\" or \"manual\"", args[0])
if args[0] != "auto" && args[0] != "manual" && args[0] != "all" {
err = fmt.Errorf("unknown decommission type %v, not \"auto\", \"manual\" and \"and\"", args[0])
return
}

Expand All @@ -377,6 +377,8 @@ func newClusterQueryDecommissionFailedDisk(client *master.MasterClient) *cobra.C
decommType = 0
case "auto":
decommType = 1
case "all":
decommType = 2
}

diskInfo, err := client.AdminAPI().QueryDecommissionFailedDisk(decommType)
Expand Down
1 change: 1 addition & 0 deletions cli/cmd/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
CliOpDecommission = "decommission"
CliOpRecommission = "recommission"
CliOpQueryProgress = "query-progress"
CliOpAbortDecommission = "abort-decommission"
CliOpMigrate = "migrate"
CliOpDownloadZip = "load"
CliOpMetaCompatibility = "meta"
Expand Down
26 changes: 26 additions & 0 deletions cli/cmd/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func newDiskCmd(client *master.MasterClient) *cobra.Command {
newDecommissionDiskCmd(client),
newRecommissionDiskCmd(client),
newQueryDecommissionDiskCmd(client),
newAbortDecommissionDiskCmd(client),
)
return cmd
}
Expand Down Expand Up @@ -208,3 +209,28 @@ func newQueryDecommissionDiskCmd(client *master.MasterClient) *cobra.Command {
}
return cmd
}

const (
cmdAbortDecommissionDiskShort = "Abort disk decommission"
)

func newAbortDecommissionDiskCmd(client *master.MasterClient) *cobra.Command {
cmd := &cobra.Command{
Use: CliOpAbortDecommission + " [DATA NODE ADDR] [DISK]",
Short: cmdAbortDecommissionDiskShort,
Args: cobra.MinimumNArgs(2),
Run: func(cmd *cobra.Command, args []string) {
var err error
defer func() {
errout(err)
}()

err = client.AdminAPI().AbortDiskDecommission(args[0], args[1])
if err != nil {
return
}
stdout("%v\n", "Abort decommission successfully")
},
}
return cmd
}
2 changes: 1 addition & 1 deletion cli/cmd/fmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func formatClusterView(cv *proto.ClusterView, cn *proto.ClusterNodeInfo, cp *pro
sb.WriteString(fmt.Sprintf(" LoadFactor : %v\n", cn.LoadFactor))
sb.WriteString(fmt.Sprintf(" volDeletionDelayTime : %v h\n", cv.VolDeletionDelayTimeHour))
sb.WriteString(fmt.Sprintf(" EnableAutoDecommission: %v\n", cv.EnableAutoDecommission))
sb.WriteString(fmt.Sprintf(" MarkDiskBrokenThreshold : %v%%\n", cv.MarkDiskBrokenThreshold*100))
sb.WriteString(fmt.Sprintf(" MarkDiskBrokenThreshold : %v\n", strutil.FormatPercent(cv.MarkDiskBrokenThreshold)))
return sb.String()
}

Expand Down
58 changes: 49 additions & 9 deletions master/api_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6812,17 +6812,57 @@ func (m *Server) QueryDecommissionFailedDisk(w http.ResponseWriter, r *http.Requ
disks := make([]*proto.DecommissionFailedDiskInfo, 0)
m.cluster.DecommissionDisks.Range(func(key, value interface{}) bool {
d := value.(*DecommissionDisk)
if d.GetDecommissionStatus() == DecommissionFail && d.Type == uint32(decommType) {
disks = append(disks, &proto.DecommissionFailedDiskInfo{
SrcAddr: d.SrcAddr,
DiskPath: d.DiskPath,
DecommissionRaftForce: d.DecommissionRaftForce,
DecommissionRetry: d.DecommissionRetry,
DecommissionDpTotal: d.DecommissionDpTotal,
IsAutoDecommission: d.Type == AutoDecommission,
})
if d.GetDecommissionStatus() == DecommissionFail {
if d.Type == uint32(decommType) || decommType == int(AllDecommission) {
disks = append(disks, &proto.DecommissionFailedDiskInfo{
SrcAddr: d.SrcAddr,
DiskPath: d.DiskPath,
DecommissionRaftForce: d.DecommissionRaftForce,
DecommissionRetry: d.DecommissionRetry,
DecommissionDpTotal: d.DecommissionDpTotal,
IsAutoDecommission: d.Type == AutoDecommission,
})
}
}
return true
})
sendOkReply(w, r, newSuccessHTTPReply(disks))
}

func (m *Server) abortDecommissionDisk(w http.ResponseWriter, r *http.Request) {
var (
err error
addr string
disk string
)

metric := exporter.NewTPCnt(apiToMetricsName(proto.AdminAbortDecommissionDisk))
defer func() {
doStatAndMetric(proto.AdminAbortDecommissionDisk, metric, err, nil)
}()

addr, err = parseAndExtractNodeAddr(r)
if err != nil {
sendErrReply(w, r, &proto.HTTPReply{Code: proto.ErrCodeParamError, Msg: err.Error()})
return
}
disk, err = extractDiskPath(r)
if err != nil {
sendErrReply(w, r, &proto.HTTPReply{Code: proto.ErrCodeParamError, Msg: err.Error()})
return
}

key := fmt.Sprintf("%v_%v", addr, disk)
val, ok := m.cluster.DecommissionDisks.Load(key)
if !ok {
sendErrReply(w, r, &proto.HTTPReply{Code: proto.ErrCodeParamError, Msg: fmt.Sprintf("decommission datanode %v disk %v not found", addr, disk)})
return
}
dd := val.(*DecommissionDisk)
err = dd.Abort(m.cluster)
if err != nil {
sendErrReply(w, r, &proto.HTTPReply{Code: proto.ErrCodePersistenceByRaft, Msg: err.Error()})
return
}
sendOkReply(w, r, newSuccessHTTPReply(fmt.Sprintf("cancel decommission datanode(%v) disk(%v) success", addr, disk)))
}
17 changes: 17 additions & 0 deletions master/disk_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package master
import (
"fmt"
"math"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -241,6 +242,7 @@ type DecommissionDisk struct {
DiskDisable bool
Type uint32
DecommissionCompleteTime int64
UpdateMutex sync.Mutex `json:"-"`
}

func (dd *DecommissionDisk) GenerateKey() string {
Expand Down Expand Up @@ -278,6 +280,9 @@ func (dd *DecommissionDisk) updateDecommissionStatus(c *Cluster, debug bool) (ui
return DecommissionPause, float64(0)
}

dd.UpdateMutex.Lock()
defer dd.UpdateMutex.Unlock()

defer func() {
c.syncUpdateDecommissionDisk(dd)
}()
Expand Down Expand Up @@ -336,6 +341,18 @@ func (dd *DecommissionDisk) updateDecommissionStatus(c *Cluster, debug bool) (ui
return DecommissionRunning, progress
}

func (dd *DecommissionDisk) Abort(c *Cluster) (err error) {
dd.UpdateMutex.Lock()
defer dd.UpdateMutex.Unlock()

err = c.syncDeleteDecommissionDisk(dd)
if err != nil {
return
}
c.DecommissionDisks.Delete(dd.GenerateKey())
return
}

func (dd *DecommissionDisk) GetDecommissionStatus() uint32 {
return atomic.LoadUint32(&dd.DecommissionStatus)
}
Expand Down
3 changes: 3 additions & 0 deletions master/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,9 @@ func (m *Server) registerAPIRoutes(router *mux.Router) {
router.NewRoute().Methods(http.MethodGet).
Path(proto.AdminQueryDecommissionFailedDisk).
HandlerFunc(m.QueryDecommissionFailedDisk)
router.NewRoute().Methods(http.MethodGet, http.MethodPost).
Path(proto.AdminAbortDecommissionDisk).
HandlerFunc(m.abortDecommissionDisk)

// multi version snapshot APIs
router.NewRoute().Methods(http.MethodGet).
Expand Down
1 change: 1 addition & 0 deletions proto/admin_proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ const (
AdminUpdateDecommissionLimit = "/admin/updateDecommissionLimit"
AdminQueryDecommissionLimit = "/admin/queryDecommissionLimit"
AdminQueryDecommissionFailedDisk = "/admin/queryDecommissionFailedDisk"
AdminAbortDecommissionDisk = "/admin/abortDecommissionDisk"
// #nosec G101
AdminQueryDecommissionToken = "/admin/queryDecommissionToken"
AdminSetFileStats = "/admin/setFileStatsEnable"
Expand Down
5 changes: 2 additions & 3 deletions sdk/data/wrapper/data_partition_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,8 @@ type DataPartitionSelector interface {
// Count return number of data partitions held by selector.
Count() int

// GetAllDp return data partitions held by selector
GetAllDp() (dp []*DataPartition)

GetDpCount() (count int)
}

var (
Expand Down Expand Up @@ -135,7 +134,7 @@ func (w *Wrapper) refreshDpSelector(refreshPolicy RefreshDpPolicy, partitions []

log.LogInfof("[refreshDpSelector] refresh dp, partition count(%v)", len(partitions))
if refreshPolicy == UpdateDpPolicy {
minDpCount := w.refreshMinDpCount(dpSelector.GetDpCount())
minDpCount := w.refreshMinDpCount(dpSelector.Count())
// NOTE: if decrease more than 1/3 dp at once
if len(partitions) < minDpCount {
oldDps := dpSelector.GetAllDp()
Expand Down
7 changes: 0 additions & 7 deletions sdk/data/wrapper/default_random_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,3 @@ func (s *DefaultRandomSelector) GetAllDp() (dps []*DataPartition) {
copy(dps, s.partitions)
return
}

func (s *DefaultRandomSelector) GetDpCount() (count int) {
s.RLock()
defer s.RUnlock()
count = len(s.partitions)
return
}
7 changes: 0 additions & 7 deletions sdk/data/wrapper/k_faster_random_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,6 @@ func (s *KFasterRandomSelector) GetAllDp() (dps []*DataPartition) {
return
}

func (s *KFasterRandomSelector) GetDpCount() (count int) {
s.RLock()
defer s.RUnlock()
count = len(s.partitions)
return
}

func (s *KFasterRandomSelector) Count() int {
s.RLock()
defer s.RUnlock()
Expand Down
9 changes: 9 additions & 0 deletions sdk/master/api_admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,3 +748,12 @@ func (api *AdminAPI) QueryDecommissionFailedDisk(decommType int) (diskInfo []*pr
err = api.mc.requestWith(&diskInfo, request)
return
}

func (api *AdminAPI) AbortDiskDecommission(addr string, disk string) (err error) {
request := newRequest(post, proto.AdminAbortDecommissionDisk)
request.addParam("addr", addr)
request.addParam("disk", disk)

err = api.mc.request(request)
return
}

0 comments on commit fdec047

Please sign in to comment.