Skip to content

Commit

Permalink
Clean shutdown of resourcequota integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
wojtek-t committed Jun 14, 2022
1 parent 32cbd77 commit 8a87681
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 122 deletions.
1 change: 1 addition & 0 deletions pkg/controller/resourcequota/resource_quota_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ func (rq *Controller) worker(ctx context.Context, queue workqueue.RateLimitingIn
func (rq *Controller) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
defer rq.queue.ShutDown()
defer rq.missingUsageQueue.ShutDown()

klog.Infof("Starting resource quota controller")
defer klog.Infof("Shutting down resource quota controller")
Expand Down
11 changes: 11 additions & 0 deletions pkg/controller/resourcequota/resource_quota_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,8 @@ func (qm *QuotaMonitor) IsSynced() bool {
// Run sets the stop channel and starts monitor execution until stopCh is
// closed. Any running monitors will be stopped before Run returns.
func (qm *QuotaMonitor) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()

klog.Infof("QuotaMonitor running")
defer klog.Infof("QuotaMonitor stopping")

Expand All @@ -317,6 +319,15 @@ func (qm *QuotaMonitor) Run(stopCh <-chan struct{}) {
// Start monitors and begin change processing until the stop channel is
// closed.
qm.StartMonitors()

// The following workers are hanging forever until the queue is
// shutted down, so we need to shut it down in a separate goroutine.
go func() {
defer utilruntime.HandleCrash()
defer qm.resourceChanges.ShutDown()

<-stopCh
}()
wait.Until(qm.runProcessResourceChanges, 1*time.Second, stopCh)

// Stop any running monitors.
Expand Down
200 changes: 78 additions & 122 deletions test/integration/quota/quota_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ import (
"context"
"errors"
"fmt"
"net/http"
"net/http/httptest"
"os"
"testing"
"time"

Expand All @@ -31,23 +30,17 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/admission"
genericadmissioninitializer "k8s.io/apiserver/pkg/admission/initializer"
"k8s.io/apiserver/pkg/admission/plugin/resourcequota"
resourcequotaapi "k8s.io/apiserver/pkg/admission/plugin/resourcequota/apis/resourcequota"
"k8s.io/apiserver/pkg/quota/v1/generic"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
watchtools "k8s.io/client-go/tools/watch"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
"k8s.io/kubernetes/pkg/controller"
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota"
kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
quotainstall "k8s.io/kubernetes/pkg/quota/v1/install"
"k8s.io/kubernetes/test/integration/framework"
)
Expand All @@ -64,41 +57,20 @@ const (
// quota_test.go:115: Took 12.021640372s to scale up with quota
func TestQuota(t *testing.T) {
// Set up a API server
h := &framework.APIServerHolder{Initialized: make(chan struct{})}
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
<-h.Initialized
h.M.GenericAPIServer.Handler.ServeHTTP(w, req)
}))

admissionCh := make(chan struct{})
defer close(admissionCh)
clientset := clientset.NewForConfigOrDie(&restclient.Config{QPS: -1, Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
config := &resourcequotaapi.Configuration{}
admissionControl, err := resourcequota.NewResourceQuota(config, 5)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
internalInformers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc())
qca := quotainstall.NewQuotaConfigurationForAdmission()

initializers := admission.PluginInitializers{
genericadmissioninitializer.New(clientset, internalInformers, nil, nil, admissionCh),
kubeapiserveradmission.NewPluginInitializer(nil, nil, qca),
}
initializers.Initialize(admissionControl)
if err := admission.ValidateInitialization(admissionControl); err != nil {
t.Fatalf("couldn't initialize resource quota: %v", err)
}
_, kubeConfig, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
opts.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount"}
},
})
defer tearDownFn()

controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig()
controlPlaneConfig.GenericConfig.AdmissionControl = admissionControl
_, _, closeFn := framework.RunAnAPIServerUsingServer(controlPlaneConfig, s, h)
defer closeFn()
clientset := clientset.NewForConfigOrDie(kubeConfig)

ns := framework.CreateTestingNamespace("quotaed", t)
defer framework.DeleteTestingNamespace(ns, t)
ns2 := framework.CreateTestingNamespace("non-quotaed", t)
defer framework.DeleteTestingNamespace(ns2, t)
ns := framework.CreateNamespaceOrDie(clientset, "quotaed", t)
defer framework.DeleteNamespaceOrDie(clientset, ns, t)
ns2 := framework.CreateNamespaceOrDie(clientset, "non-quotaed", t)
defer framework.DeleteNamespaceOrDie(clientset, ns2, t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -136,7 +108,6 @@ func TestQuota(t *testing.T) {
// Periodically the quota controller to detect new resource types
go resourceQuotaController.Sync(discoveryFunc, 30*time.Second, ctx.Done())

internalInformers.Start(ctx.Done())
informers.Start(ctx.Done())
close(informersStarted)

Expand Down Expand Up @@ -292,49 +263,43 @@ func scale(t *testing.T, namespace string, clientset *clientset.Clientset) {
}

func TestQuotaLimitedResourceDenial(t *testing.T) {
// Set up an API server
h := &framework.APIServerHolder{Initialized: make(chan struct{})}
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
<-h.Initialized
h.M.GenericAPIServer.Handler.ServeHTTP(w, req)
}))

admissionCh := make(chan struct{})
defer close(admissionCh)
clientset := clientset.NewForConfigOrDie(&restclient.Config{QPS: -1, Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})

