Skip to content

Commit

Permalink
Merge pull request kubernetes#70515 from davidz627/feature/csiNodeInfo
Browse files Browse the repository at this point in the history
Add explicit "Installed" field to CSINodeInfo and change update semantics
  • Loading branch information
k8s-ci-robot authored Nov 9, 2018
2 parents 8825843 + 06f3b26 commit e133ab2
Show file tree
Hide file tree
Showing 13 changed files with 498 additions and 191 deletions.
54 changes: 38 additions & 16 deletions cluster/addons/storage-crds/csinodeinfo.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,42 @@ spec:
validation:
openAPIV3Schema:
properties:
csiDrivers:
description: List of CSI drivers running on the node and their properties.
items:
properties:
driver:
description: The CSI driver that this object refers to.
type: string
nodeID:
description: The node from the driver point of view.
type: string
topologyKeys:
description: List of keys supported by the driver.
items:
type: string
type: array
type: array
spec:
description: Specification of CSINodeInfo
properties:
drivers:
description: List of CSI drivers running on the node and their specs.
type: array
items:
properties:
name:
description: The CSI driver that this object refers to.
type: string
nodeID:
description: The node from the driver point of view.
type: string
topologyKeys:
description: List of keys supported by the driver.
items:
type: string
type: array
status:
description: Status of CSINodeInfo
properties:
drivers:
description: List of CSI drivers running on the node and their statuses.
type: array
items:
properties:
name:
description: The CSI driver that this object refers to.
type: string
available:
description: Whether the CSI driver is installed.
type: boolean
volumePluginMechanism:
description: Indicates to external components the required mechanism
to use for any in-tree plugins replaced by this driver.
pattern: in-tree|csi
type: string
version: v1alpha1
9 changes: 6 additions & 3 deletions pkg/volume/csi/csi_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string)
return err
}

