Skip to content

Commit

Permalink
Merge pull request kubernetes#105412 from MikeSpreitzer/define-seat-s…
Browse files Browse the repository at this point in the history
…econds

Define datatype for seat-seconds and prevent overflow
  • Loading branch information
k8s-ci-robot authored Oct 5, 2021
2 parents efa9029 + 4b5e139 commit 6f942a0
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ type queueSetCompleter struct {
// not end in "Locked" either acquires the lock or does not care about
// locking.
type queueSet struct {
clock eventclock.Interface
estimatedServiceSeconds float64
obsPair metrics.TimedObserverPair
clock eventclock.Interface
estimatedServiceDuration time.Duration
obsPair metrics.TimedObserverPair

promiseFactory promiseFactory

Expand All @@ -102,9 +102,9 @@ type queueSet struct {
// queues are still draining.
queues []*queue

// virtualTime is the amount of seat-seconds allocated per queue since process startup.
// currentR is the amount of seat-seconds allocated per queue since process startup.
// This is our generalization of the progress meter named R in the original fair queuing work.
virtualTime float64
currentR SeatSeconds

// lastRealTime is what `clock.Now()` yielded when `virtualTime` was last updated
lastRealTime time.Time
Expand Down Expand Up @@ -173,12 +173,12 @@ func (qsc *queueSetCompleter) Complete(dCfg fq.DispatchingConfig) fq.QueueSet {
qs := qsc.theSet
if qs == nil {
qs = &queueSet{
clock: qsc.factory.clock,
estimatedServiceSeconds: 0.003,
obsPair: qsc.obsPair,
qCfg: qsc.qCfg,
virtualTime: 0,
lastRealTime: qsc.factory.clock.Now(),
clock: qsc.factory.clock,
estimatedServiceDuration: 3 * time.Millisecond,
obsPair: qsc.obsPair,
qCfg: qsc.qCfg,
currentR: 0,
lastRealTime: qsc.factory.clock.Now(),
}
qs.promiseFactory = qsc.factory.promiseFactoryFactory(qs)
}
Expand Down Expand Up @@ -407,10 +407,14 @@ func (qs *queueSet) lockAndSyncTime() {
// lock and before modifying the state of any queue.
func (qs *queueSet) syncTimeLocked() {
realNow := qs.clock.Now()
timeSinceLast := realNow.Sub(qs.lastRealTime).Seconds()
timeSinceLast := realNow.Sub(qs.lastRealTime)
qs.lastRealTime = realNow
qs.virtualTime += timeSinceLast * qs.getVirtualTimeRatioLocked()
metrics.SetCurrentR(qs.qCfg.Name, qs.virtualTime)
prevR := qs.currentR
qs.currentR += SeatsTimesDuration(qs.getVirtualTimeRatioLocked(), timeSinceLast)
if qs.currentR < prevR {
klog.ErrorS(errors.New("progress meter wrapped around"), "Wrap", "QS", qs.qCfg.Name, "prevR", prevR, "currentR", qs.currentR)
}
metrics.SetCurrentR(qs.qCfg.Name, qs.currentR.ToFloat())
}

// getVirtualTimeRatio calculates the rate at which virtual time has
Expand Down Expand Up @@ -460,7 +464,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte
ctx: ctx,
decision: qs.promiseFactory(nil, ctx.Done(), decisionCancel),
arrivalTime: qs.clock.Now(),
arrivalR: qs.virtualTime,
arrivalR: qs.currentR,
queue: queue,
descr1: descr1,
descr2: descr2,
Expand Down Expand Up @@ -495,15 +499,15 @@ func (qs *queueSet) shuffleShardLocked(hashValue uint64, descr1, descr2 interfac
// Ideally, this should be based on projected completion time in the
// virtual world of the youngest request in the queue.
thisSeatsSum := waitingSeats + queue.seatsInUse
klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of %d seats waiting and %d executing, virtualStart=%vss", qs.qCfg.Name, descr1, descr2, queueIdx, waitingSeats, queue.seatsInUse, queue.virtualStart)
klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of %d seats waiting and %d executing, nextDispatchR=%v", qs.qCfg.Name, descr1, descr2, queueIdx, waitingSeats, queue.seatsInUse, queue.nextDispatchR)
if thisSeatsSum < bestQueueSeatsSum {
bestQueueIdx, bestQueueSeatsSum = queueIdx, thisSeatsSum
}

}
if klog.V(6).Enabled() {
chosenQueue := qs.queues[bestQueueIdx]
klog.V(6).Infof("QS(%s) at r=%s v=%.9fss: For request %#+v %#+v chose queue %d, had seatSum %d & %d requests executing & virtualStart=%vss", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, descr1, descr2, bestQueueIdx, chosenQueue.requests.SeatsSum(), chosenQueue.requestsExecuting, chosenQueue.virtualStart)
klog.V(6).Infof("QS(%s) at t=%s R=%v: For request %#+v %#+v chose queue %d, had seatSum %d & %d requests executing & virtualStart=%v", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.currentR, descr1, descr2, bestQueueIdx, chosenQueue.requests.SeatsSum(), chosenQueue.requestsExecuting, chosenQueue.nextDispatchR)
}
return bestQueueIdx
}
Expand Down Expand Up @@ -571,9 +575,9 @@ func (qs *queueSet) enqueueLocked(request *request) {
now := qs.clock.Now()
if queue.requests.Length() == 0 && queue.requestsExecuting == 0 {
// the queue’s start R is set to the virtual time.
queue.virtualStart = qs.virtualTime
queue.nextDispatchR = qs.currentR
if klog.V(6).Enabled() {
klog.Infof("QS(%s) at r=%s v=%.9fss: initialized queue %d start R due to request %#+v %#+v", qs.qCfg.Name, now.Format(nsTimeFmt), queue.virtualStart, queue.index, request.descr1, request.descr2)
klog.Infof("QS(%s) at t=%s R=%v: initialized queue %d start R due to request %#+v %#+v", qs.qCfg.Name, now.Format(nsTimeFmt), queue.nextDispatchR, queue.index, request.descr1, request.descr2)
}
}
queue.Enqueue(request)
Expand Down Expand Up @@ -609,7 +613,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f
startTime: now,
decision: qs.promiseFactory(decisionExecute, ctx.Done(), decisionCancel),
arrivalTime: now,
arrivalR: qs.virtualTime,
arrivalR: qs.currentR,
descr1: descr1,
descr2: descr2,
workEstimate: *workEstimate,
Expand All @@ -620,7 +624,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f
metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, fsName, req.Seats())
qs.obsPair.RequestsExecuting.Add(1)
if klog.V(5).Enabled() {
klog.Infof("QS(%s) at r=%s v=%.9fss: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, fsName, descr1, descr2, qs.totRequestsExecuting)
klog.Infof("QS(%s) at t=%s R=%v: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, fsName, descr1, descr2, qs.totRequestsExecuting)
}
return req
}
Expand Down Expand Up @@ -656,12 +660,12 @@ func (qs *queueSet) dispatchLocked() bool {
qs.obsPair.RequestsWaiting.Add(-1)
qs.obsPair.RequestsExecuting.Add(1)
if klog.V(6).Enabled() {
klog.Infof("QS(%s) at r=%s v=%.9fss: dispatching request %#+v %#+v work %v from queue %d with start R %.9fss, queue will have %d waiting & %d requests occupying %d seats, set will have %d seats occupied",
qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2,
request.workEstimate, queue.index, queue.virtualStart, queue.requests.Length(), queue.requestsExecuting, queue.seatsInUse, qs.totSeatsInUse)
klog.Infof("QS(%s) at t=%s R=%v: dispatching request %#+v %#+v work %v from queue %d with start R %v, queue will have %d waiting & %d requests occupying %d seats, set will have %d seats occupied",
qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.currentR, request.descr1, request.descr2,
request.workEstimate, queue.index, queue.nextDispatchR, queue.requests.Length(), queue.requestsExecuting, queue.seatsInUse, qs.totSeatsInUse)
}
// When a request is dequeued for service -> qs.virtualStart += G * width
queue.virtualStart += qs.estimatedServiceSeconds * float64(request.Seats())
queue.nextDispatchR += SeatsTimesDuration(float64(request.Seats()), qs.estimatedServiceDuration)
qs.boundNextDispatch(queue)
request.decision.Set(decisionExecute)
return ok
Expand Down Expand Up @@ -694,11 +698,11 @@ func (qs *queueSet) canAccommodateSeatsLocked(seats int) bool {
// returns the first one of those for which the virtual finish time of
// the oldest waiting request is minimal.
func (qs *queueSet) findDispatchQueueLocked() *queue {
minVirtualFinish := math.Inf(1)
sMin := math.Inf(1)
dsMin := math.Inf(1)
sMax := math.Inf(-1)
dsMax := math.Inf(-1)
minVirtualFinish := MaxSeatSeconds
sMin := MaxSeatSeconds
dsMin := MaxSeatSeconds
sMax := MinSeatSeconds
dsMax := MinSeatSeconds
var minQueue *queue
var minIndex int
nq := len(qs.queues)
Expand All @@ -707,12 +711,12 @@ func (qs *queueSet) findDispatchQueueLocked() *queue {
queue := qs.queues[qs.robinIndex]
oldestWaiting, _ := queue.requests.Peek()
if oldestWaiting != nil {
sMin = math.Min(sMin, queue.virtualStart)
sMax = math.Max(sMax, queue.virtualStart)
estimatedWorkInProgress := qs.estimatedServiceSeconds * float64(queue.seatsInUse)
dsMin = math.Min(dsMin, queue.virtualStart-estimatedWorkInProgress)
dsMax = math.Max(dsMax, queue.virtualStart-estimatedWorkInProgress)
currentVirtualFinish := queue.virtualStart + qs.estimatedServiceSeconds*float64(oldestWaiting.Seats())
sMin = ssMin(sMin, queue.nextDispatchR)
sMax = ssMax(sMax, queue.nextDispatchR)
estimatedWorkInProgress := SeatsTimesDuration(float64(queue.seatsInUse), qs.estimatedServiceDuration)
dsMin = ssMin(dsMin, queue.nextDispatchR-estimatedWorkInProgress)
dsMax = ssMax(dsMax, queue.nextDispatchR-estimatedWorkInProgress)
currentVirtualFinish := queue.nextDispatchR + SeatsTimesDuration(float64(oldestWaiting.Seats()), qs.estimatedServiceDuration)
klog.V(11).InfoS("Considering queue to dispatch", "queueSet", qs.qCfg.Name, "queue", qs.robinIndex, "finishR", currentVirtualFinish)
if currentVirtualFinish < minVirtualFinish {
minVirtualFinish = currentVirtualFinish
Expand Down Expand Up @@ -743,13 +747,27 @@ func (qs *queueSet) findDispatchQueueLocked() *queue {
// win in the case that the virtual finish times are the same
qs.robinIndex = minIndex

if minQueue.virtualStart < oldestReqFromMinQueue.arrivalR {
klog.ErrorS(errors.New("dispatch before arrival"), "Inconceivable!", "QS", qs.qCfg.Name, "queue", minQueue.index, "dispatchR", minQueue.virtualStart, "request", oldestReqFromMinQueue)
if minQueue.nextDispatchR < oldestReqFromMinQueue.arrivalR {
klog.ErrorS(errors.New("dispatch before arrival"), "Inconceivable!", "QS", qs.qCfg.Name, "queue", minQueue.index, "dispatchR", minQueue.nextDispatchR, "request", oldestReqFromMinQueue)
}
metrics.SetDispatchMetrics(qs.qCfg.Name, qs.virtualTime, minQueue.virtualStart, sMin, sMax, dsMin, dsMax)
metrics.SetDispatchMetrics(qs.qCfg.Name, qs.currentR.ToFloat(), minQueue.nextDispatchR.ToFloat(), sMin.ToFloat(), sMax.ToFloat(), dsMin.ToFloat(), dsMax.ToFloat())
return minQueue
}

func ssMin(a, b SeatSeconds) SeatSeconds {
if a > b {
return b
}
return a
}

func ssMax(a, b SeatSeconds) SeatSeconds {
if a < b {
return b
}
return a
}

// finishRequestAndDispatchAsMuchAsPossible is a convenience method
// which calls finishRequest for a given request and then dispatches
// as many requests as possible. This is all of what needs to be done
Expand All @@ -773,7 +791,7 @@ func (qs *queueSet) finishRequestLocked(r *request) {
metrics.AddRequestsExecuting(r.ctx, qs.qCfg.Name, r.fsName, -1)
qs.obsPair.RequestsExecuting.Add(-1)

S := now.Sub(r.startTime).Seconds()
actualServiceDuration := now.Sub(r.startTime)

// TODO: for now we keep the logic localized so it is easier to see
// how the counters are tracked for queueset and queue, in future we
Expand All @@ -794,23 +812,23 @@ func (qs *queueSet) finishRequestLocked(r *request) {
releaseSeatsLocked()
if !klog.V(6).Enabled() {
} else if r.queue != nil {
klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished all use of %d seats, adjusted queue %d start R to %.9fss due to service time %.9fs, queue will have %d requests, %d seats waiting & %d requests occupying %d seats",
qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.InitialSeats, r.queue.index,
r.queue.virtualStart, S, r.queue.requests.Length(), r.queue.requests.SeatsSum(), r.queue.requestsExecuting, r.queue.seatsInUse)
klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished all use of %d seats, adjusted queue %d start R to %v due to service time %.9fs, queue will have %d requests, %d seats waiting & %d requests occupying %d seats",
qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, r.queue.index,
r.queue.nextDispatchR, actualServiceDuration.Seconds(), r.queue.requests.Length(), r.queue.requests.SeatsSum(), r.queue.requestsExecuting, r.queue.seatsInUse)
} else {
klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished all use of %d seats, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.InitialSeats, qs.totRequestsExecuting, qs.totSeatsInUse)
klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished all use of %d seats, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, qs.totRequestsExecuting, qs.totSeatsInUse)
}
return
}

additionalLatency := r.workEstimate.AdditionalLatency
if !klog.V(6).Enabled() {
} else if r.queue != nil {
klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished main use but lingering on %d seats for %v seconds, adjusted queue %d start R to %.9fss due to service time %.9fs, queue will have %d requests waiting & %d executing, still has %d seats waiting & %d executing",
qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.InitialSeats, additionalLatency.Seconds(), r.queue.index,
r.queue.virtualStart, S, r.queue.requests.Length(), r.queue.requestsExecuting, r.queue.requests.SeatsSum(), r.queue.seatsInUse)
klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished main use but lingering on %d seats for %v seconds, adjusted queue %d start R to %v due to service time %.9fs, queue will have %d requests waiting & %d executing, still has %d seats waiting & %d executing",
qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, additionalLatency.Seconds(), r.queue.index,
r.queue.nextDispatchR, actualServiceDuration.Seconds(), r.queue.requests.Length(), r.queue.requestsExecuting, r.queue.requests.SeatsSum(), r.queue.seatsInUse)
} else {
klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished main use but lingering on %d seats for %v seconds, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.InitialSeats, additionalLatency.Seconds(), qs.totRequestsExecuting, qs.totSeatsInUse)
klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished main use but lingering on %d seats for %v seconds, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, additionalLatency.Seconds(), qs.totRequestsExecuting, qs.totSeatsInUse)
}
// EventAfterDuration will execute the event func in a new goroutine,
// so the seats allocated to this request will be released after
Expand All @@ -823,11 +841,11 @@ func (qs *queueSet) finishRequestLocked(r *request) {
releaseSeatsLocked()
if !klog.V(6).Enabled() {
} else if r.queue != nil {
klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished lingering on %d seats, queue %d will have %d requests, %d seats waiting & %d requests occupying %d seats",
qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.InitialSeats, r.queue.index,
klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished lingering on %d seats, queue %d will have %d requests, %d seats waiting & %d requests occupying %d seats",
qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, r.queue.index,
r.queue.requests.Length(), r.queue.requests.SeatsSum(), r.queue.requestsExecuting, r.queue.seatsInUse)
} else {
klog.Infof("QS(%s) at r=%s v=%.9fss: request %#+v %#+v finished lingering on %d seats, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.workEstimate.InitialSeats, qs.totRequestsExecuting, qs.totSeatsInUse)
klog.Infof("QS(%s) at t=%s R=%v: request %#+v %#+v finished lingering on %d seats, qs will have %d requests occupying %d seats", qs.qCfg.Name, now.Format(nsTimeFmt), qs.currentR, r.descr1, r.descr2, r.workEstimate.InitialSeats, qs.totRequestsExecuting, qs.totSeatsInUse)
}
qs.dispatchAsMuchAsPossibleLocked()
}, additionalLatency)
Expand All @@ -839,7 +857,7 @@ func (qs *queueSet) finishRequestLocked(r *request) {

// When a request finishes being served, and the actual service time was S,
// the queue’s start R is decremented by (G - S)*width.
r.queue.virtualStart -= (qs.estimatedServiceSeconds - S) * float64(r.Seats())
r.queue.nextDispatchR -= SeatsTimesDuration(float64(r.Seats()), qs.estimatedServiceDuration-actualServiceDuration)
qs.boundNextDispatch(r.queue)
}
}
Expand All @@ -857,11 +875,11 @@ func (qs *queueSet) boundNextDispatch(queue *queue) {
return
}
var virtualStartBound = oldestReqFromMinQueue.arrivalR
if queue.virtualStart < virtualStartBound {
if queue.nextDispatchR < virtualStartBound {
if klog.V(4).Enabled() {
klog.InfoS("AntiWindup tweaked queue", "QS", qs.qCfg.Name, "queue", queue.index, "time", qs.clock.Now().Format(nsTimeFmt), "requestDescr1", oldestReqFromMinQueue.descr1, "requestDescr2", oldestReqFromMinQueue.descr2, "newVirtualStart", virtualStartBound, "deltaVirtualStart", (virtualStartBound - queue.virtualStart))
klog.InfoS("AntiWindup tweaked queue", "QS", qs.qCfg.Name, "queue", queue.index, "time", qs.clock.Now().Format(nsTimeFmt), "requestDescr1", oldestReqFromMinQueue.descr1, "requestDescr2", oldestReqFromMinQueue.descr2, "newVirtualStart", virtualStartBound, "deltaVirtualStart", (virtualStartBound - queue.nextDispatchR))
}
queue.virtualStart = virtualStartBound
queue.nextDispatchR = virtualStartBound
}
}

Expand Down
Loading

0 comments on commit 6f942a0

Please sign in to comment.