Skip to content

Commit

Permalink
Merge pull request kubevirt#8575 from iholder101/feature/QEMU-level-p…
Browse files Browse the repository at this point in the history
…arallel-migrations

QEMU-level migration parallelism (a.k.a. multifd) + Upgrade QEMU to 7.2.0-14.el9
  • Loading branch information
kubevirt-bot authored Apr 14, 2023
2 parents 319ef9d + decdf0b commit 9c8b40b
Show file tree
Hide file tree
Showing 15 changed files with 954 additions and 1,110 deletions.
1,041 changes: 360 additions & 681 deletions WORKSPACE

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion hack/bootstrap.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ KUBEVIRT_NO_BAZEL=${KUBEVIRT_NO_BAZEL:-false}
HOST_ARCHITECTURE="$(uname -m)"

sandbox_root=${SANDBOX_DIR}/default/root
sandbox_hash="a63496d9667d5658621639ab62fa40cba6163823"
sandbox_hash="3c2f965b686874b607d7065cad6136446b8e1663"

function kubevirt::bootstrap::regenerate() {
(
Expand Down
4 changes: 2 additions & 2 deletions hack/rpm-deps.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ source hack/bootstrap.sh
source hack/config.sh

LIBVIRT_VERSION=${LIBVIRT_VERSION:-0:9.0.0-3.el9}
QEMU_VERSION=${QEMU_VERSION:-17:7.2.0-6.el9}
QEMU_VERSION=${QEMU_VERSION:-17:7.2.0-14.el9}
SEABIOS_VERSION=${SEABIOS_VERSION:-0:1.16.1-1.el9}
EDK2_VERSION=${EDK2_VERSION:-0:20221207gitfff6d81270b5-5.el9}
EDK2_VERSION=${EDK2_VERSION:-0:20221207gitfff6d81270b5-9}
LIBGUESTFS_VERSION=${LIBGUESTFS_VERSION:-1:1.48.4-4.el9}
GUESTFSTOOLS_VERSION=${GUESTFSTOOLS_VERSION:-0:1.48.2-8.el9}
PASST_VERSION=${PASST_VERSION:-0:0^20221110.g4129764-1.el9}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ go_library(
"//pkg/util/webhooks/validating-webhooks:go_default_library",
"//pkg/virt-api/webhooks:go_default_library",
"//pkg/virt-config:go_default_library",
"//pkg/virt-handler/cmd-client:go_default_library",
"//staging/src/kubevirt.io/api/clone:go_default_library",
"//staging/src/kubevirt.io/api/clone/v1alpha1:go_default_library",
"//staging/src/kubevirt.io/api/core:go_default_library",
Expand Down Expand Up @@ -99,6 +100,7 @@ go_test(
"//pkg/util/webhooks:go_default_library",
"//pkg/virt-api/webhooks:go_default_library",
"//pkg/virt-config:go_default_library",
"//pkg/virt-handler/cmd-client:go_default_library",
"//pkg/virt-handler/node-labeller/util:go_default_library",
"//pkg/virt-operator/resource/generate/components:go_default_library",
"//staging/src/kubevirt.io/api/clone:go_default_library",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@ import (
"net"
"path/filepath"
"regexp"
"strconv"
"strings"

cmdclient "kubevirt.io/kubevirt/pkg/virt-handler/cmd-client"

admissionv1 "k8s.io/api/admission/v1"
k8sv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -1627,6 +1630,25 @@ func ValidateVirtualMachineInstanceMetadata(field *k8sfield.Path, metadata *meta
})
}

if threadCountStr, exists := metadata.Annotations[cmdclient.MultiThreadedQemuMigrationAnnotation]; exists {
threadCount, err := strconv.Atoi(threadCountStr)
invalidEntry := field.Child("annotations", cmdclient.MultiThreadedQemuMigrationAnnotation).String()

if err != nil {
causes = append(causes, metav1.StatusCause{
Type: metav1.CauseTypeFieldValueInvalid,
Message: fmt.Sprintf("cannot parse %s to int: %s, invalid entry %s", threadCountStr, err.Error(), invalidEntry),
Field: field.Child("annotations").String(),
})
} else if threadCount <= 1 {
causes = append(causes, metav1.StatusCause{
Type: metav1.CauseTypeFieldValueInvalid,
Message: fmt.Sprintf("thread count (%s) must be larger than 1. invalid entry %s", threadCountStr, invalidEntry),
Field: field.Child("annotations").String(),
})
}
}

return causes
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"fmt"
"strings"

cmdclient "kubevirt.io/kubevirt/pkg/virt-handler/cmd-client"

"kubevirt.io/client-go/api"

. "github.com/onsi/ginkgo/v2"
Expand Down Expand Up @@ -4590,6 +4592,24 @@ var _ = Describe("Validating VMICreate Admitter", func() {
})
})
})

