Skip to content

Commit

Permalink
Fix etcd in proxy for namespace awareness
Browse files Browse the repository at this point in the history
  • Loading branch information
derekwaynecarr authored and thockin committed Oct 16, 2014
1 parent 332a03b commit d4e1076
Showing 1 changed file with 51 additions and 30 deletions.
81 changes: 51 additions & 30 deletions pkg/proxy/config/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,50 @@ func (s ConfigSourceEtcd) Run() {
}
}

// decodeServices recurses from the root of the service storage directory into each namespace to get each service and endpoint object
func (s ConfigSourceEtcd) decodeServices(node *etcd.Node, retServices []api.Service, retEndpoints []api.Endpoints) ([]api.Service, []api.Endpoints, error) {
// TODO this needs to go against API server desperately, so much redundant error prone code here
// we hit a namespace boundary, recurse until we find actual nodes
if node.Dir == true {
for _, n := range node.Nodes {
var err error // Don't shadow the ret* variables.
retServices, retEndpoints, err = s.decodeServices(n, retServices, retEndpoints)
if err != nil {
return retServices, retEndpoints, err
}
}
return retServices, retEndpoints, nil
}

// we have an actual service node
var svc api.Service
err := latest.Codec.DecodeInto([]byte(node.Value), &svc)
if err != nil {
glog.Errorf("Failed to load Service: %s (%#v)", node.Value, err)
} else {
// so we got a service we can handle, and now get endpoints
retServices = append(retServices, svc)
// get the endpoints
endpoints, err := s.GetEndpoints(svc.Namespace, svc.ID)
if err != nil {
if tools.IsEtcdNotFound(err) {
glog.V(4).Infof("Unable to get endpoints for %s %s : %v", svc.Namespace, svc.ID, err)
}
glog.Errorf("Couldn't get endpoints for %s %s : %v skipping", svc.Namespace, svc.ID, err)
endpoints = api.Endpoints{}
} else {
glog.V(3).Infof("Got service: %s %s on localport %d mapping to: %s", svc.Namespace, svc.ID, svc.Port, endpoints)
}
retEndpoints = append(retEndpoints, endpoints)
}
return retServices, retEndpoints, nil
}

// GetServices finds the list of services and their endpoints from etcd.
// This operation is akin to a set a known good at regular intervals.
func (s ConfigSourceEtcd) GetServices() ([]api.Service, []api.Endpoints, error) {
response, err := s.client.Get(registryRoot+"/specs", true, false)
// this is a recursive query now that services are namespaced under "/registry/services/specs/<ns>/<name>"
response, err := s.client.Get(registryRoot+"/specs", false, true)
if err != nil {
if tools.IsEtcdNotFound(err) {
glog.V(4).Infof("Failed to get the key %s: %v", registryRoot, err)
Expand All @@ -128,40 +168,18 @@ func (s ConfigSourceEtcd) GetServices() ([]api.Service, []api.Endpoints, error)
}
return []api.Service{}, []api.Endpoints{}, err
}
// this code needs to go through the API server in the future, this is one big hack
if response.Node.Dir == true {
retServices := make([]api.Service, len(response.Node.Nodes))
retEndpoints := make([]api.Endpoints, len(response.Node.Nodes))
// Ok, so we have directories, this list should be the list
// of services. Find the local port to listen on and remote endpoints
// and create a Service entry for it.
for i, node := range response.Node.Nodes {
var svc api.Service
err = latest.Codec.DecodeInto([]byte(node.Value), &svc)
if err != nil {
glog.Errorf("Failed to load Service: %s (%#v)", node.Value, err)
continue
}
retServices[i] = svc
endpoints, err := s.GetEndpoints(svc.ID)
if err != nil {
if tools.IsEtcdNotFound(err) {
glog.V(4).Infof("Unable to get endpoints for %s : %v", svc.ID, err)
}
glog.Errorf("Couldn't get endpoints for %s : %v skipping", svc.ID, err)
endpoints = api.Endpoints{}
} else {
glog.V(3).Infof("Got service: %s on localport %d mapping to: %s", svc.ID, svc.Port, endpoints)
}
retEndpoints[i] = endpoints
}
return retServices, retEndpoints, err
retServices := []api.Service{}
retEndpoints := []api.Endpoints{}
return s.decodeServices(response.Node, retServices, retEndpoints)
}
return nil, nil, fmt.Errorf("did not get the root of the registry %s", registryRoot)
}

// GetEndpoints finds the list of endpoints of the service from etcd.
func (s ConfigSourceEtcd) GetEndpoints(service string) (api.Endpoints, error) {
key := path.Join(registryRoot, "endpoints", service)
func (s ConfigSourceEtcd) GetEndpoints(namespace, service string) (api.Endpoints, error) {
key := path.Join(registryRoot, "endpoints", namespace, service)
response, err := s.client.Get(key, true, false)
if err != nil {
glog.Errorf("Failed to get the key: %s %v", key, err)
Expand Down Expand Up @@ -195,7 +213,10 @@ func (s ConfigSourceEtcd) WatchForChanges() {
if !ok {
break
}
s.ProcessChange(watchResponse)
// only listen for non directory changes
if watchResponse.Node.Dir == false {
s.ProcessChange(watchResponse)
}
}
}

Expand Down

0 comments on commit d4e1076

Please sign in to comment.