Skip to content

Commit

Permalink
fix(libsdk,meta): support dir lock
Browse files Browse the repository at this point in the history
Signed-off-by: baihailong <[email protected]>
  • Loading branch information
baihailong authored and longerfly committed Jul 18, 2024
1 parent 8ba6d75 commit 3d7c51d
Show file tree
Hide file tree
Showing 12 changed files with 344 additions and 1 deletion.
111 changes: 111 additions & 0 deletions libsdk/libsdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,98 @@ type client struct {
mu sync.Mutex
}

//export cfs_get_dir_lock
func cfs_get_dir_lock(id C.int64_t, path *C.char, lock_id *C.int64_t, valid_time **C.char) C.int {
c, exist := getClient(int64(id))
if !exist {
return statusEINVAL
}
c.mu.Lock()
defer c.mu.Unlock()

log.LogDebugf("cfs_get_dir_lock begin path(%s)\n", c.absPath(C.GoString(path)))

info, err := c.lookupPath(c.absPath(C.GoString(path)))
if err != nil {
return errorToStatus(err)
}

ino := info.Inode
dir_lock, err := c.getDirLock(ino)
if err != nil {
log.LogErrorf("getDirLock failed, path(%s) ino(%v) err(%v)", c.absPath(C.GoString(path)), ino, err)
return errorToStatus(err)
}
if len(dir_lock) == 0 {
log.LogDebugf("dir(%s) is not locked\n", c.absPath(C.GoString(path)))
return errorToStatus(syscall.ENOENT)
}

parts := strings.Split(string(dir_lock), "|")
lockIdStr := parts[0]
lease := parts[1]
lockId, _ := strconv.Atoi(lockIdStr)
*lock_id = C.int64_t(lockId)
*valid_time = C.CString(lease)

log.LogDebugf("cfs_get_dir_lock end path(%s) lock_id(%d) lease(%s)\n", c.absPath(C.GoString(path)), lockId, lease)
return statusOK
}

//export cfs_lock_dir
func cfs_lock_dir(id C.int64_t, path *C.char, lease C.uint64_t, lockId C.int64_t) C.int64_t {
c, exist := getClient(int64(id))
if !exist {
return C.int64_t(statusEINVAL)
}
c.mu.Lock()
defer c.mu.Unlock()

dirpath := c.absPath(C.GoString(path))

log.LogDebugf("cfs_lock_dir path(%s) lease(%d) lockId(%d)\n", dirpath, lease, lockId)

info, err := c.lookupPath(dirpath)
if err != nil {
log.LogErrorf("cfs_lock_dir lookupPath failed, err%v\n", err)
return C.int64_t(errorToStatus(err))
}

ino := info.Inode
retLockId, err := c.lockDir(ino, uint64(lease), int64(lockId))
if err != nil {
log.LogErrorf("cfs_lock_dir failed, dir(%s) ino(%v) err(%v)", dirpath, ino, err)
return C.int64_t(errorToStatus(err))
}

log.LogDebugf("cfs_lock_dir success dir(%s) ino(%v) retLockId(%d)\n", dirpath, ino, retLockId)
return C.int64_t(retLockId)
}

//export cfs_unlock_dir
func cfs_unlock_dir(id C.int64_t, path *C.char) C.int {
c, exist := getClient(int64(id))
if !exist {
return statusEINVAL
}

c.mu.Lock()
defer c.mu.Unlock()

info, err := c.lookupPath(c.absPath(C.GoString(path)))
if err != nil {
return errorToStatus(err)
}

ino := info.Inode
if err = c.unlockDir(ino); err != nil {
log.LogErrorf("unlockDir failed, ino(%v) err(%v)", ino, err)
return errorToStatus(err)
}

return statusOK
}

