Skip to content

Commit

Permalink
Replace watchdog logic with command client unresponsive checks
Browse files Browse the repository at this point in the history
Signed-off-by: David Vossel <[email protected]>
  • Loading branch information
davidvossel committed Apr 13, 2020
1 parent d328a9b commit 54bdc7a
Show file tree
Hide file tree
Showing 9 changed files with 314 additions and 30 deletions.
2 changes: 1 addition & 1 deletion cmd/virt-handler/virt-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ import (
)

const (
defaultWatchdogTimeout = 15 * time.Second
defaultWatchdogTimeout = 30 * time.Second

// Default port that virt-handler listens on.
defaultPort = 8185
Expand Down
7 changes: 7 additions & 0 deletions cmd/virt-launcher/virt-launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,13 @@ func main() {
socketPath := cmdclient.SocketFromUID(*virtShareDir, *uid, false)
cmdServerDone := startCmdServer(socketPath, domainManager, stopChan, options)

// Set socket info file after starting server
err = cmdclient.SetSocketInfo(socketPath, *uid, *name, *namespace)
if err != nil {
log.Log.Reason(err).Errorf("Unable to write server info file.")
panic(err)
}

gracefulShutdownTriggerFile := virtlauncher.GracefulShutdownTriggerFromNamespaceName(*virtShareDir,
*namespace,
*name)
Expand Down
1 change: 1 addition & 0 deletions pkg/virt-handler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ go_test(
embed = [":go_default_library"],
deps = [
"//pkg/certificates:go_default_library",
"//pkg/ephemeral-disk-utils:go_default_library",
"//pkg/handler-launcher-com/cmd/v1:go_default_library",
"//pkg/testutils:go_default_library",
"//pkg/virt-config:go_default_library",
Expand Down
1 change: 1 addition & 0 deletions pkg/virt-handler/cache/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
"//staging/src/kubevirt.io/client-go/log:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
Expand Down
95 changes: 94 additions & 1 deletion pkg/virt-handler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@
package cache

import (
"net"
"os"
"sync"
"time"

"k8s.io/client-go/tools/record"

k8sv1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"

Expand All @@ -44,6 +47,7 @@ func newListWatchFromNotify(virtShareDir string, watchdogTimeout int, recorder r
watchdogTimeout: watchdogTimeout,
recorder: recorder,
vmiStore: vmiStore,
unresponsiveSockets: make(map[string]int64),
}

return d
Expand All @@ -59,6 +63,9 @@ type DomainWatcher struct {
watchdogTimeout int
recorder record.EventRecorder
vmiStore cache.Store

watchDogLock sync.Mutex
unresponsiveSockets map[string]int64
}

func (d *DomainWatcher) startBackground() error {
Expand All @@ -76,7 +83,10 @@ func (d *DomainWatcher) startBackground() error {
go func() {
defer d.wg.Done()

expiredWatchdogTicker := time.NewTicker(time.Duration(d.watchdogTimeout) * time.Second).C
// Divide the watchdogTimeout by 3 for our ticker.
// This ensures we always have at least 2 response failures
// in a row before we mark the socket as unavailable (which results in shutdown of VMI)
expiredWatchdogTicker := time.NewTicker(time.Duration((d.watchdogTimeout/3)+1) * time.Second).C
srvErr := make(chan error)
go func() {
defer close(srvErr)
Expand All @@ -88,6 +98,7 @@ func (d *DomainWatcher) startBackground() error {
select {
case <-expiredWatchdogTicker:
d.handleStaleWatchdogFiles()
d.handleStaleSocketConnections()
case err := <-srvErr:
if err != nil {
log.Log.Reason(err).Errorf("Unexpected err encountered with Domain Notify aggregation server")
Expand All @@ -103,6 +114,8 @@ func (d *DomainWatcher) startBackground() error {
return nil
}

// TODO remove watchdog file usage eventually and only rely on detecting stale socket connections
// for now we have to keep watchdog files around for backwards compatiblity with old VMIs
func (d *DomainWatcher) handleStaleWatchdogFiles() error {
domains, err := watchdog.GetExpiredDomains(d.watchdogTimeout, d.virtShareDir)
if err != nil {
Expand All @@ -117,6 +130,86 @@ func (d *DomainWatcher) handleStaleWatchdogFiles() error {
return nil
}

func (d *DomainWatcher) handleStaleSocketConnections() error {
var unresponsive []string

socketFiles, err := cmdclient.ListAllSockets(d.virtShareDir)
if err != nil {
log.Log.Reason(err).Error("failed to list sockets")
return err
}

for _, socket := range socketFiles {
if !cmdclient.SocketMonitoringEnabled(socket) {
// don't process legacy sockets here. They still use the
// old watchdog file method
continue
}

sock, err := net.DialTimeout("unix", socket, 3*time.Second)
if err == nil {
// socket is alive still
sock.Close()
continue
}
unresponsive = append(unresponsive, socket)
}

d.watchDogLock.Lock()
defer d.watchDogLock.Unlock()

now := time.Now().UTC().Unix()

// Add new unresponsive sockets
for _, socket := range unresponsive {
_, ok := d.unresponsiveSockets[socket]
if !ok {
d.unresponsiveSockets[socket] = now
}
}

for key, timeStamp := range d.unresponsiveSockets {
found := false
for _, socket := range unresponsive {
if socket == key {
found = true
break
}
}
// reap old unresponsive sockets
// remove from unresponsive list if not found unresponsive this iteration
if !found {
delete(d.unresponsiveSockets, key)
break
}

diff := now - timeStamp

if diff > int64(d.watchdogTimeout) {
socketInfo, err := cmdclient.GetSocketInfo(key)
if err != nil && os.IsNotExist(err) {
// ignore if info file doesn't exist
// this is possible with legacy VMIs that haven't
// been updated. The watchdog file will catch these.
} else if err != nil {
log.Log.Reason(err).Errorf("Unable to retrieve info about unresponsive vmi with socket %s", key)
} else {
domain := api.NewMinimalDomainWithNS(socketInfo.Namespace, socketInfo.Name)
domain.ObjectMeta.UID = types.UID(socketInfo.UID)
log.Log.Object(domain).Warning("detected unresponsive virt-launcher command socket for domain")
d.eventChan <- watch.Event{Type: watch.Deleted, Object: domain}

err := cmdclient.MarkSocketUnresponsive(key)
if err != nil {
log.Log.Reason(err).Errorf("Unable to mark vmi as unresponsive socket %s", key)
}
}
}
}

return nil
}

func (d *DomainWatcher) listAllKnownDomains() ([]*api.Domain, error) {
var domains []*api.Domain

Expand Down
1 change: 1 addition & 0 deletions pkg/virt-handler/cmd-client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
"//pkg/util/net/grpc:go_default_library",
"//pkg/virt-launcher/virtwrap/api:go_default_library",
"//pkg/virt-launcher/virtwrap/stats:go_default_library",
"//pkg/watchdog:go_default_library",
"//staging/src/kubevirt.io/client-go/api/v1:go_default_library",
"//staging/src/kubevirt.io/client-go/log:go_default_library",
"//vendor/github.com/golang/mock/gomock:go_default_library",
Expand Down
126 changes: 126 additions & 0 deletions pkg/virt-handler/cmd-client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
grpcutil "kubevirt.io/kubevirt/pkg/util/net/grpc"
"kubevirt.io/kubevirt/pkg/virt-launcher/virtwrap/api"
"kubevirt.io/kubevirt/pkg/virt-launcher/virtwrap/stats"
"kubevirt.io/kubevirt/pkg/watchdog"
)

var (
Expand All @@ -62,6 +63,8 @@ var (
)

const StandardLauncherSocketFileName = "launcher-sock"
const StandardLauncherInfoFileName = "launcher-info"
const StandardLauncherUnresponsiveFileName = "launcher-unresponsive"

type MigrationOptions struct {
Bandwidth resource.Quantity
Expand All @@ -71,6 +74,12 @@ type MigrationOptions struct {
AllowAutoConverge bool
}

type SocketInfo struct {
Name string `json:"name"`
Namespace string `json:"namespace"`
UID string `json:"uid"`
}

type LauncherClient interface {
SyncVirtualMachine(vmi *v1.VirtualMachineInstance, options *cmdv1.VirtualMachineOptions) error
PauseVirtualMachine(vmi *v1.VirtualMachineInstance) error
Expand Down Expand Up @@ -101,6 +110,43 @@ const (
longTimeout time.Duration = 20 * time.Second
)

func FindLastKnownUIDForKey(baseDir string, name string, namespace string) (string, error) {
socketFiles, err := ListAllSockets(baseDir)
if err != nil {
return "", err
}

// Attempt to detect UID by traversing all known UIDs cached on the system
for _, socket := range socketFiles {
if SocketMonitoringEnabled(socket) {
info, err := GetSocketInfo(socket)
if err != nil {
continue
}
if info.Name == name && info.Namespace == namespace {
return info.UID, nil
}
}
}

// Fallback to legacy watchdog file detection.
// This works for old VMIs that haven't been updated
filePath := watchdog.WatchdogFileFromNamespaceName(baseDir, namespace, name)
watchdogExists, err := diskutils.FileExists(filePath)
if err != nil {
return "", err
}
if watchdogExists {
b, err := ioutil.ReadFile(filePath)
if err != nil {
return "", err
}

return string(b), nil
}
return "", nil
}

func ListAllSockets(baseDir string) ([]string, error) {
var socketFiles []string

Expand Down Expand Up @@ -144,6 +190,86 @@ func SocketsDirectory(baseDir string) string {
return filepath.Join(baseDir, "sockets")
}

func SocketMonitoringEnabled(socket string) bool {
if filepath.Base(socket) == StandardLauncherSocketFileName {
return true
}
return false
}

func IsSocketUnresponsive(socket string) bool {
file := filepath.Join(filepath.Dir(socket), StandardLauncherUnresponsiveFileName)
exists, _ := diskutils.FileExists(file)
// if the unresponsive socket monitor marked this socket
// as being unresponsive, return true
if exists {
return true
}

exists, _ = diskutils.FileExists(socket)
// if the socket file doesn't exist, it's definitely unresponsive as well
if !exists {
return true
}

return false
}

func MarkSocketUnresponsive(socket string) error {
file := filepath.Join(filepath.Dir(socket), StandardLauncherUnresponsiveFileName)
f, err := os.Create(file)
if err != nil {
return err
}
f.Close()
return nil
}

func SetSocketInfo(socket string, uid string, name string, namespace string) error {
file := filepath.Join(filepath.Dir(socket), StandardLauncherInfoFileName)

socketInfo := &SocketInfo{
Name: name,
Namespace: namespace,
UID: uid,
}

fileBytes, err := json.Marshal(socketInfo)
if err != nil {
return err
}

f, err := os.Create(file)
if err != nil {
return err
}
defer f.Close()

_, err = f.Write(fileBytes)
if err != nil {
return err
}
return nil
}

func GetSocketInfo(socket string) (*SocketInfo, error) {

infoFile := filepath.Join(filepath.Dir(socket), StandardLauncherInfoFileName)

fileBytes, err := ioutil.ReadFile(infoFile)
if err != nil {
return nil, err
}

socketInfo := &SocketInfo{}
err = json.Unmarshal(fileBytes, socketInfo)
if err != nil {
return nil, err
}

return socketInfo, nil
}

func SocketFromUID(baseDir string, uid string, isHost bool) string {
sockFile := StandardLauncherSocketFileName
legacySockFile := uid + "_sock"
Expand Down
Loading

0 comments on commit 54bdc7a

Please sign in to comment.