Context("with multi-threaded QEMU migrations", func() {
DescribeTable("should", func(threadCountStr string, isValid bool) {
meta := metav1.ObjectMeta{Annotations: map[string]string{cmdclient.MultiThreadedQemuMigrationAnnotation: threadCountStr}}
causes := ValidateVirtualMachineInstanceMetadata(k8sfield.NewPath("metadata"), &meta, config, "fake-account")

if isValid {
Expect(causes).To(BeEmpty())
} else {
Expect(causes).To(HaveLen(1))
}
},
Entry("deny if thread count is negative", "-123", false),
Entry("deny if thread count is 0", "0", false),
Entry("deny if thread count is 1", "1", false),
Entry("allow otherwise", "5", true),
)
})
})

var _ = Describe("Function getNumberOfPodInterfaces()", func() {
Expand Down
1 change: 1 addition & 0 deletions pkg/virt-handler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ go_library(
"//pkg/network/errors:go_default_library",
"//pkg/network/setup:go_default_library",
"//pkg/network/vmispec:go_default_library",
"//pkg/pointer:go_default_library",
"//pkg/safepath:go_default_library",
"//pkg/storage/reservation:go_default_library",
"//pkg/storage/types:go_default_library",
Expand Down
15 changes: 9 additions & 6 deletions pkg/virt-handler/cmd-client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,16 @@ const StandardLauncherSocketFileName = "launcher-sock"
const StandardInitLauncherSocketFileName = "launcher-init-sock"
const StandardLauncherUnresponsiveFileName = "launcher-unresponsive"

const MultiThreadedQemuMigrationAnnotation = "kubevirt.io/multiThreadedQemuMigration"

type MigrationOptions struct {
Bandwidth resource.Quantity
ProgressTimeout int64
CompletionTimeoutPerGiB int64
UnsafeMigration bool
AllowAutoConverge bool
AllowPostCopy bool
Bandwidth resource.Quantity
ProgressTimeout int64
CompletionTimeoutPerGiB int64
UnsafeMigration bool
AllowAutoConverge bool
AllowPostCopy bool
ParallelMigrationThreads *uint
}

type LauncherClient interface {
Expand Down
12 changes: 12 additions & 0 deletions pkg/virt-handler/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"strings"
"time"

"kubevirt.io/kubevirt/pkg/pointer"

"k8s.io/apimachinery/pkg/util/errors"

"kubevirt.io/kubevirt/pkg/virt-controller/watch/topology"
Expand Down Expand Up @@ -2560,6 +2562,16 @@ func (d *VirtualMachineController) vmUpdateHelperMigrationSource(origVMI *v1.Vir
AllowPostCopy: *migrationConfiguration.AllowPostCopy,
}

if threadCountStr, exists := origVMI.Annotations[cmdclient.MultiThreadedQemuMigrationAnnotation]; exists {
threadCount, err := strconv.Atoi(threadCountStr)

if err != nil {
log.Log.Object(origVMI).Reason(err).Infof("cannot parse %s to int", threadCountStr)
} else {
options.ParallelMigrationThreads = pointer.P(uint(threadCount))
}
}

marshalledOptions, err := json.Marshal(options)
if err != nil {
log.Log.Object(origVMI).Warning("failed to marshall matched migration options")
Expand Down
40 changes: 40 additions & 0 deletions pkg/virt-handler/vm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2973,6 +2973,46 @@ var _ = Describe("VirtualMachineInstance", func() {
Expect(reason).To(Equal(agentSupported))
})
})

Context("Migration options", func() {
It("multi-threaded qemu migrations", func() {
const threadCount uint = 123

vmi := api2.NewMinimalVMI("testvmi")
vmi.UID = vmiTestUUID
vmi.Status.Phase = v1.Running
vmi.Status.MigrationState = &v1.VirtualMachineInstanceMigrationState{
TargetNode: "othernode",
TargetNodeAddress: "127.0.0.1:12345",
SourceNode: host,
MigrationUID: "123",
TargetDirectMigrationNodePorts: map[string]int{"49152": 12132},
}
vmi.Status.Conditions = []v1.VirtualMachineInstanceCondition{
{
Type: v1.VirtualMachineInstanceIsMigratable,
Status: k8sv1.ConditionTrue,
},
}
vmi = addActivePods(vmi, podTestUUID, host)

mockWatchdog.CreateFile(vmi)
domain := api.NewMinimalDomainWithUUID("testvmi", vmiTestUUID)
domain.Status.Status = api.Running
domainFeeder.Add(domain)
vmiFeeder.Add(vmi)

vmi.Annotations = map[string]string{cmdclient.MultiThreadedQemuMigrationAnnotation: fmt.Sprintf("%d", threadCount)}

client.EXPECT().MigrateVirtualMachine(gomock.Any(), gomock.Any()).Do(func(_ *v1.VirtualMachineInstance, options *cmdclient.MigrationOptions) {
Expect(options.ParallelMigrationThreads).ToNot(BeNil())
Expect(*options.ParallelMigrationThreads).To(Equal(threadCount))
}).Times(1).Return(nil)

controller.Execute()
testutils.ExpectEvent(recorder, VMIMigrating)
})
})
})

var _ = Describe("DomainNotifyServerRestarts", func() {
Expand Down
38 changes: 25 additions & 13 deletions pkg/virt-launcher/virtwrap/live-migration-source.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,24 +90,27 @@ type inflightMigrationAborted struct {
abortStatus v1.MigrationAbortStatus
}

func generateMigrationFlags(isBlockMigration, isUnsafeMigration, allowAutoConverge, allowPostyCopy, migratePaused bool) libvirt.DomainMigrateFlags {
func generateMigrationFlags(isBlockMigration, migratePaused bool, options *cmdclient.MigrationOptions) libvirt.DomainMigrateFlags {
migrateFlags := libvirt.MIGRATE_LIVE | libvirt.MIGRATE_PEER2PEER | libvirt.MIGRATE_PERSIST_DEST

if isBlockMigration {
migrateFlags |= libvirt.MIGRATE_NON_SHARED_INC
}
if isUnsafeMigration {
if options.UnsafeMigration {
migrateFlags |= libvirt.MIGRATE_UNSAFE
}
if allowAutoConverge {
if options.AllowAutoConverge {
migrateFlags |= libvirt.MIGRATE_AUTO_CONVERGE
}
if allowPostyCopy {
if options.AllowPostCopy {
migrateFlags |= libvirt.MIGRATE_POSTCOPY
}
if migratePaused {
migrateFlags |= libvirt.MIGRATE_PAUSED
}
if options.ParallelMigrationThreads != nil {
migrateFlags |= libvirt.MIGRATE_PARALLEL
}

return migrateFlags

Expand Down Expand Up @@ -813,17 +816,26 @@ func generateMigrationParams(dom cli.VirDomain, vmi *v1.VirtualMachineInstance,
return nil, err
}

parallelMigrationSet := false
var parallelMigrationThreads int
if options.ParallelMigrationThreads != nil {
parallelMigrationSet = true
parallelMigrationThreads = int(*options.ParallelMigrationThreads)
}

key := migrationproxy.ConstructProxyKey(string(vmi.UID), migrationproxy.LibvirtDirectMigrationPort)
migrURI := fmt.Sprintf("unix://%s", migrationproxy.SourceUnixFile(virtShareDir, key))
params := &libvirt.DomainMigrateParameters{
URI: migrURI,
URISet: true,
Bandwidth: bandwidth, // MiB/s
BandwidthSet: bandwidth > 0,
DestXML: xmlstr,
DestXMLSet: true,
PersistXML: xmlstr,
PersistXMLSet: true,
URI: migrURI,
URISet: true,
Bandwidth: bandwidth, // MiB/s
BandwidthSet: bandwidth > 0,
DestXML: xmlstr,
DestXMLSet: true,
PersistXML: xmlstr,
PersistXMLSet: true,
ParallelConnectionsSet: parallelMigrationSet,
ParallelConnections: parallelMigrationThreads,
}

copyDisks := getDiskTargetsForMigration(dom, vmi)
Expand Down Expand Up @@ -857,7 +869,7 @@ func (l *LibvirtDomainManager) migrateHelper(vmi *v1.VirtualMachineInstance, opt
if err != nil {
return fmt.Errorf("failed to retrive domain state")
}
migrateFlags := generateMigrationFlags(isBlockMigration(vmi), options.UnsafeMigration, options.AllowAutoConverge, options.AllowPostCopy, migratePaused)
migrateFlags := generateMigrationFlags(isBlockMigration(vmi), migratePaused, options)

// anything that modifies the domain needs to be performed with the domainModifyLock held
// The domain params and unHotplug need to be performed in a critical section together.
Expand Down
26 changes: 20 additions & 6 deletions pkg/virt-launcher/virtwrap/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1517,20 +1517,30 @@ var _ = Describe("Manager", func() {
DescribeTable("check migration flags",
func(migrationType string) {
isBlockMigration := migrationType == "block"
isUnsafeMigration := migrationType == "unsafe"
allowAutoConverge := migrationType == "autoConverge"
migrationMode := migrationType == "postCopy"
isVmiPaused := migrationType == "paused"

flags := generateMigrationFlags(isBlockMigration, isUnsafeMigration, allowAutoConverge, migrationMode, isVmiPaused)
var parallelMigrationThreads *uint = nil
if migrationType == "parallel" {
var fakeNumberOfThreads uint = 123
parallelMigrationThreads = &fakeNumberOfThreads
}

options := &cmdclient.MigrationOptions{
UnsafeMigration: migrationType == "unsafe",
AllowAutoConverge: migrationType == "autoConverge",
AllowPostCopy: migrationType == "postCopy",
ParallelMigrationThreads: parallelMigrationThreads,
}

flags := generateMigrationFlags(isBlockMigration, isVmiPaused, options)
expectedMigrateFlags := libvirt.MIGRATE_LIVE | libvirt.MIGRATE_PEER2PEER | libvirt.MIGRATE_PERSIST_DEST

if isBlockMigration {
expectedMigrateFlags |= libvirt.MIGRATE_NON_SHARED_INC
} else if migrationType == "unsafe" {
expectedMigrateFlags |= libvirt.MIGRATE_UNSAFE
}
if allowAutoConverge {
if options.AllowAutoConverge {
expectedMigrateFlags |= libvirt.MIGRATE_AUTO_CONVERGE
}
if migrationType == "postCopy" {
Expand All @@ -1539,14 +1549,18 @@ var _ = Describe("Manager", func() {
if migrationType == "paused" {
expectedMigrateFlags |= libvirt.MIGRATE_PAUSED
}
Expect(flags).To(Equal(expectedMigrateFlags))
if migrationType == "parallel" {
expectedMigrateFlags |= libvirt.MIGRATE_PARALLEL
}
Expect(flags).To(Equal(expectedMigrateFlags), "libvirt migration flags are not set as expected")
},
Entry("with block migration", "block"),
Entry("without block migration", "live"),
Entry("unsafe migration", "unsafe"),
Entry("migration auto converge", "autoConverge"),
Entry("migration using postcopy", "postCopy"),
Entry("migration of paused vmi", "paused"),
Entry("migration with parallel threads", "parallel"),
)

DescribeTable("on successful list all domains",
Expand Down
Loading

0 comments on commit 9c8b40b

Please sign in to comment.