From 82247ffeb9e83b73955e1d2e14d94a6800e510b9 Mon Sep 17 00:00:00 2001 From: Roman Mohr Date: Fri, 12 May 2017 14:38:58 +0200 Subject: [PATCH] Refactor migration controller Only use one controller loop and feed this loop with all events of involved subresources. --- cmd/virt-controller/virt-controller.go | 16 +- pkg/api/v1/types.go | 6 +- pkg/kubecli/kubecli.go | 15 ++ pkg/virt-controller/services/vm_test.go | 2 +- pkg/virt-controller/watch/job.go | 86 ------- pkg/virt-controller/watch/job_test.go | 148 ----------- pkg/virt-controller/watch/migration.go | 250 ++++++++---------- pkg/virt-controller/watch/migration_test.go | 271 +++++++------------- tests/vm_migration_test.go | 5 +- 9 files changed, 218 insertions(+), 581 deletions(-) delete mode 100644 pkg/virt-controller/watch/job.go delete mode 100644 pkg/virt-controller/watch/job_test.go diff --git a/cmd/virt-controller/virt-controller.go b/cmd/virt-controller/virt-controller.go index 51cdd78f0619..612bb0bee222 100644 --- a/cmd/virt-controller/virt-controller.go +++ b/cmd/virt-controller/virt-controller.go @@ -83,27 +83,15 @@ func main() { // Wait until VM cache has warmed up before we start watching pods vmController.WaitForSync(stop) - //TODO order the parameters consistantly in the factories, or use an object. - _, migrationController, migrationQueue := watch.NewMigrationController(vmService, nil, restClient, clientSet) - migrationController.StartInformer(stop) + //FIXME when we have more than one worker, we need a lock on the VM + migrationController := watch.NewMigrationController(vmService, restClient, clientSet) go migrationController.Run(1, stop) - migrationController.WaitForSync(stop) // Start watching pods _, podController := watch.NewPodController(vmCache, nil, clientSet, restClient, vmService) podController.StartInformer(stop) go podController.Run(1, stop) - _, migrationPodController := watch.NewMigrationPodController(vmCache, nil, clientSet, restClient, vmService, *migrationQueue) - migrationPodController.StartInformer(stop) - //FIXME when we have more than one worker, we need a lock on the VM - go migrationPodController.Run(1, stop) - - _, jobController := watch.NewJobController(vmService, nil, clientSet, restClient, *migrationQueue) - jobController.StartInformer(stop) - go jobController.Run(1, stop) - jobController.WaitForSync(stop) - httpLogger := logger.With("service", "http") httpLogger.Info().Log("action", "listening", "interface", *host, "port", *port) diff --git a/pkg/api/v1/types.go b/pkg/api/v1/types.go index 9c752f93df39..1979d906dc42 100644 --- a/pkg/api/v1/types.go +++ b/pkg/api/v1/types.go @@ -366,9 +366,6 @@ const ( // Create Migration has been called but nothing has been done with it MigrationUnknown MigrationPhase = "" - // Migration is actively progressing - MigrationScheduled MigrationPhase = "Scheduled" - // Migration has been scheduled but no update on the status has been recorded MigrationRunning MigrationPhase = "Running" @@ -383,7 +380,8 @@ const ( // MigrationStatus is the last reported status of a VM Migratrion. Status may trail the actual // state of a migration. type MigrationStatus struct { - Phase MigrationPhase `json:"phase,omitempty"` + Phase MigrationPhase `json:"phase,omitempty"` + Instance *types.UID `json:"instance,omitempty"` } // Required to satisfy ObjectMetaAccessor interface diff --git a/pkg/kubecli/kubecli.go b/pkg/kubecli/kubecli.go index b29417c2c79e..8034ed5b4e63 100644 --- a/pkg/kubecli/kubecli.go +++ b/pkg/kubecli/kubecli.go @@ -136,6 +136,21 @@ func NewResourceEventHandlerFuncsForWorkqueue(queue workqueue.RateLimitingInterf } } +func NewResourceEventHandlerFuncsForFunc(f func(interface{})) cache.ResourceEventHandlerFuncs { + return cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + f(obj) + }, + UpdateFunc: func(old interface{}, new interface{}) { + f(new) + + }, + DeleteFunc: func(obj interface{}) { + f(obj) + }, + } +} + type Controller struct { indexer cache.Store queue workqueue.RateLimitingInterface diff --git a/pkg/virt-controller/services/vm_test.go b/pkg/virt-controller/services/vm_test.go index 3a236e871867..8d9b48809d17 100644 --- a/pkg/virt-controller/services/vm_test.go +++ b/pkg/virt-controller/services/vm_test.go @@ -54,7 +54,7 @@ var _ = Describe("VM", func() { migration = v1.NewMinimalMigration(vm.ObjectMeta.Name+"-migration", vm.ObjectMeta.Name) expected_migration = &v1.Migration{} *expected_migration = *migration - expected_migration.Status.Phase = v1.MigrationScheduled + expected_migration.Status.Phase = v1.MigrationRunning vm.ObjectMeta.UID = "testUID" vm.ObjectMeta.SetUID(uuid.NewUUID()) diff --git a/pkg/virt-controller/watch/job.go b/pkg/virt-controller/watch/job.go deleted file mode 100644 index 7a7a71a6ca45..000000000000 --- a/pkg/virt-controller/watch/job.go +++ /dev/null @@ -1,86 +0,0 @@ -package watch - -import ( - "k8s.io/client-go/kubernetes" - kubeapi "k8s.io/client-go/pkg/api" - "k8s.io/client-go/pkg/api/v1" - "k8s.io/client-go/pkg/fields" - "k8s.io/client-go/pkg/labels" - "k8s.io/client-go/pkg/util/workqueue" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/record" - - kvirtv1 "kubevirt.io/kubevirt/pkg/api/v1" - "kubevirt.io/kubevirt/pkg/kubecli" - "kubevirt.io/kubevirt/pkg/logging" - "kubevirt.io/kubevirt/pkg/virt-controller/services" -) - -func migrationJobSelector() kubeapi.ListOptions { - fieldSelector := fields.ParseSelectorOrDie( - "status.phase!=" + string(v1.PodPending) + - ",status.phase!=" + string(v1.PodRunning) + - ",status.phase!=" + string(v1.PodUnknown)) - labelSelector, err := labels.Parse(kvirtv1.AppLabel + "=migration," + kvirtv1.DomainLabel + "," + kvirtv1.MigrationLabel) - if err != nil { - panic(err) - } - return kubeapi.ListOptions{FieldSelector: fieldSelector, LabelSelector: labelSelector} -} - -func NewJobController(vmService services.VMService, recorder record.EventRecorder, clientSet *kubernetes.Clientset, restClient *rest.RESTClient, migrationQueue workqueue.RateLimitingInterface) (cache.Store, *kubecli.Controller) { - selector := migrationJobSelector() - lw := kubecli.NewListWatchFromClient(clientSet.CoreV1().RESTClient(), "pods", kubeapi.NamespaceDefault, selector.FieldSelector, selector.LabelSelector) - queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) - return kubecli.NewController(lw, queue, &v1.Pod{}, NewJobControllerDispatch(vmService, restClient, migrationQueue)) -} - -func NewJobControllerDispatch(vmService services.VMService, restClient *rest.RESTClient, migrationQueue workqueue.RateLimitingInterface) kubecli.ControllerDispatch { - dispatch := JobDispatch{ - restClient: restClient, - vmService: vmService, - migrationQueue: migrationQueue, - } - var vmd kubecli.ControllerDispatch = &dispatch - return vmd -} - -type JobDispatch struct { - restClient *rest.RESTClient - vmService services.VMService - migrationQueue workqueue.RateLimitingInterface -} - -func (jd *JobDispatch) Execute(store cache.Store, queue workqueue.RateLimitingInterface, key interface{}) { - obj, exists, err := store.GetByKey(key.(string)) - if err != nil { - queue.AddRateLimited(key) - return - } - if exists { - job := obj.(*v1.Pod) - - //TODO Use the namespace from the Job and stop referencing the migration object - migrationLabel := job.ObjectMeta.Labels[kvirtv1.MigrationLabel] - migration, migrationExists, err := jd.vmService.FetchMigration(migrationLabel) - if err != nil { - queue.AddRateLimited(key) - return - } - if !migrationExists { - //REstart where the Migration has gone away. - queue.Forget(key) - return - } - migrationKey, err := cache.MetaNamespaceKeyFunc(migration) - if err == nil { - jd.migrationQueue.Add(migrationKey) - } else { - logger := logging.DefaultLogger().Object(migration) - logger.Error().Reason(err).Msgf("Updating migration queue with %s failed.", migrationLabel) - queue.AddRateLimited(key) - return - } - } -} diff --git a/pkg/virt-controller/watch/job_test.go b/pkg/virt-controller/watch/job_test.go deleted file mode 100644 index 0384b9bee905..000000000000 --- a/pkg/virt-controller/watch/job_test.go +++ /dev/null @@ -1,148 +0,0 @@ -package watch - -import ( - "net/http" - - "github.com/facebookgo/inject" - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - "github.com/onsi/gomega/ghttp" - "k8s.io/client-go/kubernetes" - kubeapi "k8s.io/client-go/pkg/api" - corev1 "k8s.io/client-go/pkg/api/v1" - "k8s.io/client-go/pkg/fields" - "k8s.io/client-go/pkg/labels" - "k8s.io/client-go/pkg/util/workqueue" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/cache" - - kvirtv1 "kubevirt.io/kubevirt/pkg/api/v1" - "kubevirt.io/kubevirt/pkg/kubecli" - "kubevirt.io/kubevirt/pkg/logging" - "kubevirt.io/kubevirt/pkg/virt-controller/services" -) - -var _ = Describe("Migration", func() { - var server *ghttp.Server - var jobCache cache.Store - var vmService services.VMService - var restClient *rest.RESTClient - var vm *kvirtv1.VM - var dispatch kubecli.ControllerDispatch - var migration *kvirtv1.Migration - var job *corev1.Pod - var listOptions kubeapi.ListOptions = migrationJobSelector() - var jobQueue workqueue.RateLimitingInterface - var migrationQueue workqueue.RateLimitingInterface - - var jobKey interface{} - - doExecute := func() { - // Tell the controller function that there is a new Job - jobKey, _ = cache.MetaNamespaceKeyFunc(job) - jobCache.Add(job) - jobQueue.Add(jobKey) - dispatch.Execute(jobCache, jobQueue, jobKey) - } - - logging.DefaultLogger().SetIOWriter(GinkgoWriter) - - BeforeEach(func() { - var g inject.Graph - vmService = services.NewVMService() - server = ghttp.NewServer() - config := rest.Config{} - config.Host = server.URL() - clientSet, _ := kubernetes.NewForConfig(&config) - templateService, _ := services.NewTemplateService("kubevirt/virt-launcher") - restClient, _ = kubecli.GetRESTClientFromFlags(server.URL(), "") - - g.Provide( - &inject.Object{Value: restClient}, - &inject.Object{Value: clientSet}, - &inject.Object{Value: vmService}, - &inject.Object{Value: templateService}, - ) - g.Populate() - - jobCache = cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, nil) - jobQueue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) - migrationQueue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) - vm = kvirtv1.NewMinimalVM("test-vm") - vm.Status.Phase = kvirtv1.Migrating - vm.GetObjectMeta().SetLabels(map[string]string{"a": "b"}) - - dispatch = NewJobControllerDispatch(vmService, restClient, migrationQueue) - migration = kvirtv1.NewMinimalMigration("test-migration", "test-vm") - job = &corev1.Pod{ - ObjectMeta: corev1.ObjectMeta{ - Labels: map[string]string{ - kvirtv1.AppLabel: "migration", - kvirtv1.DomainLabel: "test-vm", - kvirtv1.MigrationLabel: migration.ObjectMeta.Name, - }, - }, - Status: corev1.PodStatus{ - Phase: corev1.PodSucceeded, - }, - } - - }) - - Context("Running job", func() { - It("Success should update the VM to Running", func(done Done) { - // Register the expected REST call - server.AppendHandlers( - handlerToFetchTestMigration(migration), - ) - doExecute() - Expect(len(server.ReceivedRequests())).To(Equal(1)) - Expect(jobQueue.NumRequeues(jobKey)).Should(Equal(0)) - Expect(migrationQueue.Len()).Should(Equal(1)) - close(done) - }, 10) - - It("should have valid listSelector", func() { - - f := fields.Set{"Status.Phase": string(corev1.PodSucceeded)} - matches := listOptions.FieldSelector.Matches(f) - Expect(matches).Should(BeTrue()) - - s := labels.Set(job.Labels) - matches = listOptions.LabelSelector.Matches(s) - Expect(matches).Should(BeTrue()) - Expect(jobQueue.NumRequeues(jobKey)).Should(Equal(0)) - }, 100) - - It("Error Fetching Migration should requeue", func(done Done) { - // Register the expected REST call - server.AppendHandlers( - handlerToFetchTestMigrationAuthError(migration), - ) - - doExecute() - - Expect(len(server.ReceivedRequests())).To(Equal(1)) - Expect(jobQueue.NumRequeues(jobKey)).Should(Equal(1)) - close(done) - }, 10) - }) - - AfterEach(func() { - server.Close() - }) -}) - -func handlerToFetchTestMigration(migration *kvirtv1.Migration) http.HandlerFunc { - return ghttp.CombineHandlers( - ghttp.VerifyRequest("GET", "/apis/kubevirt.io/v1alpha1/namespaces/default/migrations/"+migration.ObjectMeta.Name), - ghttp.RespondWithJSONEncoded(http.StatusOK, migration), - ) -} - -func handlerToFetchTestMigrationAuthError(migration *kvirtv1.Migration) http.HandlerFunc { - return ghttp.CombineHandlers( - ghttp.VerifyRequest("GET", "/apis/kubevirt.io/v1alpha1/namespaces/default/migrations/"+migration.ObjectMeta.Name), - ghttp.RespondWithJSONEncoded(http.StatusForbidden, migration), - ) -} diff --git a/pkg/virt-controller/watch/migration.go b/pkg/virt-controller/watch/migration.go index 2a465a0a5c16..872b37f71078 100644 --- a/pkg/virt-controller/watch/migration.go +++ b/pkg/virt-controller/watch/migration.go @@ -2,6 +2,7 @@ package watch import ( "fmt" + "time" "k8s.io/client-go/kubernetes" kubeapi "k8s.io/client-go/pkg/api" @@ -9,11 +10,11 @@ import ( metav1 "k8s.io/client-go/pkg/apis/meta/v1" "k8s.io/client-go/pkg/fields" "k8s.io/client-go/pkg/labels" - "k8s.io/client-go/pkg/types" + "k8s.io/client-go/pkg/runtime" + "k8s.io/client-go/pkg/util/wait" "k8s.io/client-go/pkg/util/workqueue" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/record" kubev1 "kubevirt.io/kubevirt/pkg/api/v1" "kubevirt.io/kubevirt/pkg/kubecli" @@ -21,40 +22,73 @@ import ( "kubevirt.io/kubevirt/pkg/virt-controller/services" ) -func NewMigrationController(migrationService services.VMService, recorder record.EventRecorder, restClient *rest.RESTClient, clientset *kubernetes.Clientset) (cache.Store, *kubecli.Controller, *workqueue.RateLimitingInterface) { +func NewMigrationController(migrationService services.VMService, restClient *rest.RESTClient, clientset *kubernetes.Clientset) *MigrationController { lw := cache.NewListWatchFromClient(restClient, "migrations", k8sv1.NamespaceDefault, fields.Everything()) queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) - store, controller := kubecli.NewController(lw, queue, &kubev1.Migration{}, NewMigrationControllerDispatch(migrationService, restClient, clientset)) - return store, controller, &queue -} - -func NewMigrationControllerDispatch(vmService services.VMService, restClient *rest.RESTClient, clientset *kubernetes.Clientset) kubecli.ControllerDispatch { - - dispatch := MigrationDispatch{ + store, informer := cache.NewIndexerInformer(lw, &kubev1.Migration{}, 0, kubecli.NewResourceEventHandlerFuncsForWorkqueue(queue), cache.Indexers{}) + return &MigrationController{ restClient: restClient, - vmService: vmService, + vmService: migrationService, clientset: clientset, + queue: queue, + store: store, + informer: informer, } - return &dispatch } -type MigrationDispatch struct { +type MigrationController struct { restClient *rest.RESTClient vmService services.VMService clientset *kubernetes.Clientset + queue workqueue.RateLimitingInterface + store cache.Store + informer cache.ControllerInterface } -func (md *MigrationDispatch) Execute(store cache.Store, queue workqueue.RateLimitingInterface, key interface{}) { - if err := md.execute(store, key.(string)); err != nil { +func (c *MigrationController) Run(threadiness int, stopCh chan struct{}) { + defer kubecli.HandlePanic() + defer c.queue.ShutDown() + logging.DefaultLogger().Info().Msg("Starting controller.") + + // Start all informers and wait for the cache sync + _, jobInformer := NewMigrationJobInformer(c.clientset, c.queue) + go jobInformer.Run(stopCh) + _, podInformer := NewMigrationPodInformer(c.clientset, c.queue) + go podInformer.Run(stopCh) + go c.informer.Run(stopCh) + cache.WaitForCacheSync(stopCh, c.informer.HasSynced, jobInformer.HasSynced, podInformer.HasSynced) + + // Start the actual work + for i := 0; i < threadiness; i++ { + go wait.Until(c.runWorker, time.Second, stopCh) + } + + <-stopCh + logging.DefaultLogger().Info().Msg("Stopping controller.") +} + +func (c *MigrationController) runWorker() { + for c.Execute() { + } +} + +func (md *MigrationController) Execute() bool { + key, quit := md.queue.Get() + if quit { + return false + } + defer md.queue.Done(key) + if err := md.execute(key.(string)); err != nil { logging.DefaultLogger().Info().Reason(err).Msgf("reenqueuing migration %v", key) - queue.AddRateLimited(key) + md.queue.AddRateLimited(key) } else { logging.DefaultLogger().Info().V(4).Msgf("processed migration %v", key) - queue.Forget(key) + md.queue.Forget(key) } + return true } -func (md *MigrationDispatch) execute(store cache.Store, key string) error { +func (md *MigrationController) execute(key string) error { setMigrationPhase := func(migration *kubev1.Migration, phase kubev1.MigrationPhase) error { @@ -64,16 +98,9 @@ func (md *MigrationDispatch) execute(store cache.Store, key string) error { logger := logging.DefaultLogger().Object(migration) - // Copy migration for future modifications - migrationCopy, err := copy(migration) - if err != nil { - logger.Error().Reason(err).Msg("could not copy migration object") - return err - } - - migrationCopy.Status.Phase = phase + migration.Status.Phase = phase // TODO indicate why it was set to failed - err = md.vmService.UpdateMigration(migrationCopy) + err := md.vmService.UpdateMigration(migration) if err != nil { logger.Error().Reason(err).Msgf("updating migration state failed: %v ", err) return err @@ -85,7 +112,7 @@ func (md *MigrationDispatch) execute(store cache.Store, key string) error { return setMigrationPhase(mig, kubev1.MigrationFailed) } - obj, exists, err := store.GetByKey(key) + obj, exists, err := md.store.GetByKey(key) if err != nil { return err } @@ -93,8 +120,13 @@ func (md *MigrationDispatch) execute(store cache.Store, key string) error { return nil } - var migration *kubev1.Migration = obj.(*kubev1.Migration) - logger := logging.DefaultLogger().Object(migration) + logger := logging.DefaultLogger().Object(obj.(*kubev1.Migration)) + // Copy migration for future modifications + if obj, err = kubeapi.Scheme.Copy(obj.(runtime.Object)); err != nil { + logger.Error().Reason(err).Msg("could not copy migration object") + return err + } + migration := obj.(*kubev1.Migration) vm, exists, err := md.vmService.FetchVM(migration.Spec.Selector.Name) if err != nil { @@ -104,20 +136,14 @@ func (md *MigrationDispatch) execute(store cache.Store, key string) error { if !exists { logger.Info().Msgf("VM with name %s does not exist, marking migration as failed", migration.Spec.Selector.Name) - if err = setMigrationFailed(migration); err != nil { - return err - } - return nil + return setMigrationFailed(migration) } switch migration.Status.Phase { case kubev1.MigrationUnknown: if vm.Status.Phase != kubev1.Running { logger.Error().Msgf("VM with name %s is in state %s, no migration possible. Marking migration as failed", vm.GetObjectMeta().GetName(), vm.Status.Phase) - if err = setMigrationFailed(migration); err != nil { - return err - } - return nil + return setMigrationFailed(migration) } if err := mergeConstraints(migration, vm); err != nil { @@ -131,7 +157,7 @@ func (md *MigrationDispatch) execute(store cache.Store, key string) error { } //FIXME when we have more than one worker, we need a lock on the VM - numOfPods, targetPod, err := investigateTargetPodSituation(migration, podList, store) + numOfPods, targetPod, err := investigateTargetPodSituation(migration, podList, md.store) if err != nil { logger.Error().Reason(err).Msg("could not investigate pods") return err @@ -157,73 +183,34 @@ func (md *MigrationDispatch) execute(store cache.Store, key string) error { } else { if targetPod.Status.Phase == k8sv1.PodFailed { logger.Error().Msg("migration target pod is in failed state") - if err = setMigrationFailed(migration); err != nil { - return err - } - return nil + return setMigrationFailed(migration) } // Unlikely to hit this case, but prevents erroring out // if we re-enter this loop - logger.Info().Msgf("migration appears to be set up, but was not set to %s", kubev1.MigrationScheduled) + logger.Info().Msgf("migration appears to be set up, but was not set to %s", kubev1.MigrationRunning) } - err = setMigrationPhase(migration, kubev1.MigrationScheduled) - if err != nil { - return err - } - return nil - case kubev1.MigrationScheduled: - podList, err := md.vmService.GetRunningVMPods(vm) - if err != nil { - logger.Error().Reason(err).Msg("could not fetch a list of running VM target pods") - return err - } - - _, targetPod, err := investigateTargetPodSituation(migration, podList, store) - if err != nil { - logger.Error().Reason(err).Msg("could not investigate pods") - return err - } - - if targetPod == nil { - logger.Error().Msg("migration target pod does not exist or is an end state") - if err = setMigrationFailed(migration); err != nil { - return err - } - return nil - } - // Migration has been scheduled but no update on the status has been recorded - err = setMigrationPhase(migration, kubev1.MigrationRunning) - if err != nil { - return err - } - return nil + return setMigrationPhase(migration, kubev1.MigrationRunning) case kubev1.MigrationRunning: podList, err := md.vmService.GetRunningVMPods(vm) if err != nil { logger.Error().Reason(err).Msg("could not fetch a list of running VM target pods") return err } - _, targetPod, err := investigateTargetPodSituation(migration, podList, store) + _, targetPod, err := investigateTargetPodSituation(migration, podList, md.store) if err != nil { logger.Error().Reason(err).Msg("could not investigate pods") return err } if targetPod == nil { logger.Error().Msg("migration target pod does not exist or is in an end state") - if err = setMigrationFailed(migration); err != nil { - return err - } - return nil + return setMigrationFailed(migration) } switch targetPod.Status.Phase { case k8sv1.PodRunning: break case k8sv1.PodSucceeded, k8sv1.PodFailed: logger.Error().Msgf("migration target pod is in end state %s", targetPod.Status.Phase) - if err = setMigrationFailed(migration); err != nil { - return err - } - return nil + return setMigrationFailed(migration) default: //Not requeuing, just not far enough along to proceed logger.Info().V(3).Msg("target Pod not running yet") @@ -274,9 +261,7 @@ func (md *MigrationDispatch) execute(store cache.Store, key string) error { if _, err = md.vmService.PutVm(vm); err != nil { return err } - if err = setMigrationFailed(migration); err != nil { - return err - } + return setMigrationFailed(migration) case k8sv1.PodSucceeded: vm.Status.NodeName = targetPod.Spec.NodeName vm.Status.MigrationNodeName = "" @@ -289,22 +274,12 @@ func (md *MigrationDispatch) execute(store cache.Store, key string) error { logger.Error().Reason(err).Msg("updating the VM failed.") return err } - if err = setMigrationPhase(migration, kubev1.MigrationSucceeded); err != nil { - return err - } + return setMigrationPhase(migration, kubev1.MigrationSucceeded) } } return nil } -func copy(migration *kubev1.Migration) (*kubev1.Migration, error) { - obj, err := kubeapi.Scheme.Copy(migration) - if err != nil { - return nil, err - } - return obj.(*kubev1.Migration), nil -} - // Returns the number of running pods and if a pod for exactly that migration is currently running func investigateTargetPodSituation(migration *kubev1.Migration, podList *k8sv1.PodList, migrationStore cache.Store) (int, *k8sv1.Pod, error) { var targetPod *k8sv1.Pod = nil @@ -373,62 +348,39 @@ func migrationVMPodSelector() kubeapi.ListOptions { return kubeapi.ListOptions{FieldSelector: fieldSelector, LabelSelector: labelSelector} } -func NewMigrationPodController(vmCache cache.Store, recorder record.EventRecorder, clientset *kubernetes.Clientset, restClient *rest.RESTClient, vmService services.VMService, migrationQueue workqueue.RateLimitingInterface) (cache.Store, *kubecli.Controller) { - - selector := migrationVMPodSelector() - lw := kubecli.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", kubeapi.NamespaceDefault, selector.FieldSelector, selector.LabelSelector) - queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) - return kubecli.NewController(lw, queue, &k8sv1.Pod{}, NewMigrationPodControllerDispatch(vmCache, restClient, vmService, clientset, migrationQueue)) +func migrationJobSelector() kubeapi.ListOptions { + fieldSelector := fields.ParseSelectorOrDie( + "status.phase!=" + string(k8sv1.PodPending) + + ",status.phase!=" + string(k8sv1.PodRunning) + + ",status.phase!=" + string(k8sv1.PodUnknown)) + labelSelector, err := labels.Parse(kubev1.AppLabel + "=migration," + kubev1.DomainLabel + "," + kubev1.MigrationLabel) + if err != nil { + panic(err) + } + return kubeapi.ListOptions{FieldSelector: fieldSelector, LabelSelector: labelSelector} } -type migrationPodDispatch struct { - vmCache cache.Store - restClient *rest.RESTClient - vmService services.VMService - clientset *kubernetes.Clientset - migrationQueue workqueue.RateLimitingInterface +// Informer, which checks for Jobs, orchestrating the migrations done by libvirt +func NewMigrationJobInformer(clientSet *kubernetes.Clientset, migrationQueue workqueue.RateLimitingInterface) (cache.Store, cache.ControllerInterface) { + selector := migrationJobSelector() + lw := kubecli.NewListWatchFromClient(clientSet.CoreV1().RESTClient(), "pods", kubeapi.NamespaceDefault, selector.FieldSelector, selector.LabelSelector) + return cache.NewIndexerInformer(lw, &k8sv1.Pod{}, 0, + kubecli.NewResourceEventHandlerFuncsForFunc(migrationLabelHandler(migrationQueue)), + cache.Indexers{}) } -func NewMigrationPodControllerDispatch(vmCache cache.Store, restClient *rest.RESTClient, vmService services.VMService, clientset *kubernetes.Clientset, migrationQueue workqueue.RateLimitingInterface) kubecli.ControllerDispatch { - dispatch := migrationPodDispatch{ - vmCache: vmCache, - restClient: restClient, - vmService: vmService, - clientset: clientset, - migrationQueue: migrationQueue, - } - return &dispatch +// Informer, which checks for potential migration target Pods +func NewMigrationPodInformer(clientSet *kubernetes.Clientset, migrationQueue workqueue.RateLimitingInterface) (cache.Store, cache.ControllerInterface) { + selector := migrationVMPodSelector() + lw := kubecli.NewListWatchFromClient(clientSet.CoreV1().RESTClient(), "pods", kubeapi.NamespaceDefault, selector.FieldSelector, selector.LabelSelector) + return cache.NewIndexerInformer(lw, &k8sv1.Pod{}, 0, + kubecli.NewResourceEventHandlerFuncsForFunc(migrationLabelHandler(migrationQueue)), + cache.Indexers{}) } -func (pd *migrationPodDispatch) Execute(podStore cache.Store, podQueue workqueue.RateLimitingInterface, key interface{}) { - // Fetch the latest Vm state from cache - obj, exists, err := podStore.GetByKey(key.(string)) - - if err != nil { - podQueue.AddRateLimited(key) - return - } - - if !exists { - // Do nothing - return - } - pod := obj.(*k8sv1.Pod) - - vmObj, exists, err := pd.vmCache.GetByKey(kubeapi.NamespaceDefault + "/" + pod.GetLabels()[kubev1.DomainLabel]) - if err != nil { - podQueue.AddRateLimited(key) - return - } - if !exists { - // Do nothing, the pod will timeout. - return - } - vm := vmObj.(*kubev1.VM) - if vm.GetObjectMeta().GetUID() != types.UID(pod.GetLabels()[kubev1.VMUIDLabel]) { - // Obviously the pod of an outdated VM object, do nothing - return +func migrationLabelHandler(migrationQueue workqueue.RateLimitingInterface) func(obj interface{}) { + return func(obj interface{}) { + migrationLabel := obj.(*k8sv1.Pod).ObjectMeta.Labels[kubev1.MigrationLabel] + migrationQueue.Add(k8sv1.NamespaceDefault + "/" + migrationLabel) } - pd.migrationQueue.Add(k8sv1.NamespaceDefault + "/" + pod.Labels[kubev1.MigrationLabel]) - return } diff --git a/pkg/virt-controller/watch/migration_test.go b/pkg/virt-controller/watch/migration_test.go index ead166084ff0..f81f81cca5e1 100644 --- a/pkg/virt-controller/watch/migration_test.go +++ b/pkg/virt-controller/watch/migration_test.go @@ -27,26 +27,24 @@ import ( var _ = Describe("Migration", func() { var ( - server *ghttp.Server - migrationCache cache.Store - podCache cache.Store - vmCache cache.Store - vmService services.VMService - restClient *rest.RESTClient - dispatch kubecli.ControllerDispatch - migrationPodDispatch kubecli.ControllerDispatch - migrationQueue workqueue.RateLimitingInterface - migration *v1.Migration - vm *v1.VM - pod *clientv1.Pod - podList clientv1.PodList - migrationKey interface{} - srcIp clientv1.NodeAddress - destIp kubev1.NodeAddress - srcNodeWithIp kubev1.Node - destNodeWithIp kubev1.Node - srcNode kubev1.Node - destNode kubev1.Node + server *ghttp.Server + migrationCache cache.Store + vmCache cache.Store + vmService services.VMService + restClient *rest.RESTClient + migrationController *MigrationController + migrationQueue workqueue.RateLimitingInterface + migration *v1.Migration + vm *v1.VM + pod *clientv1.Pod + podList clientv1.PodList + migrationKey interface{} + srcIp clientv1.NodeAddress + destIp kubev1.NodeAddress + srcNodeWithIp kubev1.Node + destNodeWithIp kubev1.Node + srcNode kubev1.Node + destNode kubev1.Node ) logging.DefaultLogger().SetIOWriter(GinkgoWriter) @@ -63,7 +61,6 @@ var _ = Describe("Migration", func() { templateService, _ := services.NewTemplateService("kubevirt/virt-launcher") restClient, _ = kubecli.GetRESTClientFromFlags(server.URL(), "") - podCache = cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, nil) vmCache = cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, nil) g.Provide( &inject.Object{Value: restClient}, @@ -73,9 +70,16 @@ var _ = Describe("Migration", func() { ) g.Populate() migrationCache = cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, nil) - dispatch = NewMigrationControllerDispatch(vmService, restClient, clientSet) migrationQueue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) + migrationController = &MigrationController{ + restClient: restClient, + vmService: vmService, + clientset: clientSet, + queue: migrationQueue, + store: migrationCache, + informer: nil, + } // Create a VM which is being scheduled vm = v1.NewMinimalVM("testvm") @@ -85,7 +89,6 @@ var _ = Describe("Migration", func() { migration = v1.NewMinimalMigration(vm.ObjectMeta.Name+"-migration", vm.ObjectMeta.Name) migration.ObjectMeta.SetUID(uuid.NewUUID()) migration.Spec.NodeSelector = map[string]string{"beta.kubernetes.io/arch": "amd64"} - migrationPodDispatch = NewMigrationPodControllerDispatch(vmCache, restClient, vmService, clientSet, migrationQueue) // Create a target Pod for the VM templateService, err := services.NewTemplateService("whatever") @@ -135,12 +138,10 @@ var _ = Describe("Migration", func() { Addresses: []clientv1.NodeAddress{destIp, srcIp}, }, } - }) + migrationKey, err = cache.DeletionHandlingMetaNamespaceKeyFunc(migration) + Expect(err).ToNot(HaveOccurred()) - doExecute := func() { - migrationKey, _ = cache.MetaNamespaceKeyFunc(migration) - dispatch.Execute(migrationCache, migrationQueue, migrationKey) - } + }) buildExpectedVM := func(phase v1.VMPhase) *v1.VM { @@ -172,67 +173,70 @@ var _ = Describe("Migration", func() { } Context("Running Migration target Pod for a running VM given", func() { - It("should update the VM with the migration target node of the running Pod", func(done Done) { + It("should update the VM with the migration target node of the running Pod", func() { // Register the expected REST call server.AppendHandlers( handleGetTestVM(buildExpectedVM(v1.Running)), handleGetPodList(podList), handleCreatePod(pod), - handlePutMigration(migration, v1.MigrationScheduled), + handlePutMigration(migration, v1.MigrationRunning), ) + migrationQueue.Add(migrationKey) migrationCache.Add(migration) - - doExecute() + migrationController.Execute() Expect(len(server.ReceivedRequests())).To(Equal(4)) Expect(migrationQueue.NumRequeues(migrationKey)).Should(Equal(0)) + }) - close(done) - }, 10) - - It("failed GET oF VM should requeue", func(done Done) { + It("failed GET oF VM should requeue", func() { // Register the expected REST call server.AppendHandlers( handleGetTestVMAuthError(buildExpectedVM(v1.Running)), ) + + migrationQueue.Add(migrationKey) migrationCache.Add(migration) - doExecute() + migrationController.Execute() + Expect(len(server.ReceivedRequests())).To(Equal(1)) Expect(migrationQueue.NumRequeues(migrationKey)).Should(Equal(1)) - close(done) - }, 10) + }) - It("failed GET oF Pod List should requeue", func(done Done) { + It("failed GET oF Pod List should requeue", func() { // Register the expected REST call server.AppendHandlers( handleGetTestVM(buildExpectedVM(v1.Running)), handleGetPodListAuthError(podList), ) + + migrationQueue.Add(migrationKey) migrationCache.Add(migration) - doExecute() + migrationController.Execute() + Expect(len(server.ReceivedRequests())).To(Equal(2)) Expect(migrationQueue.NumRequeues(migrationKey)).Should(Equal(1)) - close(done) - }, 10) + }) - It("Should Mark Migration as failed if VM Not found.", func(done Done) { + It("Should Mark Migration as failed if VM Not found.", func() { // Register the expected REST call server.AppendHandlers( handleGetTestVMNotFound(), handlePutMigration(migration, v1.MigrationFailed), ) + migrationQueue.Add(migrationKey) migrationCache.Add(migration) - doExecute() + migrationController.Execute() + Expect(len(server.ReceivedRequests())).To(Equal(2)) Expect(migrationQueue.NumRequeues(migrationKey)).Should(Equal(0)) - close(done) - }, 10) + }) - It("should requeue if VM Not found and Migration update error.", func(done Done) { + It("should requeue if VM Not found and Migration update error.", func() { // Register the expected REST call server.AppendHandlers( @@ -240,39 +244,39 @@ var _ = Describe("Migration", func() { handlePutMigrationAuthError(), ) migrationCache.Add(migration) - doExecute() + migrationQueue.Add(migrationKey) + migrationController.Execute() Expect(len(server.ReceivedRequests())).To(Equal(2)) Expect(migrationQueue.NumRequeues(migrationKey)).Should(Equal(1)) - close(done) - }, 10) + }) - It("Should mark Migration failed if VM not running ", func(done Done) { + It("Should mark Migration failed if VM not running ", func() { // Register the expected REST call server.AppendHandlers( handleGetTestVM(buildExpectedVM(v1.Pending)), handlePutMigration(migration, v1.MigrationFailed), ) migrationCache.Add(migration) - doExecute() + migrationQueue.Add(migrationKey) + migrationController.Execute() Expect(len(server.ReceivedRequests())).To(Equal(2)) Expect(migrationQueue.NumRequeues(migrationKey)).Should(Equal(0)) - close(done) - }, 10) + }) - It("Should Requeue if VM not running and updateMigratio0n Failure", func(done Done) { + It("Should Requeue if VM not running and updateMigratio0n Failure", func() { // Register the expected REST call server.AppendHandlers( handleGetTestVM(buildExpectedVM(v1.Pending)), handlePutMigrationAuthError(), ) migrationCache.Add(migration) - doExecute() + migrationQueue.Add(migrationKey) + migrationController.Execute() Expect(len(server.ReceivedRequests())).To(Equal(2)) Expect(migrationQueue.NumRequeues(migrationKey)).Should(Equal(1)) - close(done) - }, 10) + }) - It("should requeue if Migration update fails", func(done Done) { + It("should requeue if Migration update fails", func() { // Register the expected REST call server.AppendHandlers( @@ -283,14 +287,14 @@ var _ = Describe("Migration", func() { ) migrationCache.Add(migration) - doExecute() + migrationQueue.Add(migrationKey) + migrationController.Execute() Expect(len(server.ReceivedRequests())).To(Equal(4)) Expect(migrationQueue.NumRequeues(migrationKey)).Should(Equal(1)) - close(done) - }, 10) + }) - It("should fail if conflicting VM and Migration have conflicting Node Selectors", func(done Done) { + It("should fail if conflicting VM and Migration have conflicting Node Selectors", func() { vm := buildExpectedVM(v1.Running) vm.Spec.NodeSelector = map[string]string{"beta.kubernetes.io/arch": "i386"} @@ -300,14 +304,14 @@ var _ = Describe("Migration", func() { ) migrationCache.Add(migration) - doExecute() + migrationQueue.Add(migrationKey) + migrationController.Execute() Expect(len(server.ReceivedRequests())).To(Equal(1)) Expect(migrationQueue.NumRequeues(migrationKey)).Should(Equal(1)) - close(done) - }, 10) + }) - It("should requeue if create of the Target Pod fails ", func(done Done) { + It("should requeue if create of the Target Pod fails ", func() { // Register the expected REST call server.AppendHandlers( @@ -318,12 +322,12 @@ var _ = Describe("Migration", func() { ) migrationCache.Add(migration) - doExecute() + migrationQueue.Add(migrationKey) + migrationController.Execute() Expect(len(server.ReceivedRequests())).To(Equal(3)) Expect(migrationQueue.NumRequeues(migrationKey)).Should(Equal(1)) - close(done) - }, 10) + }) It("should fail if another migration is in process.", func(done Done) { @@ -341,7 +345,8 @@ var _ = Describe("Migration", func() { handlePutMigration(migration, v1.MigrationFailed), ) migrationCache.Add(migration) - doExecute() + migrationQueue.Add(migrationKey) + migrationController.Execute() Expect(len(server.ReceivedRequests())).To(Equal(3)) Expect(migrationQueue.NumRequeues(migrationKey)).Should(Equal(0)) @@ -364,7 +369,8 @@ var _ = Describe("Migration", func() { handlePutMigrationAuthError(), ) migrationCache.Add(migration) - doExecute() + migrationQueue.Add(migrationKey) + migrationController.Execute() Expect(len(server.ReceivedRequests())).To(Equal(3)) Expect(migrationQueue.NumRequeues(migrationKey)).Should(Equal(1)) @@ -394,10 +400,11 @@ var _ = Describe("Migration", func() { server.AppendHandlers( handleGetTestVM(expectedVM0), handleGetPodList(unmatchedPodList), - handlePutMigration(migration, v1.MigrationScheduled), + handlePutMigration(migration, v1.MigrationRunning), ) migrationCache.Add(migration) - doExecute() + migrationQueue.Add(migrationKey) + migrationController.Execute() Expect(len(server.ReceivedRequests())).To(Equal(3)) Expect(migrationQueue.NumRequeues(migrationKey)).Should(Equal(0)) @@ -422,10 +429,6 @@ var _ = Describe("Migration", func() { // Register the expected REST call expectedVM0 := buildExpectedVM(v1.Running) - expectedVM1 := buildExpectedVM(v1.Migrating) - expectedVM1.Status.MigrationNodeName = destinationNodeName - expectedVM2 := buildExpectedVM(v1.Running) - expectedVM2.Status.MigrationNodeName = destinationNodeName migrationPodList := clientv1.PodList{} migrationPodList.Items = []clientv1.Pod{ @@ -435,10 +438,11 @@ var _ = Describe("Migration", func() { server.AppendHandlers( handleGetTestVM(expectedVM0), handleGetPodList(unmatchedPodList), - handlePutMigration(migration, v1.MigrationScheduled), + handlePutMigration(migration, v1.MigrationRunning), ) migrationCache.Add(migration) - doExecute() + migrationQueue.Add(migrationKey) + migrationController.Execute() Expect(len(server.ReceivedRequests())).To(Equal(3)) Expect(migrationQueue.NumRequeues(migrationKey)).Should(Equal(0)) @@ -451,6 +455,7 @@ var _ = Describe("Migration", func() { unmatchedPodList := clientv1.PodList{} migrationLabel := string(migration.GetObjectMeta().GetUID()) + migration.Status.Phase = v1.MigrationRunning targetPod := mockPod(3, migrationLabel) targetPod.Spec = clientv1.PodSpec{ @@ -465,8 +470,6 @@ var _ = Describe("Migration", func() { expectedVM0 := buildExpectedVM(v1.Running) expectedVM0.Status.MigrationNodeName = "" - expectedVM1 := buildExpectedVM(v1.Migrating) - expectedVM2 := buildExpectedVM(v1.Migrating) expectedVM2.Status.MigrationNodeName = destinationNodeName @@ -478,13 +481,14 @@ var _ = Describe("Migration", func() { server.AppendHandlers( handleGetTestVM(expectedVM0), handleGetPodList(unmatchedPodList), - handlePutMigration(migration, v1.MigrationScheduled), - handlePutVM(expectedVM1), + handlePutVM(expectedVM2), + handleGetPodList(migrationPodList), ) migrationCache.Add(migration) - doExecute() + migrationQueue.Add(migrationKey) + migrationController.Execute() - Expect(len(server.ReceivedRequests())).To(Equal(3)) + Expect(len(server.ReceivedRequests())).To(Equal(4)) Expect(migrationQueue.NumRequeues(migrationKey)).Should(Equal(0)) close(done) @@ -533,7 +537,8 @@ var _ = Describe("Migration", func() { handlePutMigration(migration, v1.MigrationSucceeded), ) migrationCache.Add(migration) - doExecute() + migrationQueue.Add(migrationKey) + migrationController.Execute() Expect(len(server.ReceivedRequests())).To(Equal(5)) Expect(migrationQueue.NumRequeues(migrationKey)).Should(Equal(0)) @@ -581,7 +586,8 @@ var _ = Describe("Migration", func() { handlePutMigration(migration, v1.MigrationFailed), ) migrationCache.Add(migration) - doExecute() + migrationQueue.Add(migrationKey) + migrationController.Execute() Expect(len(server.ReceivedRequests())).To(Equal(5)) Expect(migrationQueue.NumRequeues(migrationKey)).Should(Equal(0)) @@ -591,93 +597,6 @@ var _ = Describe("Migration", func() { }) - Context("Running Migration target Pod for a running VM given", func() { - var ( - srcIp = kubev1.NodeAddress{} - destIp = kubev1.NodeAddress{} - srcNodeIp = kubev1.Node{} - destNodeIp = kubev1.Node{} - srcNode kubev1.Node - targetNode kubev1.Node - ) - - BeforeEach(func() { - srcIp = kubev1.NodeAddress{ - Type: kubev1.NodeInternalIP, - Address: "127.0.0.2", - } - destIp = kubev1.NodeAddress{ - Type: kubev1.NodeInternalIP, - Address: "127.0.0.3", - } - srcNodeIp = kubev1.Node{ - Status: kubev1.NodeStatus{ - Addresses: []kubev1.NodeAddress{srcIp}, - }, - } - destNodeIp = kubev1.Node{ - Status: kubev1.NodeStatus{ - Addresses: []kubev1.NodeAddress{destIp}, - }, - } - srcNode = kubev1.Node{ - ObjectMeta: kubev1.ObjectMeta{ - Name: "sourceNode", - }, - Status: kubev1.NodeStatus{ - Addresses: []kubev1.NodeAddress{srcIp, destIp}, - }, - } - targetNode = kubev1.Node{ - ObjectMeta: kubev1.ObjectMeta{ - Name: "targetNode", - }, - Status: kubev1.NodeStatus{ - Addresses: []kubev1.NodeAddress{destIp, srcIp}, - }, - } - }) - - It("should update the VM Phase and migration target node of the running Pod", func(done Done) { - - // Create a VM which is being scheduled - vm := v1.NewMinimalVM("testvm") - vm.Status.Phase = v1.Running - vm.ObjectMeta.SetUID(uuid.NewUUID()) - vm.Status.NodeName = "sourceNode" - - // Add the VM to the cache - vmCache.Add(vm) - - // Create a target Pod for the VM - pod := mockMigrationPod(vm) - - // Create the expected VM after the update - obj, err := conversion.NewCloner().DeepCopy(vm) - Expect(err).ToNot(HaveOccurred()) - - vmWithMigrationNodeName := obj.(*v1.VM) - vmWithMigrationNodeName.Status.MigrationNodeName = pod.Spec.NodeName - - obj, err = conversion.NewCloner().DeepCopy(vmWithMigrationNodeName) - Expect(err).ToNot(HaveOccurred()) - - vmInMigrationState := obj.(*v1.VM) - vmInMigrationState.Status.Phase = v1.Migrating - migration := v1.NewMinimalMigration("testvm-migration", "testvm") - migration.Status.Phase = v1.MigrationScheduled - - queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) - key, _ := cache.MetaNamespaceKeyFunc(pod) - podCache.Add(pod) - queue.Add(key) - migrationPodDispatch.Execute(podCache, queue, key) - - Expect(migrationQueue.Len()).To(Equal(1)) - close(done) - }, 10) - }) - Context("Pod Investigation", func() { var ( podList kubev1.PodList diff --git a/tests/vm_migration_test.go b/tests/vm_migration_test.go index 42addc11c14a..4aef03eee41d 100644 --- a/tests/vm_migration_test.go +++ b/tests/vm_migration_test.go @@ -52,7 +52,7 @@ var _ = Describe("VmMigration", func() { }, TIMEOUT, POLLING_INTERVAL).Should(Equal(v1.MigrationFailed)) }) - It("Should go to MigrationScheduled state if the VM exists", func(done Done) { + It("Should go to MigrationRunning state if the VM exists", func(done Done) { vm, err := restClient.Post().Resource("vms").Namespace(k8sv1.NamespaceDefault).Body(sourceVM).Do().Get() Expect(err).ToNot(HaveOccurred()) tests.WaitForSuccessfulVMStart(vm) @@ -66,7 +66,7 @@ var _ = Describe("VmMigration", func() { Expect(err).ToNot(HaveOccurred()) var m *v1.Migration = obj.(*v1.Migration) return m.Status.Phase - }, TIMEOUT, POLLING_INTERVAL).Should(Equal(v1.MigrationScheduled)) + }, TIMEOUT, POLLING_INTERVAL).Should(Equal(v1.MigrationRunning)) close(done) }, 30) @@ -119,7 +119,6 @@ var _ = Describe("VmMigration", func() { migratedVM := obj.(*v1.VM) Expect(migratedVM.Status.Phase).To(Equal(v1.Running)) Expect(migratedVM.Status.NodeName).ToNot(Equal(sourceNode)) - } close(done) }, 180)