err = nim.AddNodeInfo(pluginName, driverNodeID, maxVolumePerNode, accessibleTopology)
err = nim.InstallCSIDriver(pluginName, driverNodeID, maxVolumePerNode, accessibleTopology)
if err != nil {
glog.Error(log("registrationHandler.RegisterPlugin failed at AddNodeInfo: %v", err))
if unregErr := unregisterDriver(pluginName); unregErr != nil {
Expand Down Expand Up @@ -188,6 +188,9 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error {
csiDrivers = csiDriversStore{driversMap: map[string]csiDriver{}}
nim = nodeinfomanager.NewNodeInfoManager(host.GetNodeName(), host)

// TODO(#70514) Init CSINodeInfo object if the CRD exists and create Driver
// objects for migrated drivers.

return nil
}

Expand Down Expand Up @@ -583,8 +586,8 @@ func unregisterDriver(driverName string) error {
delete(csiDrivers.driversMap, driverName)
}()

if err := nim.RemoveNodeInfo(driverName); err != nil {
glog.Errorf("Error unregistering CSI driver: %v", err)
if err := nim.UninstallCSIDriver(driverName); err != nil {
glog.Errorf("Error uninstalling CSI driver: %v", err)
return err
}

Expand Down
1 change: 0 additions & 1 deletion pkg/volume/csi/nodeinfomanager/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ go_test(
"//pkg/volume/testing:go_default_library",
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
Expand Down
148 changes: 80 additions & 68 deletions pkg/volume/csi/nodeinfomanager/nodeinfomanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,17 @@ type nodeUpdateFunc func(*v1.Node) (newNode *v1.Node, updated bool, err error)

// Interface implements an interface for managing labels of a node
type Interface interface {
CreateCSINodeInfo() (*csiv1alpha1.CSINodeInfo, error)

// Record in the cluster the given node information from the CSI driver with the given name.
// Concurrent calls to AddNodeInfo() is allowed, but they should not be intertwined with calls
// Concurrent calls to InstallCSIDriver() is allowed, but they should not be intertwined with calls
// to other methods in this interface.
AddNodeInfo(driverName string, driverNodeID string, maxVolumeLimit int64, topology *csipb.Topology) error
InstallCSIDriver(driverName string, driverNodeID string, maxVolumeLimit int64, topology *csipb.Topology) error

// Remove in the cluster node information from the CSI driver with the given name.
// Concurrent calls to RemoveNodeInfo() is allowed, but they should not be intertwined with calls
// Concurrent calls to UninstallCSIDriver() is allowed, but they should not be intertwined with calls
// to other methods in this interface.
RemoveNodeInfo(driverName string) error
UninstallCSIDriver(driverName string) error
}

// NewNodeInfoManager initializes nodeInfoManager
Expand All @@ -79,11 +81,11 @@ func NewNodeInfoManager(
}
}

// AddNodeInfo updates the node ID annotation in the Node object and CSIDrivers field in the
// InstallCSIDriver updates the node ID annotation in the Node object and CSIDrivers field in the
// CSINodeInfo object. If the CSINodeInfo object doesn't yet exist, it will be created.
// If multiple calls to AddNodeInfo() are made in parallel, some calls might receive Node or
// If multiple calls to InstallCSIDriver() are made in parallel, some calls might receive Node or
// CSINodeInfo update conflicts, which causes the function to retry the corresponding update.
func (nim *nodeInfoManager) AddNodeInfo(driverName string, driverNodeID string, maxAttachLimit int64, topology *csipb.Topology) error {
func (nim *nodeInfoManager) InstallCSIDriver(driverName string, driverNodeID string, maxAttachLimit int64, topology *csipb.Topology) error {
if driverNodeID == "" {
return fmt.Errorf("error adding CSI driver node info: driverNodeID must not be empty")
}
Expand Down Expand Up @@ -114,14 +116,14 @@ func (nim *nodeInfoManager) AddNodeInfo(driverName string, driverNodeID string,
return nil
}

// RemoveNodeInfo removes the node ID annotation from the Node object and CSIDrivers field from the
// UninstallCSIDriver removes the node ID annotation from the Node object and CSIDrivers field from the
// CSINodeInfo object. If the CSINOdeInfo object contains no CSIDrivers, it will be deleted.
// If multiple calls to RemoveNodeInfo() are made in parallel, some calls might receive Node or
// If multiple calls to UninstallCSIDriver() are made in parallel, some calls might receive Node or
// CSINodeInfo update conflicts, which causes the function to retry the corresponding update.
func (nim *nodeInfoManager) RemoveNodeInfo(driverName string) error {
err := nim.removeCSINodeInfo(driverName)
func (nim *nodeInfoManager) UninstallCSIDriver(driverName string) error {
err := nim.uninstallDriverFromCSINodeInfo(driverName)
if err != nil {
return fmt.Errorf("error removing CSI driver node info from CSINodeInfo object %v", err)
return fmt.Errorf("error uninstalling CSI driver from CSINodeInfo object %v", err)
}

err = nim.updateNode(
Expand Down Expand Up @@ -333,45 +335,35 @@ func (nim *nodeInfoManager) updateCSINodeInfo(
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
nodeInfo, err := csiKubeClient.CsiV1alpha1().CSINodeInfos().Get(string(nim.nodeName), metav1.GetOptions{})
if nodeInfo == nil || errors.IsNotFound(err) {
return nim.createNodeInfoObject(driverName, driverNodeID, topology)
nodeInfo, err = nim.CreateCSINodeInfo()
}
if err != nil {
return err // do not wrap error
}

return nim.updateNodeInfoObject(nodeInfo, driverName, driverNodeID, topology)
return nim.installDriverToCSINodeInfo(nodeInfo, driverName, driverNodeID, topology)
})
if retryErr != nil {
return fmt.Errorf("CSINodeInfo update failed: %v", retryErr)
}
return nil
}

func (nim *nodeInfoManager) createNodeInfoObject(
driverName string,
driverNodeID string,
topology *csipb.Topology) error {
func (nim *nodeInfoManager) CreateCSINodeInfo() (*csiv1alpha1.CSINodeInfo, error) {

kubeClient := nim.volumeHost.GetKubeClient()
if kubeClient == nil {
return fmt.Errorf("error getting kube client")
return nil, fmt.Errorf("error getting kube client")
}

csiKubeClient := nim.volumeHost.GetCSIClient()
if csiKubeClient == nil {
return fmt.Errorf("error getting CSI client")
}

topologyKeys := []string{} // must be an empty slice instead of nil to satisfy CRD OpenAPI Schema validation
if topology != nil {
for k := range topology.Segments {
topologyKeys = append(topologyKeys, k)
}
return nil, fmt.Errorf("error getting CSI client")
}

node, err := kubeClient.CoreV1().Nodes().Get(string(nim.nodeName), metav1.GetOptions{})
if err != nil {
return err // do not wrap error
return nil, err // do not wrap error
}

nodeInfo := &csiv1alpha1.CSINodeInfo{
Expand All @@ -386,20 +378,18 @@ func (nim *nodeInfoManager) createNodeInfoObject(
},
},
},
CSIDrivers: []csiv1alpha1.CSIDriverInfo{
{
Driver: driverName,
NodeID: driverNodeID,
TopologyKeys: topologyKeys,
},
Spec: csiv1alpha1.CSINodeInfoSpec{
Drivers: []csiv1alpha1.CSIDriverInfoSpec{},
},
Status: csiv1alpha1.CSINodeInfoStatus{
Drivers: []csiv1alpha1.CSIDriverInfoStatus{},
},
}

_, err = csiKubeClient.CsiV1alpha1().CSINodeInfos().Create(nodeInfo)
return err // do not wrap error
return csiKubeClient.CsiV1alpha1().CSINodeInfos().Create(nodeInfo)
}

func (nim *nodeInfoManager) updateNodeInfoObject(
func (nim *nodeInfoManager) installDriverToCSINodeInfo(
nodeInfo *csiv1alpha1.CSINodeInfo,
driverName string,
driverNodeID string,
Expand All @@ -417,36 +407,62 @@ func (nim *nodeInfoManager) updateNodeInfoObject(
}
}

// Clone driver list, omitting the driver that matches the given driverName,
// unless the driver is identical to information provided, in which case no update is necessary.
var newDriverInfos []csiv1alpha1.CSIDriverInfo
for _, driverInfo := range nodeInfo.CSIDrivers {
if driverInfo.Driver == driverName {
prevTopologyKeys := sets.NewString(driverInfo.TopologyKeys...)
if driverInfo.NodeID == driverNodeID && prevTopologyKeys.Equal(topologyKeys) {
// No update needed
return nil
specModified := true
statusModified := true
// Clone driver list, omitting the driver that matches the given driverName
newDriverSpecs := []csiv1alpha1.CSIDriverInfoSpec{}
for _, driverInfoSpec := range nodeInfo.Spec.Drivers {
if driverInfoSpec.Name == driverName {
if driverInfoSpec.NodeID == driverNodeID &&
sets.NewString(driverInfoSpec.TopologyKeys...).Equal(topologyKeys) {
specModified = false
}
} else {
// Omit driverInfoSpec matching given driverName
newDriverSpecs = append(newDriverSpecs, driverInfoSpec)
}
}
newDriverStatuses := []csiv1alpha1.CSIDriverInfoStatus{}
for _, driverInfoStatus := range nodeInfo.Status.Drivers {
if driverInfoStatus.Name == driverName {
if driverInfoStatus.Available &&
/* TODO(https://github.com/kubernetes/enhancements/issues/625): Add actual migration status */
driverInfoStatus.VolumePluginMechanism == csiv1alpha1.VolumePluginMechanismInTree {
statusModified = false
}
} else {
// Omit driverInfo matching given driverName
newDriverInfos = append(newDriverInfos, driverInfo)
// Omit driverInfoSpec matching given driverName
newDriverStatuses = append(newDriverStatuses, driverInfoStatus)
}
}

if !specModified && !statusModified {
return nil
}

// Append new driver
driverInfo := csiv1alpha1.CSIDriverInfo{
Driver: driverName,
driverSpec := csiv1alpha1.CSIDriverInfoSpec{
Name: driverName,
NodeID: driverNodeID,
TopologyKeys: topologyKeys.List(),
}
newDriverInfos = append(newDriverInfos, driverInfo)
nodeInfo.CSIDrivers = newDriverInfos
driverStatus := csiv1alpha1.CSIDriverInfoStatus{
Name: driverName,
Available: true,
// TODO(https://github.com/kubernetes/enhancements/issues/625): Add actual migration status
VolumePluginMechanism: csiv1alpha1.VolumePluginMechanismInTree,
}

newDriverSpecs = append(newDriverSpecs, driverSpec)
newDriverStatuses = append(newDriverStatuses, driverStatus)
nodeInfo.Spec.Drivers = newDriverSpecs
nodeInfo.Status.Drivers = newDriverStatuses

_, err := csiKubeClient.CsiV1alpha1().CSINodeInfos().Update(nodeInfo)
return err // do not wrap error
}

func (nim *nodeInfoManager) removeCSINodeInfo(csiDriverName string) error {
func (nim *nodeInfoManager) uninstallDriverFromCSINodeInfo(csiDriverName string) error {
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {

csiKubeClient := nim.volumeHost.GetCSIClient()
Expand All @@ -456,32 +472,28 @@ func (nim *nodeInfoManager) removeCSINodeInfo(csiDriverName string) error {

nodeInfoClient := csiKubeClient.CsiV1alpha1().CSINodeInfos()
nodeInfo, err := nodeInfoClient.Get(string(nim.nodeName), metav1.GetOptions{})
if nodeInfo == nil || errors.IsNotFound(err) {
// do nothing
return nil
}
if err != nil {
return err // do not wrap error
}

// Remove matching driver from driver list
var newDriverInfos []csiv1alpha1.CSIDriverInfo
for _, driverInfo := range nodeInfo.CSIDrivers {
if driverInfo.Driver != csiDriverName {
newDriverInfos = append(newDriverInfos, driverInfo)
hasModified := false
newDriverStatuses := []csiv1alpha1.CSIDriverInfoStatus{}
for _, driverStatus := range nodeInfo.Status.Drivers {
if driverStatus.Name == csiDriverName {
// Uninstall the driver if we find it
hasModified = driverStatus.Available
driverStatus.Available = false
}
newDriverStatuses = append(newDriverStatuses, driverStatus)
}

if len(newDriverInfos) == len(nodeInfo.CSIDrivers) {
nodeInfo.Status.Drivers = newDriverStatuses

if !hasModified {
// No changes, don't update
return nil
}

if len(newDriverInfos) == 0 {
// No drivers left, delete CSINodeInfo object
return nodeInfoClient.Delete(string(nim.nodeName), &metav1.DeleteOptions{})
}

// TODO (verult) make sure CSINodeInfo has validation logic to prevent duplicate driver names
_, updateErr := nodeInfoClient.Update(nodeInfo)
return updateErr // do not wrap error
Expand Down
Loading

0 comments on commit e133ab2

Please sign in to comment.