Skip to content

Commit

Permalink
fix(controller): Do not panic on nil output value. Fixes argoproj#3505 (
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec authored Jul 23, 2020
1 parent c409624 commit 5afbc13
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 60 deletions.
34 changes: 11 additions & 23 deletions test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@ import (
"path/filepath"
"runtime"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"sigs.k8s.io/yaml"

wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/test/util"
)

var (
Expand Down Expand Up @@ -43,12 +41,9 @@ func LoadTestWorkflow(path string) *wfv1.Workflow {

// LoadWorkflowFromBytes returns a workflow unmarshalled from an yaml byte array
func LoadWorkflowFromBytes(yamlBytes []byte) *wfv1.Workflow {
var wf wfv1.Workflow
err := yaml.Unmarshal(yamlBytes, &wf)
if err != nil {
panic(err)
}
return &wf
v := &wfv1.Workflow{}
util.MustUnmarshallYAML(string(yamlBytes), v)
return v
}

// LoadTestWorkflow returns a workflow relative to the test file
Expand All @@ -62,20 +57,13 @@ func LoadTestWorkflowTemplate(path string) *wfv1.WorkflowTemplate {

// LoadWorkflowFromBytes returns a workflow unmarshalled from an yaml byte array
func LoadWorkflowTemplateFromBytes(yamlBytes []byte) *wfv1.WorkflowTemplate {
var wf wfv1.WorkflowTemplate
err := yaml.Unmarshal(yamlBytes, &wf)
if err != nil {
panic(err)
}
return &wf
v := &wfv1.WorkflowTemplate{}
util.MustUnmarshallYAML(string(yamlBytes), v)
return v
}

// LoadUnstructuredFromBytes returns an Unstructured unmarshalled from an yaml byte array
func LoadUnstructuredFromBytes(yamlBytes []byte) *unstructured.Unstructured {
var un unstructured.Unstructured
err := yaml.Unmarshal(yamlBytes, &un)
if err != nil {
panic(err)
}
return &un
func LoadClusterWorkflowTemplateFromBytes(yamlBytes []byte) *wfv1.ClusterWorkflowTemplate {
v := &wfv1.ClusterWorkflowTemplate{}
util.MustUnmarshallYAML(string(yamlBytes), v)
return v
}
11 changes: 9 additions & 2 deletions test/util/yaml.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
package util

import "sigs.k8s.io/yaml"
import (
log "github.com/sirupsen/logrus"
"sigs.k8s.io/yaml"
)

func MustUnmarshallYAML(text string, v interface{}) {
err := yaml.Unmarshal([]byte(text), v)
err := yaml.UnmarshalStrict([]byte(text), v)
if err != nil {
log.Warn("invalid YAML: %w", err)
err = yaml.Unmarshal([]byte(text), v)
}
if err != nil {
panic(err)
}
Expand Down
23 changes: 4 additions & 19 deletions workflow/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ import (
k8stesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/yaml"

"github.com/argoproj/argo/config"
"github.com/argoproj/argo/persist/sqldb"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
fakewfclientset "github.com/argoproj/argo/pkg/client/clientset/versioned/fake"
wfextv "github.com/argoproj/argo/pkg/client/informers/externalversions"
"github.com/argoproj/argo/test"
controllercache "github.com/argoproj/argo/workflow/controller/cache"
hydratorfake "github.com/argoproj/argo/workflow/hydrator/fake"
"github.com/argoproj/argo/workflow/metrics"
Expand Down Expand Up @@ -193,30 +193,15 @@ func newControllerWithComplexDefaults() (context.CancelFunc, *WorkflowController
}

func unmarshalWF(yamlStr string) *wfv1.Workflow {
var wf wfv1.Workflow
err := yaml.Unmarshal([]byte(yamlStr), &wf)
if err != nil {
panic(err)
}
return &wf
return test.LoadWorkflowFromBytes([]byte(yamlStr))
}

func unmarshalWFTmpl(yamlStr string) *wfv1.WorkflowTemplate {
var wftmpl wfv1.WorkflowTemplate
err := yaml.Unmarshal([]byte(yamlStr), &wftmpl)
if err != nil {
panic(err)
}
return &wftmpl
return test.LoadWorkflowTemplateFromBytes([]byte(yamlStr))
}

func unmarshalCWFTmpl(yamlStr string) *wfv1.ClusterWorkflowTemplate {
var cwftmpl wfv1.ClusterWorkflowTemplate
err := yaml.Unmarshal([]byte(yamlStr), &cwftmpl)
if err != nil {
panic(err)
}
return &cwftmpl
return test.LoadClusterWorkflowTemplateFromBytes([]byte(yamlStr))
}

// makePodsPhase acts like a pod controller and simulates the transition of pods transitioning into a specified state
Expand Down
22 changes: 6 additions & 16 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2149,32 +2149,22 @@ func (woc *wfOperationCtx) buildLocalScope(scope *wfScope, prefix string, node *
}

func (woc *wfOperationCtx) addOutputsToLocalScope(prefix string, outputs *wfv1.Outputs, scope *wfScope) {
if outputs == nil {
if outputs == nil || scope == nil {
return
}
if prefix != "workflow" && outputs.Result != nil {
key := fmt.Sprintf("%s.outputs.result", prefix)
if scope != nil {
scope.addParamToScope(key, *outputs.Result)
}
scope.addParamToScope(fmt.Sprintf("%s.outputs.result", prefix), *outputs.Result)
}
if prefix != "workflow" && outputs.ExitCode != nil {
key := fmt.Sprintf("%s.exitCode", prefix)
if scope != nil {
scope.addParamToScope(key, *outputs.ExitCode)
}
scope.addParamToScope(fmt.Sprintf("%s.exitCode", prefix), *outputs.ExitCode)
}
for _, param := range outputs.Parameters {
key := fmt.Sprintf("%s.outputs.parameters.%s", prefix, param.Name)
if scope != nil {
scope.addParamToScope(key, param.Value.String())
if param.Value != nil {
scope.addParamToScope(fmt.Sprintf("%s.outputs.parameters.%s", prefix, param.Name), param.Value.String())
}
}
for _, art := range outputs.Artifacts {
key := fmt.Sprintf("%s.outputs.artifacts.%s", prefix, art.Name)
if scope != nil {
scope.addArtifactToScope(key, art)
}
scope.addArtifactToScope(fmt.Sprintf("%s.outputs.artifacts.%s", prefix, art.Name), art)
}
}

Expand Down
47 changes: 47 additions & 0 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4227,6 +4227,53 @@ status:
}
}

func TestWorkflowOutputs(t *testing.T) {
wf := unmarshalWF(`
metadata:
name: my-wf
namespace: my-ns
spec:
entrypoint: main
templates:
- name: main
dag:
tasks:
- name: step-1
template: child
- name: child
container:
image: my-image
outputs:
parameters:
- name: my-param
valueFrom:
path: /my-path
`)
cancel, controller := newController(wf)
defer cancel()
woc := newWorkflowOperationCtx(wf, controller)

// reconcille
woc.operate()
assert.Equal(t, wfv1.NodeRunning, woc.wf.Status.Phase)

// make all created pods as successful
podInterface := controller.kubeclientset.CoreV1().Pods("my-ns")
list, err := podInterface.List(metav1.ListOptions{})
assert.NoError(t, err)
assert.Len(t, list.Items, 1)
for _, pod := range list.Items {
pod.Status.Phase = apiv1.PodSucceeded
pod.GetAnnotations()[common.AnnotationKeyOutputs] = `{"parameters": [{"name": "my-param"}]}`
_, err := podInterface.Update(&pod)
assert.NoError(t, err)
}

// reconcille
woc.operate()
assert.Equal(t, wfv1.NodeSucceeded, woc.wf.Status.Phase)
}

var globalVarsOnExit = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
Expand Down

0 comments on commit 5afbc13

Please sign in to comment.