// stop creation of a pod resource unless there is a quota
config := &resourcequotaapi.Configuration{
LimitedResources: []resourcequotaapi.LimitedResource{
{
Resource: "pods",
MatchContains: []string{"pods"},
},
},
}
qca := quotainstall.NewQuotaConfigurationForAdmission()
admissionControl, err := resourcequota.NewResourceQuota(config, 5)
// Create admission configuration with ResourceQuota configuration.
admissionConfigFile, err := os.CreateTemp("", "admission-config.yaml")
if err != nil {
t.Fatalf("unexpected error: %v", err)
t.Fatal(err)
}
defer os.Remove(admissionConfigFile.Name())
if err := os.WriteFile(admissionConfigFile.Name(), []byte(`
apiVersion: apiserver.k8s.io/v1alpha1
kind: AdmissionConfiguration
plugins:
- name: ResourceQuota
configuration:
apiVersion: apiserver.config.k8s.io/v1
kind: ResourceQuotaConfiguration
limitedResources:
- resource: pods
matchContains:
- pods
`), os.FileMode(0644)); err != nil {
t.Fatal(err)
}
externalInformers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc())

initializers := admission.PluginInitializers{
genericadmissioninitializer.New(clientset, externalInformers, nil, nil, admissionCh),
kubeapiserveradmission.NewPluginInitializer(nil, nil, qca),
}
initializers.Initialize(admissionControl)
if err := admission.ValidateInitialization(admissionControl); err != nil {
t.Fatalf("couldn't initialize resource quota: %v", err)
}
// Set up an API server
_, kubeConfig, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
opts.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount"}
opts.Admission.GenericAdmission.ConfigFile = admissionConfigFile.Name()

controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig()
controlPlaneConfig.GenericConfig.AdmissionControl = admissionControl
_, _, closeFn := framework.RunAnAPIServerUsingServer(controlPlaneConfig, s, h)
defer closeFn()
},
})
defer tearDownFn()

ns := framework.CreateTestingNamespace("quota", t)
defer framework.DeleteTestingNamespace(ns, t)
clientset := clientset.NewForConfigOrDie(kubeConfig)

ns := framework.CreateNamespaceOrDie(clientset, "quota", t)
defer framework.DeleteNamespaceOrDie(clientset, ns, t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -372,7 +337,6 @@ func TestQuotaLimitedResourceDenial(t *testing.T) {
// Periodically the quota controller to detect new resource types
go resourceQuotaController.Sync(discoveryFunc, 30*time.Second, ctx.Done())

externalInformers.Start(ctx.Done())
informers.Start(ctx.Done())
close(informersStarted)

Expand Down Expand Up @@ -425,50 +389,43 @@ func TestQuotaLimitedResourceDenial(t *testing.T) {
}

func TestQuotaLimitService(t *testing.T) {

// Set up an API server
h := &framework.APIServerHolder{Initialized: make(chan struct{})}
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
<-h.Initialized
h.M.GenericAPIServer.Handler.ServeHTTP(w, req)
}))

admissionCh := make(chan struct{})
defer close(admissionCh)
clientset := clientset.NewForConfigOrDie(&restclient.Config{QPS: -1, Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})

// stop creation of a pod resource unless there is a quota
config := &resourcequotaapi.Configuration{
LimitedResources: []resourcequotaapi.LimitedResource{
{
Resource: "pods",
MatchContains: []string{"pods"},
},
},
}
qca := quotainstall.NewQuotaConfigurationForAdmission()
admissionControl, err := resourcequota.NewResourceQuota(config, 5)
// Create admission configuration with ResourceQuota configuration.
admissionConfigFile, err := os.CreateTemp("", "admission-config.yaml")
if err != nil {
t.Fatalf("unexpected error: %v", err)
t.Fatal(err)
}
defer os.Remove(admissionConfigFile.Name())
if err := os.WriteFile(admissionConfigFile.Name(), []byte(`
apiVersion: apiserver.k8s.io/v1alpha1
kind: AdmissionConfiguration
plugins:
- name: ResourceQuota
configuration:
apiVersion: apiserver.config.k8s.io/v1
kind: ResourceQuotaConfiguration
limitedResources:
- resource: pods
matchContains:
- pods
`), os.FileMode(0644)); err != nil {
t.Fatal(err)
}
externalInformers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc())

initializers := admission.PluginInitializers{
genericadmissioninitializer.New(clientset, externalInformers, nil, nil, admissionCh),
kubeapiserveradmission.NewPluginInitializer(nil, nil, qca),
}
initializers.Initialize(admissionControl)
if err := admission.ValidateInitialization(admissionControl); err != nil {
t.Fatalf("couldn't initialize resource quota: %v", err)
}
// Set up an API server
_, kubeConfig, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
opts.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount"}
opts.Admission.GenericAdmission.ConfigFile = admissionConfigFile.Name()

},
})
defer tearDownFn()

controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig()
controlPlaneConfig.GenericConfig.AdmissionControl = admissionControl
_, _, closeFn := framework.RunAnAPIServerUsingServer(controlPlaneConfig, s, h)
defer closeFn()
clientset := clientset.NewForConfigOrDie(kubeConfig)

ns := framework.CreateTestingNamespace("quota", t)
defer framework.DeleteTestingNamespace(ns, t)
ns := framework.CreateNamespaceOrDie(clientset, "quota", t)
defer framework.DeleteNamespaceOrDie(clientset, ns, t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -506,7 +463,6 @@ func TestQuotaLimitService(t *testing.T) {
// Periodically the quota controller to detect new resource types
go resourceQuotaController.Sync(discoveryFunc, 30*time.Second, ctx.Done())

externalInformers.Start(ctx.Done())
informers.Start(ctx.Done())
close(informersStarted)

Expand Down

0 comments on commit 8a87681

Please sign in to comment.