Skip to content

Commit 2f62d1b

Browse files
authored
Issue argoproj#1161 - Use correct resource version in K8S watch API calls to avoid lost update events (argoproj#1173)
1 parent ee617d1 commit 2f62d1b

File tree

6 files changed

+292
-116
lines changed

6 files changed

+292
-116
lines changed

controller/cache/cache.go

+47-18
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func (c *liveStateCache) getCluster(server string) (*clusterInfo, error) {
9393
return nil, err
9494
}
9595
info = &clusterInfo{
96-
apis: make(map[schema.GroupVersionKind]metav1.APIResource),
96+
apis: make(map[schema.GroupKind]*gkInfo),
9797
lock: &sync.Mutex{},
9898
nodes: make(map[kube.ResourceKey]*node),
9999
nsIndex: make(map[string]map[kube.ResourceKey]*node),
@@ -148,7 +148,7 @@ func (c *liveStateCache) IsNamespaced(server string, gvk schema.GroupVersionKind
148148
if err != nil {
149149
return false, err
150150
}
151-
return clusterInfo.isNamespaced(gvk), nil
151+
return clusterInfo.isNamespaced(gvk.GroupKind()), nil
152152
}
153153

154154
func (c *liveStateCache) GetChildren(server string, obj *unstructured.Unstructured) ([]appv1.ResourceNode, error) {
@@ -178,6 +178,7 @@ func isClusterHasApps(apps []interface{}, cluster *appv1.Cluster) bool {
178178

179179
// Run watches for resource changes annotated with application label on all registered clusters and schedule corresponding app refresh.
180180
func (c *liveStateCache) Run(ctx context.Context) {
181+
watchingClustersLock := sync.Mutex{}
181182
watchingClusters := make(map[string]struct {
182183
cancel context.CancelFunc
183184
cluster *appv1.Cluster
@@ -191,9 +192,12 @@ func (c *liveStateCache) Run(ctx context.Context) {
191192
// cluster resources must be watched only if cluster has at least one app
192193
if (event.Type == watch.Deleted || !hasApps) && ok {
193194
info.cancel()
195+
watchingClustersLock.Lock()
194196
delete(watchingClusters, event.Cluster.Server)
197+
watchingClustersLock.Unlock()
195198
} else if event.Type != watch.Deleted && !ok && hasApps {
196199
ctx, cancel := context.WithCancel(ctx)
200+
watchingClustersLock.Lock()
197201
watchingClusters[event.Cluster.Server] = struct {
198202
cancel context.CancelFunc
199203
cluster *appv1.Cluster
@@ -204,6 +208,7 @@ func (c *liveStateCache) Run(ctx context.Context) {
204208
},
205209
cluster: event.Cluster,
206210
}
211+
watchingClustersLock.Unlock()
207212
go c.watchClusterResources(ctx, c.settings, *event.Cluster)
208213
}
209214
}
@@ -262,35 +267,59 @@ func (c *liveStateCache) watchClusterResources(ctx context.Context, resourceFilt
262267
if err != nil {
263268
return err
264269
}
265-
ch, err := c.kubectl.WatchResources(ctx, config, resourceFilter, "")
270+
271+
ch, err := c.kubectl.WatchResources(ctx, config, resourceFilter, func(gk schema.GroupKind) (s string, e error) {
272+
clusterInfo, err := c.getSyncedCluster(item.Server)
273+
if err != nil {
274+
return "", err
275+
}
276+
return clusterInfo.getResourceVersion(gk), nil
277+
})
266278
if err != nil {
267279
return err
268280
}
269281
for event := range ch {
270-
eventObj := event.Object.(*unstructured.Unstructured)
271-
if kube.IsCRD(eventObj) {
272-
// restart if new CRD has been created after watch started
273-
if event.Type == watch.Added {
274-
if !knownCRDs[eventObj.GetName()] {
282+
if event.WatchEvent != nil {
283+
eventObj := event.WatchEvent.Object.(*unstructured.Unstructured)
284+
if kube.IsCRD(eventObj) {
285+
// restart if new CRD has been created after watch started
286+
if event.WatchEvent.Type == watch.Added {
287+
if !knownCRDs[eventObj.GetName()] {
288+
c.removeCluster(item.Server)
289+
return fmt.Errorf("Restarting the watch because a new CRD %s was added", eventObj.GetName())
290+
} else {
291+
log.Infof("CRD %s updated", eventObj.GetName())
292+
}
293+
} else if event.WatchEvent.Type == watch.Deleted {
275294
c.removeCluster(item.Server)
276-
return fmt.Errorf("Restarting the watch because a new CRD %s was added", eventObj.GetName())
277-
} else {
278-
log.Infof("CRD %s updated", eventObj.GetName())
295+
return fmt.Errorf("Restarting the watch because CRD %s was deleted", eventObj.GetName())
279296
}
280-
} else if event.Type == watch.Deleted {
281-
c.removeCluster(item.Server)
282-
return fmt.Errorf("Restarting the watch because CRD %s was deleted", eventObj.GetName())
297+
}
298+
err = c.processEvent(event.WatchEvent.Type, eventObj, item.Server)
299+
if err != nil {
300+
log.Warnf("Failed to process event %s for obj %v: %v", event.WatchEvent.Type, event.WatchEvent.Object, err)
301+
}
302+
} else {
303+
err = c.updateCache(item.Server, event.CacheRefresh.GVK.GroupKind(), event.CacheRefresh.ResourceVersion, event.CacheRefresh.Objects)
304+
if err != nil {
305+
log.Warnf("Failed to process event %s for obj %v: %v", event.WatchEvent.Type, event.WatchEvent.Object, err)
283306
}
284307
}
285-
err = c.processEvent(event.Type, eventObj, item.Server)
286-
if err != nil {
287-
log.Warnf("Failed to process event %s for obj %v: %v", event.Type, event.Object, err)
288-
}
308+
289309
}
290310
return fmt.Errorf("resource updates channel has closed")
291311
}, fmt.Sprintf("watch app resources on %s", item.Server), ctx, clusterRetryTimeout)
292312
}
293313

314+
func (c *liveStateCache) updateCache(server string, gk schema.GroupKind, resourceVersion string, objs []unstructured.Unstructured) error {
315+
clusterInfo, err := c.getSyncedCluster(server)
316+
if err != nil {
317+
return err
318+
}
319+
clusterInfo.updateCache(gk, resourceVersion, objs)
320+
return nil
321+
}
322+
294323
// getCRDs returns a map of crds
295324
func getCRDs(config *rest.Config) (map[string]bool, error) {
296325
crdsByName := make(map[string]bool)

controller/cache/cluster.go

+97-51
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,17 @@ import (
2121
)
2222

2323
const (
24-
clusterSyncTimeout = 1 * time.Hour
24+
clusterSyncTimeout = 24 * time.Hour
2525
clusterRetryTimeout = 10 * time.Second
2626
)
2727

28+
type gkInfo struct {
29+
resource metav1.APIResource
30+
listVersion string
31+
}
32+
2833
type clusterInfo struct {
29-
apis map[schema.GroupVersionKind]metav1.APIResource
34+
apis map[schema.GroupKind]*gkInfo
3035
nodes map[kube.ResourceKey]*node
3136
nsIndex map[string]map[kube.ResourceKey]*node
3237
lock *sync.Mutex
@@ -40,6 +45,46 @@ type clusterInfo struct {
4045
settings *settings.ArgoCDSettings
4146
}
4247

48+
func (c *clusterInfo) getResourceVersion(gk schema.GroupKind) string {
49+
c.lock.Lock()
50+
defer c.lock.Unlock()
51+
info, ok := c.apis[gk]
52+
if ok {
53+
return info.listVersion
54+
}
55+
return ""
56+
}
57+
58+
func (c *clusterInfo) updateCache(gk schema.GroupKind, resourceVersion string, objs []unstructured.Unstructured) {
59+
c.lock.Lock()
60+
defer c.lock.Unlock()
61+
info, ok := c.apis[gk]
62+
if ok {
63+
objByKind := make(map[kube.ResourceKey]*unstructured.Unstructured)
64+
for i := range objs {
65+
objByKind[kube.GetResourceKey(&objs[i])] = &objs[i]
66+
}
67+
68+
for i := range objs {
69+
obj := &objs[i]
70+
key := kube.GetResourceKey(&objs[i])
71+
existingNode, exists := c.nodes[key]
72+
c.onNodeUpdated(exists, existingNode, obj, key)
73+
}
74+
75+
for key, existingNode := range c.nodes {
76+
if key.Kind != gk.Kind || key.Group != gk.Group {
77+
continue
78+
}
79+
80+
if _, ok := objByKind[key]; !ok {
81+
c.onNodeRemoved(key, existingNode)
82+
}
83+
}
84+
info.listVersion = resourceVersion
85+
}
86+
}
87+
4388
func createObjInfo(un *unstructured.Unstructured, appInstanceLabel string) *node {
4489
ownerRefs := un.GetOwnerReferences()
4590
// Special case for endpoint. Remove after https://github.com/kubernetes/kubernetes/issues/28483 is fixed
@@ -113,34 +158,29 @@ func (c *clusterInfo) sync() (err error) {
113158

114159
c.log.Info("Start syncing cluster")
115160

116-
clusterResources, err := c.kubectl.GetAPIResources(c.cluster.RESTConfig())
117-
if err != nil {
118-
if len(clusterResources) == 0 {
119-
return err
120-
}
121-
log.Warnf("Partial success when getting API resources during sync: %v", err)
122-
}
123-
c.apis = make(map[schema.GroupVersionKind]metav1.APIResource)
124-
for _, r := range clusterResources {
125-
gv, err := schema.ParseGroupVersion(r.GroupVersion)
126-
if err != nil {
127-
gv = schema.GroupVersion{}
128-
}
129-
for i := range r.APIResources {
130-
c.apis[gv.WithKind(r.APIResources[i].Kind)] = r.APIResources[i]
131-
}
132-
}
133-
161+
c.apis = make(map[schema.GroupKind]*gkInfo)
134162
c.nodes = make(map[kube.ResourceKey]*node)
163+
135164
resources, err := c.kubectl.GetResources(c.cluster.RESTConfig(), c.settings, "")
136165
if err != nil {
137166
log.Errorf("Failed to sync cluster %s: %v", c.cluster.Server, err)
138167
return err
139168
}
140169

141170
appLabelKey := c.settings.GetAppInstanceLabelKey()
142-
for i := range resources {
143-
c.setNode(createObjInfo(resources[i], appLabelKey))
171+
for res := range resources {
172+
if res.Error != nil {
173+
return res.Error
174+
}
175+
if _, ok := c.apis[res.GVK.GroupKind()]; !ok {
176+
c.apis[res.GVK.GroupKind()] = &gkInfo{
177+
listVersion: res.ListResourceVersion,
178+
resource: res.ResourceInfo,
179+
}
180+
}
181+
for i := range res.Objects {
182+
c.setNode(createObjInfo(&res.Objects[i], appLabelKey))
183+
}
144184
}
145185

146186
c.log.Info("Cluster successfully synced")
@@ -179,8 +219,8 @@ func (c *clusterInfo) getChildren(obj *unstructured.Unstructured) []appv1.Resour
179219
return children
180220
}
181221

182-
func (c *clusterInfo) isNamespaced(gvk schema.GroupVersionKind) bool {
183-
if api, ok := c.apis[gvk]; ok && !api.Namespaced {
222+
func (c *clusterInfo) isNamespaced(gk schema.GroupKind) bool {
223+
if api, ok := c.apis[gk]; ok && !api.resource.Namespaced {
184224
return false
185225
}
186226
return true
@@ -202,7 +242,7 @@ func (c *clusterInfo) getManagedLiveObjs(a *appv1.Application, targetObjs []*uns
202242
lock := &sync.Mutex{}
203243
err := util.RunAllAsync(len(targetObjs), func(i int) error {
204244
targetObj := targetObjs[i]
205-
key := GetTargetObjKey(a, targetObj, c.isNamespaced(targetObj.GroupVersionKind()))
245+
key := GetTargetObjKey(a, targetObj, c.isNamespaced(targetObj.GroupVersionKind().GroupKind()))
206246
lock.Lock()
207247
managedObj := managedObjs[key]
208248
lock.Unlock()
@@ -274,38 +314,44 @@ func (c *clusterInfo) processEvent(event watch.EventType, un *unstructured.Unstr
274314
existingNode, exists := c.nodes[key]
275315
if event == watch.Deleted {
276316
if exists {
277-
c.removeNode(key)
278-
if existingNode.appName != "" {
279-
c.onAppUpdated(existingNode.appName)
280-
}
317+
c.onNodeRemoved(key, existingNode)
281318
}
282319
} else if event != watch.Deleted {
283-
nodes := make([]*node, 0)
284-
if exists {
285-
nodes = append(nodes, existingNode)
286-
}
287-
newObj := createObjInfo(un, c.settings.GetAppInstanceLabelKey())
288-
c.setNode(newObj)
289-
nodes = append(nodes, newObj)
290-
291-
toNotify := make(map[string]bool)
292-
for i := range nodes {
293-
n := nodes[i]
294-
if ns, ok := c.nsIndex[n.ref.Namespace]; ok {
295-
app := n.getApp(ns)
296-
if app == "" || skipAppRequeing(key) {
297-
continue
298-
}
299-
toNotify[app] = true
300-
}
301-
}
320+
c.onNodeUpdated(exists, existingNode, un, key)
321+
}
322+
323+
return nil
324+
}
302325

303-
for name := range toNotify {
304-
c.onAppUpdated(name)
326+
func (c *clusterInfo) onNodeUpdated(exists bool, existingNode *node, un *unstructured.Unstructured, key kube.ResourceKey) {
327+
nodes := make([]*node, 0)
328+
if exists {
329+
nodes = append(nodes, existingNode)
330+
}
331+
newObj := createObjInfo(un, c.settings.GetAppInstanceLabelKey())
332+
c.setNode(newObj)
333+
nodes = append(nodes, newObj)
334+
toNotify := make(map[string]bool)
335+
for i := range nodes {
336+
n := nodes[i]
337+
if ns, ok := c.nsIndex[n.ref.Namespace]; ok {
338+
app := n.getApp(ns)
339+
if app == "" || skipAppRequeing(key) {
340+
continue
341+
}
342+
toNotify[app] = true
305343
}
306344
}
345+
for name := range toNotify {
346+
c.onAppUpdated(name)
347+
}
348+
}
307349

308-
return nil
350+
func (c *clusterInfo) onNodeRemoved(key kube.ResourceKey, existingNode *node) {
351+
c.removeNode(key)
352+
if existingNode.appName != "" {
353+
c.onAppUpdated(existingNode.appName)
354+
}
309355
}
310356

311357
var (

controller/cache/cluster_test.go

+34-1
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ func newCluster(resources ...*unstructured.Unstructured) *clusterInfo {
8484
cluster: &appv1.Cluster{},
8585
syncTime: nil,
8686
syncLock: &sync.Mutex{},
87-
apis: make(map[schema.GroupVersionKind]metav1.APIResource),
87+
apis: make(map[schema.GroupKind]*gkInfo),
8888
log: log.WithField("cluster", "test"),
8989
settings: &settings.ArgoCDSettings{},
9090
}
@@ -274,3 +274,36 @@ func TestCircularReference(t *testing.T) {
274274
children := cluster.getChildren(dep)
275275
assert.Len(t, children, 1)
276276
}
277+
278+
func TestWatchCacheUpdated(t *testing.T) {
279+
removed := testPod.DeepCopy()
280+
removed.SetName(testPod.GetName() + "-removed-pod")
281+
282+
updated := testPod.DeepCopy()
283+
updated.SetName(testPod.GetName() + "-updated-pod")
284+
updated.SetResourceVersion("updated-pod-version")
285+
286+
cluster := newCluster(removed, updated)
287+
err := cluster.ensureSynced()
288+
289+
assert.Nil(t, err)
290+
291+
added := testPod.DeepCopy()
292+
added.SetName(testPod.GetName() + "-new-pod")
293+
294+
podGroupKind := testPod.GroupVersionKind().GroupKind()
295+
296+
cluster.updateCache(podGroupKind, "updated-list-version", []unstructured.Unstructured{*updated, *added})
297+
298+
_, ok := cluster.nodes[kube.GetResourceKey(removed)]
299+
assert.False(t, ok)
300+
301+
updatedNode, ok := cluster.nodes[kube.GetResourceKey(updated)]
302+
assert.True(t, ok)
303+
assert.Equal(t, updatedNode.resourceVersion, "updated-pod-version")
304+
305+
_, ok = cluster.nodes[kube.GetResourceKey(added)]
306+
assert.True(t, ok)
307+
308+
assert.Equal(t, cluster.getResourceVersion(podGroupKind), "updated-list-version")
309+
}

0 commit comments

Comments
 (0)