Skip to content

Commit

Permalink
feat: support cache meta info in client
Browse files Browse the repository at this point in the history
Signed-off-by: Victor1319 <[email protected]>
  • Loading branch information
Victor1319 authored and leonrayang committed Aug 10, 2022
1 parent 3a221d0 commit a690508
Show file tree
Hide file tree
Showing 16 changed files with 256 additions and 64 deletions.
2 changes: 2 additions & 0 deletions client/fs/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ var (
LookupValidDuration = 5 * time.Second
// the expiration duration of the attributes in the FUSE cache
AttrValidDuration = 30 * time.Second

DisableMetaCache = true
)

// ParseError returns the error type.
Expand Down
16 changes: 14 additions & 2 deletions client/fs/dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,8 @@ func (d *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.Lo
d.super.fslock.Unlock()

resp.EntryValid = LookupValidDuration

log.LogDebugf("TRACE Lookup exit: parent(%v) req(%v) cost (%d)", d.info.Inode, req, time.Since(*bgTime).Microseconds())
return child, nil
}

Expand All @@ -317,6 +319,14 @@ func (d *Dir) ReadDir(ctx context.Context, req *fuse.ReadRequest, resp *fuse.Rea
var limit uint64 = DefaultReaddirLimit
start := time.Now()

bgTime := stat.BeginStat()
// var err error
metric := exporter.NewTPCnt("readdir")
defer func() {
stat.EndStat("ReadDirLimit", err, bgTime, 1)
metric.SetWithLabels(err, map[string]string{exporter.Vol: d.super.volname})
}()

dirCtx := d.dctx.GetCopy(req.Handle)
children, err := d.super.mw.ReadDirLimit_ll(d.info.Inode, dirCtx.Name, limit)
if err != nil {
Expand Down Expand Up @@ -354,6 +364,7 @@ func (d *Dir) ReadDir(ctx context.Context, req *fuse.ReadRequest, resp *fuse.Rea
Type: ParseType(child.Type),
Name: child.Name,
}

inodes = append(inodes, child.Inode)
dirents = append(dirents, dentry)
dcache.Put(child.Name, child.Inode)
Expand All @@ -365,7 +376,7 @@ func (d *Dir) ReadDir(ctx context.Context, req *fuse.ReadRequest, resp *fuse.Rea
}

elapsed := time.Since(start)
log.LogDebugf("TRACE ReadDir: ino(%v) (%v)ns", d.info.Inode, elapsed.Nanoseconds())
log.LogDebugf("TRACE ReadDir exit: ino(%v) (%v)ns %v", d.info.Inode, elapsed.Nanoseconds(), req)
return dirents, err
}

Expand Down Expand Up @@ -401,6 +412,7 @@ func (d *Dir) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) {
Type: ParseType(child.Type),
Name: child.Name,
}

inodes = append(inodes, child.Inode)
dirents = append(dirents, dentry)
dcache.Put(child.Name, child.Inode)
Expand All @@ -413,7 +425,7 @@ func (d *Dir) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) {
d.dcache = dcache

elapsed := time.Since(start)
log.LogDebugf("TRACE ReadDir: ino(%v) (%v)ns", d.info.Inode, elapsed.Nanoseconds())
log.LogDebugf("TRACE ReadDirAll: ino(%v) (%v)ns", d.info.Inode, elapsed.Nanoseconds())
return dirents, nil
}

Expand Down
40 changes: 27 additions & 13 deletions client/fs/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,19 +153,22 @@ func (f *File) Forget() {
stat.EndStat("Forget", err, bgTime, 1)
log.LogDebugf("TRACE Forget: ino(%v)", ino)
}()
f.super.ic.Delete(ino)

//TODO:why cannot close fwriter
//log.LogErrorf("TRACE Forget: ino(%v)", ino)
//if f.fWriter != nil {
// f.fWriter.Close()
//}
f.super.fslock.Lock()
delete(f.super.nodeCache, ino)
f.super.fslock.Unlock()

