Skip to content

Commit

Permalink
Merge pull request #988 from siyanshen/release-1.5
Browse files Browse the repository at this point in the history
[Release-1.5] Manual Cherrypick-Merge pull request #977 from siyanshen/lock-release-unit-test
  • Loading branch information
k8s-ci-robot authored Nov 5, 2024
2 parents c57ff71 + 5153351 commit 8b57e99
Show file tree
Hide file tree
Showing 40 changed files with 11,925 additions and 78 deletions.
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ require (
github.com/prashanthpai/sunrpc v0.0.0-20210303180433-689a3880d90a
github.com/spf13/cobra v1.6.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.9.0
golang.org/x/net v0.19.0
golang.org/x/oauth2 v0.15.0
golang.org/x/sys v0.15.0
golang.org/x/time v0.5.0
google.golang.org/api v0.152.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4
google.golang.org/grpc v1.59.0
Expand Down Expand Up @@ -69,17 +71,18 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/rasky/go-xdr v0.0.0-20170124162913-1a41d1a06c93 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/stretchr/objx v0.5.2 // indirect
go.opencensus.io v0.24.0 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/term v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
Expand Down
5 changes: 4 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1288,6 +1288,8 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v0.0.0-20151208002404-e3a8ff8ce365/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
Expand All @@ -1297,8 +1299,9 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/syndtr/gocapability v0.0.0-20170704070218-db04d3cc01c8/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
Expand Down
2 changes: 1 addition & 1 deletion pkg/csi_driver/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func initTestNodeServerWithKubeClient(t *testing.T, client kubernetes.Interface)
mounter: mounter,
metaService: metaserice,
volumeLocks: util.NewVolumeLocks(),
lockReleaseController: lockrelease.NewFakeLockReleaseControllerWithClient(client),
lockReleaseController: lockrelease.NewControllerBuilder().WithClient(client).Build(),
features: &GCFSDriverFeatureOptions{FeatureLockRelease: &FeatureLockRelease{Enabled: true}},
}
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/releaselock/configmap_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func TestGetConfigMap(t *testing.T) {
}
for _, test := range cases {
client := fake.NewSimpleClientset(test.existingCM)
controller := NewFakeLockReleaseControllerWithClient(client)
controller := NewControllerBuilder().WithClient(client).Build()
cm, err := controller.GetConfigMap(context.Background(), test.cmName, test.cmNamespace)
if gotExpected := gotExpectedError(test.name, test.expectErr, err); gotExpected != nil {
t.Fatal(gotExpected)
Expand Down Expand Up @@ -274,7 +274,7 @@ func TestUpdateConfigMapWithKeyValue(t *testing.T) {
}
for _, test := range cases {
client := fake.NewSimpleClientset(test.existingCM)
controller := NewFakeLockReleaseControllerWithClient(client)
controller := NewControllerBuilder().WithClient(client).Build()
ctx := context.Background()
err := controller.UpdateConfigMapWithKeyValue(ctx, test.existingCM, test.key, test.value)
if gotExpected := gotExpectedError(test.name, test.expectErr, err); gotExpected != nil {
Expand Down Expand Up @@ -372,7 +372,7 @@ func TestRemoveKeyFromConfigMap(t *testing.T) {
}
for _, test := range cases {
client := fake.NewSimpleClientset(test.existingCM)
controller := NewFakeLockReleaseControllerWithClient(client)
controller := NewControllerBuilder().WithClient(client).Build()
ctx := context.Background()
err := controller.RemoveKeyFromConfigMap(ctx, test.existingCM, test.key)
if gotExpected := gotExpectedError(test.name, test.expectErr, err); gotExpected != nil {
Expand Down Expand Up @@ -470,7 +470,7 @@ func TestRemoveKeyFromConfigMapWithRetry(t *testing.T) {
}
for _, test := range cases {
client := fake.NewSimpleClientset(test.existingCM)
controller := NewFakeLockReleaseControllerWithClient(client)
controller := NewControllerBuilder().WithClient(client).Build()
ctx := context.Background()
err := controller.RemoveKeyFromConfigMapWithRetry(ctx, test.existingCM, test.key)
if gotExpected := gotExpectedError(test.name, test.expectErr, err); gotExpected != nil {
Expand Down
162 changes: 97 additions & 65 deletions pkg/releaselock/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,77 @@ type NodeUpdatePair struct {
NewObj *corev1.Node
}

type LockService interface {
ReleaseLock(hostIP, clientIP string) error
}

type EventProcessor interface {
processConfigMapEntryOnNodeCreation(ctx context.Context, key string, filestoreIP string, node *corev1.Node, cm *corev1.ConfigMap) error
processConfigMapEntryOnNodeUpdate(ctx context.Context, key string, filestoreIP string, newNode *corev1.Node, oldNode *corev1.Node, cm *corev1.ConfigMap) error
SetController(ctrl *LockReleaseController)
}

type DefaultEventProcessor struct {
ctrl *LockReleaseController
}

func (p *DefaultEventProcessor) SetController(ctrl *LockReleaseController) {
p.ctrl = ctrl
}

func (p *DefaultEventProcessor) processConfigMapEntryOnNodeCreation(ctx context.Context, key string, filestoreIP string, node *corev1.Node, cm *corev1.ConfigMap) error {
if p.ctrl == nil {
return fmt.Errorf("controller not set")
}

c := p.ctrl
_, _, _, _, gceInstanceID, gkeNodeInternalIP, err := ParseConfigMapKey(key)
if err != nil {
return fmt.Errorf("failed to parse configmap key %s: %v", key, err)
}
klog.V(6).Infof("Verifying GKE node %s with nodeId %s nodeInternalIP %s exists or not", node.Name, gceInstanceID, gkeNodeInternalIP)
entryMatchesNode, err := c.verifyConfigMapEntry(node, gceInstanceID, gkeNodeInternalIP)
if err != nil {
return fmt.Errorf("failed to verify GKE node %s with nodeId %s nodeInternalIP %s still exists: %v", node.Name, gceInstanceID, gkeNodeInternalIP, err)
}
if entryMatchesNode {
klog.V(6).Infof("GKE node %s with nodeId %s nodeInternalIP %s still exists in API server, skip lock info reconciliation", node.Name, gceInstanceID, gkeNodeInternalIP)
return nil
}

// Try to match the latest node, to prevent incorrect releasing the lock in case of a lagging informer/watch
latestNode, err := c.client.CoreV1().Nodes().Get(ctx, node.Name, metav1.GetOptions{})
if err != nil {
if apiError.IsNotFound(err) {
return nil
}
return fmt.Errorf("failed to get node in namespace %v", err)
}
entryMatchesLatestNode, err := c.verifyConfigMapEntry(latestNode, gceInstanceID, gkeNodeInternalIP)
if err != nil {
return fmt.Errorf("failed to verify GKE node %s with nodeId %s nodeInternalIP %s still exists: %v", node.Name, gceInstanceID, gkeNodeInternalIP, err)
}
if entryMatchesLatestNode {
klog.V(6).Infof("GKE node %s with nodeId %s nodeInternalIP %s exists in API server, skip lock info reconciliation", node.Name, gceInstanceID, gkeNodeInternalIP)
return nil
}

klog.Infof("GKE node %s with nodeId %s nodeInternalIP %s no longer exists, releasing lock for Filestore IP %s", node.Name, gceInstanceID, gkeNodeInternalIP, filestoreIP)
opErr := c.lockService.ReleaseLock(filestoreIP, gkeNodeInternalIP)
c.RecordLockReleaseMetrics(opErr)
if opErr != nil {
return fmt.Errorf("failed to release lock: %v", opErr)
}
klog.Infof("Removing lock info key %s from configmap %s/%s with data %v", key, cm.Namespace, cm.Name, cm.Data)
// Apply the "Get() and Update(), or retry" logic in RemoveKeyFromConfigMap().
// This will increase the number of k8s api calls,
// but reduce repetitive ReleaseLock() due to kubeclient api failures in each reconcile loop.
if err := c.RemoveKeyFromConfigMapWithRetry(ctx, cm, key); err != nil {
return fmt.Errorf("failed to remove key %s from configmap %s/%s: %v", key, cm.Namespace, cm.Name, err)
}
return nil
}

type LockReleaseController struct {
client kubernetes.Interface

Expand All @@ -63,6 +134,9 @@ type LockReleaseController struct {

updateEventQueue workqueue.RateLimitingInterface
createEventQueue workqueue.RateLimitingInterface

eventProcessor EventProcessor
lockService LockService
}

type LockReleaseControllerConfig struct {
Expand Down Expand Up @@ -98,6 +172,9 @@ func NewLockReleaseController(
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(50), 300)},
)

eventProcessor := &DefaultEventProcessor{}
lockService := &FileStoreRPCClient{}

lc := &LockReleaseController{
id: id,
hostname: hostname,
Expand All @@ -106,6 +183,8 @@ func NewLockReleaseController(
nodeInformer: nodeInformer,
updateEventQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
createEventQueue: workqueue.NewRateLimitingQueue(ratelimiter),
eventProcessor: eventProcessor,
lockService: lockService,
}

if config.MetricEndpoint != "" {
Expand All @@ -116,6 +195,7 @@ func NewLockReleaseController(
lc.metricsManager = mm
}

eventProcessor.SetController(lc)
return lc, nil
}

Expand Down Expand Up @@ -160,74 +240,20 @@ func (c *LockReleaseController) handleCreateEvent(ctx context.Context, obj inter

var configMapReconcileErrors []error
for key, filestoreIP := range data {
err = c.processConfigMapEntryOnNodeCreation(ctx, key, filestoreIP, node, cm)
eventProcessor := c.eventProcessor
err = eventProcessor.processConfigMapEntryOnNodeCreation(ctx, key, filestoreIP, node, cm)
if err != nil {
configMapReconcileErrors = append(configMapReconcileErrors, err)
}
}
klog.Infof("skipped processing %d entries in config map", len(configMapReconcileErrors))
if len(configMapReconcileErrors) > 0 {
return errors.Join(configMapReconcileErrors...)
}
return nil

}

func (c *LockReleaseController) processConfigMapEntryOnNodeCreation(ctx context.Context, key string, filestoreIP string, node *corev1.Node, cm *corev1.ConfigMap) error {
_, _, _, _, gceInstanceID, gkeNodeInternalIP, err := ParseConfigMapKey(key)
if err != nil {
return fmt.Errorf("failed to parse configmap key %s: %w", key, err)
}
klog.V(6).Infof("Verifying GKE node %s with nodeId %s nodeInternalIP %s exists or not", node.Name, gceInstanceID, gkeNodeInternalIP)
entryMatchesNode, err := c.verifyConfigMapEntry(node, gceInstanceID, gkeNodeInternalIP)
if err != nil {
return fmt.Errorf("failed to verify GKE node %s with nodeId %s nodeInternalIP %s still exists: %w", node.Name, gceInstanceID, gkeNodeInternalIP, err)
}
if entryMatchesNode {
klog.V(6).Infof("GKE node %s with nodeId %s nodeInternalIP %s still exists in API server, skip lock info reconciliation", node.Name, gceInstanceID, gkeNodeInternalIP)
return nil
}

// Try to match the latest node, to prevent incorrect releasing the lock in case of a lagging informer/watch
latestNode, err := c.client.CoreV1().Nodes().Get(ctx, node.Name, metav1.GetOptions{})
if err != nil {
if apiError.IsNotFound(err) {
opErr := ReleaseLock(filestoreIP, gkeNodeInternalIP)
c.RecordLockReleaseMetrics(opErr)
if opErr != nil {
return fmt.Errorf("failed to release lock: %w", opErr)
}
if err := c.RemoveKeyFromConfigMapWithRetry(ctx, cm, key); err != nil {
return fmt.Errorf("failed to remove key %s from configmap %s/%s: %w", key, cm.Namespace, cm.Name, err)
}
return nil
}
return fmt.Errorf("failed to get node in namespace %w", err)
}
entryMatchesLatestNode, err := c.verifyConfigMapEntry(latestNode, gceInstanceID, gkeNodeInternalIP)
if err != nil {
return fmt.Errorf("failed to verify GKE node %s with nodeId %s nodeInternalIP %s still exists: %w", node.Name, gceInstanceID, gkeNodeInternalIP, err)
}
if entryMatchesLatestNode {
klog.V(6).Infof("GKE node %s with nodeId %s nodeInternalIP %s exists in API server, skip lock info reconciliation", node.Name, gceInstanceID, gkeNodeInternalIP)
return nil
}

klog.Infof("GKE node %s with nodeId %s nodeInternalIP %s no longer exists, releasing lock for Filestore IP %s", node.Name, gceInstanceID, gkeNodeInternalIP, filestoreIP)
opErr := ReleaseLock(filestoreIP, gkeNodeInternalIP)
c.RecordLockReleaseMetrics(opErr)
if opErr != nil {
return fmt.Errorf("failed to release lock: %w", opErr)
}
klog.Infof("Removing lock info key %s from configmap %s/%s with data %v", key, cm.Namespace, cm.Name, cm.Data)
// Apply the "Get() and Update(), or retry" logic in RemoveKeyFromConfigMap().
// This will increase the number of k8s api calls,
// but reduce repetitive ReleaseLock() due to kubeclient api failures in each reconcile loop.
if err := c.RemoveKeyFromConfigMapWithRetry(ctx, cm, key); err != nil {
return fmt.Errorf("failed to remove key %s from configmap %s/%s: %w", key, cm.Namespace, cm.Name, err)
}
return nil
}

func (c *LockReleaseController) processNextCreateEvent(ctx context.Context) bool {
obj, shutdown := c.createEventQueue.Get()
if shutdown {
Expand Down Expand Up @@ -306,7 +332,7 @@ func (c *LockReleaseController) handleUpdateEvent(ctx context.Context, oldObj in
data := cm.DeepCopy().Data
var configMapReconcileErrors []error
for key, filestoreIP := range data {
err = c.processConfigMapEntryOnNodeUpdate(ctx, key, filestoreIP, newNode, oldNode, cm)
err = c.eventProcessor.processConfigMapEntryOnNodeUpdate(ctx, key, filestoreIP, newNode, oldNode, cm)
if err != nil {
configMapReconcileErrors = append(configMapReconcileErrors, err)
}
Expand All @@ -317,7 +343,11 @@ func (c *LockReleaseController) handleUpdateEvent(ctx context.Context, oldObj in
return nil
}

func (c *LockReleaseController) processConfigMapEntryOnNodeUpdate(ctx context.Context, key string, filestoreIP string, newNode *corev1.Node, oldNode *corev1.Node, cm *corev1.ConfigMap) error {
func (p *DefaultEventProcessor) processConfigMapEntryOnNodeUpdate(ctx context.Context, key string, filestoreIP string, newNode *corev1.Node, oldNode *corev1.Node, cm *corev1.ConfigMap) error {
if p.ctrl == nil {
return fmt.Errorf("controller not set")
}
c := p.ctrl
_, _, _, _, gceInstanceID, gkeNodeInternalIP, err := ParseConfigMapKey(key)
if err != nil {
return fmt.Errorf("failed to parse configmap key %s: %w", key, err)
Expand All @@ -327,18 +357,20 @@ func (c *LockReleaseController) processConfigMapEntryOnNodeUpdate(ctx context.Co
if err != nil {
return fmt.Errorf("failed to verify GKE node %s with nodeId %s nodeInternalIP %s still exists: %w", newNode.Name, gceInstanceID, gkeNodeInternalIP, err)
}
entryMatchesOldNode, err := c.verifyConfigMapEntry(oldNode, gceInstanceID, gkeNodeInternalIP)
if err != nil {
return fmt.Errorf("failed to verify GKE node %s with nodeId %s nodeInternalIP %s still exists: %w", newNode.Name, gceInstanceID, gkeNodeInternalIP, err)
}
klog.Infof("Checked config map entry against old node(matching result %t), and new node(matching result %t)", entryMatchesOldNode, entryMatchesNewNode)
if entryMatchesNewNode {
klog.V(6).Infof("GKE node %s with nodeId %s nodeInternalIP %s still exists in API server, skip lock info reconciliation", newNode.Name, gceInstanceID, gkeNodeInternalIP)
return nil
}

entryMatchesOldNode, err := c.verifyConfigMapEntry(oldNode, gceInstanceID, gkeNodeInternalIP)
if err != nil {
return fmt.Errorf("failed to verify GKE node %s with nodeId %s nodeInternalIP %s still exists: %v", newNode.Name, gceInstanceID, gkeNodeInternalIP, err)
}
klog.Infof("Checked config map entry against old node(matching result %t), and new node(matching result %t)", entryMatchesOldNode, entryMatchesNewNode)

if entryMatchesOldNode {
klog.Infof("GKE node %s with nodeId %s nodeInternalIP %s matches a node before update, releasing lock for Filestore IP %s", newNode.Name, gceInstanceID, gkeNodeInternalIP, filestoreIP)
opErr := ReleaseLock(filestoreIP, gkeNodeInternalIP)
opErr := c.lockService.ReleaseLock(filestoreIP, gkeNodeInternalIP)
c.RecordLockReleaseMetrics(opErr)
if opErr != nil {
return fmt.Errorf("failed to release lock: %w", opErr)
Expand Down
Loading

0 comments on commit 8b57e99

Please sign in to comment.