Skip to content

Commit

Permalink
Virt-handler watchdog integration
Browse files Browse the repository at this point in the history
  • Loading branch information
davidvossel committed Oct 19, 2017
1 parent 8c75cc9 commit 34bafd0
Show file tree
Hide file tree
Showing 9 changed files with 615 additions and 41 deletions.
55 changes: 42 additions & 13 deletions cmd/virt-handler/virt-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes/scheme"
k8coresv1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"

"kubevirt.io/kubevirt/pkg/api/v1"
Expand All @@ -52,17 +53,19 @@ import (
virtcache "kubevirt.io/kubevirt/pkg/virt-handler/virtwrap/cache"
virtcli "kubevirt.io/kubevirt/pkg/virt-handler/virtwrap/cli"
"kubevirt.io/kubevirt/pkg/virt-handler/virtwrap/isolation"
watchdog "kubevirt.io/kubevirt/pkg/watchdog"
)

type virtHandlerApp struct {
Service *service.Service
HostOverride string
LibvirtUri string
VirtShareDir string
EphemeralDiskDir string
Service *service.Service
HostOverride string
LibvirtUri string
VirtShareDir string
EphemeralDiskDir string
WatchdogTimeoutDuration time.Duration
}

func newVirtHandlerApp(host *string, port *int, hostOverride *string, libvirtUri *string, virtShareDir *string, ephemeralDiskDir *string) *virtHandlerApp {
func newVirtHandlerApp(host *string, port *int, hostOverride *string, libvirtUri *string, virtShareDir *string, ephemeralDiskDir *string, watchdogTimeoutDuration *time.Duration) *virtHandlerApp {
if *hostOverride == "" {
defaultHostName, err := os.Hostname()
if err != nil {
Expand All @@ -72,11 +75,12 @@ func newVirtHandlerApp(host *string, port *int, hostOverride *string, libvirtUri
}

return &virtHandlerApp{
Service: service.NewService("virt-handler", host, port),
HostOverride: *hostOverride,
LibvirtUri: *libvirtUri,
VirtShareDir: *virtShareDir,
EphemeralDiskDir: *ephemeralDiskDir,
Service: service.NewService("virt-handler", host, port),
HostOverride: *hostOverride,
LibvirtUri: *libvirtUri,
VirtShareDir: *virtShareDir,
EphemeralDiskDir: *ephemeralDiskDir,
WatchdogTimeoutDuration: *watchdogTimeoutDuration,
}
}

Expand Down Expand Up @@ -135,7 +139,16 @@ func (app *virtHandlerApp) Run() {

// Wire VM controller
vmListWatcher := controller.NewListWatchFromClient(virtCli.RestClient(), "virtualmachines", k8sv1.NamespaceAll, fields.Everything(), l)
vmStore, vmQueue, vmController := virthandler.NewVMController(vmListWatcher, domainManager, recorder, *virtCli.RestClient(), virtCli, app.HostOverride, configDiskClient)
vmStore, vmQueue, vmController := virthandler.NewVMController(
vmListWatcher,
domainManager,
recorder,
*virtCli.RestClient(),
virtCli,
app.HostOverride,
configDiskClient,
app.VirtShareDir,
int(app.WatchdogTimeoutDuration.Seconds()))

// Wire Domain controller
domainSharedInformer, err := virtcache.NewSharedInformer(domainConn)
Expand All @@ -148,6 +161,16 @@ func (app *virtHandlerApp) Run() {
panic(err)
}

watchdogInformer := cache.NewSharedIndexInformer(
watchdog.NewWatchdogListWatchFromClient(
app.VirtShareDir,
int(app.WatchdogTimeoutDuration.Seconds())),
&virt_api.Domain{},
0,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})

watchdogInformer.AddEventHandler(controller.NewResourceEventHandlerFuncsForWorkqueue(vmQueue))

// Bootstrapping. From here on the startup order matters
stop := make(chan struct{})
defer close(stop)
Expand Down Expand Up @@ -176,6 +199,9 @@ func (app *virtHandlerApp) Run() {
panic(err)
}

go watchdogInformer.Run(stop)
cache.WaitForCacheSync(stop, watchdogInformer.HasSynced)

go domainController.Run(3, stop)
go vmController.Run(3, stop)

Expand All @@ -193,6 +219,8 @@ func (app *virtHandlerApp) Run() {
}

func main() {
defaultExpires := 30 * time.Second

logging.InitializeLogging("virt-handler")
libvirt.EventRegisterDefaultImpl()
libvirtUri := flag.String("libvirt-uri", "qemu:///system", "Libvirt connection string.")
Expand All @@ -201,9 +229,10 @@ func main() {
hostOverride := flag.String("hostname-override", "", "Kubernetes Pod to monitor for changes")
virtShareDir := flag.String("kubevirt-share-dir", "/var/run/kubevirt", "Shared directory between virt-handler and virt-launcher")
ephemeralDiskDir := flag.String("ephemeral-disk-dir", "/var/run/libvirt/kubevirt-ephemeral-disk", "Base directory for ephemeral disk data")
watchdogTimeoutDuration := flag.Duration("watchdog-timeout", defaultExpires, "Watchdog file timeout.")
pflag.CommandLine.AddGoFlagSet(flag.CommandLine)
pflag.Parse()

app := newVirtHandlerApp(host, port, hostOverride, libvirtUri, virtShareDir, ephemeralDiskDir)
app := newVirtHandlerApp(host, port, hostOverride, libvirtUri, virtShareDir, ephemeralDiskDir, watchdogTimeoutDuration)
app.Run()
}
15 changes: 8 additions & 7 deletions cmd/virt-launcher/virt-launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"kubevirt.io/kubevirt/pkg/logging"
"kubevirt.io/kubevirt/pkg/virt-handler/virtwrap/isolation"
virtlauncher "kubevirt.io/kubevirt/pkg/virt-launcher"
watchdog "kubevirt.io/kubevirt/pkg/watchdog"
)

func markReady(readinessFile string) {
Expand All @@ -40,7 +41,7 @@ func markReady(readinessFile string) {
panic(err)
}
f.Close()
log.Printf("Marked as ready")
log.Printf("Marked as ready\n")
}

func createSocket(virtShareDir string, namespace string, name string) net.Listener {
Expand All @@ -65,7 +66,7 @@ func createSocket(virtShareDir string, namespace string, name string) net.Listen

func main() {
startTimeout := 0 * time.Second
defaultInterval := 15 * time.Second
defaultInterval := 10 * time.Second

logging.InitializeLogging("virt-launcher")
qemuTimeout := flag.Duration("qemu-timeout", startTimeout, "Amount of time to wait for qemu")
Expand All @@ -86,12 +87,13 @@ func main() {
panic(err)
}

watchdogFile := virtlauncher.WatchdogFileFromNamespaceName(*virtShareDir, *namespace, *name)
f, err := os.Create(watchdogFile)
watchdogFile := watchdog.WatchdogFileFromNamespaceName(*virtShareDir, *namespace, *name)
err = watchdog.WatchdogFileUpdate(watchdogFile)
if err != nil {
panic(err)
}
f.Close()

log.Printf("Watchdog file created at %s\n", watchdogFile)

stopChan := make(chan struct{})
defer close(stopChan)
Expand All @@ -103,11 +105,10 @@ func main() {
case <-stopChan:
return
case <-ticker:
f, err := os.Create(watchdogFile)
err := watchdog.WatchdogFileUpdate(watchdogFile)
if err != nil {
panic(err)
}
f.Close()
}
}
}()
Expand Down
5 changes: 5 additions & 0 deletions images/libvirt-kubevirt/qemu-kube
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ set -e
exec sudo -C 10000 bash -s << END
function _term() {
rm -f $PIDFILE
pkill -P \$! --signal SIG\$1
}
Expand All @@ -70,4 +71,8 @@ exec sudo -C 10000 bash -s << END
echo "\$pid" > $PIDFILE
echo "LAUNCHED PID \$pid" >> $LOG
wait
res=\$?
rm -f $PIDFILE
exit \$res
END
76 changes: 61 additions & 15 deletions pkg/virt-handler/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
registrydisk "kubevirt.io/kubevirt/pkg/registry-disk"
"kubevirt.io/kubevirt/pkg/virt-handler/virtwrap"
"kubevirt.io/kubevirt/pkg/virt-handler/virtwrap/api"
watchdog "kubevirt.io/kubevirt/pkg/watchdog"
)

func NewVMController(lw cache.ListerWatcher,
Expand All @@ -53,10 +54,19 @@ func NewVMController(lw cache.ListerWatcher,
restClient rest.RESTClient,
clientset kubecli.KubevirtClient,
host string,
configDiskClient configdisk.ConfigDiskClient) (cache.Store, workqueue.RateLimitingInterface, *controller.Controller) {
configDiskClient configdisk.ConfigDiskClient,
virtShareDir string,
watchdogTimeoutSeconds int) (cache.Store, workqueue.RateLimitingInterface, *controller.Controller) {
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

dispatch := NewVMHandlerDispatch(domainManager, recorder, &restClient, clientset, host, configDiskClient)
dispatch := NewVMHandlerDispatch(domainManager,
recorder,
&restClient,
clientset,
host,
configDiskClient,
virtShareDir,
watchdogTimeoutSeconds)

indexer, informer := controller.NewController(lw, queue, &v1.VirtualMachine{}, dispatch)
return indexer, queue, informer
Expand All @@ -67,24 +77,30 @@ func NewVMHandlerDispatch(domainManager virtwrap.DomainManager,
restClient *rest.RESTClient,
clientset kubecli.KubevirtClient,
host string,
configDiskClient configdisk.ConfigDiskClient) controller.ControllerDispatch {
configDiskClient configdisk.ConfigDiskClient,
virtShareDir string,
watchdogTimeoutSeconds int) controller.ControllerDispatch {
return &VMHandlerDispatch{
domainManager: domainManager,
recorder: recorder,
restClient: *restClient,
clientset: clientset,
host: host,
configDisk: configDiskClient,
domainManager: domainManager,
recorder: recorder,
restClient: *restClient,
clientset: clientset,
host: host,
configDisk: configDiskClient,
virtShareDir: virtShareDir,
watchdogTimeoutSeconds: watchdogTimeoutSeconds,
}
}

type VMHandlerDispatch struct {
domainManager virtwrap.DomainManager
recorder record.EventRecorder
restClient rest.RESTClient
clientset kubecli.KubevirtClient
host string
configDisk configdisk.ConfigDiskClient
domainManager virtwrap.DomainManager
recorder record.EventRecorder
restClient rest.RESTClient
clientset kubecli.KubevirtClient
host string
configDisk configdisk.ConfigDiskClient
virtShareDir string
watchdogTimeoutSeconds int
}

func (d *VMHandlerDispatch) getVMNodeAddress(vm *v1.VirtualMachine) (string, error) {
Expand Down Expand Up @@ -194,6 +210,21 @@ func (d *VMHandlerDispatch) Execute(store cache.Store, queue workqueue.RateLimit
// The VM is deleted on the cluster, continue with processing the deletion on the host.
shouldDeleteVm = true
}

watchdogExpired, _ := watchdog.WatchdogFileIsExpired(d.watchdogTimeoutSeconds, d.virtShareDir, vm)
if watchdogExpired {
if vm.IsRunning() {
logging.DefaultLogger().V(2).Info().Object(vm).Msg("Detected expired watchdog file for running VM.")
shouldDeleteVm = true
} else if vm.IsFinal() {
err = watchdog.WatchdogFileRemove(d.virtShareDir, vm)
if err != nil {
queue.AddRateLimited(key)
return
}
}
}

logging.DefaultLogger().V(3).Info().Object(vm).Msg("Processing VM update.")

// Process the VM
Expand Down Expand Up @@ -397,12 +428,27 @@ func (d *VMHandlerDispatch) processVmUpdate(vm *v1.VirtualMachine, shouldDeleteV
return false, err
}

err = watchdog.WatchdogFileRemove(d.virtShareDir, vm)
if err != nil {
return false, err
}

return false, d.configDisk.Undefine(vm)
} else if isWorthSyncing(vm) == false {
// nothing to do here.
return false, nil
}

hasWatchdog, err := watchdog.WatchdogFileExists(d.virtShareDir, vm)
if err != nil {
logging.DefaultLogger().Object(vm).Error().Reason(err).V(3).Msgf("Error accessing virt-launcher watchdog file.")
return false, err
}
if hasWatchdog == false {
logging.DefaultLogger().Object(vm).Error().Reason(err).V(3).Msgf("Could not detect virt-launcher watchdog file.")
return false, goerror.New(fmt.Sprintf("No watchdog file found for vm"))
}

isPending, err := d.configDisk.Define(vm)
if err != nil || isPending == true {
return isPending, err
Expand Down
10 changes: 9 additions & 1 deletion pkg/virt-handler/vm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
package virthandler

import (
"io/ioutil"
"net/http"
"os"
"testing"

"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -59,9 +61,14 @@ var _ = Describe("VM", func() {

var recorder record.EventRecorder

var shareDir string

logging.DefaultLogger().SetIOWriter(GinkgoWriter)

BeforeEach(func() {
shareDir, err := ioutil.TempDir("", "kubevirt-share")
Expect(err).ToNot(HaveOccurred())

server = ghttp.NewServer()
host := ""

Expand All @@ -79,7 +86,7 @@ var _ = Describe("VM", func() {
configDiskClient := configdisk.NewConfigDiskClient(virtClient)

recorder = record.NewFakeRecorder(100)
dispatch = NewVMHandlerDispatch(domainManager, recorder, restClient, virtClient, host, configDiskClient)
dispatch = NewVMHandlerDispatch(domainManager, recorder, restClient, virtClient, host, configDiskClient, shareDir, 1)

})

Expand Down Expand Up @@ -136,6 +143,7 @@ var _ = Describe("VM", func() {
AfterEach(func() {
server.Close()
ctrl.Finish()
os.RemoveAll(shareDir)
})
})

Expand Down
7 changes: 2 additions & 5 deletions pkg/virt-launcher/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"time"

diskutils "kubevirt.io/kubevirt/pkg/ephemeral-disk-utils"
watchdog "kubevirt.io/kubevirt/pkg/watchdog"
)

type monitor struct {
Expand All @@ -55,11 +56,7 @@ func InitializeSharedDirectories(baseDir string) error {
return err
}

return os.MkdirAll(baseDir+"/watchdog-files", 0755)
}

func WatchdogFileFromNamespaceName(baseDir string, namespace string, name string) string {
return filepath.Clean(baseDir) + "/watchdog-files/" + namespace + "_" + name
return os.MkdirAll(watchdog.WatchdogFileDirectory(baseDir), 0755)
}

func QemuPidfileFromNamespaceName(baseDir string, namespace string, name string) string {
Expand Down
Loading

0 comments on commit 34bafd0

Please sign in to comment.