//export cfs_new_client
func cfs_new_client() C.int64_t {
c := newClient()
Expand Down Expand Up @@ -1392,6 +1484,25 @@ func (c *client) lookupPath(path string) (*proto.InodeInfo, error) {
return info, nil
}

func (c *client) lockDir(ino uint64, lease uint64, lockId int64) (retLockId int64, err error) {
return c.mw.LockDir(ino, lease, lockId)
}

func (c *client) unlockDir(ino uint64) error {
return c.mw.XAttrDel_ll(ino, "dir_lock")
}

func (c *client) getDirLock(ino uint64) ([]byte, error) {
info, err := c.mw.XAttrGet_ll(ino, "dir_lock")
if err != nil {
log.LogErrorf("getDirLock failed, ino(%v) err(%v)", ino, err)
return []byte(""), err
}
value := info.Get("dir_lock")
log.LogDebugf("getDirLock success, ino(%v) value(%s)", ino, string(value))
return value, nil
}

func (c *client) setattr(info *proto.InodeInfo, valid uint32, mode, uid, gid uint32, atime, mtime int64) error {
// Only rwx mode bit can be set
if valid&proto.AttrMode != 0 {
Expand Down
4 changes: 3 additions & 1 deletion metanode/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,10 @@ const (
opFSMUniqCheckerEvict = 65
opFSMUnlinkInodeOnce = 66
opFSMCreateLinkInodeOnce = 67
// dir lock
opFSMLockDir = 68

opFSMVersionOp = 68
opFSMVersionOp = 74
opFSMExtentSplit = 69
opFSMDelVer = 70

Expand Down
3 changes: 3 additions & 0 deletions metanode/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,9 @@ func (m *metadataManager) HandleMetadataOperation(conn net.Conn, p *Packet, remo
err = m.opMetaListXAttr(conn, p, remoteAddr)
case proto.OpMetaUpdateXAttr:
err = m.opMetaUpdateXAttr(conn, p, remoteAddr)
// operation for dir lock
case proto.OpMetaLockDir:
err = m.opMetaLockDir(conn, p, remoteAddr)
// operations for multipart session
case proto.OpCreateMultipart:
err = m.opCreateMultipart(conn, p, remoteAddr)
Expand Down
25 changes: 25 additions & 0 deletions metanode/manager_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -1863,6 +1863,31 @@ func (m *metadataManager) opMetaGetXAttr(conn net.Conn, p *Packet, remoteAddr st
return
}

func (m *metadataManager) opMetaLockDir(conn net.Conn, p *Packet, remoteAddr string) (err error) {
req := &proto.LockDirRequest{}
if err = json.Unmarshal(p.Data, req); err != nil {
p.PacketErrorWithBody(proto.OpErr, ([]byte)(err.Error()))
m.respondToClient(conn, p)
err = errors.NewErrorf("[%v] req: %v, resp: %v", p.GetOpMsgWithReqAndResult(), req, err.Error())
return
}
mp, err := m.getPartition(req.PartitionId)
if err != nil {
p.PacketErrorWithBody(proto.OpErr, ([]byte)(err.Error()))
m.respondToClient(conn, p)
err = errors.NewErrorf("[%v] req: %v, resp: %v", p.GetOpMsgWithReqAndResult(), req, err.Error())
return
}
if !mp.IsFollowerRead() && !m.serveProxy(conn, mp, p) {
return
}
err = mp.LockDir(req, p)
_ = m.respondToClient(conn, p)
log.LogDebugf("%s [opMetaLockDir] req: %d - %v, resp: %v, body: %s",
remoteAddr, p.GetReqID(), req, p.GetResultMsg(), p.Data)
return
}

func (m *metadataManager) opMetaGetAllXAttr(conn net.Conn, p *Packet, remoteAddr string) (err error) {
req := &proto.GetAllXAttrRequest{}
if err = json.Unmarshal(p.Data, req); err != nil {
Expand Down
1 change: 1 addition & 0 deletions metanode/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ type OpExtend interface {
RemoveXAttr(req *proto.RemoveXAttrRequest, p *Packet) (err error)
ListXAttr(req *proto.ListXAttrRequest, p *Packet) (err error)
UpdateXAttr(req *proto.UpdateXAttrRequest, p *Packet) (err error)
LockDir(req *proto.LockDirRequest, p *Packet) (err error)
}

// OpDentry defines the interface for the dentry operations.
Expand Down
6 changes: 6 additions & 0 deletions metanode/partition_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,12 @@ func (mp *metaPartition) Apply(command []byte, index uint64) (resp interface{},
return
}
err = mp.fsmSetXAttr(extend)
case opFSMLockDir:
var req = &proto.LockDirRequest{}
if err = json.Unmarshal(msg.V, req); err != nil {
return
}
resp = mp.fsmLockDir(req)
case opFSMCreateMultipart:
var multipart *Multipart
multipart = MultipartFromBytes(msg.V)
Expand Down
87 changes: 87 additions & 0 deletions metanode/partition_fsmop_extend.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,104 @@
package metanode

import (
"strconv"
"strings"
"time"
"fmt"
"math"

"github.com/cubefs/cubefs/proto"
"github.com/cubefs/cubefs/util/log"
"github.com/google/uuid"
)

type ExtendOpResult struct {
Status uint8
Extend *Extend
}

func (mp *metaPartition) fsmLockDir(req *proto.LockDirRequest) (resp *proto.LockDirResponse) {
mp.xattrLock.Lock()
defer mp.xattrLock.Unlock()
resp = &proto.LockDirResponse{}

ino := req.Inode
lockId := req.LockId
submitTime := req.SubmitTime
lease := req.Lease

log.LogDebugf("fsmLockDir ino=%v, lockId=%d, submitTime=%v, lease=%d\n", ino, lockId, submitTime, lease)

var newExtend = NewExtend(ino)
treeItem := mp.extendTree.CopyGet(newExtend)

var firstLock bool = false
var oldValue []byte
var existExtend *Extend

if treeItem == nil {
firstLock = true
} else {
existExtend = treeItem.(*Extend)
oldValue, _ = existExtend.Get([]byte("dir_lock"))
if oldValue == nil {
firstLock = true
}
}

lockIdStr := strconv.Itoa(int(lockId))
validTime := submitTime.Add(time.Duration(int(lease)) * time.Second)
validTimeStr := validTime.Format("2006-01-02 15:04:05")
value := lockIdStr + "|" + validTimeStr

if firstLock {
// first time lock dir
log.LogDebugf("fsmLockDir first time\n")
uu_id, _ := uuid.NewRandom()
lockId = int64(uu_id.ID())
lockIdStr = strconv.Itoa(int(lockId))
value = lockIdStr + "|" + validTimeStr
newExtend.Put([]byte("dir_lock"), []byte(value))
mp.extendTree.ReplaceOrInsert(newExtend, true)
} else {
log.LogDebugf("fsmLockDir oldValue=%s\n", oldValue)
renewDirLock := false
lockExpired := false
parts := strings.Split(string(oldValue), "|")
oldLockIdStr := parts[0]
oldValidTimeStr := parts[1]
oldValidTime, _ := time.Parse("2006-01-02 15:04:05", oldValidTimeStr)
renewDirLock = (oldLockIdStr == lockIdStr)

// convert time before compare (CST/UTC)
submit_time, _ := time.Parse("2006-01-02 15:04:05", submitTime.Format("2006-01-02 15:04:05"))
lockExpired = submit_time.After(oldValidTime)

if !renewDirLock && !lockExpired {
resp.Status = proto.OpExistErr
log.LogDebugf("fsmLockDir failed, dir has been locked by others and in lease\n")
return
}

if lockExpired {
// if lock expired, use new lockId
uu_id, _ := uuid.NewRandom()
lockId = int64(uu_id.ID())
lockIdStr = strconv.Itoa(int(lockId))
value = lockIdStr + "|" + validTimeStr
}

existExtend.Remove([]byte("dir_lock"))
newExtend.Put([]byte("dir_lock"), []byte(value))
existExtend.Merge(newExtend, true)
}

resp.Status = proto.OpOk
resp.LockId = lockId
log.LogDebugf("fsmLockDir success lockId=%d\n", lockId)
return
}

func (mp *metaPartition) fsmSetXAttr(extend *Extend) (err error) {
extend.verSeq = mp.GetVerSeq()
treeItem := mp.extendTree.CopyGet(extend)
Expand Down
27 changes: 27 additions & 0 deletions metanode/partition_op_extend.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/json"
"strconv"
"strings"
"time"

"github.com/cubefs/cubefs/proto"
"github.com/cubefs/cubefs/util/errors"
Expand Down Expand Up @@ -250,3 +251,29 @@ func (mp *metaPartition) putExtend(op uint32, extend *Extend) (resp interface{},
resp, err = mp.submit(op, marshaled)
return
}

func (mp *metaPartition) LockDir(req *proto.LockDirRequest, p *Packet) (err error) {
req.SubmitTime = time.Now()
val, err := json.Marshal(req)
if err != nil {
p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
return err
}

r, err := mp.submit(opFSMLockDir, val)
if err != nil {
p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
return err
}

resp := r.(*proto.LockDirResponse)
status := resp.Status
var reply []byte
reply, err = json.Marshal(resp)
if err != nil {
status = proto.OpErr
reply = []byte(err.Error())
}
p.PacketErrorWithBody(status, reply)
return
}
14 changes: 14 additions & 0 deletions proto/fs_proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -1026,3 +1026,17 @@ type GetUniqIDRequest struct {
type GetUniqIDResponse struct {
Start uint64 `json:"start"`
}

type LockDirRequest struct {
VolName string `json:"vol"`
PartitionId uint64 `json:"pid"`
Inode uint64 `json:"ino"`
LockId int64 `json:"lockId"`
Lease uint64 `json:"lease"`
SubmitTime time.Time `json:"submitTime"`
}

type LockDirResponse struct {
LockId int64 `json:"lockId"`
Status uint8 `json:"status"`
}
1 change: 1 addition & 0 deletions proto/packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ const (
OpMetaBatchGetXAttr uint8 = 0x39
OpMetaExtentAddWithCheck uint8 = 0x3A // Append extent key with discard extents check
OpMetaReadDirLimit uint8 = 0x3D
OpMetaLockDir uint8 = 0x3E

// Operations: Master -> MetaNode
OpCreateMetaPartition uint8 = 0x40
Expand Down
12 changes: 12 additions & 0 deletions sdk/meta/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2652,3 +2652,15 @@ func (mw *MetaWrapper) DisableTrashByClient(flag bool) {
func (mw *MetaWrapper) QueryTrashDisableByClient() bool {
return mw.disableTrashByClient
}

func (mw *MetaWrapper) LockDir(ino uint64, lease uint64, lockId int64) (retLockId int64, err error) {
mp := mw.getPartitionByInode(ino)
if mp == nil {
log.LogErrorf("LockDir: no such partition, ino(%v)", ino)
err = syscall.ENOENT
return
}

retLockId, err = mw.lockDir(mp, ino, lease, lockId)
return
}
Loading

0 comments on commit 3d7c51d

Please sign in to comment.