Skip to content

Commit

Permalink
Merge pull request kubernetes#52290 from jiayingz/deviceplugin-failure
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue (batch tested with PRs 52452, 52115, 52260, 52290)

Fixes device plugin re-registration handling logic to make sure:

- If a device plugin exits, its exported resource will be removed.
- No capacity change if a new device plugin instance comes up to replace the old instance.



**What this PR does / why we need it**:

**Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes kubernetes#52510

**Special notes for your reviewer**:

**Release note**:

```release-note
```
  • Loading branch information
Kubernetes Submit Queue authored Sep 15, 2017
2 parents b7953a7 + 5cac9fc commit b5fbd71
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 37 deletions.
25 changes: 25 additions & 0 deletions pkg/kubelet/deviceplugin/device_plugin_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"log"
"net"
"os"
"path"
"time"

"golang.org/x/net/context"
Expand Down Expand Up @@ -86,6 +87,30 @@ func (m *Stub) Stop() error {
return m.cleanup()
}

// Register registers the device plugin for the given resourceName with Kubelet.
func (m *Stub) Register(kubeletEndpoint, resourceName string) error {
conn, err := grpc.Dial(kubeletEndpoint, grpc.WithInsecure(),
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
}))
defer conn.Close()
if err != nil {
return err
}
client := pluginapi.NewRegistrationClient(conn)
reqt := &pluginapi.RegisterRequest{
Version: pluginapi.Version,
Endpoint: path.Base(m.socket),
ResourceName: resourceName,
}

_, err = client.Register(context.Background(), reqt)
if err != nil {
return err
}
return nil
}

