Skip to content

Commit

Permalink
Refactor pkg/proxy/config's ServiceConfigHandler and EndpointsConfigH…
Browse files Browse the repository at this point in the history
…andler to have different update methods.

Refactor `pkg/proxy/config`’s ServiceConfigHandler.OnUpdate and
EndpointsConfigHandler.OnUpdate to different method names as they have
different signatures.

This will let the new proxy
(kubernetes#3760)
implement both interfaces.

Since we won’t need a separate loadbalancer structure (load balancing
is handled in the proxy rules), we will simply handle both event types
from the same object.
  • Loading branch information
BenTheElder committed Aug 8, 2015
1 parent f6d257c commit 6bbf2aa
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 62 deletions.
12 changes: 6 additions & 6 deletions pkg/proxy/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,17 @@ type EndpointsUpdate struct {

// ServiceConfigHandler is an abstract interface of objects which receive update notifications for the set of services.
type ServiceConfigHandler interface {
// OnUpdate gets called when a configuration has been changed by one of the sources.
// OnServiceUpdate gets called when a configuration has been changed by one of the sources.
// This is the union of all the configuration sources.
OnUpdate(services []api.Service)
OnServiceUpdate(services []api.Service)
}

// EndpointsConfigHandler is an abstract interface of objects which receive update notifications for the set of endpoints.
type EndpointsConfigHandler interface {
// OnUpdate gets called when endpoints configuration is changed for a given
// OnEndpointsUpdate gets called when endpoints configuration is changed for a given
// service on any of the configuration sources. An example is when a new
// service comes up, or when containers come up or down for an existing service.
OnUpdate(endpoints []api.Endpoints)
OnEndpointsUpdate(endpoints []api.Endpoints)
}

// EndpointsConfig tracks a set of endpoints configurations.
Expand All @@ -91,7 +91,7 @@ func NewEndpointsConfig() *EndpointsConfig {

func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) {
c.bcaster.Add(config.ListenerFunc(func(instance interface{}) {
handler.OnUpdate(instance.([]api.Endpoints))
handler.OnEndpointsUpdate(instance.([]api.Endpoints))
}))
}

Expand Down Expand Up @@ -189,7 +189,7 @@ func NewServiceConfig() *ServiceConfig {

func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) {
c.bcaster.Add(config.ListenerFunc(func(instance interface{}) {
handler.OnUpdate(instance.([]api.Service))
handler.OnServiceUpdate(instance.([]api.Service))
}))
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/proxy/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func NewServiceHandlerMock() *ServiceHandlerMock {
return &ServiceHandlerMock{services: make([]api.Service, 0)}
}

func (h *ServiceHandlerMock) OnUpdate(services []api.Service) {
func (h *ServiceHandlerMock) OnServiceUpdate(services []api.Service) {
sort.Sort(sortedServices(services))
h.services = services
h.updated.Done()
Expand Down Expand Up @@ -95,7 +95,7 @@ func NewEndpointsHandlerMock() *EndpointsHandlerMock {
return &EndpointsHandlerMock{endpoints: make([]api.Endpoints, 0)}
}

func (h *EndpointsHandlerMock) OnUpdate(endpoints []api.Endpoints) {
func (h *EndpointsHandlerMock) OnEndpointsUpdate(endpoints []api.Endpoints) {
sort.Sort(sortedEndpoints(endpoints))
h.endpoints = endpoints
h.updated.Done()
Expand Down
4 changes: 2 additions & 2 deletions pkg/proxy/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ import (

// ProxyProvider is the interface provided by proxier implementations.
type ProxyProvider interface {
// OnUpdate manages the active set of service proxies.
// OnServiceUpdate manages the active set of service proxies.
// Active service proxies are reinitialized if found in the update set or
// removed if missing from the update set.
OnUpdate(services []api.Service)
OnServiceUpdate(services []api.Service)
// SyncLoop runs periodic work.
// This is expected to run as a goroutine or as the main loop of the app.
// It does not return.
Expand Down
2 changes: 1 addition & 1 deletion pkg/proxy/userspace/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ const udpIdleTimeout = 1 * time.Second
// OnUpdate manages the active set of service proxies.
// Active service proxies are reinitialized if found in the update set or
// shutdown if missing from the update set.
func (proxier *Proxier) OnUpdate(services []api.Service) {
func (proxier *Proxier) OnServiceUpdate(services []api.Service) {
glog.V(4).Infof("Received update notice: %+v", services)
activeServices := make(map[proxy.ServicePortName]bool) // use a map as a set
for i := range services {
Expand Down
54 changes: 27 additions & 27 deletions pkg/proxy/userspace/proxier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func waitForNumProxyLoops(t *testing.T, p *Proxier, want int32) {
func TestTCPProxy(t *testing.T) {
lb := NewLoadBalancerRR()
service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
lb.OnUpdate([]api.Endpoints{
lb.OnEndpointsUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{{
Expand All @@ -240,7 +240,7 @@ func TestTCPProxy(t *testing.T) {
func TestUDPProxy(t *testing.T) {
lb := NewLoadBalancerRR()
service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
lb.OnUpdate([]api.Endpoints{
lb.OnEndpointsUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{{
Expand Down Expand Up @@ -268,7 +268,7 @@ func TestMultiPortProxy(t *testing.T) {
lb := NewLoadBalancerRR()
serviceP := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo-p"}, "p"}
serviceQ := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo-q"}, "q"}
lb.OnUpdate([]api.Endpoints{{
lb.OnEndpointsUpdate([]api.Endpoints{{
ObjectMeta: api.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Expand Down Expand Up @@ -303,7 +303,7 @@ func TestMultiPortProxy(t *testing.T) {
waitForNumProxyLoops(t, p, 2)
}

func TestMultiPortOnUpdate(t *testing.T) {
func TestMultiPortOnServiceUpdate(t *testing.T) {
lb := NewLoadBalancerRR()
serviceP := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
serviceQ := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "q"}
Expand All @@ -315,7 +315,7 @@ func TestMultiPortOnUpdate(t *testing.T) {
}
waitForNumProxyLoops(t, p, 0)

p.OnUpdate([]api.Service{{
p.OnServiceUpdate([]api.Service{{
ObjectMeta: api.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
Name: "p",
Expand Down Expand Up @@ -362,7 +362,7 @@ func stopProxyByName(proxier *Proxier, service proxy.ServicePortName) error {
func TestTCPProxyStop(t *testing.T) {
lb := NewLoadBalancerRR()
service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
lb.OnUpdate([]api.Endpoints{
lb.OnEndpointsUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
Subsets: []api.EndpointSubset{{
Expand Down Expand Up @@ -400,7 +400,7 @@ func TestTCPProxyStop(t *testing.T) {
func TestUDPProxyStop(t *testing.T) {
lb := NewLoadBalancerRR()
service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
lb.OnUpdate([]api.Endpoints{
lb.OnEndpointsUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
Subsets: []api.EndpointSubset{{
Expand Down Expand Up @@ -438,7 +438,7 @@ func TestUDPProxyStop(t *testing.T) {
func TestTCPProxyUpdateDelete(t *testing.T) {
lb := NewLoadBalancerRR()
service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
lb.OnUpdate([]api.Endpoints{
lb.OnEndpointsUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
Subsets: []api.EndpointSubset{{
Expand All @@ -465,7 +465,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
conn.Close()
waitForNumProxyLoops(t, p, 1)

p.OnUpdate([]api.Service{})
p.OnServiceUpdate([]api.Service{})
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error())
}
Expand All @@ -475,7 +475,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
func TestUDPProxyUpdateDelete(t *testing.T) {
lb := NewLoadBalancerRR()
service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
lb.OnUpdate([]api.Endpoints{
lb.OnEndpointsUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
Subsets: []api.EndpointSubset{{
Expand All @@ -502,7 +502,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
conn.Close()
waitForNumProxyLoops(t, p, 1)

p.OnUpdate([]api.Service{})
p.OnServiceUpdate([]api.Service{})
if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error())
}
Expand All @@ -512,7 +512,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
lb := NewLoadBalancerRR()
service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
lb.OnUpdate([]api.Endpoints{
lb.OnEndpointsUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{{
Expand All @@ -539,13 +539,13 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
conn.Close()
waitForNumProxyLoops(t, p, 1)

p.OnUpdate([]api.Service{})
p.OnServiceUpdate([]api.Service{})
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error())
}
waitForNumProxyLoops(t, p, 0)

p.OnUpdate([]api.Service{{
p.OnServiceUpdate([]api.Service{{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
Name: "p",
Expand All @@ -564,7 +564,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
lb := NewLoadBalancerRR()
service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
lb.OnUpdate([]api.Endpoints{
lb.OnEndpointsUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{{
Expand All @@ -591,13 +591,13 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
conn.Close()
waitForNumProxyLoops(t, p, 1)

p.OnUpdate([]api.Service{})
p.OnServiceUpdate([]api.Service{})
if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error())
}
waitForNumProxyLoops(t, p, 0)

p.OnUpdate([]api.Service{{
p.OnServiceUpdate([]api.Service{{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
Name: "p",
Expand All @@ -616,7 +616,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
func TestTCPProxyUpdatePort(t *testing.T) {
lb := NewLoadBalancerRR()
service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
lb.OnUpdate([]api.Endpoints{
lb.OnEndpointsUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{{
Expand All @@ -639,7 +639,7 @@ func TestTCPProxyUpdatePort(t *testing.T) {
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
waitForNumProxyLoops(t, p, 1)

p.OnUpdate([]api.Service{{
p.OnServiceUpdate([]api.Service{{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
Name: "p",
Expand All @@ -664,7 +664,7 @@ func TestTCPProxyUpdatePort(t *testing.T) {
func TestUDPProxyUpdatePort(t *testing.T) {
lb := NewLoadBalancerRR()
service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
lb.OnUpdate([]api.Endpoints{
lb.OnEndpointsUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{{
Expand All @@ -686,7 +686,7 @@ func TestUDPProxyUpdatePort(t *testing.T) {
}
waitForNumProxyLoops(t, p, 1)

p.OnUpdate([]api.Service{{
p.OnServiceUpdate([]api.Service{{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
Name: "p",
Expand All @@ -709,7 +709,7 @@ func TestUDPProxyUpdatePort(t *testing.T) {
func TestProxyUpdatePublicIPs(t *testing.T) {
lb := NewLoadBalancerRR()
service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
lb.OnUpdate([]api.Endpoints{
lb.OnEndpointsUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{{
Expand All @@ -732,7 +732,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) {
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
waitForNumProxyLoops(t, p, 1)

p.OnUpdate([]api.Service{{
p.OnServiceUpdate([]api.Service{{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{
Ports: []api.ServicePort{{
Expand Down Expand Up @@ -761,7 +761,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) {
func TestProxyUpdatePortal(t *testing.T) {
lb := NewLoadBalancerRR()
service := proxy.ServicePortName{types.NamespacedName{"testnamespace", "echo"}, "p"}
lb.OnUpdate([]api.Endpoints{
lb.OnEndpointsUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{{
Expand All @@ -784,7 +784,7 @@ func TestProxyUpdatePortal(t *testing.T) {
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
waitForNumProxyLoops(t, p, 1)

p.OnUpdate([]api.Service{{
p.OnServiceUpdate([]api.Service{{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: "", Ports: []api.ServicePort{{
Name: "p",
Expand All @@ -797,7 +797,7 @@ func TestProxyUpdatePortal(t *testing.T) {
t.Fatalf("service with empty ClusterIP should not be included in the proxy")
}

p.OnUpdate([]api.Service{{
p.OnServiceUpdate([]api.Service{{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: "None", Ports: []api.ServicePort{{
Name: "p",
Expand All @@ -810,7 +810,7 @@ func TestProxyUpdatePortal(t *testing.T) {
t.Fatalf("service with 'None' as ClusterIP should not be included in the proxy")
}

p.OnUpdate([]api.Service{{
p.OnServiceUpdate([]api.Service{{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{
Name: "p",
Expand Down
6 changes: 3 additions & 3 deletions pkg/proxy/userspace/roundrobin.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,10 +223,10 @@ func (lb *LoadBalancerRR) updateAffinityMap(svcPort proxy.ServicePortName, newEn
}
}

// OnUpdate manages the registered service endpoints.
// OnEndpointsUpdate manages the registered service endpoints.
// Registered endpoints are updated if found in the update set or
// unregistered if missing from the update set.
func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) {
func (lb *LoadBalancerRR) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
registeredEndpoints := make(map[proxy.ServicePortName]bool)
lb.lock.Lock()
defer lb.lock.Unlock()
Expand Down Expand Up @@ -262,7 +262,7 @@ func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) {
if !exists || state == nil || len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) {
glog.V(1).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcPort, newEndpoints)
lb.updateAffinityMap(svcPort, newEndpoints)
// OnUpdate can be called without NewService being called externally.
// OnEndpointsUpdate can be called without NewService being called externally.
// To be safe we will call it here. A new service will only be created
// if one does not already exist. The affinity will be updated
// later, once NewService is called.
Expand Down
Loading

0 comments on commit 6bbf2aa

Please sign in to comment.