Skip to content

Commit

Permalink
enhance: Increase the quota in the transaction process
Browse files Browse the repository at this point in the history
Signed-off-by: baijiaruo <[email protected]>
  • Loading branch information
baijiaruo authored and leonrayang committed May 18, 2023
1 parent bf75c8e commit 517df2d
Show file tree
Hide file tree
Showing 14 changed files with 271 additions and 48 deletions.
1 change: 1 addition & 0 deletions metanode/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ const (
// transaction
opFSMSyncTxID
opFSMTxCreateInode
opFSMTxCreateInodeQuota
opFSMTxCreateDentry
opFSMTxCommit
opFSMTxInodeCommit
Expand Down
2 changes: 1 addition & 1 deletion metanode/manager_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -2084,7 +2084,7 @@ func (m *metadataManager) opMetaBatchDeleteInodeQuota(conn net.Conn, p *Packet,
p.PacketOkWithBody(reply)
_ = m.respondToClient(conn, p)

log.LogInfof("[opMetaBatchSetInodeQuota] req [%v] resp [%v] success.", req, resp)
log.LogInfof("[opMetaBatchDeleteInodeQuota] req [%v] resp [%v] success.", req, resp)
return err
}

Expand Down
59 changes: 59 additions & 0 deletions metanode/meta_quota_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ type MetaQuotaInode struct {
quotaIds []uint32
}

type TxMetaQuotaInode struct {
txinode *TxInode
quotaIds []uint32
}

func NewQuotaManager(volName string, mpId uint64) (mqMgr *MetaQuotaManager) {
mqMgr = &MetaQuotaManager{
statisticTemp: new(sync.Map),
Expand Down Expand Up @@ -108,6 +113,60 @@ func (qInode *MetaQuotaInode) Unmarshal(raw []byte) (err error) {
return
}

func (qInode *TxMetaQuotaInode) Marshal() (result []byte, err error) {
var (
inodeBytes []byte
)
quotaBytes := bytes.NewBuffer(make([]byte, 0, 128))
buff := bytes.NewBuffer(make([]byte, 0, 128))
inodeBytes, err = qInode.txinode.Marshal()
if err != nil {
return
}
inodeLen := uint32(len(inodeBytes))
if err = binary.Write(buff, binary.BigEndian, inodeLen); err != nil {
return
}
buff.Write(inodeBytes)
for _, quotaId := range qInode.quotaIds {
if err = binary.Write(quotaBytes, binary.BigEndian, quotaId); err != nil {
return
}
}
buff.Write(quotaBytes.Bytes())
result = buff.Bytes()
log.LogDebugf("TxMetaQuotaInode Marshal inode [%v] inodeLen [%v] size [%v]", qInode.txinode.Inode.Inode, inodeLen, len(result))
return
}

func (qInode *TxMetaQuotaInode) Unmarshal(raw []byte) (err error) {
var inodeLen uint32
var quotaId uint32
buff := bytes.NewBuffer(raw)
if err = binary.Read(buff, binary.BigEndian, &inodeLen); err != nil {
return
}
inodeBytes := make([]byte, inodeLen)
if _, err = buff.Read(inodeBytes); err != nil {
return
}
log.LogDebugf("TxMetaQuotaInode Unmarshal inodeLen [%v] size [%v]", inodeBytes, len(raw))
qInode.txinode = NewTxInode("", 0, 0, 0, nil)
if err = qInode.txinode.Unmarshal(inodeBytes); err != nil {
return
}
for {
if buff.Len() == 0 {
break
}
if err = binary.Read(buff, binary.BigEndian, &quotaId); err != nil {
return
}
qInode.quotaIds = append(qInode.quotaIds, quotaId)
}
return
}

func (mqMgr *MetaQuotaManager) setQuotaHbInfo(infos []*proto.QuotaHeartBeatInfo) {
mqMgr.rwlock.Lock()
defer mqMgr.rwlock.Unlock()
Expand Down
26 changes: 23 additions & 3 deletions metanode/partition_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (mp *metaPartition) Apply(command []byte, index uint64) (resp interface{},
mp.setInodeQuota(qinode.quotaIds, ino.Inode)
}
resp = mp.fsmCreateInode(ino)
if resp != proto.OpExistErr {
if resp == proto.OpOk {
for _, quotaId := range qinode.quotaIds {
mp.mqMgr.updateUsedInfo(0, 1, quotaId)
}
Expand Down Expand Up @@ -269,7 +269,27 @@ func (mp *metaPartition) Apply(command []byte, index uint64) (resp interface{},
if mp.config.Cursor < txIno.Inode.Inode {
mp.config.Cursor = txIno.Inode.Inode
}
resp = mp.fsmTxCreateInode(txIno)
resp = mp.fsmTxCreateInode(txIno, 0)
case opFSMTxCreateInodeQuota:
qinode := &TxMetaQuotaInode{}
if err = qinode.Unmarshal(msg.V); err != nil {
return
}
txIno := qinode.txinode
if mp.config.Cursor < txIno.Inode.Inode {
mp.config.Cursor = txIno.Inode.Inode
}
var quotaId uint32
if len(qinode.quotaIds) > 0 {
mp.setInodeQuota(qinode.quotaIds, txIno.Inode.Inode)
quotaId = qinode.quotaIds[0]
}
resp = mp.fsmTxCreateInode(txIno, quotaId)
if resp == proto.OpOk {
for _, quotaId := range qinode.quotaIds {
mp.mqMgr.updateUsedInfo(0, 1, quotaId)
}
}
case opFSMTxCreateDentry:
txDen := NewTxDentry(0, "", 0, 0, nil)
if err = txDen.Unmarshal(msg.V); err != nil {
Expand Down Expand Up @@ -520,7 +540,7 @@ func (mp *metaPartition) ApplySnapshot(peers []raftproto.Peer, iter raftproto.Sn
txTree.ReplaceOrInsert(txInfo, true)
log.LogDebugf("ApplySnapshot: create transaction: partitionID(%v) txInfo(%v)", mp.config.PartitionId, txInfo)
case opFSMTxRbInodeSnapshot:
txRbInode := NewTxRollbackInode(nil, nil, 0)
txRbInode := NewTxRollbackInode(nil, 0, nil, 0)
txRbInode.Unmarshal(snap.V)
//txRollbackInodes[txRbInode.inode.Inode] = txRbInode
txRbInodeTree.ReplaceOrInsert(txRbInode, true)
Expand Down
13 changes: 9 additions & 4 deletions metanode/partition_fsmop_inode.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewInodeResponse() *InodeResponse {
}

// Create and inode and attach it to the inode tree.
func (mp *metaPartition) fsmTxCreateInode(txIno *TxInode) (status uint8) {
func (mp *metaPartition) fsmTxCreateInode(txIno *TxInode, quotaId uint32) (status uint8) {
status = proto.OpOk
//1.if mpID == -1, register transaction in transaction manager
//if txIno.TxInfo.TxID != "" && txIno.TxInfo.TmID == -1 {
Expand All @@ -49,7 +49,7 @@ func (mp *metaPartition) fsmTxCreateInode(txIno *TxInode) (status uint8) {
status = proto.OpTxInodeInfoNotExistErr
return
}
rbInode := NewTxRollbackInode(txIno.Inode, inodeInfo, TxDelete)
rbInode := NewTxRollbackInode(txIno.Inode, quotaId, inodeInfo, TxDelete)
if status = mp.txProcessor.txResource.addTxRollbackInode(rbInode); status != proto.OpOk {
//status = proto.OpTxConflictErr
return
Expand Down Expand Up @@ -89,7 +89,7 @@ func (mp *metaPartition) fsmTxCreateLinkInode(txIno *TxInode) (resp *InodeRespon
return
}

rbInode := NewTxRollbackInode(txIno.Inode, inodeInfo, TxUpdate)
rbInode := NewTxRollbackInode(txIno.Inode, 0, inodeInfo, TxUpdate)
if resp.Status = mp.txProcessor.txResource.addTxRollbackInode(rbInode); resp.Status != proto.OpOk {
//resp.Status = proto.OpTxConflictErr
return
Expand Down Expand Up @@ -182,7 +182,12 @@ func (mp *metaPartition) fsmTxUnlinkInode(txIno *TxInode) (resp *InodeResponse)
resp.Status = proto.OpTxInodeInfoNotExistErr
return
}
rbInode := NewTxRollbackInode(txIno.Inode, inodeInfo, TxAdd)
var quotaId uint32
quotaIds, isFind := mp.isExistQuota(txIno.Inode.Inode)
if isFind {
quotaId = quotaIds[0]
}
rbInode := NewTxRollbackInode(txIno.Inode, quotaId, inodeInfo, TxAdd)
if resp.Status = mp.txProcessor.txResource.addTxRollbackInode(rbInode); resp.Status != proto.OpOk {
//resp.Status = proto.OpTxConflictErr
return
Expand Down
10 changes: 10 additions & 0 deletions metanode/partition_op_dentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,16 @@ func (mp *metaPartition) TxCreateDentry(req *proto.TxCreateDentryRequest, p *Pac
return
}

for _, quotaId := range req.QuotaIds {
status := mp.mqMgr.IsOverQuota(false, true, quotaId)
if status != 0 {
err = errors.New("create dentry is over quota")
reply := []byte(err.Error())
p.PacketErrorWithBody(status, reply)
return
}
}

txInfo := req.TxInfo.GetCopy()
/*if req.TxInfo.TxID == "" && req.TxInfo.TmID == -1 {
txInfo.TxID = mp.txProcessor.txManager.nextTxID()
Expand Down
58 changes: 43 additions & 15 deletions metanode/partition_op_inode.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,11 @@ func (mp *metaPartition) ClearInodeCache(req *proto.ClearInodeCacheRequest, p *P

// TxCreateInode returns a new inode.
func (mp *metaPartition) TxCreateInode(req *proto.TxCreateInodeRequest, p *Packet) (err error) {
var (
status = proto.OpNotExistErr
reply []byte
resp interface{}
)
inoID, err := mp.nextInodeID()
if err != nil {
p.PacketErrorWithBody(proto.OpInodeFullErr, []byte(err.Error()))
Expand Down Expand Up @@ -664,20 +669,43 @@ func (mp *metaPartition) TxCreateInode(req *proto.TxCreateInodeRequest, p *Packe
txIno.Inode.Gid = req.Gid
txIno.Inode.LinkTarget = req.Target

val, err := txIno.Marshal()
if err != nil {
p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
return
}
status, err := mp.submit(opFSMTxCreateInode, val)
if err != nil {
p.PacketErrorWithBody(proto.OpAgain, []byte(err.Error()))
return
if defaultQuotaSwitch {
for _, quotaId := range req.QuotaIds {
status = mp.mqMgr.IsOverQuota(false, true, quotaId)
if status != 0 {
err = errors.New("tx create inode is over quota")
reply = []byte(err.Error())
p.PacketErrorWithBody(status, reply)
return
}
}

qinode := &TxMetaQuotaInode{
txinode: txIno,
quotaIds: req.QuotaIds,
}
val, err := qinode.Marshal()
if err != nil {
p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
return err
}
resp, err = mp.submit(opFSMTxCreateInodeQuota, val)
if err != nil {
p.PacketErrorWithBody(proto.OpAgain, []byte(err.Error()))
return err
}
} else {
val, err := txIno.Marshal()
if err != nil {
p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
return err
}
resp, err = mp.submit(opFSMTxCreateInode, val)
if err != nil {
p.PacketErrorWithBody(proto.OpAgain, []byte(err.Error()))
return err
}
}
var (
//status = proto.OpNotExistErr
reply []byte
)

var rstTxInfo *proto.TransactionInfo
if req.TxInfo.TxID == "" && req.TxInfo.TmID == -1 {
Expand All @@ -688,7 +716,7 @@ func (mp *metaPartition) TxCreateInode(req *proto.TxCreateInodeRequest, p *Packe
rstTxInfo = req.TxInfo
}

if status.(uint8) == proto.OpOk {
if resp == proto.OpOk {
/*resp := &proto.TxCreateInodeResponse{
Info: &proto.InodeInfo{},
TxInfo: rstTxInfo,
Expand All @@ -709,6 +737,6 @@ func (mp *metaPartition) TxCreateInode(req *proto.TxCreateInodeRequest, p *Packe
reply = []byte(err.Error())
}
}
p.PacketErrorWithBody(status.(uint8), reply)
p.PacketErrorWithBody(status, reply)
return
}
2 changes: 1 addition & 1 deletion metanode/partition_op_quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (mp *metaPartition) batchDeleteInodeQuota(req *proto.BatchDeleteMetaserverQ
quotaInfo.Status = proto.QuotaDeleting
}
} else {
log.LogErrorf("batchDeleteInodeQuota QuotaInfoMap can not find quota [%v]", req.QuotaId)
log.LogErrorf("batchDeleteInodeQuota QuotaInfoMap can not find inode [%v] quota [%v]", ino, req.QuotaId)
continue
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion metanode/partition_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ func (mp *metaPartition) loadTxRbInode(rootDir string, crc uint32) (err error) {
return
}

txRbInode := NewTxRollbackInode(nil, nil, 0)
txRbInode := NewTxRollbackInode(nil, 0, nil, 0)
if err = txRbInode.Unmarshal(txBuf); err != nil {
err = errors.NewErrorf("[loadTxRbInode] Unmarshal: %s", err.Error())
return
Expand Down
Loading

0 comments on commit 517df2d

Please sign in to comment.