// ListAndWatch lists devices and update that list according to the Update call
func (m *Stub) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error {
log.Println("ListAndWatch")
Expand Down
42 changes: 21 additions & 21 deletions pkg/kubelet/deviceplugin/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type ManagerImpl struct {
socketname string
socketdir string

Endpoints map[string]*endpoint // Key is ResourceName
endpoints map[string]*endpoint // Key is ResourceName
mutex sync.Mutex

callback MonitorCallback
Expand All @@ -55,7 +55,7 @@ func NewManagerImpl(socketPath string, f MonitorCallback) (*ManagerImpl, error)

dir, file := filepath.Split(socketPath)
return &ManagerImpl{
Endpoints: make(map[string]*endpoint),
endpoints: make(map[string]*endpoint),

socketname: file,
socketdir: dir,
Expand Down Expand Up @@ -138,7 +138,7 @@ func (m *ManagerImpl) Devices() map[string][]*pluginapi.Device {
defer m.mutex.Unlock()

devs := make(map[string][]*pluginapi.Device)
for k, e := range m.Endpoints {
for k, e := range m.endpoints {
glog.V(3).Infof("Endpoint: %+v: %+v", k, e)
devs[k] = e.getDevices()
}
Expand All @@ -157,7 +157,7 @@ func (m *ManagerImpl) Allocate(resourceName string, devs []string) (*pluginapi.A
glog.V(3).Infof("Recieved allocation request for devices %v for device plugin %s",
devs, resourceName)
m.mutex.Lock()
e, ok := m.Endpoints[resourceName]
e, ok := m.endpoints[resourceName]
m.mutex.Unlock()
if !ok {
return nil, fmt.Errorf("Unknown Device Plugin %s", resourceName)
Expand Down Expand Up @@ -189,48 +189,48 @@ func (m *ManagerImpl) Register(ctx context.Context,

// Stop is the function that can stop the gRPC server.
func (m *ManagerImpl) Stop() error {
for _, e := range m.Endpoints {
for _, e := range m.endpoints {
e.stop()
}
m.server.Stop()
return nil
}

func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
// Stops existing endpoint if there is any.
m.mutex.Lock()
old, ok := m.Endpoints[r.ResourceName]
m.mutex.Unlock()
if ok && old != nil {
old.stop()
}

socketPath := filepath.Join(m.socketdir, r.Endpoint)
e, err := newEndpoint(socketPath, r.ResourceName, m.callback)
if err != nil {
glog.Errorf("Failed to dial device plugin with request %v: %v", r, err)
return
}

stream, err := e.list()
if err != nil {
glog.Errorf("Failed to List devices for plugin %v: %v", r.ResourceName, err)
return
}

// Associates the newly created endpoint with the corresponding resource name.
// Stops existing endpoint if there is any.
m.mutex.Lock()
old, ok := m.endpoints[r.ResourceName]
m.endpoints[r.ResourceName] = e
m.mutex.Unlock()
glog.V(2).Infof("Registered endpoint %v", e)
if ok && old != nil {
old.stop()
}

go func() {
e.listAndWatch(stream)

m.mutex.Lock()
if old, ok := m.Endpoints[r.ResourceName]; ok && old == e {
delete(m.Endpoints, r.ResourceName)
if old, ok := m.endpoints[r.ResourceName]; ok && old == e {
glog.V(2).Infof("Delete resource for endpoint %v", e)
delete(m.endpoints, r.ResourceName)
// Issues callback to delete all of devices.
e.callback(e.resourceName, []*pluginapi.Device{}, []*pluginapi.Device{}, e.getDevices())
}
glog.V(2).Infof("Unregistered endpoint %v", e)
m.mutex.Unlock()
}()

m.mutex.Lock()
m.Endpoints[r.ResourceName] = e
glog.V(2).Infof("Registered endpoint %v", e)
m.mutex.Unlock()
}
61 changes: 48 additions & 13 deletions pkg/kubelet/deviceplugin/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,43 +17,78 @@ limitations under the License.
package deviceplugin

import (
"os"
"path"
"testing"
"time"

"github.com/stretchr/testify/require"

pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha1"
)

const (
msocketName = "/tmp/server.sock"
socketName = "/tmp/device_plugin/server.sock"
pluginSocketName = "/tmp/device_plugin/device-plugin.sock"
testResourceName = "fake-domain/resource"
)

func TestNewManagerImpl(t *testing.T) {
wd, _ := os.Getwd()
socket := path.Join(wd, msocketName)

_, err := NewManagerImpl("", func(n string, a, u, r []*pluginapi.Device) {})
require.Error(t, err)

_, err = NewManagerImpl(socket, func(n string, a, u, r []*pluginapi.Device) {})
_, err = NewManagerImpl(socketName, func(n string, a, u, r []*pluginapi.Device) {})
require.NoError(t, err)
}

func TestNewManagerImplStart(t *testing.T) {
wd, _ := os.Getwd()
socket := path.Join(wd, msocketName)
setup(t, []*pluginapi.Device{}, func(n string, a, u, r []*pluginapi.Device) {})
}

_, err := NewManagerImpl(socket, func(n string, a, u, r []*pluginapi.Device) {})
// Tests that the device plugin manager correctly handles registration and re-registration by
// making sure that after registration, devices are correctly updated and if a re-registration
// happens, we will NOT delete devices.
func TestDevicePluginReRegistration(t *testing.T) {
devs := []*pluginapi.Device{
{ID: "Dev1", Health: pluginapi.Healthy},
{ID: "Dev2", Health: pluginapi.Healthy},
}

callbackCount := 0
callbackChan := make(chan int)
callback := func(n string, a, u, r []*pluginapi.Device) {
// Should be called twice, one for each plugin.
if callbackCount > 1 {
t.FailNow()
}
callbackCount++
callbackChan <- callbackCount
}
m, p1 := setup(t, devs, callback)
p1.Register(socketName, testResourceName)
// Wait for the first callback to be issued.
<-callbackChan
devices := m.Devices()
require.Equal(t, 2, len(devices[testResourceName]), "Devices are not updated.")

p2 := NewDevicePluginStub(devs, pluginSocketName+".new")
err := p2.Start()
require.NoError(t, err)
p2.Register(socketName, testResourceName)
// Wait for the second callback to be issued.
<-callbackChan

devices2 := m.Devices()
require.Equal(t, 2, len(devices2[testResourceName]), "Devices shouldn't change.")
// Wait long enough to catch unexpected callbacks.
time.Sleep(5 * time.Second)
}

func setup(t *testing.T, devs []*pluginapi.Device, pluginSocket, serverSocket string, callback MonitorCallback) (Manager, *Stub) {
m, err := NewManagerImpl(serverSocket, callback)
func setup(t *testing.T, devs []*pluginapi.Device, callback MonitorCallback) (Manager, *Stub) {
m, err := NewManagerImpl(socketName, callback)
require.NoError(t, err)
err = m.Start()
require.NoError(t, err)

p := NewDevicePluginStub(devs, pluginSocket)
p := NewDevicePluginStub(devs, pluginSocketName)
err = p.Start()
require.NoError(t, err)

Expand Down
16 changes: 13 additions & 3 deletions pkg/kubelet/kubelet_node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,13 +598,23 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) {
}
}

initialCapacity := kl.containerManager.GetCapacity()
if initialCapacity != nil {
for k, v := range initialCapacity {
currentCapacity := kl.containerManager.GetCapacity()
if currentCapacity != nil {
for k, v := range currentCapacity {
if v1helper.IsExtendedResourceName(k) {
glog.V(2).Infof("Update capacity for %s to %d", k, v.Value())
node.Status.Capacity[k] = v
}
}
// Remove stale extended resources.
for k := range node.Status.Capacity {
if v1helper.IsExtendedResourceName(k) {
if _, ok := currentCapacity[k]; !ok {
glog.V(2).Infof("delete capacity for %s", k)
delete(node.Status.Capacity, k)
}
}
}
}
}

Expand Down

0 comments on commit b5fbd71

Please sign in to comment.