Skip to content

Commit

Permalink
Merge pull request cubefs#1111 from Victor1319/refactor/vol-monitor
Browse files Browse the repository at this point in the history
enhance: add vol level & io monitor report
  • Loading branch information
shuoranliu authored Feb 26, 2021
2 parents e3e1612 + 5dd4d2b commit 2d5cf17
Show file tree
Hide file tree
Showing 18 changed files with 2,633 additions and 817 deletions.
32 changes: 24 additions & 8 deletions client/fs/dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ func (d *Dir) Create(ctx context.Context, req *fuse.CreateRequest, resp *fuse.Cr

var err error
metric := exporter.NewTPCnt("filecreate")
defer metric.Set(err)
defer func() {
metric.SetWithLabels(err, map[string]string{exporter.Vol: d.super.volname})
}()

info, err := d.super.mw.Create_ll(d.info.Inode, req.Name, proto.Mode(req.Mode.Perm()), req.Uid, req.Gid, nil)
if err != nil {
Expand Down Expand Up @@ -130,7 +132,9 @@ func (d *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, error

var err error
metric := exporter.NewTPCnt("mkdir")
defer metric.Set(err)
defer func() {
metric.SetWithLabels(err, map[string]string{exporter.Vol: d.super.volname})
}()

info, err := d.super.mw.Create_ll(d.info.Inode, req.Name, proto.Mode(os.ModeDir|req.Mode.Perm()), req.Uid, req.Gid, nil)
if err != nil {
Expand Down Expand Up @@ -159,7 +163,9 @@ func (d *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error {

var err error
metric := exporter.NewTPCnt("remove")
defer metric.Set(err)
defer func() {
metric.SetWithLabels(err, map[string]string{exporter.Vol: d.super.volname})
}()

info, err := d.super.mw.Delete_ll(d.info.Inode, req.Name, req.Dir)
if err != nil {
Expand Down Expand Up @@ -234,7 +240,9 @@ func (d *Dir) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) {

var err error
metric := exporter.NewTPCnt("readdir")
defer metric.Set(err)
defer func() {
metric.SetWithLabels(err, map[string]string{exporter.Vol: d.super.volname})
}()

children, err := d.super.mw.ReadDir_ll(d.info.Inode)
if err != nil {
Expand Down Expand Up @@ -284,7 +292,9 @@ func (d *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDir fs.Nod

var err error
metric := exporter.NewTPCnt("rename")
defer metric.Set(err)
defer func() {
metric.SetWithLabels(err, map[string]string{exporter.Vol: d.super.volname})
}()

err = d.super.mw.Rename_ll(d.info.Inode, req.OldName, dstDir.info.Inode, req.NewName)
if err != nil {
Expand Down Expand Up @@ -335,7 +345,9 @@ func (d *Dir) Mknod(ctx context.Context, req *fuse.MknodRequest) (fs.Node, error

var err error
metric := exporter.NewTPCnt("mknod")
defer metric.Set(err)
defer func() {
metric.SetWithLabels(err, map[string]string{exporter.Vol: d.super.volname})
}()

info, err := d.super.mw.Create_ll(d.info.Inode, req.Name, proto.Mode(req.Mode), req.Uid, req.Gid, nil)
if err != nil {
Expand All @@ -362,7 +374,9 @@ func (d *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node, e

var err error
metric := exporter.NewTPCnt("symlink")
defer metric.Set(err)
defer func() {
metric.SetWithLabels(err, map[string]string{exporter.Vol: d.super.volname})
}()

info, err := d.super.mw.Create_ll(parentIno, req.NewName, proto.Mode(os.ModeSymlink|os.ModePerm), req.Uid, req.Gid, []byte(req.Target))
if err != nil {
Expand Down Expand Up @@ -401,7 +415,9 @@ func (d *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (fs.

var err error
metric := exporter.NewTPCnt("link")
defer metric.Set(err)
defer func() {
metric.SetWithLabels(err, map[string]string{exporter.Vol: d.super.volname})
}()

info, err := d.super.mw.Link(d.info.Inode, req.NewName, oldInode.Inode)
if err != nil {
Expand Down
12 changes: 9 additions & 3 deletions client/fs/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,9 @@ func (f *File) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadR
start := time.Now()

metric := exporter.NewTPCnt("fileread")
defer metric.Set(err)
defer func() {
metric.SetWithLabels(err, map[string]string{exporter.Vol: f.super.volname})
}()

size, err := f.super.ec.Read(f.info.Inode, resp.Data[fuse.OutHeaderSize:], int(req.Offset), req.Size)
if err != nil && err != io.EOF {
Expand Down Expand Up @@ -228,7 +230,9 @@ func (f *File) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.Wri
start := time.Now()

metric := exporter.NewTPCnt("filewrite")
defer metric.Set(err)
defer func() {
metric.SetWithLabels(err, map[string]string{exporter.Vol: f.super.volname})
}()

size, err := f.super.ec.Write(ino, int(req.Offset), req.Data, flags)
if err != nil {
Expand Down Expand Up @@ -265,7 +269,9 @@ func (f *File) Flush(ctx context.Context, req *fuse.FlushRequest) (err error) {
start := time.Now()

metric := exporter.NewTPCnt("filesync")
defer metric.Set(err)
defer func() {
metric.SetWithLabels(err, map[string]string{exporter.Vol: f.super.volname})
}()

err = f.super.ec.Flush(f.info.Inode)
if err != nil {
Expand Down
47 changes: 47 additions & 0 deletions datanode/metric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2018 The Chubao Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package datanode

import (
"fmt"

"github.com/chubaofs/chubaofs/util/exporter"
)

const (
MetricPartitionIOName = "dataPartitionIO"
MetricPartitionIOBytesName = "dataPartitionIOBytes"
)

type DataNodeMetrics struct {
MetricIOBytes *exporter.Counter
}

func (d *DataNode) registerMetrics() {
d.metrics = &DataNodeMetrics{}
d.metrics.MetricIOBytes = exporter.NewCounter(MetricPartitionIOBytesName)
}

func GetIoMetricLabels(partition *DataPartition, tp string) map[string]string {
labels := make(map[string]string)
labels[exporter.Vol] = partition.volumeID
labels[exporter.Type] = tp
labels[exporter.Disk] = partition.disk.Path
if exporter.EnablePid {
labels[exporter.PartId] = fmt.Sprintf("%d", partition.partitionID)
}

return labels
}
3 changes: 3 additions & 0 deletions datanode/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ type DataNode struct {
tcpListener net.Listener
stopC chan bool

metrics *DataNodeMetrics

control common.Control
}

Expand Down Expand Up @@ -129,6 +131,7 @@ func doStart(server common.Server, cfg *config.Config) (err error) {
}

exporter.Init(ModuleName, cfg)
s.registerMetrics()
s.register(cfg)

// init limit
Expand Down
37 changes: 34 additions & 3 deletions datanode/wrap_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,24 @@ import (
raftProto "github.com/tiglabs/raft/proto"
)

func (s *DataNode) getPacketTpLabels(p *repl.Packet) map[string]string {
labels := make(map[string]string)
if part, ok := p.Object.(*DataPartition); ok {
labels[exporter.Vol] = part.volumeID
labels[exporter.Op] = p.GetOpMsg()
if exporter.EnablePid {
labels[exporter.PartId] = fmt.Sprintf("%d", part.partitionID)
labels[exporter.Disk] = part.path
}
}

return labels
}

func (s *DataNode) OperatePacket(p *repl.Packet, c *net.TCPConn) (err error) {
sz := p.Size
tpObject := exporter.NewTPCnt(p.GetOpMsg())
tpLabels := s.getPacketTpLabels(p)
start := time.Now().UnixNano()
defer func() {
resultSize := p.Size
Expand All @@ -63,7 +78,7 @@ func (s *DataNode) OperatePacket(p *repl.Packet, c *net.TCPConn) (err error) {
}
}
p.Size = resultSize
tpObject.Set(err)
tpObject.SetWithLabels(err, tpLabels)
}()
switch p.Opcode {
case proto.OpCreateExtent:
Expand Down Expand Up @@ -382,6 +397,7 @@ func (s *DataNode) handleWritePacket(p *repl.Packet) {
}
}()
partition := p.Object.(*DataPartition)
metricPartitionIOLabels := GetIoMetricLabels(partition, "write")
if partition.Available() <= 0 || partition.disk.Status == proto.ReadOnly || partition.IsRejectWrite() {
err = storage.NoSpaceError
return
Expand All @@ -391,13 +407,19 @@ func (s *DataNode) handleWritePacket(p *repl.Packet) {
}
store := partition.ExtentStore()
if p.ExtentType == proto.TinyExtentType {
partitionIOMetric := exporter.NewTPCnt(MetricPartitionIOName)
err = store.Write(p.ExtentID, p.ExtentOffset, int64(p.Size), p.Data, p.CRC, storage.AppendWriteType, p.IsSyncWrite())
s.metrics.MetricIOBytes.AddWithLabels(int64(p.Size), metricPartitionIOLabels)
partitionIOMetric.SetWithLabels(err, metricPartitionIOLabels)
s.incDiskErrCnt(p.PartitionID, err, WriteFlag)
return
}

if p.Size <= util.BlockSize {
partitionIOMetric := exporter.NewTPCnt(MetricPartitionIOName)
err = store.Write(p.ExtentID, p.ExtentOffset, int64(p.Size), p.Data, p.CRC, storage.AppendWriteType, p.IsSyncWrite())
s.metrics.MetricIOBytes.AddWithLabels(int64(p.Size), metricPartitionIOLabels)
partitionIOMetric.SetWithLabels(err, metricPartitionIOLabels)
partition.checkIsDiskError(err)
} else {
size := p.Size
Expand All @@ -409,7 +431,10 @@ func (s *DataNode) handleWritePacket(p *repl.Packet) {
currSize := util.Min(int(size), util.BlockSize)
data := p.Data[offset : offset+currSize]
crc := crc32.ChecksumIEEE(data)
partitionIOMetric := exporter.NewTPCnt(MetricPartitionIOName)
err = store.Write(p.ExtentID, p.ExtentOffset+int64(offset), int64(currSize), data, crc, storage.AppendWriteType, p.IsSyncWrite())
s.metrics.MetricIOBytes.AddWithLabels(int64(p.Size), metricPartitionIOLabels)
partitionIOMetric.SetWithLabels(err, metricPartitionIOLabels)
partition.checkIsDiskError(err)
if err != nil {
break
Expand Down Expand Up @@ -437,7 +462,11 @@ func (s *DataNode) handleRandomWritePacket(p *repl.Packet) {
err = raft.ErrNotLeader
return
}
metricPartitionIOLabels := GetIoMetricLabels(partition, "randwrite")
partitionIOMetric := exporter.NewTPCnt(MetricPartitionIOName)
err = partition.RandomWriteSubmit(p)
s.metrics.MetricIOBytes.AddWithLabels(int64(p.Size), metricPartitionIOLabels)
partitionIOMetric.SetWithLabels(err, metricPartitionIOLabels)
if err != nil && strings.Contains(err.Error(), raft.ErrNotLeader.Error()) {
err = raft.ErrNotLeader
return
Expand All @@ -447,7 +476,6 @@ func (s *DataNode) handleRandomWritePacket(p *repl.Packet) {
err = storage.TryAgainError
return
}

}

func (s *DataNode) handleStreamReadPacket(p *repl.Packet, connect net.Conn, isRepairRead bool) {
Expand Down Expand Up @@ -508,7 +536,7 @@ func (s *DataNode) extentRepairReadPacket(p *repl.Packet, connect net.Conn, isRe
needReplySize := p.Size
offset := p.ExtentOffset
store := partition.ExtentStore()

metricPartitionIOLabels := GetIoMetricLabels(partition, "read")
for {
if needReplySize <= 0 {
break
Expand All @@ -526,7 +554,10 @@ func (s *DataNode) extentRepairReadPacket(p *repl.Packet, connect net.Conn, isRe
reply.ExtentOffset = offset
p.Size = uint32(currReadSize)
p.ExtentOffset = offset
partitionIOMetric := exporter.NewTPCnt(MetricPartitionIOName)
reply.CRC, err = store.Read(reply.ExtentID, offset, int64(currReadSize), reply.Data, isRepairRead)
s.metrics.MetricIOBytes.AddWithLabels(int64(p.Size), metricPartitionIOLabels)
partitionIOMetric.SetWithLabels(err, metricPartitionIOLabels)
partition.checkIsDiskError(err)
tpObject.Set(err)
p.CRC = reply.CRC
Expand Down
Loading

0 comments on commit 2d5cf17

Please sign in to comment.