Skip to content

Commit

Permalink
master:
Browse files Browse the repository at this point in the history
  1.add ump warn packet
  2.partition has recover,but the status don't change
  3.remove redundancy calling loadMetaData method
  4.reload meta data when change leader

metanode:
   1. add totalMem in configFile
   2.change not raft leader to tryOtherAddr error

 datanode:
   1.when datapartition not exsit,tell client with proto.OptryAgain resultcode
   2.change disk retrain default space to 20gb
   3.heartbeat report add volname

 raft:
   1.use raft.ErrNotLeader replace ErrNotLeader
   2.raft became leader must apply from oldapplyid to commitid

 client:
   1.  create a dummy node instance if inode does not exist
   2. add ump alarm for read/write/fsync errors

log:
   1.   auto create subdir to logdir

 error:
   1.delete juju error packet
  • Loading branch information
awzhgw committed Apr 30, 2019
1 parent 88d2a1c commit 36d97f1
Show file tree
Hide file tree
Showing 89 changed files with 998 additions and 2,091 deletions.
4 changes: 3 additions & 1 deletion client/fs/dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,9 @@ func (d *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.Lo
inode, err := d.super.InodeGet(ino)
if err != nil {
log.LogErrorf("Lookup: parent(%v) name(%v) ino(%v) err(%v)", d.inode.ino, req.Name, ino, err)
return nil, ParseError(err)
dummyInode := &Inode{ino: ino}
dummyChild := NewFile(d.super, dummyInode)
return dummyChild, nil
}
mode := inode.mode

Expand Down
26 changes: 21 additions & 5 deletions client/fs/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package fs

import (
"fmt"
"io"
"syscall"
"time"
Expand Down Expand Up @@ -65,6 +66,10 @@ func (f *File) Attr(ctx context.Context, a *fuse.Attr) error {
inode, err := f.super.InodeGet(ino)
if err != nil {
log.LogErrorf("Attr: ino(%v) err(%v)", ino, err)
if err == fuse.ENOENT {
a.Inode = ino
return nil
}
return ParseError(err)
}

Expand Down Expand Up @@ -140,16 +145,22 @@ func (f *File) Release(ctx context.Context, req *fuse.ReleaseRequest) (err error
// Read handles the read request.
func (f *File) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) (err error) {
log.LogDebugf("TRACE Read enter: ino(%v) offset(%v) reqsize(%v) req(%v)", f.inode.ino, req.Offset, req.Size, req)

start := time.Now()

size, err := f.super.ec.Read(f.inode.ino, resp.Data[fuse.OutHeaderSize:], int(req.Offset), req.Size)
if err != nil && err != io.EOF {
log.LogErrorf("Read: ino(%v) req(%v) err(%v) size(%v)", f.inode.ino, req, err, size)
msg := fmt.Sprintf("Read: ino(%v) req(%v) err(%v) size(%v)", f.inode.ino, req, err, size)
f.super.handleError("Read", msg)
return fuse.EIO
}

if size > req.Size {
log.LogErrorf("Read: ino(%v) req(%v) size(%v)", f.inode.ino, req, size)
msg := fmt.Sprintf("Read: read size larger than request size, ino(%v) req(%v) size(%v)", f.inode.ino, req, size)
f.super.handleError("Read", msg)
return fuse.ERANGE
}

if size > 0 {
resp.Data = resp.Data[:size+fuse.OutHeaderSize]
} else if size <= 0 {
Expand Down Expand Up @@ -192,19 +203,23 @@ func (f *File) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.Wri
}

start := time.Now()

size, err := f.super.ec.Write(ino, int(req.Offset), req.Data, enSyncWrite)
if err != nil {
log.LogErrorf("Write: ino(%v) offset(%v) len(%v) err(%v)", ino, req.Offset, reqlen, err)
msg := fmt.Sprintf("Write: ino(%v) offset(%v) len(%v) err(%v)", ino, req.Offset, reqlen, err)
f.super.handleError("Write", msg)
return fuse.EIO
}

resp.Size = size
if size != reqlen {
log.LogErrorf("Write: ino(%v) offset(%v) len(%v) size(%v)", ino, req.Offset, reqlen, size)
}

if waitForFlush {
if err = f.super.ec.Flush(ino); err != nil {
log.LogErrorf("Write: failed to wait for flush, ino(%v) offset(%v) len(%v) err(%v) req(%v)", ino, req.Offset, reqlen, err, req)
msg := fmt.Sprintf("Write: failed to wait for flush, ino(%v) offset(%v) len(%v) err(%v) req(%v)", ino, req.Offset, reqlen, err, req)
f.super.handleError("Wrtie", msg)
return fuse.EIO
}
}
Expand All @@ -226,7 +241,8 @@ func (f *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) (err error) {
start := time.Now()
err = f.super.ec.Flush(f.inode.ino)
if err != nil {
log.LogErrorf("Fsync: ino(%v) err(%v)", f.inode.ino, err)
msg := fmt.Sprintf("Fsync: ino(%v) err(%v)", f.inode.ino, err)
f.super.handleError("Fsync", msg)
return fuse.EIO
}
f.super.ic.Delete(f.inode.ino)
Expand Down
24 changes: 17 additions & 7 deletions client/fs/super.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package fs

import (
"fmt"
"github.com/juju/errors"
"github.com/chubaofs/chubaofs/util/errors"
"golang.org/x/net/context"
"sync"
"time"
Expand All @@ -26,6 +26,7 @@ import (
"github.com/chubaofs/chubaofs/sdk/data/stream"
"github.com/chubaofs/chubaofs/sdk/meta"
"github.com/chubaofs/chubaofs/util/log"
"github.com/chubaofs/chubaofs/util/ump"
)

// Super defines the struct of a super block.
Expand Down Expand Up @@ -54,12 +55,12 @@ func NewSuper(volname, owner, master string, icacheTimeout, lookupValid, attrVal
s = new(Super)
s.mw, err = meta.NewMetaWrapper(volname, owner, master)
if err != nil {
return nil, errors.Annotate(err, "NewMetaWrapper failed!")
return nil, errors.Trace(err, "NewMetaWrapper failed!")
}

s.ec, err = stream.NewExtentClient(volname, master, s.mw.AppendExtentKey, s.mw.GetExtents, s.mw.Truncate)
if err != nil {
return nil, errors.Annotate(err, "NewExtentClient failed!")
return nil, errors.Trace(err, "NewExtentClient failed!")
}

s.volname = volname
Expand Down Expand Up @@ -107,11 +108,20 @@ func (s *Super) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.
return nil
}

func (s *Super) exporterKey(act string) string {
return fmt.Sprintf("%s_fuseclient_%s", s.cluster, act)
}

// ClusterName returns the cluster name.
func (s *Super) ClusterName() string {
return s.cluster
}

func (s *Super) exporterKey(act string) string {
return fmt.Sprintf("%v_fuseclient_%v", s.cluster, act)
}

func (s *Super) umpKey(act string) string {
return fmt.Sprintf("%v_fuseclient_%v", s.cluster, act)
}

func (s *Super) handleError(op, msg string) {
log.LogError(msg)
ump.Alarm(s.umpKey(op), msg)
}
10 changes: 6 additions & 4 deletions client/fuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@ package main
import (
"flag"
"fmt"
"github.com/juju/errors"
"github.com/chubaofs/chubaofs/util/errors"
"net/http"
_ "net/http/pprof"
"os"
"path"
"runtime"
"strconv"
"strings"
Expand All @@ -38,6 +37,7 @@ import (
"github.com/chubaofs/chubaofs/util/config"
"github.com/chubaofs/chubaofs/util/exporter"
"github.com/chubaofs/chubaofs/util/log"
"github.com/chubaofs/chubaofs/util/ump"
)

const (
Expand Down Expand Up @@ -94,21 +94,22 @@ func Mount(cfg *config.Config) (err error) {
attrValid := ParseConfigString(cfg, "attrValid")
enSyncWrite := ParseConfigString(cfg, "enSyncWrite")
autoInvalData := ParseConfigString(cfg, "autoInvalData")
umpDatadir := cfg.GetString("warnLogDir")

if mnt == "" || volname == "" || owner == "" || master == "" {
return errors.New(fmt.Sprintf("invalid config file: lack of mandatory fields, mountPoint(%v), volName(%v), owner(%v), masterAddr(%v)", mnt, volname, owner, master))
}

level := ParseLogLevel(loglvl)
_, err = log.InitLog(path.Join(logpath, LoggerDir), LoggerPrefix, level, nil)
_, err = log.InitLog(logpath, LoggerPrefix, level, nil)
if err != nil {
return err
}
defer log.LogFlush()

super, err := cfs.NewSuper(volname, owner, master, icacheTimeout, lookupValid, attrValid, enSyncWrite)
if err != nil {
log.LogError(errors.ErrorStack(err))
log.LogError(errors.Stack(err))
return err
}

Expand All @@ -132,6 +133,7 @@ func Mount(cfg *config.Config) (err error) {
fmt.Println(http.ListenAndServe(":"+profport, nil))
}()

ump.InitUmp(fmt.Sprintf("%v_%v", super.ClusterName(), ModuleName), umpDatadir)
exporter.Init(super.ClusterName(), ModuleName, cfg)

if err = fs.Serve(c, super); err != nil {
Expand Down
17 changes: 10 additions & 7 deletions cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"flag"
"fmt"
"github.com/chubaofs/chubaofs/util/config"
"github.com/chubaofs/chubaofs/util/ump"
"net/http"
_ "net/http/pprof"
"os"
Expand All @@ -38,11 +39,11 @@ var (
)

const (
ConfigKeyRole = "role"
ConfigKeyLogDir = "logDir"
ConfigKeyLogLevel = "logLevel"
ConfigKeyProfPort = "prof"
ConfigKeyExporterPort = "exporterPort"
ConfigKeyRole = "role"
ConfigKeyLogDir = "logDir"
ConfigKeyLogLevel = "logLevel"
ConfigKeyProfPort = "prof"
ConfigKeyWarnLogDir = "warnLogDir"
)

const (
Expand Down Expand Up @@ -118,7 +119,8 @@ func main() {

err := modifyOpenFiles()
if err != nil {
panic(err.Error())
fmt.Println(fmt.Sprintf(err.Error()))
os.Exit(1)
}

log.LogInfof("Hello, ChubaoFS Storage, Current Version: %s", Version)
Expand All @@ -127,10 +129,11 @@ func main() {
logDir := cfg.GetString(ConfigKeyLogDir)
logLevel := cfg.GetString(ConfigKeyLogLevel)
profPort := cfg.GetString(ConfigKeyProfPort)
umpDatadir := cfg.GetString(ConfigKeyWarnLogDir)

//for multi-cpu scheduling
runtime.GOMAXPROCS(runtime.NumCPU())

ump.InitUmp(role, umpDatadir)
// Init server instance with specified role configuration.
var (
server Server
Expand Down
Loading

0 comments on commit 36d97f1

Please sign in to comment.