if err := f.super.ec.EvictStream(ino); err != nil {
log.LogWarnf("Forget: stream not ready to evict, ino(%v) err(%v)", ino, err)
return
if DisableMetaCache {
f.super.ic.Delete(ino)
f.super.fslock.Lock()
delete(f.super.nodeCache, ino)
f.super.fslock.Unlock()
if err := f.super.ec.EvictStream(ino); err != nil {
log.LogWarnf("Forget: stream not ready to evict, ino(%v) err(%v)", ino, err)
return
}
}

if !f.super.orphan.Evict(ino) {
Expand All @@ -185,7 +188,7 @@ func (f *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenR
}()

ino := f.info.Inode
log.LogDebugf("TRANCE open ino(%v) info(%v)", ino, f.info)
log.LogDebugf("TRACE open ino(%v) info(%v)", ino, f.info)
start := time.Now()

f.super.ec.OpenStream(ino)
Expand Down Expand Up @@ -230,18 +233,24 @@ func (f *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenR

elapsed := time.Since(start)
log.LogDebugf("TRACE Open: ino(%v) req(%v) resp(%v) (%v)ns", ino, req, resp, elapsed.Nanoseconds())

return f, nil
}

// Release handles the release request.
func (f *File) Release(ctx context.Context, req *fuse.ReleaseRequest) (err error) {

ino := f.info.Inode
bgTime := stat.BeginStat()

defer func() {
stat.EndStat("Release", err, bgTime, 1)
f.fWriter.FreeCache()
if DisableMetaCache {
f.super.ic.Delete(ino)
}
}()

ino := f.info.Inode
log.LogDebugf("TRACE Release enter: ino(%v) req(%v)", ino, req)

start := time.Now()
Expand All @@ -254,12 +263,11 @@ func (f *File) Release(ctx context.Context, req *fuse.ReleaseRequest) (err error
err = f.super.ec.CloseStream(ino)
if err != nil {
log.LogErrorf("Release: close writer failed, ino(%v) req(%v) err(%v)", ino, req, err)
f.super.ic.Delete(ino)
return ParseError(err)
}
f.super.ic.Delete(ino)
elapsed := time.Since(start)
log.LogDebugf("TRACE Release: ino(%v) req(%v) (%v)ns", ino, req, elapsed.Nanoseconds())

return nil
}

Expand Down Expand Up @@ -310,6 +318,7 @@ func (f *File) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadR

elapsed := time.Since(start)
log.LogDebugf("TRACE Read: ino(%v) offset(%v) reqsize(%v) req(%v) size(%v) (%v)ns", f.info.Inode, req.Offset, req.Size, req, size, elapsed.Nanoseconds())

return nil
}

Expand Down Expand Up @@ -438,9 +447,14 @@ func (f *File) Flush(ctx context.Context, req *fuse.FlushRequest) (err error) {
log.LogErrorf("TRACE Flush err: ino(%v) err(%v)", f.info.Inode, err)
return ParseError(err)
}
f.super.ic.Delete(f.info.Inode)

if DisableMetaCache {
f.super.ic.Delete(f.info.Inode)
}

elapsed := time.Since(start)
log.LogDebugf("TRACE Flush: ino(%v) (%v)ns", f.info.Inode, elapsed.Nanoseconds())

return nil
}

Expand Down Expand Up @@ -673,6 +687,6 @@ func (f *File) fileSizeVersion2(ino uint64) (size int, gen uint64) {
}
}

log.LogDebugf("TRANCE fileSizeVersion2: ino(%v) fileSize(%v) gen(%v) valid(%v)", ino, size, gen, valid)
log.LogDebugf("TRACE fileSizeVersion2: ino(%v) fileSize(%v) gen(%v) valid(%v)", ino, size, gen, valid)
return
}
30 changes: 18 additions & 12 deletions client/fs/icache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/cubefs/cubefs/proto"
"github.com/cubefs/cubefs/util/log"
)

