Skip to content

Commit

Permalink
update: qos update client log output
Browse files Browse the repository at this point in the history
Signed-off-by: leonrayang <[email protected]>
  • Loading branch information
leonrayang committed Aug 19, 2022
1 parent 02388eb commit eb3dda7
Showing 1 changed file with 33 additions and 28 deletions.
61 changes: 33 additions & 28 deletions sdk/data/manager/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ func (factor *LimitFactor) getNeedByMagnify(allocCnt uint32, magnify uint32) uin
}

func (factor *LimitFactor) alloc(allocCnt uint32) (ret uint8, future *util.Future) {
log.LogDebugf("action[alloc] type [%v] alloc [%v], tmp factor waitlist [%v] hitlimtcnt [%v] len [%v]", proto.QosTypeString(factor.factorType),
allocCnt, factor.waitList.Len(), factor.gidHitLimitCnt, factor.gridList.Len())
//log.LogDebugf("action[alloc] type [%v] alloc [%v], tmp factor waitlist [%v] hitlimtcnt [%v] len [%v]", proto.QosTypeString(factor.factorType),
// allocCnt, factor.waitList.Len(), factor.gidHitLimitCnt, factor.gridList.Len())
atomic.AddUint64(&factor.valAllocApply, uint64(allocCnt))
if !factor.mgr.enable {
// used not accurate also fine, the purpose is get master's info
Expand Down Expand Up @@ -214,7 +214,7 @@ func (factor *LimitFactor) TryReleaseWaitList() {
value := factor.waitList.Front()
ele := value.Value.(*AllocElement)

log.LogDebugf("action[TryReleaseWaitList] type [%v] ele used [%v]", proto.QosTypeString(factor.factorType), ele.used)
// log.LogDebugf("action[TryReleaseWaitList] type [%v] ele used [%v]", proto.QosTypeString(factor.factorType), ele.used)
for atomic.LoadUint64(&tGrid.used)+uint64(ele.used) > tGrid.limit+tGrid.buffer {

log.LogWarnf("action[TryReleaseWaitList] type [%v] new gird be used up.alloc in waitlist left cnt [%v],"+
Expand Down Expand Up @@ -267,21 +267,28 @@ func (factor *LimitFactor) CheckGrid() {
log.LogWarnf("action[CheckGrid]. qos recv no command from master in long time, last time %v, grid time %v",
factor.mgr.lastTimeOfSetLimit, newGrid.time)
}
log.LogDebugf("action[CheckGrid] factor type:[%v] gridlistLen:[%v] waitlistLen:[%v] hitlimitcnt:[%v] "+
"add new grid info girdid[%v] used:[%v] limit:[%v] buffer:[%v] time:[%v]",
proto.QosTypeString(factor.factorType), factor.gridList.Len(), factor.waitList.Len(), factor.gidHitLimitCnt,
newGrid.ID, newGrid.used, newGrid.limit, newGrid.buffer, newGrid.time)
if factor.mgr.enable {
log.LogDebugf("action[CheckGrid] factor type:[%v] gridlistLen:[%v] waitlistLen:[%v] hitlimitcnt:[%v] "+
"add new grid info girdid[%v] used:[%v] limit:[%v] buffer:[%v] time:[%v]",
proto.QosTypeString(factor.factorType), factor.gridList.Len(), factor.waitList.Len(), factor.gidHitLimitCnt,
newGrid.ID, newGrid.used, newGrid.limit, newGrid.buffer, newGrid.time)
}


factor.gridList.PushBack(newGrid)
for factor.gridList.Len() > gridWindowTimeScope*girdCntOneSecond {
firstGrid := factor.gridList.Front().Value.(*GridElement)
if firstGrid.hitLimit {
factor.gidHitLimitCnt--
log.LogDebugf("action[CheckGrid] factor [%v] after minus gidHitLimitCnt:[%v]",
proto.QosTypeString(factor.factorType), factor.gidHitLimitCnt)
if factor.mgr.enable {
log.LogDebugf("action[CheckGrid] factor [%v] after minus gidHitLimitCnt:[%v]",
proto.QosTypeString(factor.factorType), factor.gidHitLimitCnt)
}
}
if factor.mgr.enable {
log.LogDebugf("action[CheckGrid] type:[%v] remove oldest grid id[%v] info buffer:[%v] limit:[%v] used[%v] from gridlist",
proto.QosTypeString(factor.factorType), firstGrid.ID, firstGrid.buffer, firstGrid.limit, firstGrid.used)
}
log.LogDebugf("action[CheckGrid] type:[%v] remove oldest grid id[%v] info buffer:[%v] limit:[%v] used[%v] from gridlist",
proto.QosTypeString(factor.factorType), firstGrid.ID, firstGrid.buffer, firstGrid.limit, firstGrid.used)
factor.gridList.Remove(factor.gridList.Front())
}
factor.TryReleaseWaitList()
Expand Down Expand Up @@ -368,7 +375,6 @@ func (limitManager *LimitManager) CalcNeedByPow(limitFactor *LimitFactor, used u
}

func (limitManager *LimitManager) GetFlowInfo() (*proto.ClientReportLimitInfo, bool) {
log.LogDebugf("action[LimitManager.GetFlowInfo]")
info := &proto.ClientReportLimitInfo{
FactorMap: make(map[uint32]*proto.ClientLimitInfo, 0),
}
Expand All @@ -393,15 +399,15 @@ func (limitManager *LimitManager) GetFlowInfo() (*proto.ClientReportLimitInfo, b
buffer += grid.Value.(*GridElement).buffer
griCnt++

log.LogDebugf("action[GetFlowInfo] type [%v] grid id[%v] used %v limit %v buffer %v time %v sum_used %v sum_limit %v,len %v",
proto.QosTypeString(factorType),
grid.Value.(*GridElement).ID,
grid.Value.(*GridElement).used,
grid.Value.(*GridElement).limit,
grid.Value.(*GridElement).buffer,
grid.Value.(*GridElement).time,
reqUsed,
limit, limitFactor.gridList.Len())
//log.LogDebugf("action[GetFlowInfo] type [%v] grid id[%v] used %v limit %v buffer %v time %v sum_used %v sum_limit %v,len %v",
// proto.QosTypeString(factorType),
// grid.Value.(*GridElement).ID,
// grid.Value.(*GridElement).used,
// grid.Value.(*GridElement).limit,
// grid.Value.(*GridElement).buffer,
// grid.Value.(*GridElement).time,
// reqUsed,
// limit, limitFactor.gridList.Len())
if grid.Prev() == nil || griCnt >= girdCntOneSecond {
log.LogDebugf("action[[GetFlowInfo] type [%v] grid count %v reqused %v list len %v",
proto.QosTypeString(factorType), griCnt, reqUsed, limitFactor.gridList.Len())
Expand Down Expand Up @@ -477,11 +483,11 @@ func (limitManager *LimitManager) ScheduleCheckGrid() {
return
case <-ticker.C:
cnt++
for factorType, limitFactor := range limitManager.limitMap {
for _, limitFactor := range limitManager.limitMap {
limitFactor.CheckGrid()
if cnt%girdCntOneSecond == 0 {
log.LogDebugf("action[ScheduleCheckGrid] type [%v] factor apply val:[%v] commit val:[%v]",
proto.QosTypeString(factorType), atomic.LoadUint64(&limitFactor.valAllocApply), atomic.LoadUint64(&limitFactor.valAllocCommit))
//log.LogDebugf("action[ScheduleCheckGrid] type [%v] factor apply val:[%v] commit val:[%v]",
// proto.QosTypeString(factorType), atomic.LoadUint64(&limitFactor.valAllocApply), atomic.LoadUint64(&limitFactor.valAllocCommit))
limitFactor.valAllocLastApply = atomic.LoadUint64(&limitFactor.valAllocLastApply)
limitFactor.valAllocLastCommit = atomic.LoadUint64(&limitFactor.valAllocCommit)
atomic.StoreUint64(&limitFactor.valAllocApply, 0)
Expand All @@ -498,7 +504,7 @@ func (limitManager *LimitManager) SetClientLimit(limit *proto.LimitRsp2Client) {
log.LogErrorf("action[SetClientLimit] limit info is nil")
return
}
log.LogDebugf("action[SetClientLimit] limit enable %v", limit.Enable)

if limitManager.enable != limit.Enable {
log.LogWarnf("action[SetClientLimit] enable [%v]", limit.Enable)
}
Expand Down Expand Up @@ -540,7 +546,7 @@ func (limitManager *LimitManager) WaitN(ctx context.Context, lim *LimitFactor, n
var ret uint8
if ret, fut = lim.alloc(uint32(n)); ret == runNow {
atomic.AddUint64(&lim.valAllocCommit, uint64(n))
log.LogDebugf("action[WaitN] type [%v] return now waitlistlen [%v]", proto.QosTypeString(lim.factorType), lim.waitList.Len())
// log.LogDebugf("action[WaitN] type [%v] return now waitlistlen [%v]", proto.QosTypeString(lim.factorType), lim.waitList.Len())
return nil
}

Expand All @@ -555,14 +561,13 @@ func (limitManager *LimitManager) WaitN(ctx context.Context, lim *LimitFactor, n
return
case <-respCh:
atomic.AddUint64(&lim.valAllocCommit, uint64(n))
log.LogDebugf("action[WaitN] type [%v] return waitlistlen [%v]", proto.QosTypeString(lim.factorType), lim.waitList.Len())
// log.LogDebugf("action[WaitN] type [%v] return waitlistlen [%v]", proto.QosTypeString(lim.factorType), lim.waitList.Len())
return nil
//default:
}
}

func (limitManager *LimitManager) UpdateFlowInfo(limit *proto.LimitRsp2Client) {
log.LogDebugf("action[LimitManager.UpdateFlowInfo]")
limitManager.SetClientLimit(limit)
return
}
Expand Down

0 comments on commit eb3dda7

Please sign in to comment.