Skip to content

Commit

Permalink
Dynamically enable controllers based on what resources the server has.
Browse files Browse the repository at this point in the history
Dynamically delete namespaces based on what resources the server has.
  • Loading branch information
brendandburns committed Oct 14, 2015
1 parent 5afbf57 commit 947a558
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 48 deletions.
77 changes: 61 additions & 16 deletions cmd/kube-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"strconv"
"time"

"k8s.io/kubernetes/pkg/api"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api"
Expand Down Expand Up @@ -91,11 +92,10 @@ type CMServer struct {
ServiceAccountKeyFile string
RootCAFile string

ClusterName string
ClusterCIDR net.IPNet
AllocateNodeCIDRs bool
EnableProfiling bool
EnableExperimental bool
ClusterName string
ClusterCIDR net.IPNet
AllocateNodeCIDRs bool
EnableProfiling bool

Master string
Kubeconfig string
Expand Down Expand Up @@ -196,7 +196,6 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.Master, "master", s.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")
fs.StringVar(&s.Kubeconfig, "kubeconfig", s.Kubeconfig, "Path to kubeconfig file with authorization and master location information.")
fs.StringVar(&s.RootCAFile, "root-ca-file", s.RootCAFile, "If set, this root certificate authority will be included in service account's token secret. This must be a valid PEM-encoded CA bundle.")
fs.BoolVar(&s.EnableExperimental, "enable-experimental", s.EnableExperimental, "Enables experimental controllers (requires enabling experimental API on apiserver).")
fs.Float32Var(&s.KubeApiQps, "kube-api-qps", s.KubeApiQps, "QPS to use while talking with kubernetes apiserver")
fs.IntVar(&s.KubeApiBurst, "kube-api-burst", s.KubeApiBurst, "Burst to use while talking with kubernetes apiserver")
}
Expand Down Expand Up @@ -287,20 +286,47 @@ func (s *CMServer) Run(_ []string) error {

resourcequotacontroller.NewResourceQuotaController(kubeClient).Run(s.ResourceQuotaSyncPeriod)

namespacecontroller.NewNamespaceController(kubeClient, s.EnableExperimental, s.NamespaceSyncPeriod).Run()
versionStrings, err := client.ServerAPIVersions(kubeconfig)
if err != nil {
glog.Fatalf("Failed to get api versions from server: %v", err)
}
versions := &api.APIVersions{Versions: versionStrings}

if s.EnableExperimental {
go daemon.NewDaemonSetsController(kubeClient, s.resyncPeriod).
Run(s.ConcurrentDSCSyncs, util.NeverStop)
resourceMap, err := kubeClient.SupportedResources()
if err != nil {
glog.Fatalf("Failed to get supported resources from server: %v", err)
}

go job.NewJobController(kubeClient, s.resyncPeriod).
Run(s.ConcurrentJobSyncs, util.NeverStop)
namespacecontroller.NewNamespaceController(kubeClient, versions, s.NamespaceSyncPeriod).Run()

groupVersion := "extensions/v1beta1"
resources, found := resourceMap[groupVersion]
// TODO: this needs to be dynamic so users don't have to restart their controller manager if they change the apiserver
if containsVersion(versions, groupVersion) && found {
glog.Infof("Starting %s apis", groupVersion)
if containsResource(resources, "horizontalpodautoscalers") {
glog.Infof("Starting horizontal pod controller.")
podautoscaler.NewHorizontalController(kubeClient, metrics.NewHeapsterMetricsClient(kubeClient)).
Run(s.HorizontalPodAutoscalerSyncPeriod)
}

podautoscaler.NewHorizontalController(kubeClient, metrics.NewHeapsterMetricsClient(kubeClient)).
Run(s.HorizontalPodAutoscalerSyncPeriod)
if containsResource(resources, "daemonsets") {
glog.Infof("Starting daemon set controller")
go daemon.NewDaemonSetsController(kubeClient, s.resyncPeriod).
Run(s.ConcurrentDSCSyncs, util.NeverStop)
}

if containsResource(resources, "jobs") {
glog.Infof("Starting job controller")
go job.NewJobController(kubeClient, s.resyncPeriod).
Run(s.ConcurrentJobSyncs, util.NeverStop)
}

deployment.New(kubeClient).
Run(s.DeploymentControllerSyncPeriod)
if containsResource(resources, "deployments") {
glog.Infof("Starting deployment controller")
deployment.New(kubeClient).
Run(s.DeploymentControllerSyncPeriod)
}
}

pvclaimBinder := persistentvolumecontroller.NewPersistentVolumeClaimBinder(kubeClient, s.PVClaimBinderSyncPeriod)
Expand Down Expand Up @@ -348,3 +374,22 @@ func (s *CMServer) Run(_ []string) error {

select {}
}

func containsVersion(versions *api.APIVersions, version string) bool {
for ix := range versions.Versions {
if versions.Versions[ix] == version {
return true
}
}
return false
}

func containsResource(resources *api.APIResourceList, resourceName string) bool {
for ix := range resources.APIResources {
resource := resources.APIResources[ix]
if resource.Name == resourceName {
return true
}
}
return false
}
1 change: 0 additions & 1 deletion hack/verify-flags/known-flags.txt
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ duration-sec
e2e-verify-service-account
e2e-output-dir
enable-debugging-handlers
enable-experimental
enable-server
etcd-config
etcd-prefix
Expand Down
79 changes: 59 additions & 20 deletions pkg/controller/namespace/namespace_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type NamespaceController struct {
}

// NewNamespaceController creates a new NamespaceController
func NewNamespaceController(kubeClient client.Interface, experimentalMode bool, resyncPeriod time.Duration) *NamespaceController {
func NewNamespaceController(kubeClient client.Interface, versions *api.APIVersions, resyncPeriod time.Duration) *NamespaceController {
var controller *framework.Controller
_, controller = framework.NewInformer(
&cache.ListWatch{
Expand All @@ -60,7 +60,7 @@ func NewNamespaceController(kubeClient client.Interface, experimentalMode bool,
framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
namespace := obj.(*api.Namespace)
if err := syncNamespace(kubeClient, experimentalMode, namespace); err != nil {
if err := syncNamespace(kubeClient, versions, namespace); err != nil {
if estimate, ok := err.(*contentRemainingError); ok {
go func() {
// Estimate is the aggregate total of TerminationGracePeriodSeconds, which defaults to 30s
Expand All @@ -82,7 +82,7 @@ func NewNamespaceController(kubeClient client.Interface, experimentalMode bool,
},
UpdateFunc: func(oldObj, newObj interface{}) {
namespace := newObj.(*api.Namespace)
if err := syncNamespace(kubeClient, experimentalMode, namespace); err != nil {
if err := syncNamespace(kubeClient, versions, namespace); err != nil {
if estimate, ok := err.(*contentRemainingError); ok {
go func() {
t := estimate.Estimate/2 + 1
Expand Down Expand Up @@ -155,7 +155,7 @@ func (e *contentRemainingError) Error() string {
// deleteAllContent will delete all content known to the system in a namespace. It returns an estimate
// of the time remaining before the remaining resources are deleted. If estimate > 0 not all resources
// are guaranteed to be gone.
func deleteAllContent(kubeClient client.Interface, experimentalMode bool, namespace string, before unversioned.Time) (estimate int64, err error) {
func deleteAllContent(kubeClient client.Interface, versions *api.APIVersions, namespace string, before unversioned.Time) (estimate int64, err error) {
err = deleteServiceAccounts(kubeClient, namespace)
if err != nil {
return estimate, err
Expand Down Expand Up @@ -193,26 +193,41 @@ func deleteAllContent(kubeClient client.Interface, experimentalMode bool, namesp
return estimate, err
}
// If experimental mode, delete all experimental resources for the namespace.
if experimentalMode {
err = deleteHorizontalPodAutoscalers(kubeClient.Extensions(), namespace)
if containsVersion(versions, "extensions/v1beta1") {
resources, err := kubeClient.SupportedResourcesForGroupVersion("extensions/v1beta1")
glog.Errorf("%v", resources)
if err != nil {
return estimate, err
}
err = deleteDaemonSets(kubeClient.Extensions(), namespace)
if err != nil {
return estimate, err
if containsResource(resources, "horizontalpodautoscalers") {
err = deleteHorizontalPodAutoscalers(kubeClient.Extensions(), namespace)
if err != nil {
return estimate, err
}
}
err = deleteJobs(kubeClient.Extensions(), namespace)
if err != nil {
return estimate, err
if containsResource(resources, "ingress") {
err = deleteIngress(kubeClient.Extensions(), namespace)
if err != nil {
return estimate, err
}
}
err = deleteDeployments(kubeClient.Extensions(), namespace)
if err != nil {
return estimate, err
if containsResource(resources, "daemonsets") {
err = deleteDaemonSets(kubeClient.Extensions(), namespace)
if err != nil {
return estimate, err
}
}
err = deleteIngress(kubeClient.Extensions(), namespace)
if err != nil {
return estimate, err
if containsResource(resources, "jobs") {
err = deleteJobs(kubeClient.Extensions(), namespace)
if err != nil {
return estimate, err
}
}
if containsResource(resources, "deployments") {
err = deleteDeployments(kubeClient.Extensions(), namespace)
if err != nil {
return estimate, err
}
}
}
return estimate, nil
Expand Down Expand Up @@ -254,7 +269,7 @@ func updateNamespaceStatusFunc(kubeClient client.Interface, namespace *api.Names
}

// syncNamespace orchestrates deletion of a Namespace and its associated content.
func syncNamespace(kubeClient client.Interface, experimentalMode bool, namespace *api.Namespace) (err error) {
func syncNamespace(kubeClient client.Interface, versions *api.APIVersions, namespace *api.Namespace) (err error) {
if namespace.DeletionTimestamp == nil {
return nil
}
Expand All @@ -280,7 +295,7 @@ func syncNamespace(kubeClient client.Interface, experimentalMode bool, namespace
}

// there may still be content for us to remove
estimate, err := deleteAllContent(kubeClient, experimentalMode, namespace.Name, *namespace.DeletionTimestamp)
estimate, err := deleteAllContent(kubeClient, versions, namespace.Name, *namespace.DeletionTimestamp)
if err != nil {
return err
}
Expand Down Expand Up @@ -515,3 +530,27 @@ func deleteIngress(expClient client.ExtensionsInterface, ns string) error {
}
return nil
}

// TODO: this is duplicated logic. Move it somewhere central?
func containsVersion(versions *api.APIVersions, version string) bool {
for ix := range versions.Versions {
if versions.Versions[ix] == version {
return true
}
}
return false
}

// TODO: this is duplicated logic. Move it somewhere central?
func containsResource(resources *api.APIResourceList, resourceName string) bool {
if resources == nil {
return false
}
for ix := range resources.APIResources {
resource := resources.APIResources[ix]
if resource.Name == resourceName {
return true
}
}
return false
}
38 changes: 27 additions & 11 deletions pkg/controller/namespace/namespace_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestFinalizeNamespaceFunc(t *testing.T) {
}
}

func testSyncNamespaceThatIsTerminating(t *testing.T, experimentalMode bool) {
func testSyncNamespaceThatIsTerminating(t *testing.T, versions *api.APIVersions) {
mockClient := &testclient.Fake{}
now := unversioned.Now()
testNamespace := &api.Namespace{
Expand All @@ -89,7 +89,21 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, experimentalMode bool) {
Phase: api.NamespaceTerminating,
},
}
err := syncNamespace(mockClient, experimentalMode, testNamespace)

if containsVersion(versions, "extensions/v1beta1") {
resources := []api.APIResource{}
for _, resource := range []string{"daemonsets", "deployments", "jobs", "horizontalpodautoscalers", "ingress"} {
resources = append(resources, api.APIResource{Name: resource})
}
mockClient.Resources = []api.APIResourceList{
{
GroupVersion: "extensions/v1beta1",
APIResources: resources,
},
}
}

err := syncNamespace(mockClient, versions, testNamespace)
if err != nil {
t.Errorf("Unexpected error when synching namespace %v", err)
}
Expand All @@ -108,13 +122,14 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, experimentalMode bool) {
strings.Join([]string{"delete", "namespaces", ""}, "-"),
)

if experimentalMode {
if containsVersion(versions, "extensions/v1beta1") {
expectedActionSet.Insert(
strings.Join([]string{"list", "horizontalpodautoscalers", ""}, "-"),
strings.Join([]string{"list", "daemonsets", ""}, "-"),
strings.Join([]string{"list", "deployments", ""}, "-"),
strings.Join([]string{"list", "jobs", ""}, "-"),
strings.Join([]string{"list", "horizontalpodautoscalers", ""}, "-"),
strings.Join([]string{"list", "ingress", ""}, "-"),
strings.Join([]string{"get", "resource", ""}, "-"),
)
}

Expand All @@ -123,10 +138,10 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, experimentalMode bool) {
actionSet.Insert(strings.Join([]string{action.GetVerb(), action.GetResource(), action.GetSubresource()}, "-"))
}
if !actionSet.HasAll(expectedActionSet.List()...) {
t.Errorf("Expected actions: %v, but got: %v", expectedActionSet, actionSet)
t.Errorf("Expected actions:\n%v\n but got:\n%v\nDifference:\n%v", expectedActionSet, actionSet, expectedActionSet.Difference(actionSet))
}
if !expectedActionSet.HasAll(actionSet.List()...) {
t.Errorf("Expected actions: %v, but got: %v", expectedActionSet, actionSet)
t.Errorf("Expected actions:\n%v\n but got:\n%v\nDifference:\n%v", expectedActionSet, actionSet, actionSet.Difference(expectedActionSet))
}
}

Expand All @@ -151,11 +166,11 @@ func TestRetryOnConflictError(t *testing.T) {
}

func TestSyncNamespaceThatIsTerminatingNonExperimental(t *testing.T) {
testSyncNamespaceThatIsTerminating(t, false)
testSyncNamespaceThatIsTerminating(t, &api.APIVersions{})
}

func TestSyncNamespaceThatIsTerminatingExperimental(t *testing.T) {
testSyncNamespaceThatIsTerminating(t, true)
func TestSyncNamespaceThatIsTerminatingV1Beta1(t *testing.T) {
testSyncNamespaceThatIsTerminating(t, &api.APIVersions{Versions: []string{"extensions/v1beta1"}})
}

func TestSyncNamespaceThatIsActive(t *testing.T) {
Expand All @@ -172,7 +187,7 @@ func TestSyncNamespaceThatIsActive(t *testing.T) {
Phase: api.NamespaceActive,
},
}
err := syncNamespace(mockClient, false, testNamespace)
err := syncNamespace(mockClient, &api.APIVersions{}, testNamespace)
if err != nil {
t.Errorf("Unexpected error when synching namespace %v", err)
}
Expand All @@ -183,7 +198,8 @@ func TestSyncNamespaceThatIsActive(t *testing.T) {

func TestRunStop(t *testing.T) {
mockClient := &testclient.Fake{}
nsController := NewNamespaceController(mockClient, false, 1*time.Second)

nsController := NewNamespaceController(mockClient, &api.APIVersions{}, 1*time.Second)

if nsController.StopEverything != nil {
t.Errorf("Non-running manager should not have a stop channel. Got %v", nsController.StopEverything)
Expand Down

0 comments on commit 947a558

Please sign in to comment.