Skip to content

Commit

Permalink
Add more parameters to Watch
Browse files Browse the repository at this point in the history
* Add labels selector (same as List)
* Add fields selector
 * Plan to let you select pods by Host and/or Status
* Add resourceVersion to let you resume a watch where you left off.
  • Loading branch information
lavalamp committed Aug 8, 2014
1 parent d524921 commit 283fdba
Show file tree
Hide file tree
Showing 11 changed files with 182 additions and 46 deletions.
37 changes: 23 additions & 14 deletions pkg/apiserver/apiserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ type SimpleRESTStorage struct {
// Set if WatchSingle is called
requestedID string

// Set if WatchAll is called
requestedLabelSelector labels.Selector
requestedFieldSelector labels.Selector
requestedResourceVersion uint64

// If non-nil, called inside the WorkFunc when answering update, delete, create.
// obj receives the original input to the update, delete, or create call.
injectedFunction func(obj interface{}) (returnObj interface{}, err error)
Expand All @@ -95,7 +100,7 @@ func (storage *SimpleRESTStorage) Delete(id string) (<-chan interface{}, error)
if storage.injectedFunction != nil {
return storage.injectedFunction(id)
}
return api.Status{Status: api.StatusSuccess}, nil
return &api.Status{Status: api.StatusSuccess}, nil
}), nil
}

Expand Down Expand Up @@ -130,7 +135,10 @@ func (storage *SimpleRESTStorage) Update(obj interface{}) (<-chan interface{}, e
}

