Skip to content

Commit

Permalink
Implement error handling for migration controll loop
Browse files Browse the repository at this point in the history
This is not a solid final implementation, it can still have races. We
need to write our controller in a different way to make it completely
resilient.
  • Loading branch information
rmohr committed Mar 14, 2017
1 parent 907a739 commit 2805a13
Show file tree
Hide file tree
Showing 9 changed files with 119 additions and 120 deletions.
2 changes: 1 addition & 1 deletion cluster/migration.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: kubevirt.io/v1alpha1
kind: Migration
metadata:
name: testvm-migration
generateName: testvm-migration
spec:
selector:
name: testvm
10 changes: 0 additions & 10 deletions pkg/virt-controller/services/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,16 +140,6 @@ func (v *vmService) SetupMigration(migration *corev1.Migration, vm *corev1.VM) e
if err == nil {
_, err = v.KubeCli.CoreV1().Pods(v1.NamespaceDefault).Create(pod)
}
if err == nil {
migration.Status.Phase = corev1.MigrationInProgress
} else {
migration.Status.Phase = corev1.MigrationFailed
}

err2 := v.UpdateMigration(migration)
if err2 != nil {
err = err2
}
return err
}

Expand Down
14 changes: 1 addition & 13 deletions pkg/virt-controller/services/vm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,22 +64,10 @@ var _ = Describe("VM", func() {
ghttp.VerifyRequest("POST", "/api/v1/namespaces/default/pods"),
ghttp.RespondWithJSONEncoded(http.StatusOK, pod),
),

ghttp.CombineHandlers(
ghttp.VerifyRequest("PUT", "/apis/kubevirt.io/v1alpha1/namespaces/default/migrations/test-vm-migration"),
ghttp.VerifyJSONRepresenting(expected_migration),
ghttp.RespondWithJSONEncoded(http.StatusOK, migration),
),

/*ghttp.CombineHandlers(
ghttp.VerifyRequest("PUT", "/apis/kubevirt.io/v1alpha1/namespaces/default/vms/testvm"),
ghttp.VerifyJSONRepresenting(vm),
ghttp.RespondWithJSONEncoded(http.StatusOK, ""),
),*/
)
err := vmService.SetupMigration(migration, vm)
Expect(err).ToNot(HaveOccurred())
Expect(len(server.ReceivedRequests())).To(Equal(2))
Expect(len(server.ReceivedRequests())).To(Equal(1))

})
})
Expand Down
1 change: 1 addition & 0 deletions pkg/virt-controller/watch/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func NewJobControllerWithListWatch(vmService services.VMService, _ record.EventR
if vmExists && vm.Status.Phase == kvirtv1.Migrating {
vm.Status.Phase = kvirtv1.Running
if job.Status.Phase == v1.PodSucceeded {
vm.ObjectMeta.Labels[kvirtv1.NodeNameLabel] = vm.Status.MigrationNodeName
vm.Status.NodeName = vm.Status.MigrationNodeName
}
vm.Status.MigrationNodeName = ""
Expand Down
1 change: 1 addition & 0 deletions pkg/virt-controller/watch/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ var _ = Describe("Migration", func() {

vm = kvirtv1.NewMinimalVM("test-vm")
vm.Status.Phase = kvirtv1.Migrating
vm.GetObjectMeta().SetLabels(map[string]string{"a": "b"})

// Start the controller
jobController.StartInformer(stopChan)
Expand Down
200 changes: 106 additions & 94 deletions pkg/virt-controller/watch/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package watch

import (
"fmt"
"github.com/jeevatkm/go-model"
kubeapi "k8s.io/client-go/pkg/api"
k8sv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/pkg/fields"
"k8s.io/client-go/pkg/util/workqueue"
"k8s.io/client-go/rest"
Expand All @@ -12,17 +12,15 @@ import (
"kubevirt.io/kubevirt/pkg/api/v1"
"kubevirt.io/kubevirt/pkg/kubecli"
"kubevirt.io/kubevirt/pkg/logging"
"kubevirt.io/kubevirt/pkg/middleware"
"kubevirt.io/kubevirt/pkg/precond"
"kubevirt.io/kubevirt/pkg/virt-controller/services"
)

func NewMigrationController(migrationService services.VMService, recorder record.EventRecorder, restClient *rest.RESTClient) (cache.Store, *kubecli.Controller) {
lw := cache.NewListWatchFromClient(restClient, "migrations", kubeapi.NamespaceDefault, fields.Everything())
return NewMigrationControllerWithListWatch(migrationService, recorder, lw, restClient)
lw := cache.NewListWatchFromClient(restClient, "migrations", k8sv1.NamespaceDefault, fields.Everything())
return NewMigrationControllerWithListWatch(migrationService, recorder, lw)
}

func NewMigrationControllerWithListWatch(migrationService services.VMService, _ record.EventRecorder, lw cache.ListerWatcher, restClient *rest.RESTClient) (cache.Store, *kubecli.Controller) {
func NewMigrationControllerWithListWatch(migrationService services.VMService, _ record.EventRecorder, lw cache.ListerWatcher) (cache.Store, *kubecli.Controller) {

queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
return kubecli.NewController(lw, queue, &v1.Migration{}, func(store cache.Store, queue workqueue.RateLimitingInterface) bool {
Expand All @@ -41,104 +39,122 @@ func NewMigrationControllerWithListWatch(migrationService services.VMService, _
}
if exists {
var migration *v1.Migration = obj.(*v1.Migration)
logger := logging.DefaultLogger().Object(migration)
if migration.Status.Phase == v1.MigrationUnknown {
migrationCopy := copyMigration(migration)
logger := logging.DefaultLogger().Object(&migrationCopy)
if err := StartMigrationTargetPod(migrationService, &migrationCopy); err != nil {
handleStartMigrationError(logger, err, migrationService, migrationCopy)
// Copy migration for future modifications
migrationCopy, err := copy(migration)
if err != nil {
logger.Error().Reason(err).Msg("could not copy migration object")
queue.AddRateLimited(key)
return true
}
// Fetch vm which we want to migrate
vm, exists, err := migrationService.FetchVM(migration.Spec.Selector.Name)
if err != nil {
logger.Error().Reason(err).Msgf("fetching the vm %s failed", migration.Spec.Selector.Name)
queue.AddRateLimited(key)
return true
}
if !exists {
logger.Info().Msgf("VM with name %s does not exist, marking migration as failed", migration.Spec.Selector.Name)
migrationCopy.Status.Phase = v1.MigrationFailed
// TODO indicate why it was set to failed
err := migrationService.UpdateMigration(migrationCopy)
if err != nil {
logger.Error().Reason(err).Msg("updating migration state failed")
queue.AddRateLimited(key)
return true
}
queue.Forget(key)
return true
}
if vm.Status.Phase != v1.Running {
logger.Error().Msgf("VM with name %s is in state %s, no migration possible. Marking migration as failed", vm.GetObjectMeta().GetName(), vm.Status.Phase)
migrationCopy.Status.Phase = v1.MigrationFailed
// TODO indicate why it was set to failed
err := migrationService.UpdateMigration(migrationCopy)
if err != nil {
logger.Error().Reason(err).Msg("updating migration state failed")
queue.AddRateLimited(key)
return true
}
queue.Forget(key)
return true
}

if err := mergeConstraints(migration, vm); err != nil {
logger.Error().Reason(err).Msg("merging Migration and VM placement constraints failed.")
queue.AddRateLimited(key)
return true
}
podList, err := migrationService.GetRunningVMPods(vm)
if err != nil {
logger.Error().Reason(err).Msg("could not fetch a list of running VM target pods")
queue.AddRateLimited(key)
return true
}

numOfPods, migrationPodExists := investigateTargetPodSituation(migration, podList)
if numOfPods > 1 && !migrationPodExists {
// Another migration is currently going on
logger.Error().Msg("another migration seems to be in progress, marking Migration as failed")
migrationCopy.Status.Phase = v1.MigrationFailed
// TODO indicate why it was set to failed
err := migrationService.UpdateMigration(migrationCopy)
if err != nil {
logger.Error().Reason(err).Msg("updating migration state failed")
queue.AddRateLimited(key)
return true
}
queue.Forget(key)
return true
} else if numOfPods == 1 && !migrationPodExists {
// We need to start a migration target pod
// TODO, this detection is not optimal, it can lead to strange situations
err := migrationService.SetupMigration(migration, vm)
if err != nil {
logger.Error().Reason(err).Msg("creating am migration target node failed")
queue.AddRateLimited(key)
return true
}
}
logger.Error().Msg("another migration seems to be in progress, marking Migration as failed")
migrationCopy.Status.Phase = v1.MigrationInProgress
// TODO indicate when this has happened
err = migrationService.UpdateMigration(migrationCopy)
if err != nil {
logger.Error().Reason(err).Msg("updating migration state failed")
queue.AddRateLimited(key)
return true
}
}
} else {
cleanupOldMigration(key, queue, migrationService)
}

queue.Forget(key)
return true
})
}
func cleanupOldMigration(key interface{}, queue workqueue.RateLimitingInterface, migrationService services.VMService) {
var migration *v1.Migration
_, name, err := cache.SplitMetaNamespaceKey(key.(string))
if err != nil {
// TODO do something more smart here
queue.AddRateLimited(key)
} else {
migration = v1.NewMigrationReferenceFromName(name)
err = migrationService.DeleteMigrationTargetPods(migration)
logger := logging.DefaultLogger().Object(migration)

if err != nil {
logger.Error().Reason(err).Msg("Deleting VM target Pod failed.")
}
logger.Info().Msg("Deleting VM target Pod succeeded.")
}
}

func handleStartMigrationError(logger *logging.FilteredLogger, err error, migrationService services.VMService, migrationCopy v1.Migration) {
logger.Error().Reason(err).Msg("Defining a target pod for the Migration.")
pl, err := migrationService.GetRunningMigrationPods(&migrationCopy)
func copy(migration *v1.Migration) (*v1.Migration, error) {
obj, err := kubeapi.Scheme.Copy(migration)
if err != nil {
logger.Error().Reason(err).Msg("Getting running Pod for the Migration failed.")
return
}
for _, p := range pl.Items {
if p.GetObjectMeta().GetLabels()["kubevirt.io/vmUID"] == string(migrationCopy.GetObjectMeta().GetUID()) {
// Pod from incomplete initialization detected, cleaning up
logger.Error().Msgf("Found orphan pod with name '%s' for Migration.", p.GetName())
err = migrationService.DeleteMigrationTargetPods(&migrationCopy)
if err != nil {
logger.Critical().Reason(err).Msgf("Deleting orphaned pod with name '%s' for Migration failed.", p.GetName())
break
}
} else {
// TODO virt-api should make sure this does not happen. For now don't ask and clean up.
// Pod from old VM object detected,
logger.Error().Msgf("Found orphan pod with name '%s' for deleted VM.", p.GetName())

err = migrationService.DeleteMigrationTargetPods(&migrationCopy)
if err != nil {
logger.Critical().Reason(err).Msgf("Deleting orphaned pod with name '%s' for Migration failed.", p.GetName())
break
}
}
return nil, err
}

}
func copyMigration(migration *v1.Migration) v1.Migration {
migrationCopy := v1.Migration{}
model.Copy(&migrationCopy, migration)
return migrationCopy
return obj.(*v1.Migration), nil
}

func StartMigrationTargetPod(v services.VMService, migration *v1.Migration) error {
precond.MustNotBeNil(migration)
precond.MustNotBeEmpty(migration.ObjectMeta.Name)
precond.MustNotBeEmpty(string(migration.ObjectMeta.UID))

vm, exists, err := v.FetchVM(migration.Spec.Selector.Name)
if err != nil || !exists {
migration.Status.Phase = v1.MigrationFailed
err2 := v.UpdateMigration(migration)
if err2 != nil {
return err2
// Returns the number of running pods and if a pod for exactly that migration is currently running
func investigateTargetPodSituation(migration *v1.Migration, podList *k8sv1.PodList) (int, bool) {
podExists := false
for _, pod := range podList.Items {
if pod.Labels[v1.MigrationUIDLabel] == string(migration.GetObjectMeta().GetUID()) {
podExists = true
}
// Report the error with the migration in the controller log
return err
}

podList, err := v.GetRunningVMPods(vm)
if err != nil {
return err
}

if len(podList.Items) < 1 {
return middleware.NewResourceConflictError(fmt.Sprintf("VM %s Pod does not exist", vm.GetObjectMeta().GetName()))
}

// If there are more than one pod in other states than Succeeded or Failed we can't go on
if len(podList.Items) > 1 {
return middleware.NewResourceConflictError(fmt.Sprintf("VM %s Pod is already migrating", vm.GetObjectMeta().GetName()))
}
return len(podList.Items), podExists
}

//TODO: detect collisions
func mergeConstraints(migration *v1.Migration, vm *v1.VM) error {
conflicts := []string{}
for k, v := range migration.Spec.NodeSelector {
if _, exists := vm.Spec.NodeSelector[k]; exists {
Expand All @@ -150,9 +166,5 @@ func StartMigrationTargetPod(v services.VMService, migration *v1.Migration) erro
if len(conflicts) > 0 {
return fmt.Errorf("Conflicting node selectors: %v", conflicts)
}

err = v.SetupMigration(migration, vm)

// Report the result of the `Create` call
return err
return nil
}
2 changes: 1 addition & 1 deletion pkg/virt-controller/watch/migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ var _ = Describe("Migration", func() {
lw = framework.NewFakeControllerSource()
migrationCache = cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, nil)

_, migrationController = NewMigrationControllerWithListWatch(vmService, nil, lw, restClient)
_, migrationController = NewMigrationControllerWithListWatch(vmService, nil, lw)

// Start the controller
migrationController.StartInformer(stopChan)
Expand Down
5 changes: 5 additions & 0 deletions pkg/virt-controller/watch/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ func NewPodControllerWithListWatch(vmCache cache.Store, _ record.EventRecorder,
return true
}
migration := obj.(*corev1.Migration)
if migration.Status.Phase == corev1.MigrationUnknown {
logger.Info().Msg("migration not yet in right state, backing off")
queue.AddRateLimited(key)
return true
}

obj, err = kubeapi.Scheme.Copy(vm)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion pkg/virt-controller/watch/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,14 @@ var _ = Describe("Pod", func() {

vmInMigrationState := obj.(*v1.VM)
vmInMigrationState.Status.Phase = v1.Migrating
migration := v1.NewMinimalMigration("testvm-migration", "testvm")
migration.Status.Phase = v1.MigrationInProgress

// Register the expected REST call
server.AppendHandlers(
ghttp.CombineHandlers(
ghttp.VerifyRequest("GET", "/apis/kubevirt.io/v1alpha1/namespaces/default/migrations/testvm-migration"),
ghttp.RespondWithJSONEncoded(http.StatusOK, v1.NewMinimalMigration("testvm-migration", "testvm")),
ghttp.RespondWithJSONEncoded(http.StatusOK, migration),
),
ghttp.CombineHandlers(
ghttp.VerifyRequest("PUT", "/apis/kubevirt.io/v1alpha1/namespaces/default/vms/testvm"),
Expand Down

0 comments on commit 2805a13

Please sign in to comment.