const (
Expand Down Expand Up @@ -70,6 +71,7 @@ func (ic *InodeCache) Put(info *proto.InodeInfo) {
element := ic.lruList.PushFront(info)
ic.cache[info.Inode] = element
ic.Unlock()
// log.LogDebugf("InodeCache put inode: inode(%v)", info.Inode)
}

// Get returns the inode info based on the given inode number.
Expand All @@ -84,7 +86,7 @@ func (ic *InodeCache) Get(ino uint64) *proto.InodeInfo {
info := element.Value.(*proto.InodeInfo)
if inodeExpired(info) {
ic.RUnlock()
//log.LogDebugf("InodeCache GetConnect expired: now(%v) inode(%v)", time.Now().Format(LogTimeFormat), inode)
// log.LogDebugf("InodeCache GetConnect expired: now(%v) inode(%v), expired(%d)", time.Now().Format(LogTimeFormat), info.Inode, info.Expiration())
return nil
}
ic.RUnlock()
Expand All @@ -93,7 +95,7 @@ func (ic *InodeCache) Get(ino uint64) *proto.InodeInfo {

// Delete deletes the inode info based on the given inode number.
func (ic *InodeCache) Delete(ino uint64) {
//log.LogDebugf("InodeCache Delete: ino(%v)", ino)
// log.LogDebugf("InodeCache Delete: ino(%v)", ino)
ic.Lock()
element, ok := ic.cache[ino]
if ok {
Expand Down Expand Up @@ -123,6 +125,7 @@ func (ic *InodeCache) evict(foreground bool) {
return
}

// log.LogDebugf("InodeCache GetConnect expired: now(%v) inode(%v)", time.Now().Format(LogTimeFormat), info.Inode)
ic.lruList.Remove(element)
delete(ic.cache, info.Inode)
count++
Expand All @@ -142,6 +145,7 @@ func (ic *InodeCache) evict(foreground bool) {
if !inodeExpired(info) {
break
}
// log.LogDebugf("InodeCache GetConnect expired: now(%v) inode(%v)", time.Now().Format(LogTimeFormat), info.Inode)
ic.lruList.Remove(element)
delete(ic.cache, info.Inode)
count++
Expand All @@ -151,16 +155,18 @@ func (ic *InodeCache) evict(foreground bool) {
func (ic *InodeCache) backgroundEviction() {
t := time.NewTicker(BgEvictionInterval)
defer t.Stop()
for {
select {
case <-t.C:
//log.LogInfof("InodeCache: start BG evict")
//start := time.Now()
ic.Lock()
ic.evict(false)
ic.Unlock()
//elapsed := time.Since(start)
//log.LogInfof("InodeCache: done BG evict, cost (%v)ns", elapsed.Nanoseconds())

for range t.C {
log.LogInfof("InodeCache: start BG evict")
if DisableMetaCache {
log.LogInfof("InodeCache: no need to do BG evict")
continue
}
start := time.Now()
ic.Lock()
ic.evict(false)
ic.Unlock()
elapsed := time.Since(start)
log.LogInfof("InodeCache: total inode cache(%d), cost(%d)ns", ic.lruList.Len(), elapsed.Nanoseconds())
}
}
5 changes: 1 addition & 4 deletions client/fs/inode.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,7 @@ func fillAttr(info *proto.InodeInfo, attr *fuse.Attr) {
}

func inodeExpired(info *proto.InodeInfo) bool {
if time.Now().UnixNano() > info.Expiration() {
return true
}
return false
return time.Now().UnixNano() > info.Expiration()
}

func inodeSetExpiration(info *proto.InodeInfo, t time.Duration) {
Expand Down
8 changes: 8 additions & 0 deletions client/fs/super.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ func NewSuper(opt *proto.MountOptions) (s *Super, err error) {
s.sc = NewSummaryCache(DefaultSummaryExpiration, MaxSummaryCache)
}

if opt.MaxStreamerLimit > 0 {
DisableMetaCache = false
}

s.volType = opt.VolType
s.ebsEndpoint = opt.EbsEndpoint
s.CacheAction = opt.CacheAction
Expand All @@ -157,14 +161,18 @@ func NewSuper(opt *proto.MountOptions) (s *Super, err error) {
WriteRate: opt.WriteRate,
VolumeType: opt.VolType,
BcacheEnable: opt.EnableBcache,
MaxStreamerLimit: opt.MaxStreamerLimit,
OnAppendExtentKey: s.mw.AppendExtentKey,
OnGetExtents: s.mw.GetExtents,
OnTruncate: s.mw.Truncate,
OnEvictIcache: s.ic.Delete,
OnLoadBcache: s.bc.Get,
OnCacheBcache: s.bc.Put,
OnEvictBcache: s.bc.Evict,

DisableMetaCache: DisableMetaCache,
}

s.ec, err = stream.NewExtentClient(extentConfig)
if err != nil {
return nil, errors.Trace(err, "NewExtentClient failed!")
Expand Down
1 change: 1 addition & 0 deletions client/fuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,7 @@ func parseMountOption(cfg *config.Config) (*proto.MountOptions, error) {
}
opt.BuffersTotalLimit = GlobalMountOptions[proto.BuffersTotalLimit].GetInt64()
opt.MetaSendTimeout = GlobalMountOptions[proto.MetaSendTimeout].GetInt64()
opt.MaxStreamerLimit = GlobalMountOptions[proto.MaxStreamerLimit].GetInt64()

if opt.MountPoint == "" || opt.Volname == "" || opt.Owner == "" || opt.Master == "" {
return nil, errors.New(fmt.Sprintf("invalid config file: lack of mandatory fields, mountPoint(%v), volName(%v), owner(%v), masterAddr(%v)", opt.MountPoint, opt.Volname, opt.Owner, opt.Master))
Expand Down
6 changes: 6 additions & 0 deletions depends/bazil.org/fuse/fs/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cubefs/cubefs/depends/bazil.org/fuse"
"github.com/cubefs/cubefs/depends/bazil.org/fuse/fuseutil"
"github.com/cubefs/cubefs/util"
"github.com/cubefs/cubefs/util/stat"
"golang.org/x/net/context"
"golang.org/x/time/rate"
)
Expand Down Expand Up @@ -1296,6 +1297,11 @@ func (c *Server) serve(r fuse.Request) {

req := &serveRequest{Request: r, cancel: cancel}

bgTime := stat.BeginStat()
defer func() {
stat.EndStat("fuse:"+opName(r), nil, bgTime, 1)
}()

c.debug(request{
Op: opName(r),
Request: r.Hdr(),
Expand Down
4 changes: 4 additions & 0 deletions proto/mount_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ const (
WriteThreads
MetaSendTimeout
BuffersTotalLimit
MaxStreamerLimit

MaxMountOption
)

Expand Down Expand Up @@ -138,6 +140,7 @@ func InitMountOptions(opts []MountOption) {
opts[EnableSummary] = MountOption{"enableSummary", "enable content summary", "", false}
opts[MetaSendTimeout] = MountOption{"metaSendTimeout", "Meta send timeout", "", int64(600)}
opts[BuffersTotalLimit] = MountOption{"buffersTotalLimit", "Send/Receive packets memory limit", "", int64(32768)} //default 4G
opts[MaxStreamerLimit] = MountOption{"maxStreamerLimit", "The maximum number of streamers", "", int64(0)} // default 0

for i := 0; i < MaxMountOption; i++ {
flag.StringVar(&opts[i].cmdlineValue, opts[i].keyword, "", opts[i].description)
Expand Down Expand Up @@ -281,4 +284,5 @@ type MountOptions struct {
NeedRestoreFuse bool
MetaSendTimeout int64
BuffersTotalLimit int64
MaxStreamerLimit int64
}
16 changes: 8 additions & 8 deletions sdk/data/stream/extent_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,12 @@ type ExtentCache struct {
func NewExtentCache(inode uint64) *ExtentCache {
return &ExtentCache{
inode: inode,
root: btree.New(32),
discard: btree.New(32),
root: btree.NewWithSize(8, 4),
discard: btree.NewWithSize(8, 4),
}
}

// Refresh refreshes the extent cache.
func (cache *ExtentCache) Refresh(inode uint64, getExtents GetExtentsFunc) error {
if cache.root.Len() > 0 {
return nil
}
func (cache *ExtentCache) RefreshForce(inode uint64, getExtents GetExtentsFunc) error {
gen, size, extents, err := getExtents(inode)
if err != nil {
return err
Expand All @@ -81,7 +77,11 @@ func (cache *ExtentCache) Refresh(inode uint64, getExtents GetExtentsFunc) error
}

// Refresh refreshes the extent cache.
func (cache *ExtentCache) RefreshForce(inode uint64, getExtents GetExtentsFunc) error {
func (cache *ExtentCache) Refresh(inode uint64, getExtents GetExtentsFunc) error {
if cache.root.Len() > 0 {
return nil
}

gen, size, extents, err := getExtents(inode)
if err != nil {
return err
Expand Down
Loading

0 comments on commit a690508

Please sign in to comment.