Skip to content

Commit

Permalink
Merge pull request cubefs#1332 from shuoranliu/feat-support-exporter-…
Browse files Browse the repository at this point in the history
…degrade

Feat support exporter degrade
  • Loading branch information
shuoranliu authored Jan 20, 2022
2 parents ac737d0 + b02c9f2 commit 03e4ec6
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 69 deletions.
54 changes: 33 additions & 21 deletions datanode/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"errors"
Expand Down Expand Up @@ -76,7 +77,16 @@ const (
ConfigKeyRaftReplica = "raftReplica" // string
CfgTickInterval = "tickInterval" // int
CfgRaftRecvBufSize = "raftRecvBufSize" // int
ConfigExportRatio = "exportRatio" // string

/*
* Metrics Degrade Level
* minus value: turn off metrics collection.
* 0 or 1: full metrics.
* 2: 1/2 of the metrics will be collected.
* 3: 1/3 of the metrics will be collected.
* ...
*/
CfgMetricsDegrade = "metricsDegrade" // int

// smux Config
ConfigKeyEnableSmuxClient = "enableSmuxConnPool" //bool
Expand Down Expand Up @@ -115,16 +125,15 @@ type DataNode struct {
getRepairConnFunc func(target string) (net.Conn, error)
putRepairConnFunc func(conn net.Conn, forceClose bool)

metrics *DataNodeMetrics
metricCnt uint32
metricSampleFactor uint32
metricOn bool
metrics *DataNodeMetrics
metricsDegrade int64
metricsCnt uint64

control common.Control
}

func NewServer() *DataNode {
return &DataNode{metricSampleFactor: 1}
return &DataNode{}
}

func (s *DataNode) Start(cfg *config.Config) (err error) {
Expand Down Expand Up @@ -222,24 +231,9 @@ func (s *DataNode) parseConfig(cfg *config.Config) (err error) {
var (
port string
regexpPort *regexp.Regexp
ratio float64
)
LocalIP = cfg.GetString(ConfigKeyLocalIP)
port = cfg.GetString(proto.ListenPort)

if cfg.GetString(ConfigExportRatio) != "" {
ratio, _ = strconv.ParseFloat(cfg.GetString(ConfigExportRatio), 64)
if ratio > 0 {
if ratio < 0.01 {
ratio = 0.01
}
if ratio > 1 {
ratio = 1
}
s.metricSampleFactor = 100 / uint32(100*ratio)
}
}

serverPort = port
if regexpPort, err = regexp.Compile("^(\\d)+$"); err != nil {
return fmt.Errorf("Err:no port")
Expand All @@ -258,6 +252,7 @@ func (s *DataNode) parseConfig(cfg *config.Config) (err error) {
if s.zoneName == "" {
s.zoneName = DefaultZoneName
}
s.metricsDegrade = cfg.GetInt(CfgMetricsDegrade)

log.LogDebugf("action[parseConfig] load masterAddrs(%v).", MasterClient.Nodes())
log.LogDebugf("action[parseConfig] load port(%v).", s.port)
Expand Down Expand Up @@ -419,6 +414,8 @@ func (s *DataNode) registerHandler() {
http.HandleFunc("/getTinyDeleted", s.getTinyDeleted)
http.HandleFunc("/getNormalDeleted", s.getNormalDeleted)
http.HandleFunc("/getSmuxPoolStat", s.getSmuxPoolStat())
http.HandleFunc("/setMetricsDegrade", s.setMetricsDegrade)
http.HandleFunc("/getMetricsDegrade", s.getMetricsDegrade)
}

func (s *DataNode) startTCPService() (err error) {
Expand Down Expand Up @@ -641,6 +638,21 @@ func (s *DataNode) closeSmuxConnPool() {
return
}

func (s *DataNode) shallDegrade() bool {
level := atomic.LoadInt64(&s.metricsDegrade)
if level < 0 {
return true
}
if level == 0 {
return false
}
cnt := atomic.LoadUint64(&s.metricsCnt)
if cnt%uint64(level) == 0 {
return false
}
return true
}

func IsDiskErr(errMsg string) bool {
if strings.Contains(errMsg, syscall.EIO.Error()) || strings.Contains(errMsg, syscall.EROFS.Error()) {
return true
Expand Down
22 changes: 22 additions & 0 deletions datanode/server_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"net/http"
"strconv"
"sync/atomic"

"github.com/chubaofs/chubaofs/proto"
"github.com/chubaofs/chubaofs/storage"
Expand Down Expand Up @@ -337,6 +338,27 @@ func (s *DataNode) getSmuxPoolStat() func(http.ResponseWriter, *http.Request) {
}
}

func (s *DataNode) setMetricsDegrade(w http.ResponseWriter, r *http.Request) {
if err := r.ParseForm(); err != nil {
w.Write([]byte(err.Error()))
return
}

if level := r.FormValue("level"); level != "" {
val, err := strconv.Atoi(level)
if err != nil {
w.Write([]byte("Set metrics degrade level failed\n"))
} else {
atomic.StoreInt64(&s.metricsDegrade, int64(val))
w.Write([]byte(fmt.Sprintf("Set metrics degrade level to %v successfully\n", val)))
}
}
}

func (s *DataNode) getMetricsDegrade(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(fmt.Sprintf("%v\n", atomic.LoadInt64(&s.metricsDegrade))))
}

func (s *DataNode) buildSuccessResp(w http.ResponseWriter, data interface{}) {
s.buildJSONResp(w, http.StatusOK, data, "")
}
Expand Down
100 changes: 64 additions & 36 deletions datanode/wrap_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,17 @@ func (s *DataNode) getPacketTpLabels(p *repl.Packet) map[string]string {
}

func (s *DataNode) OperatePacket(p *repl.Packet, c net.Conn) (err error) {
var tpObject *exporter.TimePointCount
var (
tpLabels map[string]string
tpObject *exporter.TimePointCount
)

shallDegrade := p.ShallDegrade()
sz := p.Size
if s.metricOn {
if !shallDegrade {
tpObject = exporter.NewTPCnt(p.GetOpMsg())
tpLabels = s.getPacketTpLabels(p)
}

tpLabels := s.getPacketTpLabels(p)
start := time.Now().UnixNano()
defer func() {
resultSize := p.Size
Expand All @@ -87,7 +91,7 @@ func (s *DataNode) OperatePacket(p *repl.Packet, c net.Conn) (err error) {
}
}
p.Size = resultSize
if tpObject != nil {
if !shallDegrade {
tpObject.SetWithLabels(err, tpLabels)
}
}()
Expand Down Expand Up @@ -403,7 +407,12 @@ func (s *DataNode) handleBatchMarkDeletePacket(p *repl.Packet, c net.Conn) {

// Handle OpWrite packet.
func (s *DataNode) handleWritePacket(p *repl.Packet) {
var err error
var (
err error

metricPartitionIOLabels map[string]string
partitionIOMetric *exporter.TimePointCount
)
defer func() {
if err != nil {
p.PackErrorBody(ActionWrite, err.Error())
Expand All @@ -412,38 +421,40 @@ func (s *DataNode) handleWritePacket(p *repl.Packet) {
}
}()
partition := p.Object.(*DataPartition)
metricPartitionIOLabels := GetIoMetricLabels(partition, "write")
shallDegrade := p.ShallDegrade()
if !shallDegrade {
metricPartitionIOLabels = GetIoMetricLabels(partition, "write")
}
if partition.Available() <= 0 || partition.disk.Status == proto.ReadOnly || partition.IsRejectWrite() {
err = storage.NoSpaceError
return
} else if partition.disk.Status == proto.Unavailable {
err = storage.BrokenDiskError
return
}

var partitionIOMetric *exporter.TimePointCount

writeWithMetics := func(store *storage.ExtentStore, data []byte, offset, size int64, crc uint32) (err error) {
if s.metricOn {
store := partition.ExtentStore()
if p.ExtentType == proto.TinyExtentType {
if !shallDegrade {
partitionIOMetric = exporter.NewTPCnt(MetricPartitionIOName)
}
err = store.Write(p.ExtentID, p.ExtentOffset, int64(p.Size), p.Data, crc, storage.AppendWriteType, p.IsSyncWrite())
if partitionIOMetric != nil {
err = store.Write(p.ExtentID, p.ExtentOffset, int64(p.Size), p.Data, p.CRC, storage.AppendWriteType, p.IsSyncWrite())
if !shallDegrade {
s.metrics.MetricIOBytes.AddWithLabels(int64(p.Size), metricPartitionIOLabels)
partitionIOMetric.SetWithLabels(err, metricPartitionIOLabels)
}
return
}

store := partition.ExtentStore()
if p.ExtentType == proto.TinyExtentType {
err = writeWithMetics(store, p.Data, p.ExtentOffset, int64(p.Size), p.CRC)
s.incDiskErrCnt(p.PartitionID, err, WriteFlag)
return
}

if p.Size <= util.BlockSize {
err = writeWithMetics(store, p.Data, p.ExtentOffset, int64(p.Size), p.CRC)
if !shallDegrade {
partitionIOMetric = exporter.NewTPCnt(MetricPartitionIOName)
}
err = store.Write(p.ExtentID, p.ExtentOffset, int64(p.Size), p.Data, p.CRC, storage.AppendWriteType, p.IsSyncWrite())
if !shallDegrade {
s.metrics.MetricIOBytes.AddWithLabels(int64(p.Size), metricPartitionIOLabels)
partitionIOMetric.SetWithLabels(err, metricPartitionIOLabels)
}
partition.checkIsDiskError(err)
} else {
size := p.Size
Expand All @@ -454,7 +465,15 @@ func (s *DataNode) handleWritePacket(p *repl.Packet) {
}
currSize := util.Min(int(size), util.BlockSize)
data := p.Data[offset : offset+currSize]
err = writeWithMetics(store, data, p.ExtentOffset+int64(offset), int64(currSize), crc32.ChecksumIEEE(data))
crc := crc32.ChecksumIEEE(data)
if !shallDegrade {
partitionIOMetric = exporter.NewTPCnt(MetricPartitionIOName)
}
err = store.Write(p.ExtentID, p.ExtentOffset+int64(offset), int64(currSize), data, crc, storage.AppendWriteType, p.IsSyncWrite())
if !shallDegrade {
s.metrics.MetricIOBytes.AddWithLabels(int64(p.Size), metricPartitionIOLabels)
partitionIOMetric.SetWithLabels(err, metricPartitionIOLabels)
}
partition.checkIsDiskError(err)
if err != nil {
break
Expand All @@ -468,7 +487,12 @@ func (s *DataNode) handleWritePacket(p *repl.Packet) {
}

func (s *DataNode) handleRandomWritePacket(p *repl.Packet) {
var err error
var (
err error

metricPartitionIOLabels map[string]string
partitionIOMetric *exporter.TimePointCount
)
defer func() {
if err != nil {
p.PackErrorBody(ActionWrite, err.Error())
Expand All @@ -482,17 +506,16 @@ func (s *DataNode) handleRandomWritePacket(p *repl.Packet) {
err = raft.ErrNotLeader
return
}
var partitionIOMetric *exporter.TimePointCount
metricPartitionIOLabels := GetIoMetricLabels(partition, "randwrite")
if s.metricOn {
shallDegrade := p.ShallDegrade()
if !shallDegrade {
metricPartitionIOLabels = GetIoMetricLabels(partition, "randwrite")
partitionIOMetric = exporter.NewTPCnt(MetricPartitionIOName)
}
err = partition.RandomWriteSubmit(p)
if partitionIOMetric != nil {
if !shallDegrade {
s.metrics.MetricIOBytes.AddWithLabels(int64(p.Size), metricPartitionIOLabels)
partitionIOMetric.SetWithLabels(err, metricPartitionIOLabels)
}

if err != nil && strings.Contains(err.Error(), raft.ErrNotLeader.Error()) {
err = raft.ErrNotLeader
return
Expand Down Expand Up @@ -552,6 +575,9 @@ func (s *DataNode) handleTinyExtentRepairReadPacket(p *repl.Packet, connect net.
func (s *DataNode) extentRepairReadPacket(p *repl.Packet, connect net.Conn, isRepairRead bool) {
var (
err error

metricPartitionIOLabels map[string]string
partitionIOMetric, tpObject *exporter.TimePointCount
)
defer func() {
if err != nil {
Expand All @@ -563,7 +589,10 @@ func (s *DataNode) extentRepairReadPacket(p *repl.Packet, connect net.Conn, isRe
needReplySize := p.Size
offset := p.ExtentOffset
store := partition.ExtentStore()
metricPartitionIOLabels := GetIoMetricLabels(partition, "read")
shallDegrade := p.ShallDegrade()
if !shallDegrade {
metricPartitionIOLabels = GetIoMetricLabels(partition, "read")
}
for {
if needReplySize <= 0 {
break
Expand All @@ -577,21 +606,20 @@ func (s *DataNode) extentRepairReadPacket(p *repl.Packet, connect net.Conn, isRe
} else {
reply.Data = make([]byte, currReadSize)
}
tpObject := exporter.NewTPCnt(fmt.Sprintf("Repair_%s", p.GetOpMsg()))
if !shallDegrade {
partitionIOMetric = exporter.NewTPCnt(MetricPartitionIOName)
tpObject = exporter.NewTPCnt(fmt.Sprintf("Repair_%s", p.GetOpMsg()))
}
reply.ExtentOffset = offset
p.Size = uint32(currReadSize)
p.ExtentOffset = offset
var partitionIOMetric *exporter.TimePointCount
if s.metricOn {
partitionIOMetric = exporter.NewTPCnt(MetricPartitionIOName)
}
reply.CRC, err = store.Read(reply.ExtentID, offset, int64(currReadSize), reply.Data, isRepairRead)
if partitionIOMetric != nil {
if !shallDegrade {
s.metrics.MetricIOBytes.AddWithLabels(int64(p.Size), metricPartitionIOLabels)
partitionIOMetric.SetWithLabels(err, metricPartitionIOLabels)
tpObject.Set(err)
}
partition.checkIsDiskError(err)
tpObject.Set(err)
p.CRC = reply.CRC
if err != nil {
return
Expand Down
11 changes: 4 additions & 7 deletions datanode/wrap_post.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
package datanode

import (
"sync/atomic"

"github.com/chubaofs/chubaofs/repl"
"github.com/chubaofs/chubaofs/storage"
"sync/atomic"
)

func (s *DataNode) Post(p *repl.Packet) error {
Expand All @@ -28,11 +29,7 @@ func (s *DataNode) Post(p *repl.Packet) error {
p.NeedReply = false
}
s.cleanupPkt(p)

// todo:add control from master,percent be set to
if s.metricOn {
s.addMetrics(p)
}
s.addMetrics(p)
return nil
}

Expand Down Expand Up @@ -67,7 +64,7 @@ func (s *DataNode) releaseExtent(p *repl.Packet) {
}

func (s *DataNode) addMetrics(p *repl.Packet) {
if p.IsMasterCommand() {
if p.IsMasterCommand() || p.ShallDegrade() {
return
}
p.AfterTp()
Expand Down
Loading

0 comments on commit 03e4ec6

Please sign in to comment.