Skip to content

Commit

Permalink
fix: Restart server ConfigMap watch when closed (argoproj#2360)
Browse files Browse the repository at this point in the history
  • Loading branch information
simster7 authored Mar 4, 2020
1 parent 64d0cec commit 9743831
Showing 1 changed file with 24 additions and 20 deletions.
44 changes: 24 additions & 20 deletions server/apiserver/argoserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,7 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st
if err != nil {
log.Fatal(err)
}
err = as.restartOnConfigChange(ctx.Done())
if err != nil {
log.Fatal(err)
}
as.restartOnConfigChange(ctx.Done())
var offloadRepo = sqldb.ExplosiveOffloadNodeStatusRepo
var wfArchive = sqldb.NullWorkflowArchive
persistence := configMap.Persistence
Expand Down Expand Up @@ -270,29 +267,36 @@ func (as *argoServer) rsyncConfig() (*config.WorkflowControllerConfig, error) {
// Unlike the controller, the server creates object based on the config map at init time, and will not pick-up on
// changes unless we restart.
// Instead of opting to re-write the server, instead we'll just listen for any old change and restart.
func (as *argoServer) restartOnConfigChange(stopCh <-chan struct{}) error {
w, err := as.kubeClientset.CoreV1().ConfigMaps(as.namespace).
Watch(metav1.ListOptions{FieldSelector: "metadata.name=" + as.configName})
if err != nil {
return err
}
func (as *argoServer) restartOnConfigChange(stopCh <-chan struct{}) {
go func() {
defer w.Stop()
main:
for {
select {
// normal exit, e.g. due to user interupt
case <-stopCh:
return
case e := <-w.ResultChan():
if e.Type != watch.Added && e.Type != watch.Bookmark {
log.WithField("eventType", e.Type).Info("config map event, exiting gracefully")
as.stopCh <- struct{}{}
log.Info("establishing configmap watch")
w, err := as.kubeClientset.CoreV1().ConfigMaps(as.namespace).
Watch(metav1.ListOptions{FieldSelector: "metadata.name=" + as.configName})
if err != nil {
log.Fatalf("error establishing watch: %s", err)
}
log.Info("configmap watch established")
for {
select {
// normal exit, e.g. due to user interrupt
case <-stopCh:
return
case e, open := <-w.ResultChan():
if !open {
// The channel is closed, reopen it
continue main
}
if e.Type == watch.Modified || e.Type == watch.Deleted {
log.WithField("eventType", e.Type).Info("config map event, exiting gracefully")
as.stopCh <- struct{}{}
return
}
}
}
}
}()
return nil
}

func (as *argoServer) updateConfig(cm *apiv1.ConfigMap) (*config.WorkflowControllerConfig, error) {
Expand Down

0 comments on commit 9743831

Please sign in to comment.