Skip to content

Commit

Permalink
Fix bug of delivering random parts of events
Browse files Browse the repository at this point in the history
  • Loading branch information
wojtek-t committed Dec 22, 2016
1 parent 89a506a commit d31ff83
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 1 deletion.
20 changes: 19 additions & 1 deletion pkg/storage/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -859,9 +859,27 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
case !curObjPasses && oldObjPasses:
watchEvent = watch.Event{Type: watch.Deleted, Object: object}
}

// We need to ensure that if we put event X to the c.result, all
// previous events were already put into it before, no matter whether
// c.done is close or not.
// Thus we cannot simply select from c.done and c.result and this
// would give us non-determinism.
// At the same time, we don't want to block infinitely on putting
// to c.result, when c.done is already closed.

// This ensures that with c.done already close, we at most once go
// into the next select after this. With that, no matter which
// statement we choose there, we will deliver only consecutive
// events.
select {
case <-c.done:
return
default:
}

select {
case c.result <- watchEvent:
// don't block on c.result if c.done is closed
case <-c.done:
}
}
Expand Down
41 changes: 41 additions & 0 deletions pkg/storage/cacher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,3 +490,44 @@ func TestStartingResourceVersion(t *testing.T) {
t.Errorf("timed out waiting for event")
}
}

func TestRandomWatchDeliver(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
defer server.Terminate(t)
cacher := newTestCacher(etcdStorage, 10)
defer cacher.Stop()

fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil)
rv, err := storage.ParseWatchResourceVersion(fooCreated.ResourceVersion)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
startVersion := strconv.Itoa(int(rv))

watcher, err := cacher.WatchList(context.TODO(), "pods/ns", startVersion, storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// Now we can create exactly 21 events that should be delivered
// to the watcher, before it will completely block cacher and as
// a result will be dropped.
for i := 0; i < 21; i++ {
updatePod(t, etcdStorage, makeTestPod(fmt.Sprintf("foo-%d", i)), nil)
}

// Now stop the watcher and check if the consecutive events are being delivered.
watcher.Stop()

watched := 0
for {
event, ok := <-watcher.ResultChan()
if !ok {
break
}
if a, e := event.Object.(*api.Pod).Name, fmt.Sprintf("foo-%d", watched); e != a {
t.Errorf("Unexpected object watched: %s, expected %s", a, e)
}
watched++
}
}

0 comments on commit d31ff83

Please sign in to comment.