diff --git a/pkg/virt-handler/vm.go b/pkg/virt-handler/vm.go index f2401af002f8..32ab64c0b56e 100644 --- a/pkg/virt-handler/vm.go +++ b/pkg/virt-handler/vm.go @@ -21,7 +21,6 @@ package virthandler import ( "fmt" - "net/http" "strings" "github.com/jeevatkm/go-model" @@ -69,6 +68,8 @@ type VMHandlerDispatch struct { func (d *VMHandlerDispatch) Execute(store cache.Store, queue workqueue.RateLimitingInterface, key interface{}) { + shouldDeleteVm := false + // Fetch the latest Vm state from cache obj, exists, err := store.GetByKey(key.(string)) @@ -87,66 +88,32 @@ func (d *VMHandlerDispatch) Execute(store cache.Store, queue workqueue.RateLimit return } vm = v1.NewVMReferenceFromName(name) - - // If we don't have the VM in the cache, it could be that it is currently migrating to us - result := d.restClient.Get().Name(vm.GetObjectMeta().GetName()).Resource("vms").Namespace(kubeapi.NamespaceDefault).Do() - if result.Error() == nil { - // So the VM still seems to exist - fetchedVM, err := result.Get() - if err != nil { - // Since there was no fetch error, this should have worked, let's back off - queue.AddRateLimited(key) - return - } - if fetchedVM.(*v1.VM).Status.MigrationNodeName == d.host { - // OK, this VM is migrating to us, don't interrupt it - queue.Forget(key) - return - } - } else if result.Error().(*errors.StatusError).Status().Code != int32(http.StatusNotFound) { - // Something went wrong, let's try again later - queue.AddRateLimited(key) - return - } - // The VM is deleted on the cluster, let's go on with the deletion on the host } else { vm = obj.(*v1.VM) } - logging.DefaultLogger().V(3).Info().Object(vm).Msg("Processing VM update.") - // Process the VM + // Check For Migration before processing vm not in our cache if !exists { - // Since the VM was not in the cache, we delete it - err = d.domainManager.KillVM(vm) - } else if isWorthSyncing(vm) { - // Synchronize the VM state - vm, err = MapPersistentVolumes(vm, d.clientset.CoreV1().RESTClient(), kubeapi.NamespaceDefault) - - if err == nil { - // TODO check if found VM has the same UID like the domain, if not, delete the Domain first - - // Only sync if the VM is not marked as migrating. Everything except shutting down the VM is not permitted when it is migrating. - // TODO MigrationNodeName should be a pointer - if vm.Status.MigrationNodeName == "" { - err = d.domainManager.SyncVM(vm) - } else { - queue.Forget(key) - return - } + // If we don't have the VM in the cache, it could be that it is currently migrating to us + isDestination, err := d.isMigrationDestination(vm.GetObjectMeta().GetName()) + if err != nil { + // unable to determine migration status, we'll try again later. + queue.AddRateLimited(key) + return } - // Update VM status to running - if err == nil && vm.Status.Phase != v1.Running { - obj, err = kubeapi.Scheme.Copy(vm) - if err == nil { - vm = obj.(*v1.VM) - vm.Status.Phase = v1.Running - err = d.restClient.Put().Resource("vms").Body(vm). - Name(vm.ObjectMeta.Name).Namespace(kubeapi.NamespaceDefault).Do().Error() - } + if isDestination { + // OK, this VM is migrating to us, don't interrupt it. + queue.Forget(key) + return } + // The VM is deleted on the cluster, continue with processing the deletion on the host. + shouldDeleteVm = true } + logging.DefaultLogger().V(3).Info().Object(vm).Msg("Processing VM update.") + // Process the VM + err = d.processVmUpdate(vm, shouldDeleteVm) if err != nil { // Something went wrong, reenqueue the item with a delay logging.DefaultLogger().Error().Object(vm).Reason(err).Msg("Synchronizing the VM failed.") @@ -226,6 +193,77 @@ func MapPersistentVolumes(vm *v1.VM, restClient cache.Getter, namespace string) return vmCopy, nil } +func (d *VMHandlerDispatch) processVmUpdate(vm *v1.VM, shouldDeleteVm bool) error { + + if shouldDeleteVm { + // Since the VM was not in the cache, we delete it + return d.domainManager.KillVM(vm) + } else if isWorthSyncing(vm) == false { + // nothing to do here. + return nil + } + + // Synchronize the VM state + vm, err := MapPersistentVolumes(vm, d.clientset.CoreV1().RESTClient(), kubeapi.NamespaceDefault) + if err != nil { + return err + } + + // TODO MigrationNodeName should be a pointer + if vm.Status.MigrationNodeName != "" { + // Only sync if the VM is not marked as migrating. + // Everything except shutting down the VM is not + // permitted when it is migrating. + return nil + } + + // TODO check if found VM has the same UID like the domain, + // if not, delete the Domain first + err = d.domainManager.SyncVM(vm) + if err != nil { + return err + } + + // Update VM status to running + if vm.Status.Phase != v1.Running { + obj, err := kubeapi.Scheme.Copy(vm) + if err != nil { + return err + } + vm = obj.(*v1.VM) + vm.Status.Phase = v1.Running + err = d.restClient.Put().Resource("vms").Body(vm). + Name(vm.ObjectMeta.Name).Namespace(kubeapi.NamespaceDefault).Do().Error() + if err != nil { + return err + } + } + + return nil +} + +func (d *VMHandlerDispatch) isMigrationDestination(vmName string) (bool, error) { + + // If we don't have the VM in the cache, it could be that it is currently migrating to us + result := d.restClient.Get().Name(vmName).Resource("vms").Namespace(kubeapi.NamespaceDefault).Do() + if result.Error() == nil { + // So the VM still seems to exist + fetchedVM, err := result.Get() + if err != nil { + return false, err + } + if fetchedVM.(*v1.VM).Status.MigrationNodeName == d.host { + return true, nil + } + } else if !errors.IsNotFound(result.Error()) { + // Something went wrong, let's try again later + return false, result.Error() + } + + // VM object was not found. + return false, nil +} + func isWorthSyncing(vm *v1.VM) bool { return vm.Status.Phase != v1.Succeeded && vm.Status.Phase != v1.Failed }