Skip to content

Commit

Permalink
Merge pull request openshift#8755 from deads2k/fix-watch-0
Browse files Browse the repository at this point in the history
Merged by openshift-bot
  • Loading branch information
OpenShift Bot committed May 5, 2016
2 parents 8cf2c7e + d37235b commit d48ee4a
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 71 deletions.
2 changes: 1 addition & 1 deletion pkg/project/auth/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (ac *AuthorizationCache) RemoveWatcher(watcher CacheWatcher) {
// if we're not the last element, shift
copy(ac.watchers[i:], ac.watchers[i+1:])
}
ac.watchers = ac.watchers[:lastIndex-1]
ac.watchers = ac.watchers[:lastIndex]
break
}
}
Expand Down
71 changes: 53 additions & 18 deletions pkg/project/auth/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ type userProjectWatcher struct {
projectCache *projectcache.ProjectCache
authCache WatchableCache

knownProjects sets.String
initialProjects []kapi.Namespace
// knownProjects maps name to resourceVersion
knownProjects map[string]string
}

var (
Expand All @@ -67,12 +69,18 @@ var (
watchChannelHWM etcd.HighWaterMark
)

func NewUserProjectWatcher(username string, groups []string, projectCache *projectcache.ProjectCache, authCache WatchableCache) *userProjectWatcher {
func NewUserProjectWatcher(username string, groups []string, projectCache *projectcache.ProjectCache, authCache WatchableCache, includeAllExistingProjects bool) *userProjectWatcher {
userInfo := &user.DefaultInfo{Name: username, Groups: groups}
namespaces, _ := authCache.List(userInfo)
knownProjects := sets.String{}
knownProjects := map[string]string{}
for _, namespace := range namespaces.Items {
knownProjects.Insert(namespace.Name)
knownProjects[namespace.Name] = namespace.ResourceVersion
}

// this is optional. If they don't request it, don't include it.
initialProjects := []kapi.Namespace{}
if includeAllExistingProjects {
initialProjects = append(initialProjects, namespaces.Items...)
}

w := &userProjectWatcher{
Expand All @@ -84,9 +92,10 @@ func NewUserProjectWatcher(username string, groups []string, projectCache *proje
outgoing: make(chan watch.Event),
userStop: make(chan struct{}),

projectCache: projectCache,
authCache: authCache,
knownProjects: knownProjects,
projectCache: projectCache,
authCache: authCache,
initialProjects: initialProjects,
knownProjects: knownProjects,
}
w.emit = func(e watch.Event) {
select {
Expand All @@ -103,10 +112,10 @@ func (w *userProjectWatcher) GroupMembershipChanged(namespaceName string, latest

switch {
case removed:
if !w.knownProjects.Has(namespaceName) {
if _, known := w.knownProjects[namespaceName]; !known {
return
}
w.knownProjects.Delete(namespaceName)
delete(w.knownProjects, namespaceName)

select {
case w.cacheIncoming <- watch.Event{
Expand All @@ -125,16 +134,22 @@ func (w *userProjectWatcher) GroupMembershipChanged(namespaceName string, latest
utilruntime.HandleError(err)
return
}

event := watch.Event{
Type: watch.Added,
Object: projectutil.ConvertNamespace(namespace),
}

// if we already have this in our list, then we're getting notified because the object changed
if w.knownProjects.Has(namespaceName) {
if lastResourceVersion, known := w.knownProjects[namespaceName]; known {
event.Type = watch.Modified

// if we've already notified for this particular resourceVersion, there's no work to do
if lastResourceVersion == namespace.ResourceVersion {
return
}
}
w.knownProjects.Insert(namespace.Name)
w.knownProjects[namespaceName] = namespace.ResourceVersion

select {
case w.cacheIncoming <- event:
Expand All @@ -158,16 +173,26 @@ func (w *userProjectWatcher) Watch() {
}()
defer utilruntime.HandleCrash()

// start by emitting all the `initialProjects`
for i := range w.initialProjects {
// keep this check here to sure we don't keep this open in the case of failures
select {
case err := <-w.cacheError:
w.emit(makeErrorEvent(err))
return
default:
}

w.emit(watch.Event{
Type: watch.Added,
Object: projectutil.ConvertNamespace(&w.initialProjects[i]),
})
}

for {
select {
case err := <-w.cacheError:
w.emit(watch.Event{
Type: watch.Error,
Object: &unversioned.Status{
Status: unversioned.StatusFailure,
Message: err.Error(),
},
})
w.emit(makeErrorEvent(err))
return

case <-w.userStop:
Expand All @@ -184,6 +209,16 @@ func (w *userProjectWatcher) Watch() {
}
}

func makeErrorEvent(err error) watch.Event {
return watch.Event{
Type: watch.Error,
Object: &unversioned.Status{
Status: unversioned.StatusFailure,
Message: err.Error(),
},
}
}

// ResultChan implements watch.Interface.
func (w *userProjectWatcher) ResultChan() <-chan watch.Event {
return w.outgoing
Expand Down
20 changes: 5 additions & 15 deletions pkg/project/auth/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func newTestWatcher(user string, groups []string, namespaces ...*kapi.Namespace)
projectCache.Run()
fakeAuthCache := &fakeAuthCache{}

return NewUserProjectWatcher(user, groups, projectCache, fakeAuthCache), fakeAuthCache
return NewUserProjectWatcher(user, groups, projectCache, fakeAuthCache, false), fakeAuthCache
}

type fakeAuthCache struct {
Expand Down Expand Up @@ -116,17 +116,12 @@ func TestAddModifyDeleteEventsByUser(t *testing.T) {
t.Fatalf("timeout")
}

// the object didn't change, we shouldn't observe it
watcher.GroupMembershipChanged("ns-01", sets.NewString("bob"), sets.String{}, sets.String{}, sets.String{}, sets.String{}, sets.String{})
select {
case event := <-watcher.ResultChan():
if event.Type != watch.Modified {
t.Errorf("expected Modified, got %v", event)
}
if event.Object.(*projectapi.Project).Name != "ns-01" {
t.Errorf("expected %v, got %#v", "ns-01", event.Object)
}
t.Fatalf("unexpected event %v", event)
case <-time.After(3 * time.Second):
t.Fatalf("timeout")
}

watcher.GroupMembershipChanged("ns-01", sets.NewString("alice"), sets.String{}, sets.NewString("bob"), sets.String{}, sets.String{}, sets.String{})
Expand Down Expand Up @@ -160,17 +155,12 @@ func TestAddModifyDeleteEventsByGroup(t *testing.T) {
t.Fatalf("timeout")
}

// the object didn't change, we shouldn't observe it
watcher.GroupMembershipChanged("ns-01", sets.String{}, sets.NewString("group-one"), sets.String{}, sets.String{}, sets.String{}, sets.String{})
select {
case event := <-watcher.ResultChan():
if event.Type != watch.Modified {
t.Errorf("expected Modified, got %v", event)
}
if event.Object.(*projectapi.Project).Name != "ns-01" {
t.Errorf("expected %v, got %#v", "ns-01", event.Object)
}
t.Fatalf("unexpected event %v", event)
case <-time.After(3 * time.Second):
t.Fatalf("timeout")
}

watcher.GroupMembershipChanged("ns-01", sets.String{}, sets.NewString("group-two"), sets.String{}, sets.NewString("group-one"), sets.String{}, sets.String{})
Expand Down
5 changes: 4 additions & 1 deletion pkg/project/registry/project/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@ func (s *REST) Watch(ctx kapi.Context, options *kapi.ListOptions) (watch.Interfa
if !exists {
return nil, fmt.Errorf("no user")
}
watcher := projectauth.NewUserProjectWatcher(userInfo.GetName(), userInfo.GetGroups(), s.projectCache, s.authCache)

includeAllExistingProjects := (options != nil) && options.ResourceVersion == "0"

watcher := projectauth.NewUserProjectWatcher(userInfo.GetName(), userInfo.GetGroups(), s.projectCache, s.authCache, includeAllExistingProjects)
s.authCache.AddWatcher(watcher)

go watcher.Watch()
Expand Down
66 changes: 30 additions & 36 deletions test/integration/project_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,42 +194,6 @@ func TestProjectWatch(t *testing.T) {
t.Fatalf("timeout")
}

select {
case event := <-w.ResultChan():
if event.Type != watch.Modified {
t.Errorf("expected Modified, got %v", event)
}
project := event.Object.(*projectapi.Project)
if project.Name != "ns-01" {
t.Fatalf("expected %v, got %#v", "ns-01", project)
}
project.Annotations[projectapi.ProjectDisplayName] = "whatever"
_, err := bobClient.Projects().Update(project)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

case <-time.After(3 * time.Second):
t.Fatalf("timeout")
}

select {
case event := <-w.ResultChan():
if event.Type != watch.Modified {
t.Errorf("expected Modified, got %v", event)
}
project := event.Object.(*projectapi.Project)
if project.Name != "ns-01" {
t.Errorf("expected %v, got %#v", "ns-01", project)
}
if project.Annotations[projectapi.ProjectDisplayName] != "whatever" {
t.Errorf("expected %v, got %#v", "whatever", project)
}

case <-time.After(3 * time.Second):
t.Fatalf("timeout")
}

joeClient, err := testserver.CreateNewProject(clusterAdminClient, *clusterAdminClientConfig, "ns-02", "joe")
if err != nil {
t.Fatalf("unexpected error: %v", err)
Expand Down Expand Up @@ -277,4 +241,34 @@ func TestProjectWatch(t *testing.T) {
}
}

// test the "start from beginning watch"
beginningWatch, err := bobClient.Projects().Watch(kapi.ListOptions{ResourceVersion: "0"})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
select {
case event := <-beginningWatch.ResultChan():
if event.Type != watch.Added {
t.Errorf("expected added, got %v", event)
}
project := event.Object.(*projectapi.Project)
if project.Name != "ns-01" {
t.Fatalf("expected %v, got %#v", "ns-01", project)
}

case <-time.After(3 * time.Second):
t.Fatalf("timeout")
}

fromNowWatch, err := bobClient.Projects().Watch(kapi.ListOptions{})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
select {
case event := <-fromNowWatch.ResultChan():
t.Fatalf("unexpected event %v", event)

case <-time.After(3 * time.Second):
}

}

0 comments on commit d48ee4a

Please sign in to comment.