Skip to content

Commit

Permalink
feat: intruduce metrics degrade level to datanode
Browse files Browse the repository at this point in the history
This commit introduces metrics degrade level config to suppress monitor
metrics collection as needed.

Metrics Degrade Level
- minus value: turn off metrics collection.
- 0 or 1: full metrics.
- 2: 1/2 of the metrics will be collected.
- 3: 1/3 of the metrics will be collected.
- ...

Signed-off-by: Shuoran Liu <[email protected]>
  • Loading branch information
shuoranliu committed Jan 20, 2022
1 parent ee0f623 commit b02c9f2
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 29 deletions.
34 changes: 33 additions & 1 deletion datanode/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"errors"
Expand Down Expand Up @@ -76,6 +77,17 @@ const (
ConfigKeyRaftReplica = "raftReplica" // string
CfgTickInterval = "tickInterval" // int
CfgRaftRecvBufSize = "raftRecvBufSize" // int

/*
* Metrics Degrade Level
* minus value: turn off metrics collection.
* 0 or 1: full metrics.
* 2: 1/2 of the metrics will be collected.
* 3: 1/3 of the metrics will be collected.
* ...
*/
CfgMetricsDegrade = "metricsDegrade" // int

// smux Config
ConfigKeyEnableSmuxClient = "enableSmuxConnPool" //bool
ConfigKeySmuxPortShift = "smuxPortShift" //int
Expand Down Expand Up @@ -113,7 +125,9 @@ type DataNode struct {
getRepairConnFunc func(target string) (net.Conn, error)
putRepairConnFunc func(conn net.Conn, forceClose bool)

metrics *DataNodeMetrics
metrics *DataNodeMetrics
metricsDegrade int64
metricsCnt uint64

control common.Control
}
Expand Down Expand Up @@ -238,6 +252,7 @@ func (s *DataNode) parseConfig(cfg *config.Config) (err error) {
if s.zoneName == "" {
s.zoneName = DefaultZoneName
}
s.metricsDegrade = cfg.GetInt(CfgMetricsDegrade)

log.LogDebugf("action[parseConfig] load masterAddrs(%v).", MasterClient.Nodes())
log.LogDebugf("action[parseConfig] load port(%v).", s.port)
Expand Down Expand Up @@ -399,6 +414,8 @@ func (s *DataNode) registerHandler() {
http.HandleFunc("/getTinyDeleted", s.getTinyDeleted)
http.HandleFunc("/getNormalDeleted", s.getNormalDeleted)
http.HandleFunc("/getSmuxPoolStat", s.getSmuxPoolStat())
http.HandleFunc("/setMetricsDegrade", s.setMetricsDegrade)
http.HandleFunc("/getMetricsDegrade", s.getMetricsDegrade)
}

func (s *DataNode) startTCPService() (err error) {
Expand Down Expand Up @@ -621,6 +638,21 @@ func (s *DataNode) closeSmuxConnPool() {
return
}

func (s *DataNode) shallDegrade() bool {
level := atomic.LoadInt64(&s.metricsDegrade)
if level < 0 {
return true
}
if level == 0 {
return false
}
cnt := atomic.LoadUint64(&s.metricsCnt)
if cnt%uint64(level) == 0 {
return false
}
return true
}

func IsDiskErr(errMsg string) bool {
if strings.Contains(errMsg, syscall.EIO.Error()) || strings.Contains(errMsg, syscall.EROFS.Error()) {
return true
Expand Down
22 changes: 22 additions & 0 deletions datanode/server_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"net/http"
"strconv"
"sync/atomic"

"github.com/chubaofs/chubaofs/proto"
"github.com/chubaofs/chubaofs/storage"
Expand Down Expand Up @@ -337,6 +338,27 @@ func (s *DataNode) getSmuxPoolStat() func(http.ResponseWriter, *http.Request) {
}
}

func (s *DataNode) setMetricsDegrade(w http.ResponseWriter, r *http.Request) {
if err := r.ParseForm(); err != nil {
w.Write([]byte(err.Error()))
return
}

if level := r.FormValue("level"); level != "" {
val, err := strconv.Atoi(level)
if err != nil {
w.Write([]byte("Set metrics degrade level failed\n"))
} else {
atomic.StoreInt64(&s.metricsDegrade, int64(val))
w.Write([]byte(fmt.Sprintf("Set metrics degrade level to %v successfully\n", val)))
}
}
}

func (s *DataNode) getMetricsDegrade(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(fmt.Sprintf("%v\n", atomic.LoadInt64(&s.metricsDegrade))))
}

func (s *DataNode) buildSuccessResp(w http.ResponseWriter, data interface{}) {
s.buildJSONResp(w, http.StatusOK, data, "")
}
Expand Down
100 changes: 75 additions & 25 deletions datanode/wrap_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,17 @@ func (s *DataNode) getPacketTpLabels(p *repl.Packet) map[string]string {
}

func (s *DataNode) OperatePacket(p *repl.Packet, c net.Conn) (err error) {
var (
tpLabels map[string]string
tpObject *exporter.TimePointCount
)

shallDegrade := p.ShallDegrade()
sz := p.Size
tpObject := exporter.NewTPCnt(p.GetOpMsg())
tpLabels := s.getPacketTpLabels(p)
if !shallDegrade {
tpObject = exporter.NewTPCnt(p.GetOpMsg())
tpLabels = s.getPacketTpLabels(p)
}
start := time.Now().UnixNano()
defer func() {
resultSize := p.Size
Expand All @@ -83,7 +91,9 @@ func (s *DataNode) OperatePacket(p *repl.Packet, c net.Conn) (err error) {
}
}
p.Size = resultSize
tpObject.SetWithLabels(err, tpLabels)
if !shallDegrade {
tpObject.SetWithLabels(err, tpLabels)
}
}()
switch p.Opcode {
case proto.OpCreateExtent:
Expand Down Expand Up @@ -397,7 +407,12 @@ func (s *DataNode) handleBatchMarkDeletePacket(p *repl.Packet, c net.Conn) {

// Handle OpWrite packet.
func (s *DataNode) handleWritePacket(p *repl.Packet) {
var err error
var (
err error

metricPartitionIOLabels map[string]string
partitionIOMetric *exporter.TimePointCount
)
defer func() {
if err != nil {
p.PackErrorBody(ActionWrite, err.Error())
Expand All @@ -406,7 +421,10 @@ func (s *DataNode) handleWritePacket(p *repl.Packet) {
}
}()
partition := p.Object.(*DataPartition)
metricPartitionIOLabels := GetIoMetricLabels(partition, "write")
shallDegrade := p.ShallDegrade()
if !shallDegrade {
metricPartitionIOLabels = GetIoMetricLabels(partition, "write")
}
if partition.Available() <= 0 || partition.disk.Status == proto.ReadOnly || partition.IsRejectWrite() {
err = storage.NoSpaceError
return
Expand All @@ -416,19 +434,27 @@ func (s *DataNode) handleWritePacket(p *repl.Packet) {
}
store := partition.ExtentStore()
if p.ExtentType == proto.TinyExtentType {
partitionIOMetric := exporter.NewTPCnt(MetricPartitionIOName)
if !shallDegrade {
partitionIOMetric = exporter.NewTPCnt(MetricPartitionIOName)
}
err = store.Write(p.ExtentID, p.ExtentOffset, int64(p.Size), p.Data, p.CRC, storage.AppendWriteType, p.IsSyncWrite())
s.metrics.MetricIOBytes.AddWithLabels(int64(p.Size), metricPartitionIOLabels)
partitionIOMetric.SetWithLabels(err, metricPartitionIOLabels)
if !shallDegrade {
s.metrics.MetricIOBytes.AddWithLabels(int64(p.Size), metricPartitionIOLabels)
partitionIOMetric.SetWithLabels(err, metricPartitionIOLabels)
}
s.incDiskErrCnt(p.PartitionID, err, WriteFlag)
return
}

if p.Size <= util.BlockSize {
partitionIOMetric := exporter.NewTPCnt(MetricPartitionIOName)
if !shallDegrade {
partitionIOMetric = exporter.NewTPCnt(MetricPartitionIOName)
}
err = store.Write(p.ExtentID, p.ExtentOffset, int64(p.Size), p.Data, p.CRC, storage.AppendWriteType, p.IsSyncWrite())
s.metrics.MetricIOBytes.AddWithLabels(int64(p.Size), metricPartitionIOLabels)
partitionIOMetric.SetWithLabels(err, metricPartitionIOLabels)
if !shallDegrade {
s.metrics.MetricIOBytes.AddWithLabels(int64(p.Size), metricPartitionIOLabels)
partitionIOMetric.SetWithLabels(err, metricPartitionIOLabels)
}
partition.checkIsDiskError(err)
} else {
size := p.Size
Expand All @@ -440,10 +466,14 @@ func (s *DataNode) handleWritePacket(p *repl.Packet) {
currSize := util.Min(int(size), util.BlockSize)
data := p.Data[offset : offset+currSize]
crc := crc32.ChecksumIEEE(data)
partitionIOMetric := exporter.NewTPCnt(MetricPartitionIOName)
if !shallDegrade {
partitionIOMetric = exporter.NewTPCnt(MetricPartitionIOName)
}
err = store.Write(p.ExtentID, p.ExtentOffset+int64(offset), int64(currSize), data, crc, storage.AppendWriteType, p.IsSyncWrite())
s.metrics.MetricIOBytes.AddWithLabels(int64(p.Size), metricPartitionIOLabels)
partitionIOMetric.SetWithLabels(err, metricPartitionIOLabels)
if !shallDegrade {
s.metrics.MetricIOBytes.AddWithLabels(int64(p.Size), metricPartitionIOLabels)
partitionIOMetric.SetWithLabels(err, metricPartitionIOLabels)
}
partition.checkIsDiskError(err)
if err != nil {
break
Expand All @@ -457,7 +487,12 @@ func (s *DataNode) handleWritePacket(p *repl.Packet) {
}

func (s *DataNode) handleRandomWritePacket(p *repl.Packet) {
var err error
var (
err error

metricPartitionIOLabels map[string]string
partitionIOMetric *exporter.TimePointCount
)
defer func() {
if err != nil {
p.PackErrorBody(ActionWrite, err.Error())
Expand All @@ -471,11 +506,16 @@ func (s *DataNode) handleRandomWritePacket(p *repl.Packet) {
err = raft.ErrNotLeader
return
}
metricPartitionIOLabels := GetIoMetricLabels(partition, "randwrite")
partitionIOMetric := exporter.NewTPCnt(MetricPartitionIOName)
shallDegrade := p.ShallDegrade()
if !shallDegrade {
metricPartitionIOLabels = GetIoMetricLabels(partition, "randwrite")
partitionIOMetric = exporter.NewTPCnt(MetricPartitionIOName)
}
err = partition.RandomWriteSubmit(p)
s.metrics.MetricIOBytes.AddWithLabels(int64(p.Size), metricPartitionIOLabels)
partitionIOMetric.SetWithLabels(err, metricPartitionIOLabels)
if !shallDegrade {
s.metrics.MetricIOBytes.AddWithLabels(int64(p.Size), metricPartitionIOLabels)
partitionIOMetric.SetWithLabels(err, metricPartitionIOLabels)
}
if err != nil && strings.Contains(err.Error(), raft.ErrNotLeader.Error()) {
err = raft.ErrNotLeader
return
Expand Down Expand Up @@ -535,6 +575,9 @@ func (s *DataNode) handleTinyExtentRepairReadPacket(p *repl.Packet, connect net.
func (s *DataNode) extentRepairReadPacket(p *repl.Packet, connect net.Conn, isRepairRead bool) {
var (
err error

metricPartitionIOLabels map[string]string
partitionIOMetric, tpObject *exporter.TimePointCount
)
defer func() {
if err != nil {
Expand All @@ -546,7 +589,10 @@ func (s *DataNode) extentRepairReadPacket(p *repl.Packet, connect net.Conn, isRe
needReplySize := p.Size
offset := p.ExtentOffset
store := partition.ExtentStore()
metricPartitionIOLabels := GetIoMetricLabels(partition, "read")
shallDegrade := p.ShallDegrade()
if !shallDegrade {
metricPartitionIOLabels = GetIoMetricLabels(partition, "read")
}
for {
if needReplySize <= 0 {
break
Expand All @@ -560,16 +606,20 @@ func (s *DataNode) extentRepairReadPacket(p *repl.Packet, connect net.Conn, isRe
} else {
reply.Data = make([]byte, currReadSize)
}
tpObject := exporter.NewTPCnt(fmt.Sprintf("Repair_%s", p.GetOpMsg()))
if !shallDegrade {
partitionIOMetric = exporter.NewTPCnt(MetricPartitionIOName)
tpObject = exporter.NewTPCnt(fmt.Sprintf("Repair_%s", p.GetOpMsg()))
}
reply.ExtentOffset = offset
p.Size = uint32(currReadSize)
p.ExtentOffset = offset
partitionIOMetric := exporter.NewTPCnt(MetricPartitionIOName)
reply.CRC, err = store.Read(reply.ExtentID, offset, int64(currReadSize), reply.Data, isRepairRead)
s.metrics.MetricIOBytes.AddWithLabels(int64(p.Size), metricPartitionIOLabels)
partitionIOMetric.SetWithLabels(err, metricPartitionIOLabels)
if !shallDegrade {
s.metrics.MetricIOBytes.AddWithLabels(int64(p.Size), metricPartitionIOLabels)
partitionIOMetric.SetWithLabels(err, metricPartitionIOLabels)
tpObject.Set(err)
}
partition.checkIsDiskError(err)
tpObject.Set(err)
p.CRC = reply.CRC
if err != nil {
return
Expand Down
5 changes: 3 additions & 2 deletions datanode/wrap_post.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
package datanode

import (
"sync/atomic"

"github.com/chubaofs/chubaofs/repl"
"github.com/chubaofs/chubaofs/storage"
"sync/atomic"
)

func (s *DataNode) Post(p *repl.Packet) error {
Expand Down Expand Up @@ -63,7 +64,7 @@ func (s *DataNode) releaseExtent(p *repl.Packet) {
}

func (s *DataNode) addMetrics(p *repl.Packet) {
if p.IsMasterCommand() {
if p.IsMasterCommand() || p.ShallDegrade() {
return
}
p.AfterTp()
Expand Down
9 changes: 8 additions & 1 deletion datanode/wrap_prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/json"
"fmt"
"hash/crc32"
"sync/atomic"

"github.com/chubaofs/chubaofs/proto"
"github.com/chubaofs/chubaofs/repl"
Expand All @@ -34,7 +35,13 @@ func (s *DataNode) Prepare(p *repl.Packet) (err error) {
if p.IsMasterCommand() {
return
}
p.BeforeTp(s.clusterID)
atomic.AddUint64(&s.metricsCnt, 1)
if !s.shallDegrade() {
p.BeforeTp(s.clusterID)
p.UnsetDegrade()
} else {
p.SetDegrade()
}
err = s.checkStoreMode(p)
if err != nil {
return
Expand Down
15 changes: 15 additions & 0 deletions repl/packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type Packet struct {
TpObject *exporter.TimePointCount
NeedReply bool
OrgBuffer []byte

// used locally
shallDegrade bool
}

type FollowerPacket struct {
Expand Down Expand Up @@ -424,3 +427,15 @@ func (p *Packet) IsRandomWrite() bool {
func (p *Packet) IsSyncWrite() bool {
return p.Opcode == proto.OpSyncWrite || p.Opcode == proto.OpSyncRandomWrite
}

func (p *Packet) SetDegrade() {
p.shallDegrade = true
}

func (p *Packet) UnsetDegrade() {
p.shallDegrade = false
}

func (p *Packet) ShallDegrade() bool {
return p.shallDegrade
}

0 comments on commit b02c9f2

Please sign in to comment.