Skip to content

Commit

Permalink
Simplify vm key update code path
Browse files Browse the repository at this point in the history
  • Loading branch information
davidvossel committed Jun 7, 2017
1 parent b349eb9 commit 3abe722
Showing 1 changed file with 89 additions and 51 deletions.
140 changes: 89 additions & 51 deletions pkg/virt-handler/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package virthandler

import (
"fmt"
"net/http"
"strings"

"github.com/jeevatkm/go-model"
Expand Down Expand Up @@ -69,6 +68,8 @@ type VMHandlerDispatch struct {

func (d *VMHandlerDispatch) Execute(store cache.Store, queue workqueue.RateLimitingInterface, key interface{}) {

shouldDeleteVm := false

// Fetch the latest Vm state from cache
obj, exists, err := store.GetByKey(key.(string))

Expand All @@ -87,66 +88,32 @@ func (d *VMHandlerDispatch) Execute(store cache.Store, queue workqueue.RateLimit
return
}
vm = v1.NewVMReferenceFromName(name)

// If we don't have the VM in the cache, it could be that it is currently migrating to us
result := d.restClient.Get().Name(vm.GetObjectMeta().GetName()).Resource("vms").Namespace(kubeapi.NamespaceDefault).Do()
if result.Error() == nil {
// So the VM still seems to exist
fetchedVM, err := result.Get()
if err != nil {
// Since there was no fetch error, this should have worked, let's back off
queue.AddRateLimited(key)
return
}
if fetchedVM.(*v1.VM).Status.MigrationNodeName == d.host {
// OK, this VM is migrating to us, don't interrupt it
queue.Forget(key)
return
}
} else if result.Error().(*errors.StatusError).Status().Code != int32(http.StatusNotFound) {
// Something went wrong, let's try again later
queue.AddRateLimited(key)
return
}
// The VM is deleted on the cluster, let's go on with the deletion on the host
} else {
vm = obj.(*v1.VM)
}
logging.DefaultLogger().V(3).Info().Object(vm).Msg("Processing VM update.")

// Process the VM
// Check For Migration before processing vm not in our cache
if !exists {
// Since the VM was not in the cache, we delete it
err = d.domainManager.KillVM(vm)
} else if isWorthSyncing(vm) {
// Synchronize the VM state
vm, err = MapPersistentVolumes(vm, d.clientset.CoreV1().RESTClient(), kubeapi.NamespaceDefault)

if err == nil {
// TODO check if found VM has the same UID like the domain, if not, delete the Domain first

// Only sync if the VM is not marked as migrating. Everything except shutting down the VM is not permitted when it is migrating.
// TODO MigrationNodeName should be a pointer
if vm.Status.MigrationNodeName == "" {
err = d.domainManager.SyncVM(vm)
} else {
queue.Forget(key)
return
}
// If we don't have the VM in the cache, it could be that it is currently migrating to us
isDestination, err := d.isMigrationDestination(vm.GetObjectMeta().GetName())
if err != nil {
// unable to determine migration status, we'll try again later.
queue.AddRateLimited(key)
return
}

// Update VM status to running
if err == nil && vm.Status.Phase != v1.Running {
obj, err = kubeapi.Scheme.Copy(vm)
if err == nil {
vm = obj.(*v1.VM)
vm.Status.Phase = v1.Running
err = d.restClient.Put().Resource("vms").Body(vm).
Name(vm.ObjectMeta.Name).Namespace(kubeapi.NamespaceDefault).Do().Error()
}
if isDestination {
// OK, this VM is migrating to us, don't interrupt it.
queue.Forget(key)
return
}
// The VM is deleted on the cluster, continue with processing the deletion on the host.
shouldDeleteVm = true
}
logging.DefaultLogger().V(3).Info().Object(vm).Msg("Processing VM update.")

// Process the VM
err = d.processVmUpdate(vm, shouldDeleteVm)
if err != nil {
// Something went wrong, reenqueue the item with a delay
logging.DefaultLogger().Error().Object(vm).Reason(err).Msg("Synchronizing the VM failed.")
Expand Down Expand Up @@ -226,6 +193,77 @@ func MapPersistentVolumes(vm *v1.VM, restClient cache.Getter, namespace string)
return vmCopy, nil
}

func (d *VMHandlerDispatch) processVmUpdate(vm *v1.VM, shouldDeleteVm bool) error {

if shouldDeleteVm {
// Since the VM was not in the cache, we delete it
return d.domainManager.KillVM(vm)
} else if isWorthSyncing(vm) == false {
// nothing to do here.
return nil
}

// Synchronize the VM state
vm, err := MapPersistentVolumes(vm, d.clientset.CoreV1().RESTClient(), kubeapi.NamespaceDefault)
if err != nil {
return err
}

// TODO MigrationNodeName should be a pointer
if vm.Status.MigrationNodeName != "" {
// Only sync if the VM is not marked as migrating.
// Everything except shutting down the VM is not
// permitted when it is migrating.
return nil
}

// TODO check if found VM has the same UID like the domain,
// if not, delete the Domain first
err = d.domainManager.SyncVM(vm)
if err != nil {
return err
}

// Update VM status to running
if vm.Status.Phase != v1.Running {
obj, err := kubeapi.Scheme.Copy(vm)
if err != nil {
return err
}
vm = obj.(*v1.VM)
vm.Status.Phase = v1.Running
err = d.restClient.Put().Resource("vms").Body(vm).
Name(vm.ObjectMeta.Name).Namespace(kubeapi.NamespaceDefault).Do().Error()
if err != nil {
return err
}
}

return nil
}

func (d *VMHandlerDispatch) isMigrationDestination(vmName string) (bool, error) {

// If we don't have the VM in the cache, it could be that it is currently migrating to us
result := d.restClient.Get().Name(vmName).Resource("vms").Namespace(kubeapi.NamespaceDefault).Do()
if result.Error() == nil {
// So the VM still seems to exist
fetchedVM, err := result.Get()
if err != nil {
return false, err
}
if fetchedVM.(*v1.VM).Status.MigrationNodeName == d.host {
return true, nil
}
} else if !errors.IsNotFound(result.Error()) {
// Something went wrong, let's try again later
return false, result.Error()
}

// VM object was not found.
return false, nil
}

func isWorthSyncing(vm *v1.VM) bool {
return vm.Status.Phase != v1.Succeeded && vm.Status.Phase != v1.Failed
}

0 comments on commit 3abe722

Please sign in to comment.