Skip to content

Commit

Permalink
fix&enhance(meta&master): 1.Added power off to restart the metanode t…
Browse files Browse the repository at this point in the history
…o ensure that the quota can be set correctly

        2.Increase the flow control of set and delete quota on metanode
        3.Fix after setting the quota, the rootinode of the created file is wrong
        4.Fix version upgrade old createInode request apply failed
Signed-off-by: baijiaruo <[email protected]>
  • Loading branch information
baijiaruo authored and leonrayang committed May 18, 2023
1 parent 40965e0 commit 1b5739a
Show file tree
Hide file tree
Showing 18 changed files with 316 additions and 104 deletions.
5 changes: 1 addition & 4 deletions master/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2761,10 +2761,7 @@ func (c *Cluster) createVol(req *createVolReq) (vol *Vol, err error) {

vol.aclMgr.init(c, vol)
vol.initUidSpaceManager(c)

if err = vol.initQuotaManager(c); err != nil {
goto errHandler
}
vol.initQuotaManager(c)

if err = vol.initMetaPartitions(c, req.mpCount); err != nil {

Expand Down
7 changes: 7 additions & 0 deletions master/master_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func (m *Server) loadMetadata() {
if err = m.cluster.loadDataPartitions(); err != nil {
panic(err)
}

if err = m.cluster.startDecommissionListTraverse(); err != nil {
panic(err)
}
Expand Down Expand Up @@ -196,6 +197,12 @@ func (m *Server) loadMetadata() {
panic(err)
}
log.LogInfo("action[loadApiLimiterInfo] end")

log.LogInfo("action[loadQuota] begin")
if err = m.cluster.loadQuota(); err != nil {
panic(err)
}
log.LogInfo("action[loadQuota] end")
}

func (m *Server) clearMetadata() {
Expand Down
2 changes: 1 addition & 1 deletion master/master_quota_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (mqMgr *MasterQuotaManager) deleteQuota(quotaId uint32) (err error) {
return
}

var inodes = make([]uint64, 1)
var inodes = make([]uint64, 0)
inodes = append(inodes, quotaInfo.RootInode)
request := &proto.BatchDeleteMetaserverQuotaReuqest{
PartitionId: quotaInfo.PartitionId,
Expand Down
16 changes: 12 additions & 4 deletions master/metadata_fsm_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -1272,10 +1272,6 @@ func (c *Cluster) loadVols() (err error) {
continue
}

if err = vol.initQuotaManager(c); err != nil {
log.LogErrorf("loadVols initQuotaManager fail err [%v]", err.Error())
return err
}
c.putVol(vol)
log.LogInfof("action[loadVols],vol[%v]", vol.Name)
}
Expand Down Expand Up @@ -1363,6 +1359,18 @@ func (c *Cluster) loadDataPartitions() (err error) {
return
}

func (c *Cluster) loadQuota() (err error) {
c.volMutex.RLock()
defer c.volMutex.RUnlock()
for name, vol := range c.vols {
if err = vol.loadQuotaManager(c); err != nil {
log.LogErrorf("loadQuota loadQuotaManager vol [%v] fail err [%v]", name, err.Error())
return err
}
}
return
}

func (c *Cluster) addBadDataParitionIdMap(dp *DataPartition) {
if !dp.isRecover {
return
Expand Down
42 changes: 38 additions & 4 deletions master/vol.go
Original file line number Diff line number Diff line change
Expand Up @@ -1253,7 +1253,17 @@ func getVolVarargs(vol *Vol) *VolVarargs {
}
}

func (vol *Vol) initQuotaManager(c *Cluster) (err error) {
func (vol *Vol) initQuotaManager(c *Cluster) {
vol.quotaManager = &MasterQuotaManager{
MpQuotaInfoMap: make(map[uint64][]*proto.QuotaReportInfo),
IdQuotaInfoMap: make(map[uint32]*proto.QuotaInfo),
FullPathQuotaInfoMap: make(map[string]*proto.QuotaInfo),
c: c,
vol: vol,
}
}

func (vol *Vol) loadQuotaManager(c *Cluster) (err error) {
vol.quotaManager = &MasterQuotaManager{
MpQuotaInfoMap: make(map[uint64][]*proto.QuotaReportInfo),
IdQuotaInfoMap: make(map[uint32]*proto.QuotaInfo),
Expand All @@ -1264,22 +1274,46 @@ func (vol *Vol) initQuotaManager(c *Cluster) (err error) {

result, err := c.fsm.store.SeekForPrefix([]byte(quotaPrefix + strconv.FormatUint(vol.ID, 10)))
if err != nil {
err = fmt.Errorf("initQuotaManager get quota failed, err [%v]", err)
err = fmt.Errorf("loadQuotaManager get quota failed, err [%v]", err)
return err
}

for _, value := range result {
var quotaInfo = &proto.QuotaInfo{}

if err = json.Unmarshal(value, quotaInfo); err != nil {
log.LogErrorf("initQuotaManager Unmarshal fail err [%v]", err)
log.LogErrorf("loadQuotaManager Unmarshal fail err [%v]", err)
return err
}
log.LogDebugf("loadQuota info [%v]", quotaInfo)
log.LogDebugf("loadQuotaManager info [%v]", quotaInfo)
if vol.Name != quotaInfo.VolName {
panic("vol name do not match")
}

// if quotaInfo.Status != proto.QuotaComplete {
// var inodes = make([]uint64, 0)
// inodes = append(inodes, quotaInfo.RootInode)
// if quotaInfo.Status == proto.QuotaInit {
// request := &proto.BatchSetMetaserverQuotaReuqest{
// PartitionId: quotaInfo.PartitionId,
// Inodes: inodes,
// QuotaId: quotaInfo.QuotaId,
// }
// if err = vol.quotaManager.setQuotaToMetaNode(request); err != nil {
// log.LogErrorf("set quota [%v] to metanode fail [%v].", quotaInfo, err)
// }
// } else if quotaInfo.Status == proto.QuotaDeleting {
// request := &proto.BatchDeleteMetaserverQuotaReuqest{
// PartitionId: quotaInfo.PartitionId,
// Inodes: inodes,
// QuotaId: quotaInfo.QuotaId,
// }
// if err = vol.quotaManager.DeleteQuotaToMetaNode(request); err != nil {
// log.LogErrorf("delete quota [%v] to metanode fail [%v].", quotaInfo, err)
// }
// }
// }

vol.quotaManager.IdQuotaInfoMap[quotaInfo.QuotaId] = quotaInfo
vol.quotaManager.FullPathQuotaInfoMap[quotaInfo.FullPath] = quotaInfo
}
Expand Down
7 changes: 6 additions & 1 deletion metanode/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ const (
opFSMTxSnapshot
opFSMTxRbInodeSnapshot
opFSMTxRbDentrySnapshot

//quota
opFSMCreateInodeQuota
)

var (
Expand Down Expand Up @@ -198,7 +201,9 @@ const (
intervalToPersistData = time.Minute * 5
intervalToSyncCursor = time.Minute * 1

defaultDelExtentsCnt = 100000
defaultDelExtentsCnt = 100000
defaultMaxQuotaGoroutine = 5
defaultQuotaSwitch = true
)

const (
Expand Down
49 changes: 32 additions & 17 deletions metanode/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,19 @@ type MetadataManagerConfig struct {
}

type metadataManager struct {
nodeId uint64
zoneName string
rootDir string
raftStore raftstore.RaftStore
connPool *util.ConnectPool
state uint32
mu sync.RWMutex
partitions map[uint64]MetaPartition // Key: metaRangeId, Val: metaPartition
metaNode *MetaNode
flDeleteBatchCount atomic.Value
fileStatsEnable bool
nodeId uint64
zoneName string
rootDir string
raftStore raftstore.RaftStore
connPool *util.ConnectPool
state uint32
mu sync.RWMutex
partitions map[uint64]MetaPartition // Key: metaRangeId, Val: metaPartition
metaNode *MetaNode
flDeleteBatchCount atomic.Value
fileStatsEnable bool
curQuotaGoroutineNum int32
maxQuotaGoroutineNum int32
}

func (m *metadataManager) getPacketLabels(p *Packet) (labels map[string]string) {
Expand Down Expand Up @@ -548,15 +550,28 @@ func (m *metadataManager) MarshalJSON() (data []byte, err error) {
return json.Marshal(m.partitions)
}

func (m *metadataManager) QuotaGoroutineIsOver() (lsOver bool) {
log.LogInfof("hytemp QuotaGoroutineIsOver cur [%v] max [%v]", m.curQuotaGoroutineNum, m.maxQuotaGoroutineNum)
if atomic.LoadInt32(&m.curQuotaGoroutineNum) >= m.maxQuotaGoroutineNum {
return true
}
return false
}

func (m *metadataManager) QuotaGoroutineInc(num int32) {
atomic.AddInt32(&m.curQuotaGoroutineNum, num)
}

// NewMetadataManager returns a new metadata manager.
func NewMetadataManager(conf MetadataManagerConfig, metaNode *MetaNode) MetadataManager {
return &metadataManager{
nodeId: conf.NodeID,
zoneName: conf.ZoneName,
rootDir: conf.RootDir,
raftStore: conf.RaftStore,
partitions: make(map[uint64]MetaPartition),
metaNode: metaNode,
nodeId: conf.NodeID,
zoneName: conf.ZoneName,
rootDir: conf.RootDir,
raftStore: conf.RaftStore,
partitions: make(map[uint64]MetaPartition),
metaNode: metaNode,
maxQuotaGoroutineNum: defaultMaxQuotaGoroutine,
}
}

Expand Down
3 changes: 1 addition & 2 deletions metanode/manager_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -1942,8 +1942,7 @@ func (m *metadataManager) OpMasterSetInodeQuota(conn net.Conn, p *Packet, remote
err = errors.NewErrorf("[OpMasterSetInodeQuota] req: %v, resp: %v", req, err.Error())
return
}
// leaderAddr, ok := mp.IsLeader()
// log.LogInfof("[opMetaBatchSetInodeQuota] mp [%v] isLeader [%v] leader[%v]", mp, ok, leaderAddr)

if !m.serveProxy(conn, mp, p) {
return
}
Expand Down
2 changes: 1 addition & 1 deletion metanode/metanode.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func doStart(s common.Server, cfg *config.Config) (err error) {
exporter.Warning(err.Error())
return
}

go m.startUpdateInodeQuota()
exporter.RegistConsul(m.clusterId, cfg.GetString("role"), cfg)
return
}
Expand Down
16 changes: 15 additions & 1 deletion metanode/nodeinfo.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package metanode

import (
"github.com/cubefs/cubefs/proto"
"sync/atomic"
"time"

"github.com/cubefs/cubefs/proto"

"github.com/cubefs/cubefs/util/log"
)

Expand Down Expand Up @@ -90,3 +91,16 @@ func (m *MetaNode) updateNodeInfo() {

//updateDirChildrenNumLimit(clusterInfo.DirChildrenNumLimit)
}

func (m *MetaNode) startUpdateInodeQuota() {
//time.Sleep(100 * time.Second)
if manager, ok := m.metadataManager.(*metadataManager); ok {
manager.mu.RLock()
for _, p := range manager.partitions {
if mp, ok := p.(*metaPartition); ok {
mp.UpdateInodeQuota()
}
}
manager.mu.RUnlock()
}
}
9 changes: 9 additions & 0 deletions metanode/partition_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ func (mp *metaPartition) Apply(command []byte, index uint64) (resp interface{},

switch msg.Op {
case opFSMCreateInode:
ino := NewInode(0, 0)
if err = ino.Unmarshal(msg.V); err != nil {
return
}
if mp.config.Cursor < ino.Inode {
mp.config.Cursor = ino.Inode
}
resp = mp.fsmCreateInode(ino)
case opFSMCreateInodeQuota:
qinode := &MetaQuotaInode{}
if err = qinode.Unmarshal(msg.V); err != nil {
return
Expand Down
20 changes: 10 additions & 10 deletions metanode/partition_fsmop.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,19 @@ func (mp *metaPartition) initInode(ino *Inode) {
if !mp.raftPartition.IsRaftLeader() {
continue
}
qinode := &MetaQuotaInode{
inode: ino,
quotaIds: make([]uint32, 0, 0),
}
data, err := qinode.Marshal()
// qinode := &MetaQuotaInode{
// inode: ino,
// quotaIds: make([]uint32, 0, 0),
// }
// data, err := qinode.Marshal()
// if err != nil {
// log.LogFatalf("[initInode] marshal: %s", err.Error())
// }

data, err := ino.Marshal()
if err != nil {
log.LogFatalf("[initInode] marshal: %s", err.Error())
}
/*
data, err := ino.Marshal()
if err != nil {
log.LogFatalf("[initInode] marshal: %s", err.Error())
}*/
// put first root inode
resp, err := mp.submit(opFSMCreateInode, data)
if err != nil {
Expand Down
59 changes: 37 additions & 22 deletions metanode/partition_op_inode.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,38 +101,53 @@ func (mp *metaPartition) CreateInode(req *CreateInoReq, p *Packet) (err error) {
var (
status = proto.OpNotExistErr
reply []byte
resp interface{}
qinode *MetaQuotaInode
)
inoID, err := mp.nextInodeID()
if err != nil {
p.PacketErrorWithBody(proto.OpInodeFullErr, []byte(err.Error()))
return
}
for _, quotaId := range req.QuotaIds {
status = mp.mqMgr.IsOverQuota(false, true, quotaId)
if status != 0 {
err = errors.New("create inode is over quota")
reply = []byte(err.Error())
p.PacketErrorWithBody(status, reply)
return
}
}
ino := NewInode(inoID, req.Mode)
ino.Uid = req.Uid
ino.Gid = req.Gid
ino.LinkTarget = req.Target
qinode := &MetaQuotaInode{
inode: ino,
quotaIds: req.QuotaIds,
}
val, err := qinode.Marshal()
if err != nil {
p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
return
}
resp, err := mp.submit(opFSMCreateInode, val)
if err != nil {
p.PacketErrorWithBody(proto.OpAgain, []byte(err.Error()))
return
if defaultQuotaSwitch {
for _, quotaId := range req.QuotaIds {
status = mp.mqMgr.IsOverQuota(false, true, quotaId)
if status != 0 {
err = errors.New("create inode is over quota")
reply = []byte(err.Error())
p.PacketErrorWithBody(status, reply)
return
}
}
qinode = &MetaQuotaInode{
inode: ino,
quotaIds: req.QuotaIds,
}
val, err := qinode.Marshal()
if err != nil {
p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
return err
}
resp, err = mp.submit(opFSMCreateInodeQuota, val)
if err != nil {
p.PacketErrorWithBody(proto.OpAgain, []byte(err.Error()))
return err
}
} else {
val, err := ino.Marshal()
if err != nil {
p.PacketErrorWithBody(proto.OpErr, []byte(err.Error()))
return err
}
resp, err = mp.submit(opFSMCreateInode, val)
if err != nil {
p.PacketErrorWithBody(proto.OpAgain, []byte(err.Error()))
return err
}
}

if resp.(uint8) == proto.OpOk {
Expand Down
Loading

0 comments on commit 1b5739a

Please sign in to comment.