// Implement ResourceWatcher.
func (storage *SimpleRESTStorage) WatchAll() (watch.Interface, error) {
func (storage *SimpleRESTStorage) WatchAll(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
storage.requestedLabelSelector = label
storage.requestedFieldSelector = field
storage.requestedResourceVersion = resourceVersion
if err := storage.errors["watchAll"]; err != nil {
return nil, err
}
Expand All @@ -139,8 +147,9 @@ func (storage *SimpleRESTStorage) WatchAll() (watch.Interface, error) {
}

// Implement ResourceWatcher.
func (storage *SimpleRESTStorage) WatchSingle(id string) (watch.Interface, error) {
func (storage *SimpleRESTStorage) WatchSingle(id string, resourceVersion uint64) (watch.Interface, error) {
storage.requestedID = id
storage.requestedResourceVersion = resourceVersion
if err := storage.errors["watchSingle"]; err != nil {
return nil, err
}
Expand All @@ -164,17 +173,17 @@ func TestNotFound(t *testing.T) {
Path string
}
cases := map[string]T{
"PATCH method": T{"PATCH", "/prefix/version/foo"},
"GET long prefix": T{"GET", "/prefix/"},
"GET missing storage": T{"GET", "/prefix/version/blah"},
"GET with extra segment": T{"GET", "/prefix/version/foo/bar/baz"},
"POST with extra segment": T{"POST", "/prefix/version/foo/bar"},
"DELETE without extra segment": T{"DELETE", "/prefix/version/foo"},
"DELETE with extra segment": T{"DELETE", "/prefix/version/foo/bar/baz"},
"PUT without extra segment": T{"PUT", "/prefix/version/foo"},
"PUT with extra segment": T{"PUT", "/prefix/version/foo/bar/baz"},
"watch missing storage": T{"GET", "/prefix/version/watch/"},
"watch with bad method": T{"POST", "/prefix/version/watch/foo/bar"},
"PATCH method": {"PATCH", "/prefix/version/foo"},
"GET long prefix": {"GET", "/prefix/"},
"GET missing storage": {"GET", "/prefix/version/blah"},
"GET with extra segment": {"GET", "/prefix/version/foo/bar/baz"},
"POST with extra segment": {"POST", "/prefix/version/foo/bar"},
"DELETE without extra segment": {"DELETE", "/prefix/version/foo"},
"DELETE with extra segment": {"DELETE", "/prefix/version/foo/bar/baz"},
"PUT without extra segment": {"PUT", "/prefix/version/foo"},
"PUT with extra segment": {"PUT", "/prefix/version/foo/bar/baz"},
"watch missing storage": {"GET", "/prefix/version/watch/"},
"watch with bad method": {"POST", "/prefix/version/watch/foo/bar"},
}
handler := New(map[string]RESTStorage{
"foo": &SimpleRESTStorage{},
Expand Down
10 changes: 7 additions & 3 deletions pkg/apiserver/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type RESTStorage interface {
New() interface{}

// List selects resources in the storage which match to the selector.
// TODO: add field selector in addition to label selector.
List(labels.Selector) (interface{}, error)

// Get finds a resource in the storage by id and returns it.
Expand All @@ -46,7 +47,10 @@ type RESTStorage interface {
// ResourceWatcher should be implemented by all RESTStorage objects that
// want to offer the ability to watch for changes through the watch api.
type ResourceWatcher interface {
// TODO: take a query, like List, to filter out unwanted events.
WatchAll() (watch.Interface, error)
WatchSingle(id string) (watch.Interface, error)
// label selects on labels; field selects on the objects fields. Not all fields
// are supported; an error will be returned if you try to select for a field that
// isn't supported.
WatchAll(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)
// TODO: Decide if we need to keep WatchSingle?
WatchSingle(id string, resourceVersion uint64) (watch.Interface, error)
}
28 changes: 25 additions & 3 deletions pkg/apiserver/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,38 @@ package apiserver
import (
"encoding/json"
"net/http"
"net/url"
"strconv"

"code.google.com/p/go.net/websocket"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)

type WatchHandler struct {
storage map[string]RESTStorage
}

func (s *APIServer) getWatchParams(query url.Values) (id string, label, field labels.Selector, resourceVersion uint64) {
id = query.Get("id")
if s, err := labels.ParseSelector(query.Get("labels")); err != nil {
label = labels.Everything()
} else {
label = s
}
if s, err := labels.ParseSelector(query.Get("fields")); err != nil {
field = labels.Everything()
} else {
field = s
}
if rv, err := strconv.ParseUint(query.Get("resourceVersion"), 10, 64); err == nil {
resourceVersion = rv
}
return id, label, field, resourceVersion
}

// handleWatch processes a watch request
func (h *WatchHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
parts := splitPath(req.URL.Path)
Expand All @@ -43,10 +64,11 @@ func (h *WatchHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if watcher, ok := storage.(ResourceWatcher); ok {
var watching watch.Interface
var err error
if id := req.URL.Query().Get("id"); id != "" {
watching, err = watcher.WatchSingle(id)
id, label, field, resourceVersion := s.getWatchParams(req.URL.Query())
if id != "" {
watching, err = watcher.WatchSingle(id, resourceVersion)
} else {
watching, err = watcher.WatchAll()
watching, err = watcher.WatchAll(label, field, resourceVersion)
}
if err != nil {
internalError(err, w)
Expand Down
67 changes: 67 additions & 0 deletions pkg/apiserver/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,70 @@ func TestWatchHTTP(t *testing.T) {
t.Errorf("Unexpected non-error")
}
}

func TestWatchParamParsing(t *testing.T) {
simpleStorage := &SimpleRESTStorage{}
handler := New(map[string]RESTStorage{
"foo": simpleStorage,
}, "/prefix/version")
server := httptest.NewServer(handler)

dest, _ := url.Parse(server.URL)
dest.Path = "/prefix/version/watch/foo"

table := []struct {
rawQuery string
resourceVersion uint64
labelSelector string
fieldSelector string
id string
}{
{
rawQuery: "id=myID&resourceVersion=1234",
resourceVersion: 1234,
labelSelector: "",
fieldSelector: "",
id: "myID",
}, {
rawQuery: "resourceVersion=314159&fields=Host%3D&labels=name%3Dfoo",
resourceVersion: 314159,
labelSelector: "name=foo",
fieldSelector: "Host=",
id: "",
}, {
rawQuery: "",
resourceVersion: 0,
labelSelector: "",
fieldSelector: "",
id: "",
},
}

for _, item := range table {
simpleStorage.requestedLabelSelector = nil
simpleStorage.requestedFieldSelector = nil
simpleStorage.requestedResourceVersion = 5 // Prove this is set in all cases
simpleStorage.requestedID = ""
dest.RawQuery = item.rawQuery
resp, err := http.Get(dest.String())
if err != nil {
t.Errorf("%v: unexpected error: %v", item.rawQuery, err)
continue
}
resp.Body.Close()
if e, a := item.id, simpleStorage.requestedID; e != a {
t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a)
}
if e, a := item.resourceVersion, simpleStorage.requestedResourceVersion; e != a {
t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a)
}
if simpleStorage.requestedID == "" {
if e, a := item.labelSelector, simpleStorage.requestedLabelSelector.String(); e != a {
t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a)
}
if e, a := item.fieldSelector, simpleStorage.requestedFieldSelector.String(); e != a {
t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a)
}
}
}
}
6 changes: 3 additions & 3 deletions pkg/registry/controllerstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,12 @@ func (storage *ControllerRegistryStorage) waitForController(ctrl api.Replication

// WatchAll returns ReplicationController events via a watch.Interface, implementing
// apiserver.ResourceWatcher.
func (storage *ControllerRegistryStorage) WatchAll() (watch.Interface, error) {
return storage.registry.WatchControllers()
func (storage *ControllerRegistryStorage) WatchAll(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
return storage.registry.WatchControllers(label, field, resourceVersion)
}

// WatchSingle returns events for a single ReplicationController via a watch.Interface,
// implementing apiserver.ResourceWatcher.
func (storage *ControllerRegistryStorage) WatchSingle(id string) (watch.Interface, error) {
func (storage *ControllerRegistryStorage) WatchSingle(id string, resourceVersion uint64) (watch.Interface, error) {
return nil, errors.New("unimplemented")
}
5 changes: 4 additions & 1 deletion pkg/registry/controllerstorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)

// TODO: Why do we have this AND MemoryRegistry?
type MockControllerRegistry struct {
err error
controllers []api.ReplicationController
Expand All @@ -49,10 +50,12 @@ func (registry *MockControllerRegistry) CreateController(controller api.Replicat
func (registry *MockControllerRegistry) UpdateController(controller api.ReplicationController) error {
return registry.err
}

func (registry *MockControllerRegistry) DeleteController(ID string) error {
return registry.err
}
func (registry *MockControllerRegistry) WatchControllers() (watch.Interface, error) {

func (registry *MockControllerRegistry) WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
return nil, registry.err
}

Expand Down
10 changes: 7 additions & 3 deletions pkg/registry/etcdregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,13 @@ func (registry *EtcdRegistry) ListControllers() ([]api.ReplicationController, er
}

// WatchControllers begins watching for new, changed, or deleted controllers.
// TODO: Add id/selector parameters?
func (registry *EtcdRegistry) WatchControllers() (watch.Interface, error) {
return registry.helper.WatchList("/registry/controllers", tools.Everything)
func (registry *EtcdRegistry) WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
if field.String() != "" {
return nil, fmt.Errorf("no field selector implemented for controllers")
}
return registry.helper.WatchList("/registry/controllers", resourceVersion, func(obj interface{}) bool {
return label.Matches(labels.Set(obj.(*api.ReplicationController).Labels))
})
}

func makeControllerKey(id string) string {
Expand Down
2 changes: 1 addition & 1 deletion pkg/registry/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type PodRegistry interface {
// ControllerRegistry is an interface for things that know how to store ReplicationControllers.
type ControllerRegistry interface {
ListControllers() ([]api.ReplicationController, error)
WatchControllers() (watch.Interface, error)
WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)
GetController(controllerID string) (*api.ReplicationController, error)
CreateController(controller api.ReplicationController) error
UpdateController(controller api.ReplicationController) error
Expand Down
2 changes: 1 addition & 1 deletion pkg/registry/memory_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (registry *MemoryRegistry) ListControllers() ([]api.ReplicationController,
return result, nil
}

func (registry *MemoryRegistry) WatchControllers() (watch.Interface, error) {
func (registry *MemoryRegistry) WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
return nil, errors.New("unimplemented")
}

Expand Down
29 changes: 16 additions & 13 deletions pkg/tools/etcd_tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,18 +296,19 @@ func Everything(interface{}) bool {

// WatchList begins watching the specified key's items. Items are decoded into
// API objects, and any items passing 'filter' are sent down the returned
// watch.Interface.
func (h *EtcdHelper) WatchList(key string, filter FilterFunc) (watch.Interface, error) {
// watch.Interface. resourceVersion may be used to specify what version to begin
// watching (e.g., for reconnecting without missing any updateds).
func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
w := newEtcdWatcher(true, filter, h.Codec)
go w.etcdWatch(h.Client, key)
go w.etcdWatch(h.Client, key, resourceVersion)
return w, nil
}

// Watch begins watching the specified key. Events are decoded into
// API objects and sent down the returned watch.Interface.
func (h *EtcdHelper) Watch(key string) (watch.Interface, error) {
func (h *EtcdHelper) Watch(key string, resourceVersion uint64) (watch.Interface, error) {
w := newEtcdWatcher(false, nil, h.Codec)
go w.etcdWatch(h.Client, key)
go w.etcdWatch(h.Client, key, resourceVersion)
return w, nil
}

Expand Down Expand Up @@ -350,10 +351,10 @@ func newEtcdWatcher(list bool, filter FilterFunc, encoding Codec) *etcdWatcher {

// etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called
// as a goroutine.
func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string) {
func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string, resourceVersion uint64) {
defer util.HandleCrash()
defer close(w.etcdCallEnded)
_, err := client.Watch(key, 0, w.list, w.etcdIncoming, w.etcdStop)
_, err := client.Watch(key, resourceVersion, w.list, w.etcdIncoming, w.etcdStop)
if err != etcd.ErrWatchStoppedByUser {
glog.Errorf("etcd.Watch stopped unexpectedly: %v (%#v)", err, err)
}
Expand Down Expand Up @@ -385,18 +386,20 @@ func (w *etcdWatcher) sendResult(res *etcd.Response) {
var action watch.EventType
var data []byte
switch res.Action {
case "create", "set":
case "create":
if res.Node == nil {
glog.Errorf("unexpected nil node: %#v", res)
return
}
data = []byte(res.Node.Value)
// TODO: Is this conditional correct?
if res.EtcdIndex > 0 {
action = watch.Modified
} else {
action = watch.Added
action = watch.Added
case "set":
if res.Node == nil {
glog.Errorf("unexpected nil node: %#v", res)
return
}
data = []byte(res.Node.Value)
action = watch.Modified
case "delete":
if res.PrevNode == nil {
glog.Errorf("unexpected nil prev node: %#v", res)
Expand Down
Loading

0 comments on commit 283fdba

Please sign in to comment.