Skip to content

Commit

Permalink
fix: Improve better handling on Pod deletion scenario (argoproj#4064)
Browse files Browse the repository at this point in the history
  • Loading branch information
sarabala1979 authored Sep 19, 2020
1 parent e2f4966 commit ed59408
Showing 1 changed file with 19 additions and 12 deletions.
31 changes: 19 additions & 12 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,25 +572,33 @@ func (wfc *WorkflowController) processNextPodItem() bool {
// we dequeued it.
return true
}

err = wfc.enqueueWfFromPodLabel(obj)
if err != nil {
log.WithError(err).Warnf("Failed to enqueue the workflow for %s", key)
}
return true
}

// enqueueWfFromPodLabel will extract the workflow name from pod label and
// enqueue workflow for processing
func (wfc *WorkflowController) enqueueWfFromPodLabel(obj interface{}) error {
pod, ok := obj.(*apiv1.Pod)
if !ok {
log.WithFields(log.Fields{"key": key}).Warn("Key in index is not a pod")
return true
return fmt.Errorf("Key in index is not a pod")
}
if pod.Labels == nil {
log.WithFields(log.Fields{"key": key}).Warn("Pod did not have labels")
return true
return fmt.Errorf("Pod did not have labels")
}
workflowName, ok := pod.Labels[common.LabelKeyWorkflow]
if !ok {
// Ignore pods unrelated to workflow (this shouldn't happen unless the watch is setup incorrectly)
log.WithFields(log.Fields{"key": pod.ObjectMeta.Name}).Warn("Watch returned pod unrelated to any workflow")
return true
return fmt.Errorf("Watch returned pod unrelated to any workflow")
}
// add this change after 1s - this reduces the number of workflow reconciliations -
//with each reconciliation doing more work
wfc.wfQueue.AddAfter(pod.ObjectMeta.Namespace+"/"+workflowName, enoughTimeForInformerSync)
return true
return nil
}

func (wfc *WorkflowController) tweakListOptions(options *metav1.ListOptions) {
Expand Down Expand Up @@ -816,11 +824,10 @@ func (wfc *WorkflowController) newPodInformer() cache.SharedIndexInformer {
DeleteFunc: func(obj interface{}) {
// IndexerInformer uses a delta queue, therefore for deletes we have to use this
// key function.
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
return
}
wfc.podQueue.Add(key)

// Enqueue the workflow for deleted pod
_ = wfc.enqueueWfFromPodLabel(obj)

},
},
)
Expand Down

0 comments on commit ed59408

Please sign in to comment.