From 98ac080f2a8c821bc7f2699f0c86a636ef449f10 Mon Sep 17 00:00:00 2001 From: Sheng Yong Date: Fri, 24 Dec 2021 17:24:30 +0800 Subject: [PATCH] fuse: Add suspend/resume This patch addes two http handler "/suspend" and "/resume". **suspend** Suspend has one paremeter, which is a Unix Domain Socket used to send fd. Suspend stops reading more request from /dev/fuse. It waits all handling requests to finish, then saves serveNodes and serveHandles to locale files ("/tmp/ChubaoFS-fuse-Nodes.list" and "/tmp/ChubaoFS-fuse- Handles.list" by default). After that, cfs-client exits. **resume** Resume could stop Suspend before cfs-client exists. Signed-off-by: Sheng Yong --- client/fs/file.go | 1 + client/fs/super.go | 93 +++++++++ client/fuse.go | 4 + vendor/bazil.org/fuse/fs/serve.go | 305 ++++++++++++++++++++++++++++++ vendor/bazil.org/fuse/fuse.go | 6 + 5 files changed, 409 insertions(+) diff --git a/client/fs/file.go b/client/fs/file.go index 63f66b22d4..cf8d6bcc1e 100644 --- a/client/fs/file.go +++ b/client/fs/file.go @@ -76,6 +76,7 @@ func (f *File) Attr(ctx context.Context, a *fuse.Attr) error { } fillAttr(info, a) + a.ParentIno = f.parentIno fileSize, gen := f.fileSize(ino) log.LogDebugf("Attr: ino(%v) fileSize(%v) gen(%v) inode.gen(%v)", ino, fileSize, gen, info.Generation) if gen >= info.Generation { diff --git a/client/fs/super.go b/client/fs/super.go index 9d133eb6d9..600972adb7 100644 --- a/client/fs/super.go +++ b/client/fs/super.go @@ -20,6 +20,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "golang.org/x/net/context" @@ -55,6 +56,10 @@ type Super struct { enableXattr bool rootIno uint64 sc *SummaryCache + + state fs.FSStatType + sockaddr string + suspendCh chan interface{} } // Functions that Super needs to implement @@ -129,6 +134,7 @@ func NewSuper(opt *proto.MountOptions) (s *Super, err error) { if s.rootIno, err = s.mw.GetRootIno(opt.SubDir); err != nil { return nil, err } + s.suspendCh = make(chan interface{}) log.LogInfof("NewSuper: cluster(%v) volname(%v) icacheExpiration(%v) LookupValidDuration(%v) AttrValidDuration(%v)", s.cluster, s.volname, inodeExpiration, LookupValidDuration, AttrValidDuration) return s, nil @@ -207,3 +213,90 @@ func (s *Super) handleError(op, msg string) { log.LogError(msg) ump.Alarm(s.umpKey(op), msg) } + +func replyFail(w http.ResponseWriter, r *http.Request, msg string) { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(msg)) +} + +func replySucc(w http.ResponseWriter, r *http.Request, msg string) { + w.WriteHeader(http.StatusOK) + w.Write([]byte(msg)) +} + +func (s *Super) SetSuspend(w http.ResponseWriter, r *http.Request) { + var ( + err error + ret string + ) + + if err = r.ParseForm(); err != nil { + replyFail(w, r, err.Error()) + return + } + + sockaddr := r.FormValue("sock") + if sockaddr == "" { + err = fmt.Errorf("Need parameter 'sock' for IPC") + replyFail(w, r, err.Error()) + return + } + + s.fslock.Lock() + if s.sockaddr != "" || + !atomic.CompareAndSwapUint32((*uint32)(&s.state), uint32(fs.FSStatResume), uint32(fs.FSStatSuspend)) { + s.fslock.Unlock() + err = fmt.Errorf("Already in suspend: sock '%s', state %v", s.sockaddr, s.state) + replyFail(w, r, err.Error()) + return + } + s.sockaddr = sockaddr + s.fslock.Unlock() + + // wait + msg := <-s.suspendCh + switch msg.(type) { + case error: + err = msg.(error) + case string: + ret = msg.(string) + default: + err = fmt.Errorf("Unknown return type: %v", msg) + } + + if err != nil { + s.fslock.Lock() + atomic.StoreUint32((*uint32)(&s.state), uint32(fs.FSStatResume)) + s.sockaddr = "" + s.fslock.Unlock() + replyFail(w, r, err.Error()) + return + } + + if !atomic.CompareAndSwapUint32((*uint32)(&s.state), uint32(fs.FSStatSuspend), uint32(fs.FSStatShutdown)) { + s.fslock.Lock() + atomic.StoreUint32((*uint32)(&s.state), uint32(fs.FSStatResume)) + s.sockaddr = "" + s.fslock.Unlock() + err = fmt.Errorf("Invalid old state %v", s.state) + replyFail(w, r, err.Error()) + return + } + + replySucc(w, r, fmt.Sprintf("set suspend successfully: %s", ret)) +} + +func (s *Super) SetResume(w http.ResponseWriter, r *http.Request) { + atomic.StoreUint32((*uint32)(&s.state), uint32(fs.FSStatResume)) + replySucc(w, r, "set resume successfully") +} + +func (s *Super) State() (state fs.FSStatType, sockaddr string) { + return fs.FSStatType(atomic.LoadUint32((*uint32)(&s.state))), s.sockaddr +} + +func (s *Super) Notify(stat fs.FSStatType, msg interface{}) { + if stat == fs.FSStatSuspend { + s.suspendCh <- msg + } +} diff --git a/client/fuse.go b/client/fuse.go index ad6c5f5684..43fc62a3c1 100644 --- a/client/fuse.go +++ b/client/fuse.go @@ -70,6 +70,8 @@ const ( ControlCommandSetRate = "/rate/set" ControlCommandGetRate = "/rate/get" ControlCommandFreeOSMemory = "/debug/freeosmemory" + ControlCommandSuspend = "/suspend" + ControlCommandResume = "/resume" Role = "Client" ) @@ -323,6 +325,8 @@ func mount(opt *proto.MountOptions) (fsConn *fuse.Conn, super *cfs.Super, err er http.HandleFunc(log.SetLogLevelPath, log.SetLogLevel) http.HandleFunc(ControlCommandFreeOSMemory, freeOSMemory) http.HandleFunc(log.GetLogPath, log.GetLog) + http.HandleFunc(ControlCommandSuspend, super.SetSuspend) + http.HandleFunc(ControlCommandResume, super.SetResume) statusCh := make(chan error) go waitListenAndServe(statusCh, ":"+opt.Profport, nil) diff --git a/vendor/bazil.org/fuse/fs/serve.go b/vendor/bazil.org/fuse/fs/serve.go index 7dae8aedfd..1784711abf 100644 --- a/vendor/bazil.org/fuse/fs/serve.go +++ b/vendor/bazil.org/fuse/fs/serve.go @@ -8,16 +8,20 @@ import ( "hash/fnv" "io" "log" + "net" + "os" "reflect" "runtime" "strings" "sync" "time" + "unsafe" "bytes" "bazil.org/fuse" "bazil.org/fuse/fuseutil" + "github.com/chubaofs/chubaofs/util" "golang.org/x/net/context" "golang.org/x/time/rate" ) @@ -36,6 +40,14 @@ var ForgetServeLimit *rate.Limiter = rate.NewLimiter(defaultForgetServeLimit, de // TODO: FINISH DOCS +type FSStatType uint32 + +const ( + FSStatResume FSStatType = iota + FSStatSuspend + FSStatShutdown +) + // An FS is the interface required of a file system. // // Other FUSE requests can be handled by implementing methods from the @@ -43,6 +55,8 @@ var ForgetServeLimit *rate.Limiter = rate.NewLimiter(defaultForgetServeLimit, de type FS interface { // Root is called to obtain the Node for the file system root. Root() (Node, error) + State() (FSStatType, string) + Notify(stat FSStatType, msg interface{}) } type FSStatfser interface { @@ -393,6 +407,293 @@ type Server struct { wg sync.WaitGroup } +const ( + ContextNodeVersionV1 uint32 = 1 + ContextHandleVersionV1 uint32 = 1 + ContextNodeVersion uint32 = ContextNodeVersionV1 + ContextHandleVersion uint32 = ContextHandleVersionV1 + NodeListFileName string = "/tmp/ChubaoFS-fuse-Nodes.list" + HandleListFileName string = "/tmp/ChubaoFS-fuse-Handles.list" +) + +func WriteVersion(file *os.File, version uint32) error { + data := make([]byte, 4) + binary.BigEndian.PutUint32(data, version) + _, err := file.Write(data) + return err +} + +type ContextNode struct { + Inode uint64 + ParentIno uint64 + Generation uint64 + Refs uint64 + NodeID uint64 + Mode uint32 + Rsvd uint32 +} + +func (cn *ContextNode) String() string { + return fmt.Sprintf("nodeid:%v inode:%v parent:%v gen:%v refs:%v mode:%o", + cn.NodeID, cn.Inode, cn.ParentIno, cn.Generation, cn.Refs, cn.Mode) +} + +func ContextNodeToBytes(cn *ContextNode) []byte { + var buf []byte = make([]byte, unsafe.Sizeof(ContextNode{})) + binary.BigEndian.PutUint64(buf[0:8], cn.Inode) + binary.BigEndian.PutUint64(buf[8:16], cn.ParentIno) + binary.BigEndian.PutUint64(buf[16:24], cn.Generation) + binary.BigEndian.PutUint64(buf[24:32], cn.Refs) + binary.BigEndian.PutUint64(buf[32:40], cn.NodeID) + binary.BigEndian.PutUint32(buf[40:44], cn.Mode) + return buf +} + +func ContextNodeFromBytes(buf []byte) *ContextNode { + cn := &ContextNode{} + cn.Inode = binary.BigEndian.Uint64(buf[0:8]) + cn.ParentIno = binary.BigEndian.Uint64(buf[8:16]) + cn.Generation = binary.BigEndian.Uint64(buf[16:24]) + cn.Refs = binary.BigEndian.Uint64(buf[24:32]) + cn.NodeID = binary.BigEndian.Uint64(buf[32:40]) + cn.Mode = binary.BigEndian.Uint32(buf[40:44]) + return cn +} + +type ContextHandle struct { + HandleID uint64 + NodeID uint64 +} + +func (ch *ContextHandle) String() string { + return fmt.Sprintf("handleid:%v nodeid:%v", ch.HandleID, ch.NodeID) +} + +func ContextHandleToBytes(ch *ContextHandle) []byte { + var buf []byte = make([]byte, unsafe.Sizeof(ContextHandle{})) + binary.BigEndian.PutUint64(buf[0:8], ch.HandleID) + binary.BigEndian.PutUint64(buf[8:16], ch.NodeID) + return buf +} + +func ContextHandleFromBytes(buf []byte) *ContextHandle { + ch := &ContextHandle{} + ch.HandleID = binary.BigEndian.Uint64(buf[0:8]) + ch.NodeID = binary.BigEndian.Uint64(buf[8:16]) + return ch +} + +func (s *Server) TrySuspend(fs FS) bool { + var err error + var msg string + var ret bool + + stat, sockaddr := fs.State() + if stat == FSStatSuspend { + if msg, err = s.SaveFuseContext(fs); err != nil { + s.CleanupFuseContext() + fs.Notify(stat, err) + goto out + } + if err = s.SaveFuseDevFd(sockaddr); err != nil { + s.CleanupFuseContext() + fs.Notify(stat, err) + goto out + } + + fs.Notify(stat, msg) + + out: + for { + stat, _ = fs.State() + if stat == FSStatShutdown { + ret = true + break + } else if stat == FSStatResume { + s.CleanupFuseContext() + ret = false + break + } else { + runtime.Gosched() + } + } + } + + return ret +} + +func (s *Server) CleanupFuseContext() { + os.Remove(NodeListFileName) + os.Remove(HandleListFileName) +} + +func (s *Server) SaveFuseContext(fs FS) (msg string, err error) { + var ( + nodeListFile *os.File + handleListFile *os.File + ncount int + hcount int + skip uint64 + ) + // Wait all received requests to finish + // FIXME: add a timeout to avoid waiting forever + s.wg.Wait() + + if nodeListFile, err = os.OpenFile(NodeListFileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644); err != nil { + err = fmt.Errorf("SaveFuseContext: failed to create nodes list file: %v", err) + return + } + defer nodeListFile.Close() + if handleListFile, err = os.OpenFile(HandleListFileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644); err != nil { + err = fmt.Errorf("SaveFuseContext: failed to create s list file: %v", err) + return + } + defer handleListFile.Close() + + if err = WriteVersion(nodeListFile, ContextNodeVersion); err != nil { + err = fmt.Errorf("SaveFuseContext: failed to write nodes list file: %v", err) + return + } + if err = WriteVersion(handleListFile, ContextHandleVersion); err != nil { + err = fmt.Errorf("SaveFuseContext: failed to write handles list file: %v", err) + return + } + + s.meta.Lock() + // s.node[0] is nil and s.node[1] is root. + // No need to save root since it is created everytime fuse is mounted. + skip = 2 + for i, sn := range s.node[skip:] { + var ( + attr fuse.Attr = fuse.Attr{} + nodeid uint64 = skip + uint64(i) + n int + ) + + if sn == nil { + continue + } + + sn.wg.Wait() + + if err = sn.node.Attr(nil, &attr); err != nil { + s.meta.Unlock() + err = fmt.Errorf("SaveFuseContext: failed to get mode of node %v: %v", sn.inode, err) + return + } + cn := &ContextNode{sn.inode, attr.ParentIno, sn.generation, sn.refs, nodeid, uint32(attr.Mode), 0} + data := ContextNodeToBytes(cn) + if n, err = nodeListFile.Write(data); n != len(data) || err != nil { + s.meta.Unlock() + err = fmt.Errorf("SaveFuseContext: failed to write nodes list file: %v", err) + return + } + + ncount++ + // check if need stop + if ncount%20 == 0 { + stat, _ := fs.State() + if stat != FSStatSuspend { + s.meta.Unlock() + err = fmt.Errorf("SaveFuseContext: detect state changed to %v", stat) + return + } + } + } + + skip = 1 + for i, sh := range s.handle[skip:] { + var ( + handleid uint64 = skip + uint64(i) + n int + ) + + if sh == nil { + continue + } + + if hdl, ok := sh.handle.(HandleFlusher); ok { + if err = hdl.Flush(nil, nil); err != nil { + s.meta.Unlock() + err = fmt.Errorf("SaveFuseContext: flush handle %v: %v\n", + s.node[sh.nodeID].inode, err) + return + } + } + ch := &ContextHandle{handleid, uint64(sh.nodeID)} + data := ContextHandleToBytes(ch) + if n, err = handleListFile.Write(data); n != len(data) || err != nil { + s.meta.Unlock() + err = fmt.Errorf("SaveFuseContext: failed to write handles list file: %v", err) + return + } + + hcount++ + // check if need stop + if hcount%20 == 0 { + stat, _ := fs.State() + if stat != FSStatSuspend { + s.meta.Unlock() + err = fmt.Errorf("SaveFuseContext: detect state changed to %v", stat) + return + } + } + } + s.meta.Unlock() + + if err = nodeListFile.Sync(); err != nil { + err = fmt.Errorf("SaveFuseContext: failed to sync nodes list file: %v", err) + return + } + + if err = handleListFile.Sync(); err != nil { + err = fmt.Errorf("SaveFuseContext: failed to sync handles list file: %v", err) + return + } + + msg = fmt.Sprintf("Node count: %d Handle count: %d", ncount, hcount) + return +} + +func (s *Server) SaveFuseDevFd(sockaddr string) (err error) { + var addr *net.UnixAddr + var conn *net.UnixConn + var fud *os.File + var socket *os.File + + defer func() { + if socket != nil { + socket.Close() + } + if conn != nil { + conn.Close() + } + }() + + if addr, err = net.ResolveUnixAddr("unix", sockaddr); err != nil { + return fmt.Errorf("SaveFuseDevFd: failed to create unix addr: %v", err) + } + + if conn, err = net.DialUnix("unix", nil, addr); err != nil { + return fmt.Errorf("SaveFuseDevFd: failed to connect unix socket: %v", err) + } + + if socket, err = conn.File(); err != nil { + return fmt.Errorf("SaveFuseDevFd: failed to get socket file: %v", err) + } + + fud = s.conn.GetFuseDevFile() + if fud == nil { + return fmt.Errorf("SaveFuseDevFd: fuse dev not exist") + } + + if err = util.SendFd(socket, fud.Name(), fud.Fd()); err != nil { + return fmt.Errorf("SaveFuseDevFd: failed to send fuse dev file: %v", err) + } + + return nil +} + // Serve serves the FUSE connection by making calls to the methods // of fs and the Nodes and Handles it makes available. It returns only // when the connection has been closed or an unexpected error occurs. @@ -420,6 +721,10 @@ func (s *Server) Serve(fs FS) error { s.handle = append(s.handle, nil) for { + if s.TrySuspend(fs) { + break + } + req, err := s.conn.ReadRequest() if err != nil { if err == io.EOF { diff --git a/vendor/bazil.org/fuse/fuse.go b/vendor/bazil.org/fuse/fuse.go index 7413200f26..fbd8bcbf3d 100644 --- a/vendor/bazil.org/fuse/fuse.go +++ b/vendor/bazil.org/fuse/fuse.go @@ -132,6 +132,10 @@ type Conn struct { proto Protocol } +func (c *Conn) GetFuseDevFile() *os.File { + return c.dev +} + // MountpointDoesNotExistError is an error returned when the // mountpoint does not exist. type MountpointDoesNotExistError struct { @@ -1331,6 +1335,8 @@ type Attr struct { Rdev uint32 // device numbers Flags uint32 // chflags(2) flags (OS X only) BlockSize uint32 // preferred blocksize for filesystem I/O + + ParentIno uint64 // for chubaofs's file only } func (a Attr) String() string {