Skip to content

Commit

Permalink
fix(server): Report v1.Status errors. Fixes argoproj#3608 (argoproj#3652
Browse files Browse the repository at this point in the history
)
  • Loading branch information
alexec authored Jul 31, 2020
1 parent a3a4ea0 commit a8f4da0
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 18 deletions.
14 changes: 8 additions & 6 deletions server/workflow/workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
apierr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/argoproj/argo/errors"
Expand Down Expand Up @@ -168,12 +169,8 @@ func (s *workflowServer) WatchWorkflows(req *workflowpkg.WatchWorkflowsRequest,
select {
case <-ctx.Done():
return nil
case event, ok := <-watch.ResultChan():
var wf *wfv1.Workflow
if ok {
wf, ok = event.Object.(*wfv1.Workflow)
}
if !ok {
case event, open := <-watch.ResultChan():
if !open {
log.Debug("Re-establishing workflow watch")
watch.Stop()
watch, err = wfIf.Watch(*opts)
Expand All @@ -183,6 +180,11 @@ func (s *workflowServer) WatchWorkflows(req *workflowpkg.WatchWorkflowsRequest,
continue
}
log.Debug("Received event")
wf, ok := event.Object.(*wfv1.Workflow)
if !ok {
// object is probably probably metav1.Status, `FromObject` can deal with anything
return apierr.FromObject(event.Object)
}
logCtx := log.WithFields(log.Fields{"workflow": wf.Name, "type": event.Type, "phase": wf.Status.Phase})
err := s.hydrator.Hydrate(wf)
if err != nil {
Expand Down
29 changes: 17 additions & 12 deletions util/logs/workflow-logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
apierr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
Expand Down Expand Up @@ -163,12 +164,8 @@ func WorkflowLogs(ctx context.Context, wfClient versioned.Interface, kubeClient
select {
case <-ctx.Done():
return
case event, ok := <-wfWatch.ResultChan():
var wf *wfv1.Workflow
if ok {
wf, ok = event.Object.(*wfv1.Workflow)
}
if !ok {
case event, open := <-wfWatch.ResultChan():
if !open {
logCtx.Debug("Re-establishing workflow watch")
wfWatch.Stop()
wfWatch, err = wfInterface.Watch(wfListOptions)
Expand All @@ -178,6 +175,12 @@ func WorkflowLogs(ctx context.Context, wfClient versioned.Interface, kubeClient
}
continue
}
wf, ok := event.Object.(*wfv1.Workflow)
if !ok {
// object is probably probably metav1.Status
logCtx.WithError(apierr.FromObject(event.Object)).Warn("watch object was not a workflow")
return
}
logCtx.WithFields(log.Fields{"eventType": event.Type, "completed": wf.Status.Fulfilled()}).Debug("Workflow event")
if event.Type == watch.Deleted || wf.Status.Fulfilled() {
return
Expand All @@ -198,12 +201,8 @@ func WorkflowLogs(ctx context.Context, wfClient versioned.Interface, kubeClient
select {
case <-stopWatchingPods:
return
case event, ok := <-podWatch.ResultChan():
var pod *corev1.Pod
if ok {
pod, ok = event.Object.(*corev1.Pod)
}
if !ok {
case event, open := <-podWatch.ResultChan():
if !open {
logCtx.Info("Re-establishing pod watch")
podWatch.Stop()
podWatch, err = podInterface.Watch(podListOptions)
Expand All @@ -213,6 +212,12 @@ func WorkflowLogs(ctx context.Context, wfClient versioned.Interface, kubeClient
}
continue
}
pod, ok := event.Object.(*corev1.Pod)
if !ok {
// object is probably probably metav1.Status
logCtx.WithError(apierr.FromObject(event.Object)).Warn("watch object was not a pod")
return
}
logCtx.WithFields(log.Fields{"eventType": event.Type, "podName": pod.GetName(), "phase": pod.Status.Phase}).Debug("Pod event")
if pod.Status.Phase == corev1.PodRunning {
ensureWeAreStreaming(pod)
Expand Down

0 comments on commit a8f4da0

Please sign in to comment.