Skip to content

Commit

Permalink
fuse: Add suspend/resume
Browse files Browse the repository at this point in the history
This patch addes two http handler "/suspend" and "/resume".

**suspend**
Suspend has one paremeter, which is a Unix Domain Socket used to send
fd. Suspend stops reading more request from /dev/fuse. It waits all
handling requests to finish, then saves serveNodes and serveHandles to
locale files ("/tmp/ChubaoFS-fuse-Nodes.list" and "/tmp/ChubaoFS-fuse-
Handles.list" by default). After that, cfs-client exits.

**resume**
Resume could stop Suspend before cfs-client exists.

Signed-off-by: Sheng Yong <[email protected]>
  • Loading branch information
shyodx committed Jan 14, 2022
1 parent 77a8148 commit 98ac080
Show file tree
Hide file tree
Showing 5 changed files with 409 additions and 0 deletions.
1 change: 1 addition & 0 deletions client/fs/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func (f *File) Attr(ctx context.Context, a *fuse.Attr) error {
}

fillAttr(info, a)
a.ParentIno = f.parentIno
fileSize, gen := f.fileSize(ino)
log.LogDebugf("Attr: ino(%v) fileSize(%v) gen(%v) inode.gen(%v)", ino, fileSize, gen, info.Generation)
if gen >= info.Generation {
Expand Down
93 changes: 93 additions & 0 deletions client/fs/super.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"golang.org/x/net/context"
Expand Down Expand Up @@ -55,6 +56,10 @@ type Super struct {
enableXattr bool
rootIno uint64
sc *SummaryCache

state fs.FSStatType
sockaddr string
suspendCh chan interface{}
}

// Functions that Super needs to implement
Expand Down Expand Up @@ -129,6 +134,7 @@ func NewSuper(opt *proto.MountOptions) (s *Super, err error) {
if s.rootIno, err = s.mw.GetRootIno(opt.SubDir); err != nil {
return nil, err
}
s.suspendCh = make(chan interface{})

log.LogInfof("NewSuper: cluster(%v) volname(%v) icacheExpiration(%v) LookupValidDuration(%v) AttrValidDuration(%v)", s.cluster, s.volname, inodeExpiration, LookupValidDuration, AttrValidDuration)
return s, nil
Expand Down Expand Up @@ -207,3 +213,90 @@ func (s *Super) handleError(op, msg string) {
log.LogError(msg)
ump.Alarm(s.umpKey(op), msg)
}

func replyFail(w http.ResponseWriter, r *http.Request, msg string) {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(msg))
}

func replySucc(w http.ResponseWriter, r *http.Request, msg string) {
w.WriteHeader(http.StatusOK)
w.Write([]byte(msg))
}

func (s *Super) SetSuspend(w http.ResponseWriter, r *http.Request) {
var (
err error
ret string
)

if err = r.ParseForm(); err != nil {
replyFail(w, r, err.Error())
return
}

sockaddr := r.FormValue("sock")
if sockaddr == "" {
err = fmt.Errorf("Need parameter 'sock' for IPC")
replyFail(w, r, err.Error())
return
}

s.fslock.Lock()
if s.sockaddr != "" ||
!atomic.CompareAndSwapUint32((*uint32)(&s.state), uint32(fs.FSStatResume), uint32(fs.FSStatSuspend)) {
s.fslock.Unlock()
err = fmt.Errorf("Already in suspend: sock '%s', state %v", s.sockaddr, s.state)
replyFail(w, r, err.Error())
return
}
s.sockaddr = sockaddr
s.fslock.Unlock()

// wait
msg := <-s.suspendCh
switch msg.(type) {
case error:
err = msg.(error)
case string:
ret = msg.(string)
default:
err = fmt.Errorf("Unknown return type: %v", msg)
}

if err != nil {
s.fslock.Lock()
atomic.StoreUint32((*uint32)(&s.state), uint32(fs.FSStatResume))
s.sockaddr = ""
s.fslock.Unlock()
replyFail(w, r, err.Error())
return
}

if !atomic.CompareAndSwapUint32((*uint32)(&s.state), uint32(fs.FSStatSuspend), uint32(fs.FSStatShutdown)) {
s.fslock.Lock()
atomic.StoreUint32((*uint32)(&s.state), uint32(fs.FSStatResume))
s.sockaddr = ""
s.fslock.Unlock()
err = fmt.Errorf("Invalid old state %v", s.state)
replyFail(w, r, err.Error())
return
}

replySucc(w, r, fmt.Sprintf("set suspend successfully: %s", ret))
}

func (s *Super) SetResume(w http.ResponseWriter, r *http.Request) {
atomic.StoreUint32((*uint32)(&s.state), uint32(fs.FSStatResume))
replySucc(w, r, "set resume successfully")
}

func (s *Super) State() (state fs.FSStatType, sockaddr string) {
return fs.FSStatType(atomic.LoadUint32((*uint32)(&s.state))), s.sockaddr
}

func (s *Super) Notify(stat fs.FSStatType, msg interface{}) {
if stat == fs.FSStatSuspend {
s.suspendCh <- msg
}
}
4 changes: 4 additions & 0 deletions client/fuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ const (
ControlCommandSetRate = "/rate/set"
ControlCommandGetRate = "/rate/get"
ControlCommandFreeOSMemory = "/debug/freeosmemory"
ControlCommandSuspend = "/suspend"
ControlCommandResume = "/resume"
Role = "Client"
)

Expand Down Expand Up @@ -323,6 +325,8 @@ func mount(opt *proto.MountOptions) (fsConn *fuse.Conn, super *cfs.Super, err er
http.HandleFunc(log.SetLogLevelPath, log.SetLogLevel)
http.HandleFunc(ControlCommandFreeOSMemory, freeOSMemory)
http.HandleFunc(log.GetLogPath, log.GetLog)
http.HandleFunc(ControlCommandSuspend, super.SetSuspend)
http.HandleFunc(ControlCommandResume, super.SetResume)

statusCh := make(chan error)
go waitListenAndServe(statusCh, ":"+opt.Profport, nil)
Expand Down
Loading

0 comments on commit 98ac080

Please sign in to comment.