diff --git a/cluster/migration.yaml b/cluster/migration.yaml index 95a446a52608..e397f160a481 100644 --- a/cluster/migration.yaml +++ b/cluster/migration.yaml @@ -1,6 +1,7 @@ apiVersion: kubevirt.io/v1alpha1 kind: Migration metadata: - name: testvm-migration + generateName: testvm-migration spec: - migratingVMName: testvm + selector: + name: testvm diff --git a/cluster/vagrant/setup_kubernetes_common.sh b/cluster/vagrant/setup_kubernetes_common.sh index e5098e00cd27..87294e1dc5b3 100755 --- a/cluster/vagrant/setup_kubernetes_common.sh +++ b/cluster/vagrant/setup_kubernetes_common.sh @@ -50,7 +50,7 @@ repo_gpgcheck=1 gpgkey=https://packages.cloud.google.com/yum/doc/yum-key.gpg https://packages.cloud.google.com/yum/doc/rpm-package-key.gpg EOF -yum install --nogpgcheck -y docker kubelet kubeadm kubectl kubernetes-cni +yum install -y docker kubelet kubeadm kubectl kubernetes-cni # To get the qemu user and libvirt yum install -y qemu-common qemu-kvm qemu-system-x86 libcgroup-tools libvirt || : diff --git a/cluster/vm.json b/cluster/vm.json index 12b16b70e8f1..58c60e55c690 100644 --- a/cluster/vm.json +++ b/cluster/vm.json @@ -5,9 +5,6 @@ "apiVersion": "kubevirt.io/v1alpha1", "kind": "VM", "spec": { - "nodeSelector": { - "kubernetes.io/hostname": "master" - }, "domain": { "devices": { "disks": [ @@ -15,7 +12,9 @@ "device": "disk", "driver": { "name": "qemu", - "type": "raw" + "type": "raw", + "cache": "none" + }, "snapshot": "external", "source": { diff --git a/cluster/vm.yaml b/cluster/vm.yaml index f7a4e1898245..17ef41c64086 100644 --- a/cluster/vm.yaml +++ b/cluster/vm.yaml @@ -53,5 +53,3 @@ spec: type: os: hvm type: qemu - nodeSelector: - kubernetes.io/hostname: master diff --git a/cmd/virt-controller/virt-controller.go b/cmd/virt-controller/virt-controller.go index bf0dc9d4abcc..988b01fcf620 100644 --- a/cmd/virt-controller/virt-controller.go +++ b/cmd/virt-controller/virt-controller.go @@ -86,7 +86,7 @@ func main() { go migrationController.Run(1, stop) migrationController.WaitForSync(stop) - _, jobController := watch.NewJobController(vmService, nil, restClient) + _, jobController := watch.NewJobController(vmService, nil, clientSet, restClient) jobController.StartInformer(stop) go jobController.Run(1, stop) jobController.WaitForSync(stop) diff --git a/pkg/api/v1/types.go b/pkg/api/v1/types.go index 4a6d9822e236..6f73f5a1593e 100644 --- a/pkg/api/v1/types.go +++ b/pkg/api/v1/types.go @@ -202,10 +202,12 @@ const ( ) const ( - AppLabel string = "kubevirt.io/app" - DomainLabel string = "kubevirt.io/domain" - UIDLabel string = "kubevirt.io/vmUID" - NodeNameLabel string = "kubevirt.io/nodeName" + AppLabel string = "kubevirt.io/app" + DomainLabel string = "kubevirt.io/domain" + VMUIDLabel string = "kubevirt.io/vmUID" + NodeNameLabel string = "kubevirt.io/nodeName" + MigrationUIDLabel string = "kubevirt.io/migrationUID" + MigrationLabel string = "kubevirt.io/migration" ) func NewVM(name string, uid types.UID) *VM { @@ -294,7 +296,7 @@ func NewSpice(vmName string) *Spice { func NewMinimalMigration(name string, vmName string) *Migration { migration := NewMigrationReferenceFromName(name) migration.Spec = MigrationSpec{ - MigratingVMName: vmName, + Selector: VMSelector{vmName}, } return migration } @@ -326,10 +328,14 @@ type Migration struct { } // MigrationSpec is a description of a VM Migration +// For example "destinationNodeName": "testvm" will migrate a VM called "testvm" in the namespace "default" type MigrationSpec struct { - // The Kubernetes name of the Virtual Machine object to select for one migration. - // For example "destinationNodeName": "testvm" will migrate a VM called "testvm" in the namespace "default" - MigratingVMName string `json:"migratingVMName,omitempty"` + // Criterias for selecting the VM to migrate. + // For example + // selector: + // name: testvm + // will select the VM `testvm` for migration + Selector VMSelector `json:"selector"` // Criteria to use when selecting the destination for the migration // for example, to select by the hostname, specify `kubernetes.io/hostname: master` // other possible choices include the hardware required to run the vm or @@ -340,7 +346,14 @@ type MigrationSpec struct { // randomGenerator: superfastdevice, // app: mysql, // licensedForServiceX: true - DestinationNodeSelector map[string]string `json:"destinationNodeSelector,omitempty"` + // Note that these selectors are additions to the node selectors on the VM itself and they must not exist on the VM. + // If they are conflicting with the VM, no migration will be started. + NodeSelector map[string]string `json:"nodeSelector,omitempty"` +} + +type VMSelector struct { + // Name of the VM to migrate + Name string `json:"name" valid:"required"` } type MigrationPhase string @@ -353,7 +366,7 @@ const ( MigrationPending MigrationPhase = "Pending" // Migration is actively progressing - MigrationInProgress MigrationPhase = "In Progress" + MigrationInProgress MigrationPhase = "InProgress" // Migration has completed successfully MigrationSucceeded MigrationPhase = "Succeeded" diff --git a/pkg/api/v1/types_swagger_generated.go b/pkg/api/v1/types_swagger_generated.go index 556b252d25b2..5c89da934944 100644 --- a/pkg/api/v1/types_swagger_generated.go +++ b/pkg/api/v1/types_swagger_generated.go @@ -45,9 +45,15 @@ func (Migration) SwaggerDoc() map[string]string { func (MigrationSpec) SwaggerDoc() map[string]string { return map[string]string{ - "": "MigrationSpec is a description of a VM Migration", - "migratingVMName": "The Kubernetes name of the Virtual Machine object to select for one migration.\nFor example \"destinationNodeName\": \"testvm\" will migrate a VM called \"testvm\" in the namespace \"default\"", - "destinationNodeSelector": "Criteria to use when selecting the destination for the migration\nfor example, to select by the hostname, specify `kubernetes.io/hostname: master`\nother possible choices include the hardware required to run the vm or\nor lableing of the nodes to indicate their roles in larger applications.\nexamples:\ndisktype: ssd,\nrandomGenerator: /dev/random,\nrandomGenerator: superfastdevice,\napp: mysql,\nlicensedForServiceX: true", + "": "MigrationSpec is a description of a VM Migration\nFor example \"destinationNodeName\": \"testvm\" will migrate a VM called \"testvm\" in the namespace \"default\"", + "selector": "Criterias for selecting the VM to migrate.\nFor example\nselector:\n name: testvm\nwill select the VM `testvm` for migration", + "nodeSelector": "Criteria to use when selecting the destination for the migration\nfor example, to select by the hostname, specify `kubernetes.io/hostname: master`\nother possible choices include the hardware required to run the vm or\nor lableing of the nodes to indicate their roles in larger applications.\nexamples:\ndisktype: ssd,\nrandomGenerator: /dev/random,\nrandomGenerator: superfastdevice,\napp: mysql,\nlicensedForServiceX: true\nNote that these selectors are additions to the node selectors on the VM itself and they must not exist on the VM.\nIf they are conflicting with the VM, no migration will be started.", + } +} + +func (VMSelector) SwaggerDoc() map[string]string { + return map[string]string{ + "name": "Name of the VM to migrate", } } diff --git a/pkg/virt-controller/services/generated_mock_vm.go b/pkg/virt-controller/services/generated_mock_vm.go index 3b5fb53509ec..1ad2ac1d4663 100644 --- a/pkg/virt-controller/services/generated_mock_vm.go +++ b/pkg/virt-controller/services/generated_mock_vm.go @@ -6,8 +6,7 @@ package services import ( gomock "github.com/golang/mock/gomock" v1 "k8s.io/client-go/pkg/api/v1" - v10 "k8s.io/client-go/pkg/apis/batch/v1" - v11 "kubevirt.io/kubevirt/pkg/api/v1" + v10 "kubevirt.io/kubevirt/pkg/api/v1" ) // Mock of VMService interface @@ -31,7 +30,7 @@ func (_m *MockVMService) EXPECT() *_MockVMServiceRecorder { return _m.recorder } -func (_m *MockVMService) StartVMPod(_param0 *v11.VM) error { +func (_m *MockVMService) StartVMPod(_param0 *v10.VM) error { ret := _m.ctrl.Call(_m, "StartVMPod", _param0) ret0, _ := ret[0].(error) return ret0 @@ -41,7 +40,7 @@ func (_mr *_MockVMServiceRecorder) StartVMPod(arg0 interface{}) *gomock.Call { return _mr.mock.ctrl.RecordCall(_mr.mock, "StartVMPod", arg0) } -func (_m *MockVMService) DeleteVMPod(_param0 *v11.VM) error { +func (_m *MockVMService) DeleteVMPod(_param0 *v10.VM) error { ret := _m.ctrl.Call(_m, "DeleteVMPod", _param0) ret0, _ := ret[0].(error) return ret0 @@ -51,7 +50,7 @@ func (_mr *_MockVMServiceRecorder) DeleteVMPod(arg0 interface{}) *gomock.Call { return _mr.mock.ctrl.RecordCall(_mr.mock, "DeleteVMPod", arg0) } -func (_m *MockVMService) GetRunningVMPods(_param0 *v11.VM) (*v1.PodList, error) { +func (_m *MockVMService) GetRunningVMPods(_param0 *v10.VM) (*v1.PodList, error) { ret := _m.ctrl.Call(_m, "GetRunningVMPods", _param0) ret0, _ := ret[0].(*v1.PodList) ret1, _ := ret[1].(error) @@ -62,17 +61,17 @@ func (_mr *_MockVMServiceRecorder) GetRunningVMPods(arg0 interface{}) *gomock.Ca return _mr.mock.ctrl.RecordCall(_mr.mock, "GetRunningVMPods", arg0) } -func (_m *MockVMService) DeleteMigration(_param0 *v11.Migration) error { - ret := _m.ctrl.Call(_m, "DeleteMigration", _param0) +func (_m *MockVMService) DeleteMigrationTargetPods(_param0 *v10.Migration) error { + ret := _m.ctrl.Call(_m, "DeleteMigrationTargetPods", _param0) ret0, _ := ret[0].(error) return ret0 } -func (_mr *_MockVMServiceRecorder) DeleteMigration(arg0 interface{}) *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "DeleteMigration", arg0) +func (_mr *_MockVMServiceRecorder) DeleteMigrationTargetPods(arg0 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "DeleteMigrationTargetPods", arg0) } -func (_m *MockVMService) GetRunningMigrationPods(_param0 *v11.Migration) (*v1.PodList, error) { +func (_m *MockVMService) GetRunningMigrationPods(_param0 *v10.Migration) (*v1.PodList, error) { ret := _m.ctrl.Call(_m, "GetRunningMigrationPods", _param0) ret0, _ := ret[0].(*v1.PodList) ret1, _ := ret[1].(error) @@ -83,7 +82,7 @@ func (_mr *_MockVMServiceRecorder) GetRunningMigrationPods(arg0 interface{}) *go return _mr.mock.ctrl.RecordCall(_mr.mock, "GetRunningMigrationPods", arg0) } -func (_m *MockVMService) SetupMigration(migration *v11.Migration, vm *v11.VM) error { +func (_m *MockVMService) SetupMigration(migration *v10.Migration, vm *v10.VM) error { ret := _m.ctrl.Call(_m, "SetupMigration", migration, vm) ret0, _ := ret[0].(error) return ret0 @@ -93,7 +92,7 @@ func (_mr *_MockVMServiceRecorder) SetupMigration(arg0, arg1 interface{}) *gomoc return _mr.mock.ctrl.RecordCall(_mr.mock, "SetupMigration", arg0, arg1) } -func (_m *MockVMService) UpdateMigration(migration *v11.Migration) error { +func (_m *MockVMService) UpdateMigration(migration *v10.Migration) error { ret := _m.ctrl.Call(_m, "UpdateMigration", migration) ret0, _ := ret[0].(error) return ret0 @@ -103,30 +102,43 @@ func (_mr *_MockVMServiceRecorder) UpdateMigration(arg0 interface{}) *gomock.Cal return _mr.mock.ctrl.RecordCall(_mr.mock, "UpdateMigration", arg0) } -func (_m *MockVMService) FetchVM(vmName string) (*v11.VM, error) { +func (_m *MockVMService) FetchVM(vmName string) (*v10.VM, bool, error) { ret := _m.ctrl.Call(_m, "FetchVM", vmName) - ret0, _ := ret[0].(*v11.VM) - ret1, _ := ret[1].(error) - return ret0, ret1 + ret0, _ := ret[0].(*v10.VM) + ret1, _ := ret[1].(bool) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 } func (_mr *_MockVMServiceRecorder) FetchVM(arg0 interface{}) *gomock.Call { return _mr.mock.ctrl.RecordCall(_mr.mock, "FetchVM", arg0) } -func (_m *MockVMService) StartMigration(vm *v11.VM, sourceNode *v1.Node, targetNode *v1.Node) error { - ret := _m.ctrl.Call(_m, "StartMigration", vm, sourceNode, targetNode) +func (_m *MockVMService) FetchMigration(migrationName string) (*v10.Migration, bool, error) { + ret := _m.ctrl.Call(_m, "FetchMigration", migrationName) + ret0, _ := ret[0].(*v10.Migration) + ret1, _ := ret[1].(bool) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +func (_mr *_MockVMServiceRecorder) FetchMigration(arg0 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "FetchMigration", arg0) +} + +func (_m *MockVMService) StartMigration(migration *v10.Migration, vm *v10.VM, sourceNode *v1.Node, targetNode *v1.Node) error { + ret := _m.ctrl.Call(_m, "StartMigration", migration, vm, sourceNode, targetNode) ret0, _ := ret[0].(error) return ret0 } -func (_mr *_MockVMServiceRecorder) StartMigration(arg0, arg1, arg2 interface{}) *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "StartMigration", arg0, arg1, arg2) +func (_mr *_MockVMServiceRecorder) StartMigration(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "StartMigration", arg0, arg1, arg2, arg3) } -func (_m *MockVMService) GetMigrationJob(vm *v11.VM) (*v10.Job, bool, error) { - ret := _m.ctrl.Call(_m, "GetMigrationJob", vm) - ret0, _ := ret[0].(*v10.Job) +func (_m *MockVMService) GetMigrationJob(migration *v10.Migration) (*v1.Pod, bool, error) { + ret := _m.ctrl.Call(_m, "GetMigrationJob", migration) + ret0, _ := ret[0].(*v1.Pod) ret1, _ := ret[1].(bool) ret2, _ := ret[2].(error) return ret0, ret1, ret2 diff --git a/pkg/virt-controller/services/template.go b/pkg/virt-controller/services/template.go index c6d6366393b8..97ff5411b9ef 100644 --- a/pkg/virt-controller/services/template.go +++ b/pkg/virt-controller/services/template.go @@ -5,10 +5,7 @@ import ( "strconv" "strings" - metav1 "k8s.io/client-go/pkg/apis/meta/v1" - kubev1 "k8s.io/client-go/pkg/api/v1" - batchv1 "k8s.io/client-go/pkg/apis/batch/v1" "kubevirt.io/kubevirt/pkg/api/v1" "kubevirt.io/kubevirt/pkg/logging" "kubevirt.io/kubevirt/pkg/precond" @@ -16,7 +13,7 @@ import ( type TemplateService interface { RenderLaunchManifest(*v1.VM) (*kubev1.Pod, error) - RenderMigrationJob(*v1.VM, *kubev1.Node, *kubev1.Node) (*batchv1.Job, error) + RenderMigrationJob(*v1.VM, *kubev1.Node, *kubev1.Node) (*kubev1.Pod, error) } type templateService struct { @@ -56,7 +53,7 @@ func (t *templateService) RenderLaunchManifest(vm *v1.VM) (*kubev1.Pod, error) { Labels: map[string]string{ v1.AppLabel: "virt-launcher", v1.DomainLabel: domain, - v1.UIDLabel: uid, + v1.VMUIDLabel: uid, }, }, Spec: kubev1.PodSpec{ @@ -69,7 +66,7 @@ func (t *templateService) RenderLaunchManifest(vm *v1.VM) (*kubev1.Pod, error) { return &pod, nil } -func (t *templateService) RenderMigrationJob(vm *v1.VM, sourceNode *kubev1.Node, targetNode *kubev1.Node) (*batchv1.Job, error) { +func (t *templateService) RenderMigrationJob(vm *v1.VM, sourceNode *kubev1.Node, targetNode *kubev1.Node) (*kubev1.Pod, error) { srcAddr := "" dstAddr := "" for _, addr := range sourceNode.Status.Addresses { @@ -98,29 +95,22 @@ func (t *templateService) RenderMigrationJob(vm *v1.VM, sourceNode *kubev1.Node, } destUri := fmt.Sprintf("qemu+tcp://%s/system", dstAddr) - job := batchv1.Job{ + job := kubev1.Pod{ ObjectMeta: kubev1.ObjectMeta{ GenerateName: "virt-migration", Labels: map[string]string{ v1.DomainLabel: vm.GetObjectMeta().GetName(), + v1.AppLabel: "migration", }, }, - TypeMeta: metav1.TypeMeta{ - Kind: "Job", - APIVersion: "batch/v1", - }, - Spec: batchv1.JobSpec{ - Template: kubev1.PodTemplateSpec{ - Spec: kubev1.PodSpec{ - RestartPolicy: kubev1.RestartPolicyNever, - Containers: []kubev1.Container{ - { - Name: "virt-migration", - Image: "kubevirt/virt-handler:devel", - Command: []string{ - "virsh", "-c", srcUri, "migrate", "--tunnelled", "--p2p", vm.Spec.Domain.Name, destUri, - }, - }, + Spec: kubev1.PodSpec{ + RestartPolicy: kubev1.RestartPolicyNever, + Containers: []kubev1.Container{ + { + Name: "virt-migration", + Image: "kubevirt/virt-handler:devel", + Command: []string{ + "virsh", "-c", srcUri, "migrate", "--tunnelled", "--p2p", vm.Spec.Domain.Name, destUri, }, }, }, diff --git a/pkg/virt-controller/services/template_test.go b/pkg/virt-controller/services/template_test.go index 68cef8211678..7049d7072523 100644 --- a/pkg/virt-controller/services/template_test.go +++ b/pkg/virt-controller/services/template_test.go @@ -27,7 +27,7 @@ var _ = Describe("Template", func() { Expect(pod.ObjectMeta.Labels).To(Equal(map[string]string{ v1.AppLabel: "virt-launcher", v1.DomainLabel: "testvm", - v1.UIDLabel: "1234", + v1.VMUIDLabel: "1234", })) Expect(pod.ObjectMeta.GenerateName).To(Equal("virt-launcher-testvm-----")) Expect(pod.Spec.NodeSelector).To(BeEmpty()) @@ -49,7 +49,7 @@ var _ = Describe("Template", func() { Expect(pod.ObjectMeta.Labels).To(Equal(map[string]string{ v1.AppLabel: "virt-launcher", v1.DomainLabel: "testvm", - v1.UIDLabel: "1234", + v1.VMUIDLabel: "1234", })) Expect(pod.ObjectMeta.GenerateName).To(Equal("virt-launcher-testvm-----")) Expect(pod.Spec.NodeSelector).To(Equal(map[string]string{ @@ -105,7 +105,7 @@ var _ = Describe("Template", func() { job, err := svc.RenderMigrationJob(vm, &srcNodeIp, &destNodeIp) Expect(err).ToNot(HaveOccurred()) - Expect(job.Spec.Template.Spec.RestartPolicy).To(Equal(kubev1.RestartPolicyNever)) + Expect(job.Spec.RestartPolicy).To(Equal(kubev1.RestartPolicyNever)) }) It("should use the first ip it finds", func() { vm := v1.NewMinimalVM("testvm") @@ -114,7 +114,7 @@ var _ = Describe("Template", func() { refCommand := []string{ "virsh", "-c", "qemu+tcp://127.0.0.2/system", "migrate", "--tunnelled", "--p2p", "testvm", "qemu+tcp://127.0.0.3/system"} - Expect(job.Spec.Template.Spec.Containers[0].Command).To(Equal(refCommand)) + Expect(job.Spec.Containers[0].Command).To(Equal(refCommand)) }) }) Context("migration template with incorrect parameters", func() { diff --git a/pkg/virt-controller/services/vm.go b/pkg/virt-controller/services/vm.go index d6a741f01bfa..2871966e47b7 100644 --- a/pkg/virt-controller/services/vm.go +++ b/pkg/virt-controller/services/vm.go @@ -3,8 +3,9 @@ package services import ( "fmt" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/pkg/api/errors" "k8s.io/client-go/pkg/api/v1" - batchv1 "k8s.io/client-go/pkg/apis/batch/v1" + metav1 "k8s.io/client-go/pkg/apis/meta/v1" "k8s.io/client-go/pkg/fields" "k8s.io/client-go/pkg/labels" "k8s.io/client-go/rest" @@ -23,13 +24,14 @@ type VMService interface { StartVMPod(*corev1.VM) error DeleteVMPod(*corev1.VM) error GetRunningVMPods(*corev1.VM) (*v1.PodList, error) - DeleteMigration(*corev1.Migration) error + DeleteMigrationTargetPods(*corev1.Migration) error GetRunningMigrationPods(*corev1.Migration) (*v1.PodList, error) SetupMigration(migration *corev1.Migration, vm *corev1.VM) error UpdateMigration(migration *corev1.Migration) error - FetchVM(vmName string) (*corev1.VM, error) - StartMigration(vm *corev1.VM, sourceNode *v1.Node, targetNode *v1.Node) error - GetMigrationJob(vm *corev1.VM) (*batchv1.Job, bool, error) + FetchVM(vmName string) (*corev1.VM, bool, error) + FetchMigration(migrationName string) (*corev1.Migration, bool, error) + StartMigration(migration *corev1.Migration, vm *corev1.VM, sourceNode *v1.Node, targetNode *v1.Node) error + GetMigrationJob(migration *corev1.Migration) (*v1.Pod, bool, error) } type vmService struct { @@ -90,13 +92,28 @@ func (v *vmService) UpdateMigration(migration *corev1.Migration) error { return err } -func (v *vmService) FetchVM(vmName string) (*corev1.VM, error) { +func (v *vmService) FetchVM(vmName string) (*corev1.VM, bool, error) { resp, err := v.RestClient.Get().Namespace(v1.NamespaceDefault).Resource("vms").Name(vmName).Do().Get() if err != nil { - return nil, err + if doesNotExist(err) { + return nil, false, nil + } + return nil, false, err } vm := resp.(*corev1.VM) - return vm, nil + return vm, true, nil +} + +func (v *vmService) FetchMigration(migrationName string) (*corev1.Migration, bool, error) { + resp, err := v.RestClient.Get().Namespace(v1.NamespaceDefault).Resource("migrations").Name(migrationName).Do().Get() + if err != nil { + if doesNotExist(err) { + return nil, false, nil + } + return nil, false, err + } + migration := resp.(*corev1.Migration) + return migration, true, nil } func NewVMService() VMService { @@ -108,7 +125,7 @@ func UnfinishedVMPodSelector(vm *corev1.VM) v1.ListOptions { fieldSelector := fields.ParseSelectorOrDie( "status.phase!=" + string(v1.PodFailed) + ",status.phase!=" + string(v1.PodSucceeded)) - labelSelector, err := labels.Parse(fmt.Sprintf(corev1.DomainLabel+" in (%s)", vm.GetObjectMeta().GetName())) + labelSelector, err := labels.Parse(fmt.Sprintf(corev1.AppLabel+"=virt-launcher,"+corev1.DomainLabel+" in (%s)", vm.GetObjectMeta().GetName())) if err != nil { panic(err) } @@ -117,73 +134,84 @@ func UnfinishedVMPodSelector(vm *corev1.VM) v1.ListOptions { func (v *vmService) SetupMigration(migration *corev1.Migration, vm *corev1.VM) error { pod, err := v.TemplateService.RenderLaunchManifest(vm) + pod.ObjectMeta.Labels[corev1.MigrationLabel] = migration.GetObjectMeta().GetName() + pod.ObjectMeta.Labels[corev1.MigrationUIDLabel] = string(migration.GetObjectMeta().GetUID()) corev1.SetAntiAffinityToPod(pod, corev1.AntiAffinityFromVMNode(vm)) if err == nil { _, err = v.KubeCli.CoreV1().Pods(v1.NamespaceDefault).Create(pod) } - if err == nil { - migration.Status.Phase = corev1.MigrationInProgress - } else { - migration.Status.Phase = corev1.MigrationFailed - } - - err2 := v.UpdateMigration(migration) - if err2 != nil { - err = err2 - } return err } -func (v *vmService) DeleteMigration(migration *corev1.Migration) error { +func (v *vmService) DeleteMigrationTargetPods(migration *corev1.Migration) error { precond.MustNotBeNil(migration) precond.MustNotBeEmpty(migration.GetObjectMeta().GetName()) - if err := v.KubeCli.CoreV1().Pods(v1.NamespaceDefault).DeleteCollection(nil, unfinishedMigrationPodSelector(migration)); err != nil { + if err := v.KubeCli.CoreV1().Pods(v1.NamespaceDefault).DeleteCollection(nil, unfinishedMigrationTargetPodSelector(migration)); err != nil { return err } return nil } func (v *vmService) GetRunningMigrationPods(migration *corev1.Migration) (*v1.PodList, error) { - podList, err := v.KubeCli.CoreV1().Pods(v1.NamespaceDefault).List(unfinishedMigrationPodSelector(migration)) + podList, err := v.KubeCli.CoreV1().Pods(v1.NamespaceDefault).List(unfinishedMigrationTargetPodSelector(migration)) if err != nil { return nil, err } return podList, nil } -func (v *vmService) StartMigration(vm *corev1.VM, sourceNode *v1.Node, targetNode *v1.Node) error { +func (v *vmService) StartMigration(migration *corev1.Migration, vm *corev1.VM, sourceNode *v1.Node, targetNode *v1.Node) error { job, err := v.TemplateService.RenderMigrationJob(vm, sourceNode, targetNode) + job.ObjectMeta.Labels[corev1.MigrationLabel] = migration.GetObjectMeta().GetName() + job.ObjectMeta.Labels[corev1.MigrationUIDLabel] = string(migration.GetObjectMeta().GetUID()) if err != nil { return err } - return v.KubeCli.CoreV1().RESTClient().Post().AbsPath("/apis/batch/v1/namespaces/default/jobs").Body(job).Do().Error() + _, err = v.KubeCli.CoreV1().Pods(v1.NamespaceDefault).Create(job) + return err } -func (v *vmService) GetMigrationJob(vm *corev1.VM) (*batchv1.Job, bool, error) { - selector, err := labels.Parse(corev1.DomainLabel) +func (v *vmService) GetMigrationJob(migration *corev1.Migration) (*v1.Pod, bool, error) { + selector := migrationJobSelector(migration) + podList, err := v.KubeCli.CoreV1().Pods(v1.NamespaceDefault).List(selector) if err != nil { return nil, false, err } - jobList, err := v.KubeCli.CoreV1().RESTClient().Get().AbsPath("/apis/batch/v1/namespaces/default/jobs").LabelsSelectorParam(selector).Do().Get() - if err != nil { - return nil, false, err - } - for _, job := range jobList.(*batchv1.JobList).Items { - if job.Status.CompletionTime != nil { - return &job, true, nil - } + if len(podList.Items) == 0 { + return nil, false, nil } - return nil, false, nil + + return &podList.Items[0], true, nil } -func unfinishedMigrationPodSelector(migration *corev1.Migration) v1.ListOptions { +func unfinishedMigrationTargetPodSelector(migration *corev1.Migration) v1.ListOptions { fieldSelector := fields.ParseSelectorOrDie( "status.phase!=" + string(v1.PodFailed) + ",status.phase!=" + string(v1.PodSucceeded)) - labelSelector, err := labels.Parse(fmt.Sprintf(corev1.DomainLabel+" in (%s)", migration.GetObjectMeta().GetName())) + labelSelector, err := labels.Parse( + fmt.Sprintf(corev1.AppLabel+"=virt-launcher,"+corev1.DomainLabel+","+corev1.MigrationUIDLabel+" in (%s)", migration.GetObjectMeta().GetUID())) if err != nil { panic(err) } return v1.ListOptions{FieldSelector: fieldSelector.String(), LabelSelector: labelSelector.String()} } + +func migrationJobSelector(migration *corev1.Migration) v1.ListOptions { + labelSelector, err := labels.Parse(corev1.DomainLabel + "," + corev1.AppLabel + "=migration" + + "," + corev1.MigrationUIDLabel + "=" + string(migration.GetObjectMeta().GetUID()), + ) + if err != nil { + panic(err) + } + return v1.ListOptions{LabelSelector: labelSelector.String()} +} + +func doesNotExist(err error) bool { + if e, ok := err.(*errors.StatusError); ok { + if e.Status().Reason == metav1.StatusReasonNotFound { + return true + } + } + return false +} diff --git a/pkg/virt-controller/services/vm_test.go b/pkg/virt-controller/services/vm_test.go index a53a8c96c5c4..dd3744f20714 100644 --- a/pkg/virt-controller/services/vm_test.go +++ b/pkg/virt-controller/services/vm_test.go @@ -64,22 +64,10 @@ var _ = Describe("VM", func() { ghttp.VerifyRequest("POST", "/api/v1/namespaces/default/pods"), ghttp.RespondWithJSONEncoded(http.StatusOK, pod), ), - - ghttp.CombineHandlers( - ghttp.VerifyRequest("PUT", "/apis/kubevirt.io/v1alpha1/namespaces/default/migrations/test-vm-migration"), - ghttp.VerifyJSONRepresenting(expected_migration), - ghttp.RespondWithJSONEncoded(http.StatusOK, migration), - ), - - /*ghttp.CombineHandlers( - ghttp.VerifyRequest("PUT", "/apis/kubevirt.io/v1alpha1/namespaces/default/vms/testvm"), - ghttp.VerifyJSONRepresenting(vm), - ghttp.RespondWithJSONEncoded(http.StatusOK, ""), - ),*/ ) err := vmService.SetupMigration(migration, vm) Expect(err).ToNot(HaveOccurred()) - Expect(len(server.ReceivedRequests())).To(Equal(2)) + Expect(len(server.ReceivedRequests())).To(Equal(1)) }) }) diff --git a/pkg/virt-controller/watch/job.go b/pkg/virt-controller/watch/job.go index 0ee44e88f2b3..42289302fe53 100644 --- a/pkg/virt-controller/watch/job.go +++ b/pkg/virt-controller/watch/job.go @@ -1,8 +1,9 @@ package watch import ( + "k8s.io/client-go/kubernetes" kubeapi "k8s.io/client-go/pkg/api" - batchv1 "k8s.io/client-go/pkg/apis/batch/v1" + "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/fields" "k8s.io/client-go/pkg/labels" "k8s.io/client-go/pkg/util/workqueue" @@ -15,24 +16,27 @@ import ( ) func migrationJobSelector() kubeapi.ListOptions { - fieldSelector := fields.Everything() - labelSelector, err := labels.Parse(kvirtv1.DomainLabel) + fieldSelector := fields.ParseSelectorOrDie( + "status.phase!=" + string(v1.PodPending) + + ",status.phase!=" + string(v1.PodRunning) + + ",status.phase!=" + string(v1.PodUnknown)) + labelSelector, err := labels.Parse(kvirtv1.AppLabel + "=migration," + kvirtv1.DomainLabel + "," + kvirtv1.MigrationLabel) if err != nil { panic(err) } return kubeapi.ListOptions{FieldSelector: fieldSelector, LabelSelector: labelSelector} } -func NewJobController(vmService services.VMService, recorder record.EventRecorder, restClient *rest.RESTClient) (cache.Store, *kubecli.Controller) { +func NewJobController(vmService services.VMService, recorder record.EventRecorder, clientSet *kubernetes.Clientset, restClient *rest.RESTClient) (cache.Store, *kubecli.Controller) { selector := migrationJobSelector() - lw := kubecli.NewListWatchFromClient(restClient, "jobs", kubeapi.NamespaceDefault, selector.FieldSelector, selector.LabelSelector) + lw := kubecli.NewListWatchFromClient(clientSet.CoreV1().RESTClient(), "pods", kubeapi.NamespaceDefault, selector.FieldSelector, selector.LabelSelector) return NewJobControllerWithListWatch(vmService, recorder, lw, restClient) } func NewJobControllerWithListWatch(vmService services.VMService, _ record.EventRecorder, lw cache.ListerWatcher, restClient *rest.RESTClient) (cache.Store, *kubecli.Controller) { queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) - return kubecli.NewController(lw, queue, &batchv1.Job{}, func(store cache.Store, queue workqueue.RateLimitingInterface) bool { + return kubecli.NewController(lw, queue, &v1.Pod{}, func(store cache.Store, queue workqueue.RateLimitingInterface) bool { key, quit := queue.Get() if quit { return false @@ -47,22 +51,51 @@ func NewJobControllerWithListWatch(vmService services.VMService, _ record.EventR return true } if exists { - var job *batchv1.Job = obj.(*batchv1.Job) + job := obj.(*v1.Pod) - if job.Status.Succeeded < 1 { - //Job did not succeed, do not update the vm + name := job.ObjectMeta.Labels[kvirtv1.DomainLabel] + vm, vmExists, err := vmService.FetchVM(name) + if err != nil { + queue.AddRateLimited(key) return true } - name := job.ObjectMeta.Labels["vmname"] - vm, err := vmService.FetchVM(name) + // TODO at the end, only virt-handler can decide for all migration types if a VM successfully migrated to it (think about p2p2 migrations) + // For now we use a managed migration + if vmExists && vm.Status.Phase == kvirtv1.Migrating { + vm.Status.Phase = kvirtv1.Running + if job.Status.Phase == v1.PodSucceeded { + vm.ObjectMeta.Labels[kvirtv1.NodeNameLabel] = vm.Status.MigrationNodeName + vm.Status.NodeName = vm.Status.MigrationNodeName + } + vm.Status.MigrationNodeName = "" + _, err := putVm(vm, restClient, nil) + if err != nil { + queue.AddRateLimited(key) + return true + } + } + + migration, migrationExists, err := vmService.FetchMigration(job.ObjectMeta.Labels[kvirtv1.MigrationLabel]) if err != nil { - //TODO proper error handling queue.AddRateLimited(key) return true } - vm.Status.Phase = kvirtv1.Running - putVm(vm, restClient, nil) + + if migrationExists { + if migration.Status.Phase != kvirtv1.MigrationSucceeded && migration.Status.Phase != kvirtv1.MigrationFailed { + if job.Status.Phase == v1.PodSucceeded { + migration.Status.Phase = kvirtv1.MigrationSucceeded + } else { + migration.Status.Phase = kvirtv1.MigrationFailed + } + err := vmService.UpdateMigration(migration) + if err != nil { + queue.AddRateLimited(key) + return true + } + } + } } return true }) diff --git a/pkg/virt-controller/watch/job_test.go b/pkg/virt-controller/watch/job_test.go index 111f84bac99a..077c3f42ca9c 100644 --- a/pkg/virt-controller/watch/job_test.go +++ b/pkg/virt-controller/watch/job_test.go @@ -17,7 +17,6 @@ import ( "kubevirt.io/kubevirt/pkg/virt-controller/services" corev1 "k8s.io/client-go/pkg/api/v1" - batchv1 "k8s.io/client-go/pkg/apis/batch/v1" kvirtv1 "kubevirt.io/kubevirt/pkg/api/v1" ) @@ -59,6 +58,8 @@ var _ = Describe("Migration", func() { _, jobController = NewJobControllerWithListWatch(vmService, nil, lw, restClient) vm = kvirtv1.NewMinimalVM("test-vm") + vm.Status.Phase = kvirtv1.Migrating + vm.GetObjectMeta().SetLabels(map[string]string{"a": "b"}) // Start the controller jobController.StartInformer(stopChan) @@ -68,7 +69,7 @@ var _ = Describe("Migration", func() { Context("Running job with out migration labels", func() { It("should not attempt to update the VM", func(done Done) { - job := &batchv1.Job{} + job := &corev1.Pod{} // Register the expected REST call //server.AppendHandlers() @@ -84,46 +85,19 @@ var _ = Describe("Migration", func() { }, 10) }) - Context("Running job with migration labels but no success", func() { - It("should ignore the the VM ", func(done Done) { - - job := &batchv1.Job{ - ObjectMeta: corev1.ObjectMeta{ - Labels: map[string]string{ - kvirtv1.DomainLabel: "something", - "vmname": vm.ObjectMeta.Name, - }, - }, - } - - // No registered REST calls - //server.AppendHandlers() - - // Tell the controller that there is a new Job - lw.Add(job) - - // Wait until we have processed the added item - finishController(jobController, stopChan) - - Expect(len(server.ReceivedRequests())).To(Equal(0)) - close(done) - }, 10) - }) - Context("Running job with migration labels and one success", func() { It("should update the VM to Running", func(done Done) { - job := &batchv1.Job{ + migration := kvirtv1.NewMinimalMigration("test-migration", "test-vm") + job := &corev1.Pod{ ObjectMeta: corev1.ObjectMeta{ Labels: map[string]string{ - kvirtv1.DomainLabel: "something", - "vmname": vm.ObjectMeta.Name, + kvirtv1.DomainLabel: "test-vm", + kvirtv1.MigrationLabel: migration.ObjectMeta.Name, }, }, - Status: batchv1.JobStatus{ - Succeeded: 1, - Failed: 0, - Active: 0, + Status: corev1.PodStatus{ + Phase: corev1.PodSucceeded, }, } @@ -131,12 +105,14 @@ var _ = Describe("Migration", func() { server.AppendHandlers( handlerToFetchTestVM(vm), handlerToUpdateTestVM(vm), + handlerToFetchTestMigration(migration), + handlerToUpdateTestMigration(migration), ) // Tell the controller that there is a new Job lw.Add(job) finishController(jobController, stopChan) - Expect(len(server.ReceivedRequests())).To(Equal(2)) + Expect(len(server.ReceivedRequests())).To(Equal(4)) close(done) }, 10) }) @@ -154,6 +130,20 @@ func handlerToFetchTestVM(vm *kvirtv1.VM) http.HandlerFunc { ) } +func handlerToFetchTestMigration(migration *kvirtv1.Migration) http.HandlerFunc { + return ghttp.CombineHandlers( + ghttp.VerifyRequest("GET", "/apis/kubevirt.io/v1alpha1/namespaces/default/migrations/"+migration.ObjectMeta.Name), + ghttp.RespondWithJSONEncoded(http.StatusOK, migration), + ) +} + +func handlerToUpdateTestMigration(migration *kvirtv1.Migration) http.HandlerFunc { + return ghttp.CombineHandlers( + ghttp.VerifyRequest("PUT", "/apis/kubevirt.io/v1alpha1/namespaces/default/migrations/"+migration.ObjectMeta.Name), + ghttp.RespondWithJSONEncoded(http.StatusOK, migration), + ) +} + func handlerToUpdateTestVM(vm *kvirtv1.VM) http.HandlerFunc { return ghttp.CombineHandlers( ghttp.VerifyRequest("PUT", "/apis/kubevirt.io/v1alpha1/namespaces/default/vms/"+vm.ObjectMeta.Name), diff --git a/pkg/virt-controller/watch/migration.go b/pkg/virt-controller/watch/migration.go index e4abeb4a7206..5dcc7a4f57d5 100644 --- a/pkg/virt-controller/watch/migration.go +++ b/pkg/virt-controller/watch/migration.go @@ -2,8 +2,8 @@ package watch import ( "fmt" - "github.com/jeevatkm/go-model" kubeapi "k8s.io/client-go/pkg/api" + k8sv1 "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/fields" "k8s.io/client-go/pkg/util/workqueue" "k8s.io/client-go/rest" @@ -12,17 +12,15 @@ import ( "kubevirt.io/kubevirt/pkg/api/v1" "kubevirt.io/kubevirt/pkg/kubecli" "kubevirt.io/kubevirt/pkg/logging" - "kubevirt.io/kubevirt/pkg/middleware" - "kubevirt.io/kubevirt/pkg/precond" "kubevirt.io/kubevirt/pkg/virt-controller/services" ) func NewMigrationController(migrationService services.VMService, recorder record.EventRecorder, restClient *rest.RESTClient) (cache.Store, *kubecli.Controller) { - lw := cache.NewListWatchFromClient(restClient, "migrations", kubeapi.NamespaceDefault, fields.Everything()) - return NewMigrationControllerWithListWatch(migrationService, recorder, lw, restClient) + lw := cache.NewListWatchFromClient(restClient, "migrations", k8sv1.NamespaceDefault, fields.Everything()) + return NewMigrationControllerWithListWatch(migrationService, recorder, lw) } -func NewMigrationControllerWithListWatch(migrationService services.VMService, _ record.EventRecorder, lw cache.ListerWatcher, restClient *rest.RESTClient) (cache.Store, *kubecli.Controller) { +func NewMigrationControllerWithListWatch(migrationService services.VMService, _ record.EventRecorder, lw cache.ListerWatcher) (cache.Store, *kubecli.Controller) { queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) return kubecli.NewController(lw, queue, &v1.Migration{}, func(store cache.Store, queue workqueue.RateLimitingInterface) bool { @@ -41,110 +39,132 @@ func NewMigrationControllerWithListWatch(migrationService services.VMService, _ } if exists { var migration *v1.Migration = obj.(*v1.Migration) + logger := logging.DefaultLogger().Object(migration) if migration.Status.Phase == v1.MigrationUnknown { - migrationCopy := copyMigration(migration) - logger := logging.DefaultLogger().Object(&migrationCopy) - if err := StartMigrationTargetPod(migrationService, &migrationCopy); err != nil { - handleStartMigrationError(logger, err, migrationService, migrationCopy) + // Copy migration for future modifications + migrationCopy, err := copy(migration) + if err != nil { + logger.Error().Reason(err).Msg("could not copy migration object") + queue.AddRateLimited(key) + return true + } + // Fetch vm which we want to migrate + vm, exists, err := migrationService.FetchVM(migration.Spec.Selector.Name) + if err != nil { + logger.Error().Reason(err).Msgf("fetching the vm %s failed", migration.Spec.Selector.Name) + queue.AddRateLimited(key) + return true + } + if !exists { + logger.Info().Msgf("VM with name %s does not exist, marking migration as failed", migration.Spec.Selector.Name) + migrationCopy.Status.Phase = v1.MigrationFailed + // TODO indicate why it was set to failed + err := migrationService.UpdateMigration(migrationCopy) + if err != nil { + logger.Error().Reason(err).Msg("updating migration state failed") + queue.AddRateLimited(key) + return true + } + queue.Forget(key) + return true + } + if vm.Status.Phase != v1.Running { + logger.Error().Msgf("VM with name %s is in state %s, no migration possible. Marking migration as failed", vm.GetObjectMeta().GetName(), vm.Status.Phase) + migrationCopy.Status.Phase = v1.MigrationFailed + // TODO indicate why it was set to failed + err := migrationService.UpdateMigration(migrationCopy) + if err != nil { + logger.Error().Reason(err).Msg("updating migration state failed") + queue.AddRateLimited(key) + return true + } + queue.Forget(key) + return true + } + + if err := mergeConstraints(migration, vm); err != nil { + logger.Error().Reason(err).Msg("merging Migration and VM placement constraints failed.") + queue.AddRateLimited(key) + return true + } + podList, err := migrationService.GetRunningVMPods(vm) + if err != nil { + logger.Error().Reason(err).Msg("could not fetch a list of running VM target pods") + queue.AddRateLimited(key) + return true + } + + numOfPods, migrationPodExists := investigateTargetPodSituation(migration, podList) + if numOfPods > 1 && !migrationPodExists { + // Another migration is currently going on + logger.Error().Msg("another migration seems to be in progress, marking Migration as failed") + migrationCopy.Status.Phase = v1.MigrationFailed + // TODO indicate why it was set to failed + err := migrationService.UpdateMigration(migrationCopy) + if err != nil { + logger.Error().Reason(err).Msg("updating migration state failed") + queue.AddRateLimited(key) + return true + } + queue.Forget(key) + return true + } else if numOfPods == 1 && !migrationPodExists { + // We need to start a migration target pod + // TODO, this detection is not optimal, it can lead to strange situations + err := migrationService.SetupMigration(migration, vm) + if err != nil { + logger.Error().Reason(err).Msg("creating am migration target node failed") + queue.AddRateLimited(key) + return true + } + } + logger.Error().Msg("another migration seems to be in progress, marking Migration as failed") + migrationCopy.Status.Phase = v1.MigrationInProgress + // TODO indicate when this has happened + err = migrationService.UpdateMigration(migrationCopy) + if err != nil { + logger.Error().Reason(err).Msg("updating migration state failed") + queue.AddRateLimited(key) + return true } } - } else { - cleanupOldMigration(key, queue, migrationService) } + + queue.Forget(key) return true }) } -func cleanupOldMigration(key interface{}, queue workqueue.RateLimitingInterface, migrationService services.VMService) { - var migration *v1.Migration - _, name, err := cache.SplitMetaNamespaceKey(key.(string)) - if err != nil { - // TODO do something more smart here - queue.AddRateLimited(key) - } else { - migration = v1.NewMigrationReferenceFromName(name) - err = migrationService.DeleteMigration(migration) - logger := logging.DefaultLogger().Object(migration) - if err != nil { - logger.Error().Reason(err).Msg("Deleting VM target Pod failed.") - } - logger.Info().Msg("Deleting VM target Pod succeeded.") - } -} - -func handleStartMigrationError(logger *logging.FilteredLogger, err error, migrationService services.VMService, migrationCopy v1.Migration) { - logger.Error().Reason(err).Msg("Defining a target pod for the Migration.") - pl, err := migrationService.GetRunningMigrationPods(&migrationCopy) +func copy(migration *v1.Migration) (*v1.Migration, error) { + obj, err := kubeapi.Scheme.Copy(migration) if err != nil { - logger.Error().Reason(err).Msg("Getting running Pod for the Migration failed.") - return + return nil, err } - for _, p := range pl.Items { - if p.GetObjectMeta().GetLabels()["kubevirt.io/vmUID"] == string(migrationCopy.GetObjectMeta().GetUID()) { - // Pod from incomplete initialization detected, cleaning up - logger.Error().Msgf("Found orphan pod with name '%s' for Migration.", p.GetName()) - err = migrationService.DeleteMigration(&migrationCopy) - if err != nil { - logger.Critical().Reason(err).Msgf("Deleting orphaned pod with name '%s' for Migration failed.", p.GetName()) - break - } - } else { - // TODO virt-api should make sure this does not happen. For now don't ask and clean up. - // Pod from old VM object detected, - logger.Error().Msgf("Found orphan pod with name '%s' for deleted VM.", p.GetName()) + return obj.(*v1.Migration), nil +} - err = migrationService.DeleteMigration(&migrationCopy) - if err != nil { - logger.Critical().Reason(err).Msgf("Deleting orphaned pod with name '%s' for Migration failed.", p.GetName()) - break - } +// Returns the number of running pods and if a pod for exactly that migration is currently running +func investigateTargetPodSituation(migration *v1.Migration, podList *k8sv1.PodList) (int, bool) { + podExists := false + for _, pod := range podList.Items { + if pod.Labels[v1.MigrationUIDLabel] == string(migration.GetObjectMeta().GetUID()) { + podExists = true } } - -} -func copyMigration(migration *v1.Migration) v1.Migration { - migrationCopy := v1.Migration{} - model.Copy(&migrationCopy, migration) - return migrationCopy + return len(podList.Items), podExists } -func StartMigrationTargetPod(v services.VMService, migration *v1.Migration) error { - precond.MustNotBeNil(migration) - precond.MustNotBeEmpty(migration.ObjectMeta.Name) - precond.MustNotBeEmpty(string(migration.ObjectMeta.UID)) - - vm, err := v.FetchVM(migration.Spec.MigratingVMName) - if err != nil { - migration.Status.Phase = v1.MigrationFailed - err2 := v.UpdateMigration(migration) - if err2 != nil { - return err2 +func mergeConstraints(migration *v1.Migration, vm *v1.VM) error { + conflicts := []string{} + for k, v := range migration.Spec.NodeSelector { + if _, exists := vm.Spec.NodeSelector[k]; exists { + conflicts = append(conflicts, k) + } else { + vm.Spec.NodeSelector[k] = v } - // Report the error with the migration in the controller log - return err - } - - podList, err := v.GetRunningVMPods(vm) - if err != nil { - return err } - - if len(podList.Items) < 1 { - return middleware.NewResourceConflictError(fmt.Sprintf("VM %s Pod does not exist", vm.GetObjectMeta().GetName())) - } - - // If there are more than one pod in other states than Succeeded or Failed we can't go on - if len(podList.Items) > 1 { - return middleware.NewResourceConflictError(fmt.Sprintf("VM %s Pod is already migrating", vm.GetObjectMeta().GetName())) + if len(conflicts) > 0 { + return fmt.Errorf("Conflicting node selectors: %v", conflicts) } - - //TODO: detect collisions - for k, v := range migration.Spec.DestinationNodeSelector { - vm.Spec.NodeSelector[k] = v - } - - err = v.SetupMigration(migration, vm) - - // Report the result of the `Create` call - return err + return nil } diff --git a/pkg/virt-controller/watch/migration_test.go b/pkg/virt-controller/watch/migration_test.go index f1289390acc2..1f3dc27d4d75 100644 --- a/pkg/virt-controller/watch/migration_test.go +++ b/pkg/virt-controller/watch/migration_test.go @@ -54,7 +54,7 @@ var _ = Describe("Migration", func() { lw = framework.NewFakeControllerSource() migrationCache = cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, nil) - _, migrationController = NewMigrationControllerWithListWatch(vmService, nil, lw, restClient) + _, migrationController = NewMigrationControllerWithListWatch(vmService, nil, lw) // Start the controller migrationController.StartInformer(stopChan) diff --git a/pkg/virt-controller/watch/pod.go b/pkg/virt-controller/watch/pod.go index 6c6e9b345b9b..7b76967b4f20 100644 --- a/pkg/virt-controller/watch/pod.go +++ b/pkg/virt-controller/watch/pod.go @@ -69,7 +69,7 @@ func NewPodControllerWithListWatch(vmCache cache.Store, _ record.EventRecorder, return true } vm := vmObj.(*corev1.VM) - if vm.GetObjectMeta().GetUID() != types.UID(pod.GetLabels()[corev1.UIDLabel]) { + if vm.GetObjectMeta().GetUID() != types.UID(pod.GetLabels()[corev1.VMUIDLabel]) { // Obviously the pod of an outdated VM object, do nothing return true } @@ -94,9 +94,24 @@ func NewPodControllerWithListWatch(vmCache cache.Store, _ record.EventRecorder, return true } logger.Info().Msgf("VM successfully scheduled to %s.", vmCopy.Status.NodeName) - } else if vm.Status.Phase == corev1.Running { + } else if _, isMigrationPod := pod.Labels[corev1.MigrationLabel]; vm.Status.Phase == corev1.Running && isMigrationPod { logger := logging.DefaultLogger() - obj, err := kubeapi.Scheme.Copy(vm) + + // Get associated migration + obj, err := restClient.Get().Resource("migrations").Namespace(v1.NamespaceDefault).Name(pod.Labels[corev1.MigrationLabel]).Do().Get() + if err != nil { + logger.Error().Reason(err).Msgf("Fetching migration %s failed.", pod.Labels[corev1.MigrationLabel]) + queue.AddRateLimited(key) + return true + } + migration := obj.(*corev1.Migration) + if migration.Status.Phase == corev1.MigrationUnknown { + logger.Info().Msg("migration not yet in right state, backing off") + queue.AddRateLimited(key) + return true + } + + obj, err = kubeapi.Scheme.Copy(vm) if err != nil { logger.Error().Reason(err).Msg("could not copy vm object") queue.AddRateLimited(key) @@ -115,12 +130,11 @@ func NewPodControllerWithListWatch(vmCache cache.Store, _ record.EventRecorder, } // Let's check if the job already exists, it can already exist in case we could not update the VM object in a previous run - if _, exists, err := vmService.GetMigrationJob(vmCopy); err != nil { + if _, exists, err := vmService.GetMigrationJob(migration); err != nil { logger.Error().Reason(err).Msg("Checking for an existing migration job failed.") queue.AddRateLimited(key) return true } else if !exists { - // Job does not yet exist, create it. sourceNode, err := clientset.CoreV1().Nodes().Get(vmCopy.Status.NodeName, metav1.GetOptions{}) if err != nil { logger.Error().Reason(err).Msgf("Fetching source node %s failed.", vmCopy.Status.NodeName) @@ -133,7 +147,8 @@ func NewPodControllerWithListWatch(vmCache cache.Store, _ record.EventRecorder, queue.AddRateLimited(key) return true } - if err := vmService.StartMigration(vmCopy, sourceNode, targetNode); err != nil { + + if err := vmService.StartMigration(migration, vmCopy, sourceNode, targetNode); err != nil { logger.Error().Reason(err).Msg("Starting the migration job failed.") queue.AddRateLimited(key) return true diff --git a/pkg/virt-controller/watch/pod_test.go b/pkg/virt-controller/watch/pod_test.go index 197072b3ce25..1bba575d957a 100644 --- a/pkg/virt-controller/watch/pod_test.go +++ b/pkg/virt-controller/watch/pod_test.go @@ -154,6 +154,7 @@ var _ = Describe("Pod", func() { pod, err := temlateService.RenderLaunchManifest(vm) Expect(err).ToNot(HaveOccurred()) pod.Spec.NodeName = "targetNode" + pod.Labels[v1.MigrationLabel] = "testvm-migration" // Create the expected VM after the update obj, err := conversion.NewCloner().DeepCopy(vm) @@ -167,9 +168,15 @@ var _ = Describe("Pod", func() { vmInMigrationState := obj.(*v1.VM) vmInMigrationState.Status.Phase = v1.Migrating + migration := v1.NewMinimalMigration("testvm-migration", "testvm") + migration.Status.Phase = v1.MigrationInProgress // Register the expected REST call server.AppendHandlers( + ghttp.CombineHandlers( + ghttp.VerifyRequest("GET", "/apis/kubevirt.io/v1alpha1/namespaces/default/migrations/testvm-migration"), + ghttp.RespondWithJSONEncoded(http.StatusOK, migration), + ), ghttp.CombineHandlers( ghttp.VerifyRequest("PUT", "/apis/kubevirt.io/v1alpha1/namespaces/default/vms/testvm"), ghttp.VerifyJSONRepresenting(vmWithMigrationNodeName), @@ -191,7 +198,7 @@ var _ = Describe("Pod", func() { ) mockVMService.EXPECT().GetMigrationJob(gomock.Any()).Return(nil, false, nil) - mockVMService.EXPECT().StartMigration(gomock.Any(), &srcNode, &targetNode).Return(nil) + mockVMService.EXPECT().StartMigration(gomock.Any(), gomock.Any(), &srcNode, &targetNode).Return(nil) // Tell the controller that there is a new running Pod lw.Add(pod) @@ -201,7 +208,7 @@ var _ = Describe("Pod", func() { podController.ShutDownQueue() podController.WaitUntilDone() - Expect(len(server.ReceivedRequests())).To(Equal(4)) + Expect(len(server.ReceivedRequests())).To(Equal(5)) close(done) }, 10) }) diff --git a/tests/vm_migration_test.go b/tests/vm_migration_test.go index 411c1be4dd5a..795190bef479 100644 --- a/tests/vm_migration_test.go +++ b/tests/vm_migration_test.go @@ -6,9 +6,7 @@ import ( "fmt" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "k8s.io/client-go/pkg/api" - batchv1 "k8s.io/client-go/pkg/apis/batch/v1" - metav1 "k8s.io/client-go/pkg/apis/meta/v1" + k8sv1 "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/labels" "kubevirt.io/kubevirt/pkg/api/v1" "kubevirt.io/kubevirt/pkg/kubecli" @@ -43,10 +41,10 @@ var _ = Describe("VmMigration", func() { Context("New Migration given", func() { It("Should fail if the VM does not exist", func() { - err = restClient.Post().Resource("migrations").Namespace(api.NamespaceDefault).Body(migration).Do().Error() + err = restClient.Post().Resource("migrations").Namespace(k8sv1.NamespaceDefault).Body(migration).Do().Error() Expect(err).To(BeNil()) Eventually(func() v1.MigrationPhase { - r, err := restClient.Get().Resource("migrations").Namespace(api.NamespaceDefault).Name(migration.ObjectMeta.Name).Do().Get() + r, err := restClient.Get().Resource("migrations").Namespace(k8sv1.NamespaceDefault).Name(migration.ObjectMeta.Name).Do().Get() Expect(err).ToNot(HaveOccurred()) var m *v1.Migration = r.(*v1.Migration) return m.Status.Phase @@ -55,15 +53,15 @@ var _ = Describe("VmMigration", func() { It("Should go to MigrationInProgress state if the VM exists", func(done Done) { - vm, err := restClient.Post().Resource("vms").Namespace(api.NamespaceDefault).Body(sourceVM).Do().Get() + vm, err := restClient.Post().Resource("vms").Namespace(k8sv1.NamespaceDefault).Body(sourceVM).Do().Get() Expect(err).ToNot(HaveOccurred()) tests.WaitForSuccessfulVMStart(vm) - err = restClient.Post().Resource("migrations").Namespace(api.NamespaceDefault).Body(migration).Do().Error() + err = restClient.Post().Resource("migrations").Namespace(k8sv1.NamespaceDefault).Body(migration).Do().Error() Expect(err).ToNot(HaveOccurred()) Eventually(func() v1.MigrationPhase { - obj, err := restClient.Get().Resource("migrations").Namespace(api.NamespaceDefault).Name(migration.ObjectMeta.Name).Do().Get() + obj, err := restClient.Get().Resource("migrations").Namespace(k8sv1.NamespaceDefault).Name(migration.ObjectMeta.Name).Do().Get() Expect(err).ToNot(HaveOccurred()) var m *v1.Migration = obj.(*v1.Migration) return m.Status.Phase @@ -74,24 +72,24 @@ var _ = Describe("VmMigration", func() { It("Should update the Status.MigrationNodeName after the migration target pod was started", func(done Done) { // Create the VM - obj, err := restClient.Post().Resource("vms").Namespace(api.NamespaceDefault).Body(sourceVM).Do().Get() + obj, err := restClient.Post().Resource("vms").Namespace(k8sv1.NamespaceDefault).Body(sourceVM).Do().Get() Expect(err).ToNot(HaveOccurred()) tests.WaitForSuccessfulVMStart(obj) // Create the Migration - err = restClient.Post().Resource("migrations").Namespace(api.NamespaceDefault).Body(migration).Do().Error() + err = restClient.Post().Resource("migrations").Namespace(k8sv1.NamespaceDefault).Body(migration).Do().Error() Expect(err).ToNot(HaveOccurred()) // TODO we need events var fetchedVM *v1.VM Eventually(func() string { - obj, err := restClient.Get().Resource("vms").Namespace(api.NamespaceDefault).Name(obj.(*v1.VM).ObjectMeta.Name).Do().Get() + obj, err := restClient.Get().Resource("vms").Namespace(k8sv1.NamespaceDefault).Name(obj.(*v1.VM).ObjectMeta.Name).Do().Get() Expect(err).ToNot(HaveOccurred()) fetchedVM = obj.(*v1.VM) return fetchedVM.Status.MigrationNodeName }, TIMEOUT, POLLING_INTERVAL).ShouldNot(BeEmpty()) Eventually(func() v1.VMPhase { - obj, err := restClient.Get().Resource("vms").Namespace(api.NamespaceDefault).Name(obj.(*v1.VM).ObjectMeta.Name).Do().Get() + obj, err := restClient.Get().Resource("vms").Namespace(k8sv1.NamespaceDefault).Name(obj.(*v1.VM).ObjectMeta.Name).Do().Get() Expect(err).ToNot(HaveOccurred()) fetchedVM = obj.(*v1.VM) return fetchedVM.Status.Phase @@ -103,32 +101,41 @@ var _ = Describe("VmMigration", func() { It("Should migrate the VM", func(done Done) { // Create the VM - obj, err := restClient.Post().Resource("vms").Namespace(api.NamespaceDefault).Body(sourceVM).Do().Get() + obj, err := restClient.Post().Resource("vms").Namespace(k8sv1.NamespaceDefault).Body(sourceVM).Do().Get() Expect(err).ToNot(HaveOccurred()) tests.WaitForSuccessfulVMStart(obj) + obj, err = restClient.Get().Resource("vms").Namespace(k8sv1.NamespaceDefault).Name(obj.(*v1.VM).ObjectMeta.Name).Do().Get() + Expect(err).ToNot(HaveOccurred()) + + sourceNode := obj.(*v1.VM).Status.NodeName // Create the Migration - err = restClient.Post().Resource("migrations").Namespace(api.NamespaceDefault).Body(migration).Do().Error() + err = restClient.Post().Resource("migrations").Namespace(k8sv1.NamespaceDefault).Body(migration).Do().Error() Expect(err).ToNot(HaveOccurred()) - selector, err := labels.Parse(fmt.Sprintf("%s in (%s)", v1.DomainLabel, sourceVM.GetObjectMeta().GetName())) - jobsGetter := coreClient.CoreV1().RESTClient().Get().AbsPath("/apis/batch/v1/namespaces/default/jobs").LabelsSelectorParam(selector) + selector, err := labels.Parse(fmt.Sprintf("%s in (%s)", v1.MigrationLabel, migration.GetObjectMeta().GetName()) + + fmt.Sprintf(",%s in (%s)", v1.AppLabel, "migration")) Expect(err).ToNot(HaveOccurred()) // Wait for the job Eventually(func() int { - jobs, err := jobsGetter.Do().Get() + jobs, err := coreClient.CoreV1().Pods(k8sv1.NamespaceDefault).List(k8sv1.ListOptions{LabelSelector: selector.String()}) Expect(err).ToNot(HaveOccurred()) - return len(jobs.(*batchv1.JobList).Items) + return len(jobs.Items) }, TIMEOUT*2, POLLING_INTERVAL).Should(Equal(1)) // Wait for the successful completion of the job - Eventually(func() *metav1.Time { - jobs, err := jobsGetter.Do().Get() - Expect(err).ToNot(HaveOccurred()) + Eventually(func() k8sv1.PodPhase { + jobs, err := coreClient.CoreV1().Pods(k8sv1.NamespaceDefault).List(k8sv1.ListOptions{LabelSelector: selector.String()}) Expect(err).ToNot(HaveOccurred()) - return jobs.(*batchv1.JobList).Items[0].Status.CompletionTime - }, TIMEOUT*2, POLLING_INTERVAL).ShouldNot(BeNil()) + return jobs.Items[0].Status.Phase + }, TIMEOUT*2, POLLING_INTERVAL).Should(Equal(k8sv1.PodSucceeded)) + + obj, err = restClient.Get().Resource("vms").Namespace(k8sv1.NamespaceDefault).Name(obj.(*v1.VM).ObjectMeta.Name).Do().Get() + Expect(err).ToNot(HaveOccurred()) + migratedVM := obj.(*v1.VM) + Expect(migratedVM.Status.Phase).To(Equal(v1.Running)) + Expect(migratedVM.Status.NodeName).ToNot(Equal(sourceNode)) close(done) }, 60)