Skip to content

Commit

Permalink
code change: Unify health channel logic in device-manager
Browse files Browse the repository at this point in the history
Different device types use the same logic for health channels.

Signed-off-by: Andrej Krejcir <[email protected]>
  • Loading branch information
akrejcir committed Feb 7, 2022
1 parent fec9dd8 commit 4297935
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 44 deletions.
5 changes: 5 additions & 0 deletions pkg/virt-handler/device-manager/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,3 +259,8 @@ func formatVFIODeviceSpecs(devID string) []*v1beta1.DeviceSpec {
})
return devSpecs
}

type deviceHealth struct {
DevId string
Health string
}
14 changes: 7 additions & 7 deletions pkg/virt-handler/device-manager/generic_device.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type GenericDevicePlugin struct {
server *grpc.Server
socketPath string
stop <-chan struct{}
health chan string
health chan deviceHealth
devicePath string
deviceName string
resourceName string
Expand All @@ -73,7 +73,7 @@ func NewGenericDevicePlugin(deviceName string, devicePath string, maxDevices int
dpi := &GenericDevicePlugin{
devs: []*pluginapi.Device{},
socketPath: serverSock,
health: make(chan string),
health: make(chan deviceHealth),
deviceName: deviceName,
devicePath: devicePath,
deviceRoot: util.HostRootMount,
Expand Down Expand Up @@ -215,11 +215,11 @@ func (dpi *GenericDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.Dev
done := false
for {
select {
case health := <-dpi.health:
case devHealth := <-dpi.health:
// There's only one shared generic device
// so update each plugin device to reflect overall device health
for _, dev := range dpi.devs {
dev.Health = health
dev.Health = devHealth.Health
}
s.Send(&pluginapi.ListAndWatchResponse{Devices: dpi.devs})
case <-dpi.stop:
Expand Down Expand Up @@ -302,7 +302,7 @@ func (dpi *GenericDevicePlugin) healthCheck() error {
return fmt.Errorf("could not stat the device: %v", err)
}
logger.Warningf("device '%s' is not present, the device plugin can't expose it.", dpi.devicePath)
dpi.health <- pluginapi.Unhealthy
dpi.health <- deviceHealth{Health: pluginapi.Unhealthy}
}
logger.Infof("device '%s' is present.", dpi.devicePath)

Expand All @@ -329,10 +329,10 @@ func (dpi *GenericDevicePlugin) healthCheck() error {
// Health in this case is if the device path actually exists
if event.Op == fsnotify.Create {
logger.Infof("monitored device %s appeared", dpi.deviceName)
dpi.health <- pluginapi.Healthy
dpi.health <- deviceHealth{Health: pluginapi.Healthy}
} else if (event.Op == fsnotify.Remove) || (event.Op == fsnotify.Rename) {
logger.Infof("monitored device %s disappeared", dpi.deviceName)
dpi.health <- pluginapi.Unhealthy
dpi.health <- deviceHealth{Health: pluginapi.Unhealthy}
}
} else if event.Name == dpi.socketPath && event.Op == fsnotify.Remove {
logger.Infof("device socket file for device %s was removed, kubelet probably restarted.", dpi.deviceName)
Expand Down
4 changes: 2 additions & 2 deletions pkg/virt-handler/device-manager/generic_device_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ var _ = Describe("Generic Device", func() {

By("waiting for healthcheck to send Unhealthy message")
Eventually(func() string {
return <-dpi.health
return (<-dpi.health).Health
}, 5*time.Second).Should(Equal(pluginapi.Unhealthy))

By("Creating a new (fake) device node")
Expand All @@ -83,7 +83,7 @@ var _ = Describe("Generic Device", func() {

By("waiting for healthcheck to send Healthy message")
Eventually(func() string {
return <-dpi.health
return (<-dpi.health).Health
}, 5*time.Second).Should(Equal(pluginapi.Healthy))
})
})
31 changes: 13 additions & 18 deletions pkg/virt-handler/device-manager/mediated_device.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,12 @@ type MediatedDevicePlugin struct {
server *grpc.Server
socketPath string
stop <-chan struct{}
health chan string
health chan deviceHealth
devicePath string
deviceName string
resourceName string
done chan struct{}
deviceRoot string
healthy chan string
unhealthy chan string
iommuToMDEVMap map[string]string
initialized bool
lock *sync.Mutex
Expand All @@ -84,13 +82,11 @@ func NewMediatedDevicePlugin(mdevs []*MDEV, resourceName string) *MediatedDevice
dpi := &MediatedDevicePlugin{
devs: devs,
socketPath: serverSock,
health: make(chan string),
health: make(chan deviceHealth),
deviceName: resourceName,
resourceName: resourceName,
devicePath: vfioDevicePath,
deviceRoot: util.HostRootMount,
healthy: make(chan string),
unhealthy: make(chan string),
iommuToMDEVMap: iommuToMDEVMap,
initialized: false,
lock: &sync.Mutex{},
Expand Down Expand Up @@ -268,17 +264,10 @@ func (dpi *MediatedDevicePlugin) ListAndWatch(_ *pluginapi.Empty, s pluginapi.De
done := false
for {
select {
case unhealthy := <-dpi.unhealthy:
case devHealth := <-dpi.health:
for _, dev := range dpi.devs {
if unhealthy == dev.ID {
dev.Health = pluginapi.Unhealthy
}
}
s.Send(&pluginapi.ListAndWatchResponse{Devices: dpi.devs})
case healthy := <-dpi.healthy:
for _, dev := range dpi.devs {
if healthy == dev.ID {
dev.Health = pluginapi.Healthy
if devHealth.DevId == dev.ID {
dev.Health = devHealth.Health
}
}
s.Send(&pluginapi.ListAndWatchResponse{Devices: dpi.devs})
Expand Down Expand Up @@ -422,10 +411,16 @@ func (dpi *MediatedDevicePlugin) healthCheck() error {
// Health in this case is if the device path actually exists
if event.Op == fsnotify.Create {
logger.Infof("monitored device %s appeared", dpi.deviceName)
dpi.healthy <- monDevId
dpi.health <- deviceHealth{
DevId: monDevId,
Health: pluginapi.Healthy,
}
} else if (event.Op == fsnotify.Remove) || (event.Op == fsnotify.Rename) {
logger.Infof("monitored device %s disappeared", dpi.deviceName)
dpi.unhealthy <- monDevId
dpi.health <- deviceHealth{
DevId: monDevId,
Health: pluginapi.Unhealthy,
}
}
} else if event.Name == dpi.socketPath && event.Op == fsnotify.Remove {
logger.Infof("device socket file for device %s was removed, kubelet probably restarted.", dpi.deviceName)
Expand Down
30 changes: 13 additions & 17 deletions pkg/virt-handler/device-manager/pci_device.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,12 @@ type PCIDevicePlugin struct {
server *grpc.Server
socketPath string
stop <-chan struct{}
health chan string
health chan deviceHealth
devicePath string
deviceName string
resourceName string
done chan struct{}
deviceRoot string
healthy chan string
unhealthy chan string
iommuToPCIMap map[string]string
initialized bool
lock *sync.Mutex
Expand All @@ -87,8 +85,7 @@ func NewPCIDevicePlugin(pciDevices []*PCIDevice, resourceName string) *PCIDevice
devicePath: vfioDevicePath,
deviceRoot: util.HostRootMount,
iommuToPCIMap: iommuToPCIMap,
healthy: make(chan string),
unhealthy: make(chan string),
health: make(chan deviceHealth),
initialized: false,
lock: &sync.Mutex{},
}
Expand Down Expand Up @@ -175,17 +172,10 @@ func (dpi *PCIDevicePlugin) ListAndWatch(_ *pluginapi.Empty, s pluginapi.DeviceP
done := false
for {
select {
case unhealthy := <-dpi.unhealthy:
case devHealth := <-dpi.health:
for _, dev := range dpi.devs {
if unhealthy == dev.ID {
dev.Health = pluginapi.Unhealthy
}
}
s.Send(&pluginapi.ListAndWatchResponse{Devices: dpi.devs})
case healthy := <-dpi.healthy:
for _, dev := range dpi.devs {
if healthy == dev.ID {
dev.Health = pluginapi.Healthy
if devHealth.DevId == dev.ID {
dev.Health = devHealth.Health
}
}
s.Send(&pluginapi.ListAndWatchResponse{Devices: dpi.devs})
Expand Down Expand Up @@ -294,10 +284,16 @@ func (dpi *PCIDevicePlugin) healthCheck() error {
// Health in this case is if the device path actually exists
if event.Op == fsnotify.Create {
logger.Infof("monitored device %s appeared", dpi.deviceName)
dpi.healthy <- monDevId
dpi.health <- deviceHealth{
DevId: monDevId,
Health: pluginapi.Healthy,
}
} else if (event.Op == fsnotify.Remove) || (event.Op == fsnotify.Rename) {
logger.Infof("monitored device %s disappeared", dpi.deviceName)
dpi.unhealthy <- monDevId
dpi.health <- deviceHealth{
DevId: monDevId,
Health: pluginapi.Unhealthy,
}
}
} else if event.Name == dpi.socketPath && event.Op == fsnotify.Remove {
logger.Infof("device socket file for device %s was removed, kubelet probably restarted.", dpi.deviceName)
Expand Down

0 comments on commit 4297935

Please sign in to comment.