Skip to content

Commit

Permalink
Adds some TODOs and small fixes to pkg/util/workqueue
Browse files Browse the repository at this point in the history
Adds a new unit test for queue.
  • Loading branch information
jbeda committed Nov 4, 2016
1 parent e0c6bf1 commit 16b3485
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 5 deletions.
10 changes: 8 additions & 2 deletions pkg/util/workqueue/delaying_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
)

// DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to
// DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to
// requeue items after failures without ending up in a hot-loop.
type DelayingInterface interface {
Interface
Expand Down Expand Up @@ -68,6 +68,9 @@ type delayingType struct {
stopCh chan struct{}

// heartbeat ensures we wait no more than maxWait before firing
//
// TODO: replace with Ticker (and add to clock) so this can be cleaned up.
// clock.Tick will leak.
heartbeat <-chan time.Time

// waitingForAdd is an ordered slice of items to be added to the contained work queue
Expand Down Expand Up @@ -115,7 +118,7 @@ func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
}
}

// maxWait keeps a max bound on the wait time. It's just insurance against weird things happening.
// maxWait keeps a max bound on the wait time. It's just insurance against weird things happening.
// Checking the queue every 10 seconds isn't expensive and we know that we'll never end up with an
// expired item sitting for more than 10 seconds.
const maxWait = 10 * time.Second
Expand Down Expand Up @@ -192,6 +195,9 @@ func (q *delayingType) waitingLoop() {
// inserts the given entry into the sorted entries list
// same semantics as append()... the given slice may be modified,
// and the returned value should be used
//
// TODO: This should probably be converted to use container/heap to improve
// running time for a large number of items.
func insert(entries []waitFor, knownEntries map[t]time.Time, entry waitFor) []waitFor {
// if the entry is already in our retry list and the existing time is before the new one, just skip it
existingTime, exists := knownEntries[entry.data]
Expand Down
4 changes: 4 additions & 0 deletions pkg/util/workqueue/parallelizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ func Parallelize(workers, pieces int, doWorkPiece DoWorkPieceFunc) {
}
close(toProcess)

if pieces < workers {
workers = pieces
}

wg := sync.WaitGroup{}
wg.Add(workers)
for i := 0; i < workers; i++ {
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/workqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (q *Type) Done(item interface{}) {
}
}

// Shutdown will cause q to ignore all new items added to it. As soon as the
// ShutDown will cause q to ignore all new items added to it. As soon as the
// worker goroutines have drained the existing items in the queue, they will be
// instructed to exit.
func (q *Type) ShutDown() {
Expand Down
30 changes: 30 additions & 0 deletions pkg/util/workqueue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,33 @@ func TestLen(t *testing.T) {
t.Errorf("Expected %v, got %v", e, a)
}
}

func TestReinsert(t *testing.T) {
q := workqueue.New()
q.Add("foo")

// Start processing
i, _ := q.Get()
if i != "foo" {
t.Errorf("Expected %v, got %v", "foo", i)
}

// Add it back while processing
q.Add(i)

// Finish it up
q.Done(i)

// It should be back on the queue
i, _ = q.Get()
if i != "foo" {
t.Errorf("Expected %v, got %v", "foo", i)
}

// Finish that one up
q.Done(i)

if a := q.Len(); a != 0 {
t.Errorf("Expected queue to be empty. Has %v items", a)
}
}
5 changes: 3 additions & 2 deletions pkg/util/workqueue/rate_limitting_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@ limitations under the License.

package workqueue

// RateLimitingInterface is an Interface that can Add an item at a later time. This makes it easier to
// requeue items after failures without ending up in a hot-loop.
// RateLimitingInterface is an interface that rate limits items being added to the queue.
type RateLimitingInterface interface {
DelayingInterface

// AddRateLimited adds an item to the workqueue after the rate limiter says its ok
AddRateLimited(item interface{})

// Forget indicates that an item is finished being retried. Doesn't matter whether its for perm failing
// or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you
// still have to call `Done` on the queue.
Forget(item interface{})

// NumRequeues returns back how many times the item was requeued
NumRequeues(item interface{}) int
}
Expand Down

0 comments on commit 16b3485

Please sign in to comment.