Skip to content

Commit

Permalink
Remove watchdog informer and wrap watchdog logic entirely into domain…
Browse files Browse the repository at this point in the history
… informer

Signed-off-by: David Vossel <[email protected]>
  • Loading branch information
davidvossel committed Jan 31, 2018
1 parent 84163cd commit 9de6be5
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 264 deletions.
13 changes: 1 addition & 12 deletions cmd/virt-handler/virt-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import (
virtcache "kubevirt.io/kubevirt/pkg/virt-handler/cache"
virtlauncher "kubevirt.io/kubevirt/pkg/virt-launcher"
virt_api "kubevirt.io/kubevirt/pkg/virt-launcher/virtwrap/api"
watchdog "kubevirt.io/kubevirt/pkg/watchdog"
)

const (
Expand Down Expand Up @@ -105,7 +104,7 @@ func (app *virtHandlerApp) Run() {
// Wire VM controller

// Wire Domain controller
domainSharedInformer, err := virtcache.NewSharedInformer(app.VirtShareDir)
domainSharedInformer, err := virtcache.NewSharedInformer(app.VirtShareDir, int(app.WatchdogTimeoutDuration.Seconds()))
if err != nil {
panic(err)
}
Expand All @@ -119,14 +118,6 @@ func (app *virtHandlerApp) Run() {

virtlauncher.InitializeSharedDirectories(app.VirtShareDir)

watchdogInformer := cache.NewSharedIndexInformer(
watchdog.NewWatchdogListWatchFromClient(
app.VirtShareDir,
int(app.WatchdogTimeoutDuration.Seconds())),
&virt_api.Domain{},
0,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})

gracefulShutdownInformer := cache.NewSharedIndexInformer(
inotifyinformer.NewFileListWatchFromClient(
virtlauncher.GracefulShutdownTriggerDir(app.VirtShareDir)),
Expand All @@ -139,10 +130,8 @@ func (app *virtHandlerApp) Run() {
virtCli,
app.HostOverride,
app.VirtShareDir,
int(app.WatchdogTimeoutDuration.Seconds()),
vmSharedInformer,
domainSharedInformer,
watchdogInformer,
gracefulShutdownInformer,
)

Expand Down
42 changes: 33 additions & 9 deletions pkg/virt-handler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package cache

import (
"sync"
"time"

k8sv1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -31,12 +32,14 @@ import (
"kubevirt.io/kubevirt/pkg/virt-launcher/virtwrap/api"
cmdclient "kubevirt.io/kubevirt/pkg/virt-launcher/virtwrap/cmd-server/client"
notifyserver "kubevirt.io/kubevirt/pkg/virt-launcher/virtwrap/notify-server"
"kubevirt.io/kubevirt/pkg/watchdog"
)

func newListWatchFromNotify(virtShareDir string) cache.ListerWatcher {
func newListWatchFromNotify(virtShareDir string, watchdogTimeout int) cache.ListerWatcher {
d := &DomainWatcher{
backgroundWatcherStarted: false,
virtShareDir: virtShareDir,
watchdogTimeout: watchdogTimeout,
}

return d
Expand All @@ -49,6 +52,7 @@ type DomainWatcher struct {
eventChan chan watch.Event
backgroundWatcherStarted bool
virtShareDir string
watchdogTimeout int
}

func (d *DomainWatcher) startBackground() error {
Expand All @@ -66,18 +70,25 @@ func (d *DomainWatcher) startBackground() error {
go func() {
defer d.wg.Done()

expiredWatchdogTicker := time.NewTicker(time.Duration(d.watchdogTimeout) * time.Second).C
srvErr := make(chan error)
go func() {
defer close(srvErr)
err := notifyserver.RunServer(d.virtShareDir, d.stopChan, d.eventChan)
srvErr <- err
}()

// wait for server to exit.
select {
case err := <-srvErr:
if err != nil {
log.Log.Reason(err).Errorf("Unexpeted err encountered with Domain Notify aggregation server")
for {
select {
case <-expiredWatchdogTicker:
d.handleStaleWatchdogFiles()
case err := <-srvErr:
if err != nil {
log.Log.Reason(err).Errorf("Unexpected err encountered with Domain Notify aggregation server")
}

// server exitted so this goroutine is done.
return
}
}
}()
Expand All @@ -86,6 +97,19 @@ func (d *DomainWatcher) startBackground() error {
return nil
}

func (d *DomainWatcher) handleStaleWatchdogFiles() error {
domains, err := watchdog.GetExpiredDomains(d.watchdogTimeout, d.virtShareDir)
if err != nil {
log.Log.Reason(err).Error("failed to detect expired watchdog files in domain informer")
return err
}

for _, domain := range domains {
d.eventChan <- watch.Event{Type: watch.Deleted, Object: domain}
}
return nil
}

func (d *DomainWatcher) listAllKnownDomains() ([]*api.Domain, error) {
var domains []*api.Domain

Expand Down Expand Up @@ -156,18 +180,18 @@ func (d *DomainWatcher) Stop() {
if d.backgroundWatcherStarted == false {
return
}
close(d.eventChan)
close(d.stopChan)
d.wg.Wait()
d.backgroundWatcherStarted = false
close(d.eventChan)
}

func (d *DomainWatcher) ResultChan() <-chan watch.Event {
return d.eventChan
}

func NewSharedInformer(virtShareDir string) (cache.SharedInformer, error) {
lw := newListWatchFromNotify(virtShareDir)
func NewSharedInformer(virtShareDir string, watchdogTimeout int) (cache.SharedInformer, error) {
lw := newListWatchFromNotify(virtShareDir, watchdogTimeout)
informer := cache.NewSharedInformer(lw, &api.Domain{}, 0)
return informer, nil
}
38 changes: 37 additions & 1 deletion pkg/virt-handler/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"os"
"path/filepath"
"reflect"
"time"

"github.com/golang/mock/gomock"
. "github.com/onsi/ginkgo"
Expand All @@ -38,6 +39,7 @@ import (
cmdserver "kubevirt.io/kubevirt/pkg/virt-launcher/virtwrap/cmd-server"
cmdclient "kubevirt.io/kubevirt/pkg/virt-launcher/virtwrap/cmd-server/client"
notifyclient "kubevirt.io/kubevirt/pkg/virt-launcher/virtwrap/notify-server/client"
"kubevirt.io/kubevirt/pkg/watchdog"
)

var _ = Describe("Domain informer", func() {
Expand All @@ -58,7 +60,7 @@ var _ = Describe("Domain informer", func() {
socketsDir = filepath.Join(shareDir, "sockets")
os.Mkdir(socketsDir, 0755)

informer, err = NewSharedInformer(shareDir)
informer, err = NewSharedInformer(shareDir, 10)
Expect(err).ToNot(HaveOccurred())

ctrl = gomock.NewController(GinkgoT())
Expand Down Expand Up @@ -134,6 +136,40 @@ var _ = Describe("Domain informer", func() {

verifyObj("default/test", domain)
})

It("should detect expired watchdog file.", func() {
socketPath := filepath.Join(socketsDir, "default_test_sock")
f, err := os.Create(socketPath)
Expect(err).ToNot(HaveOccurred())
f.Close()

d := &DomainWatcher{
backgroundWatcherStarted: false,
virtShareDir: shareDir,
watchdogTimeout: 1,
}

watchdogFile := watchdog.WatchdogFileFromNamespaceName(shareDir, "default", "test")
os.MkdirAll(filepath.Dir(watchdogFile), 0755)
watchdog.WatchdogFileUpdate(watchdogFile)

err = d.startBackground()
Expect(err).ToNot(HaveOccurred())
defer d.Stop()

timedOut := false
timeout := time.After(3 * time.Second)
select {
case event := <-d.eventChan:
Expect(event.Type).To(Equal(watch.Deleted))
case <-timeout:
timedOut = true
}

Expect(timedOut).To(Equal(false))

}, 5)

It("should not return errors when encountering disconnected clients at startup.", func() {
var list []*api.Domain

Expand Down
36 changes: 2 additions & 34 deletions pkg/virt-handler/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,8 @@ func NewController(
clientset kubecli.KubevirtClient,
host string,
virtShareDir string,
watchdogTimeoutSeconds int,
vmInformer cache.SharedIndexInformer,
domainInformer cache.SharedInformer,
watchdogInformer cache.SharedIndexInformer,
gracefulShutdownInformer cache.SharedIndexInformer,
) *VirtualMachineController {

Expand All @@ -71,10 +69,8 @@ func NewController(
clientset: clientset,
host: host,
virtShareDir: virtShareDir,
watchdogTimeoutSeconds: watchdogTimeoutSeconds,
vmInformer: vmInformer,
domainInformer: domainInformer,
watchdogInformer: watchdogInformer,
gracefulShutdownInformer: gracefulShutdownInformer,
}

Expand All @@ -90,12 +86,6 @@ func NewController(
UpdateFunc: c.updateDomainFunc,
})

watchdogInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.addFunc,
DeleteFunc: c.deleteFunc,
UpdateFunc: c.updateFunc,
})

gracefulShutdownInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.addFunc,
DeleteFunc: c.deleteFunc,
Expand All @@ -112,11 +102,9 @@ type VirtualMachineController struct {
clientset kubecli.KubevirtClient
host string
virtShareDir string
watchdogTimeoutSeconds int
Queue workqueue.RateLimitingInterface
vmInformer cache.SharedIndexInformer
domainInformer cache.SharedInformer
watchdogInformer cache.SharedIndexInformer
gracefulShutdownInformer cache.SharedIndexInformer
launcherClients map[string]cmdclient.LauncherClient
launcherClientLock sync.Mutex
Expand Down Expand Up @@ -247,9 +235,8 @@ func (c *VirtualMachineController) Run(threadiness int, stopCh chan struct{}) {
}

go c.vmInformer.Run(stopCh)
go c.watchdogInformer.Run(stopCh)
go c.gracefulShutdownInformer.Run(stopCh)
cache.WaitForCacheSync(stopCh, c.domainInformer.HasSynced, c.vmInformer.HasSynced, c.watchdogInformer.HasSynced, c.gracefulShutdownInformer.HasSynced)
cache.WaitForCacheSync(stopCh, c.domainInformer.HasSynced, c.vmInformer.HasSynced, c.gracefulShutdownInformer.HasSynced)

// Start the actual work
for i := 0; i < threadiness; i++ {
Expand Down Expand Up @@ -337,25 +324,6 @@ func (d *VirtualMachineController) execute(key string) error {
return err
}

// Determine if VM's watchdog has expired
watchdogExpired, err := watchdog.WatchdogFileIsExpired(d.watchdogTimeoutSeconds, d.virtShareDir, vm)
if err != nil {
return err
} else if watchdogExpired && vm.IsRunning() {
log.Log.Object(vm).Info("Shutting down due to expired watchdog.")
shouldShutdownAndDelete = true
if domainExists {
// Virt-launcher provids domain state. If virt-launcher
// is down, the domain needs to be deleted from the cache.
err = d.domainInformer.GetStore().Delete(domain)
if err != nil {
return err
}
}
domainExists = false
domain = nil
}

// Determine if gracefulShutdown has been triggered by virt-launcher
gracefulShutdown, err := virtlauncher.VmHasGracefulShutdownTrigger(d.virtShareDir, vm)
if err != nil {
Expand Down Expand Up @@ -418,7 +386,7 @@ func (d *VirtualMachineController) execute(key string) error {
var syncErr error

// Process the VM update in this order.
// * Shutdown and Deletion due to VM deletion, process stopping, graceful shutdown trigger, expired watchdog, etc...
// * Shutdown and Deletion due to VM deletion, process stopping, graceful shutdown trigger, etc...
// * Cleanup of already shutdown and Deleted VMs
// * Update due to spec change and initial start flow.
if shouldShutdownAndDelete {
Expand Down
8 changes: 1 addition & 7 deletions pkg/virt-handler/vm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ var _ = Describe("VM", func() {
var vmInformer cache.SharedIndexInformer
var domainSource *framework.FakeControllerSource
var domainInformer cache.SharedIndexInformer
var watchdogSource *framework.FakeControllerSource
var watchdogInformer cache.SharedIndexInformer
var gracefulShutdownSource *framework.FakeControllerSource
var gracefulShutdownInformer cache.SharedIndexInformer
var mockQueue *testutils.MockWorkQueue
Expand Down Expand Up @@ -90,7 +88,6 @@ var _ = Describe("VM", func() {

vmInformer, vmSource = testutils.NewFakeInformerFor(&v1.VirtualMachine{})
domainInformer, domainSource = testutils.NewFakeInformerFor(&api.Domain{})
watchdogInformer, watchdogSource = testutils.NewFakeInformerFor(&api.Domain{})
gracefulShutdownInformer, gracefulShutdownSource = testutils.NewFakeInformerFor(&api.Domain{})
recorder = record.NewFakeRecorder(100)

Expand All @@ -106,10 +103,8 @@ var _ = Describe("VM", func() {
virtClient,
host,
shareDir,
10,
vmInformer,
domainInformer,
watchdogInformer,
gracefulShutdownInformer)

client = cmdclient.NewMockLauncherClient(ctrl)
Expand All @@ -124,9 +119,8 @@ var _ = Describe("VM", func() {

go vmInformer.Run(stop)
go domainInformer.Run(stop)
go watchdogInformer.Run(stop)
go gracefulShutdownInformer.Run(stop)
Expect(cache.WaitForCacheSync(stop, vmInformer.HasSynced, domainInformer.HasSynced, watchdogInformer.HasSynced, gracefulShutdownInformer.HasSynced)).To(BeTrue())
Expect(cache.WaitForCacheSync(stop, vmInformer.HasSynced, domainInformer.HasSynced, gracefulShutdownInformer.HasSynced)).To(BeTrue())
})

AfterEach(func() {
Expand Down
14 changes: 14 additions & 0 deletions pkg/virt-launcher/virtwrap/cmd-server/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ package cmdclient

import (
"encoding/json"
goerror "errors"
"fmt"
"io"
"io/ioutil"
"net/rpc"
"path/filepath"
"strings"

k8sv1 "k8s.io/api/core/v1"

Expand Down Expand Up @@ -103,6 +105,18 @@ func SocketFromNamespaceName(baseDir string, namespace string, name string) stri
return filepath.Join(SocketsDirectory(baseDir), sockFile)
}

func DomainFromSocketPath(socketPath string) (*api.Domain, error) {
splitName := strings.SplitN(filepath.Base(socketPath), "_", 3)
if len(splitName) != 3 {
return nil, goerror.New(fmt.Sprintf("malformed domain socket %s", socketPath))
}
namespace := splitName[0]
name := splitName[1]
domain := api.NewDomainReferenceFromName(namespace, name)

return domain, nil
}

func GetClient(socketPath string) (LauncherClient, error) {
conn, err := rpc.Dial("unix", socketPath)
if err != nil {
Expand Down
Loading

0 comments on commit 9de6be5

Please sign in to comment.