Skip to content

Commit

Permalink
feat: Add 'outputs.result' to Container templates (argoproj#2584)
Browse files Browse the repository at this point in the history
  • Loading branch information
simster7 authored Apr 8, 2020
1 parent 51bc876 commit 4da6f4f
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 29 deletions.
8 changes: 4 additions & 4 deletions docs/variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@ The following variables are made available to reference various metadata of a wo
| Variable | Description|
|----------|------------|
| `steps.<STEPNAME>.ip` | IP address of a previous daemon container step |
| `steps.<STEPNAME>.status` | Phase status of any previous script step |
| `steps.<STEPNAME>.outputs.result` | Output result of any previous script step |
| `steps.<STEPNAME>.status` | Phase status of any previous step |
| `steps.<STEPNAME>.outputs.result` | Output result of any previous container or script step |
| `steps.<STEPNAME>.outputs.parameters.<NAME>` | Output parameter of any previous step |
| `steps.<STEPNAME>.outputs.artifacts.<NAME>` | Output artifact of any previous step |

## DAG Templates
| Variable | Description|
|----------|------------|
| `tasks.<TASKNAME>.ip` | IP address of a previous daemon container task |
| `tasks.<TASKNAME>.status` | Phase status of any previous task step |
| `tasks.<TASKNAME>.outputs.result` | Output result of any previous script task |
| `tasks.<TASKNAME>.status` | Phase status of any previous task |
| `tasks.<TASKNAME>.outputs.result` | Output result of any previous container or script task |
| `tasks.<TASKNAME>.outputs.parameters.<NAME>` | Output parameter of any previous task |
| `tasks.<TASKNAME>.outputs.artifacts.<NAME>` | Output artifact of any previous task |

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ require (
golang.org/x/crypto v0.0.0-20200311171314-f7b00557c8c4
golang.org/x/net v0.0.0-20200301022130-244492dfa37a
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
golang.org/x/tools v0.0.0-20200408014516-4d14fc9c00ce // indirect
golang.org/x/tools v0.0.0-20200408132156-9ee5ef7a2c0d // indirect
google.golang.org/api v0.20.0
google.golang.org/genproto v0.0.0-20200317114155-1f3552e48f24
google.golang.org/grpc v1.28.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -618,8 +618,8 @@ golang.org/x/tools v0.0.0-20200207183749-b753a1ba74fa/go.mod h1:TB2adYChydJhpapK
golang.org/x/tools v0.0.0-20200212150539-ea181f53ac56/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200224181240-023911ca70b2/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200317043434-63da46f3035e/go.mod h1:Sl4aGygMT6LrqrWclx+PTx3U+LnKx/seiNR+3G19Ar8=
golang.org/x/tools v0.0.0-20200408014516-4d14fc9c00ce h1:Zc5zydnQGVfragUHga1rdKr1tk2yKSlLIK3winWj5sc=
golang.org/x/tools v0.0.0-20200408014516-4d14fc9c00ce/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200408132156-9ee5ef7a2c0d h1:2DXIdtvIYvvWOcAOsX81FwOUBoQoMZhosWn7KjXEl94=
golang.org/x/tools v0.0.0-20200408132156-9ee5ef7a2c0d/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898 h1:/atklqdjdhuosWIl6AIbOeHJjicWYPqR9bpxqxYG2pA=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
59 changes: 40 additions & 19 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1773,8 +1773,18 @@ func (woc *wfOperationCtx) executeContainer(nodeName string, templateScope strin
return node, nil
}

// Check if the output of this container is referenced elsewhere in the Workflow. If so, make sure to include it during
// execution.
includeScriptOutput, err := woc.includeScriptOutput(nodeName, opts.boundaryID)
if err != nil {
return node, err
}

woc.log.Debugf("Executing node %s with container template: %v\n", nodeName, tmpl)
_, err := woc.createWorkflowPod(nodeName, *tmpl.Container, tmpl, &createWorkflowPodOpts{onExitPod: opts.onExitTemplate})
_, err = woc.createWorkflowPod(nodeName, *tmpl.Container, tmpl, &createWorkflowPodOpts{
includeScriptOutput: includeScriptOutput,
onExitPod: opts.onExitTemplate,
})

if apierr.IsForbidden(err) && isResubmitAllowed(tmpl) {
// Our error was most likely caused by a lack of resources. If pod resubmission is allowed, keep the node pending
Expand Down Expand Up @@ -1900,28 +1910,16 @@ func (woc *wfOperationCtx) executeScript(nodeName string, templateScope string,
}
node = woc.initializeExecutableNode(nodeName, wfv1.NodeTypePod, templateScope, tmpl, orgTmpl, opts.boundaryID, wfv1.NodePending)

includeScriptOutput := false
if boundaryNode, ok := woc.wf.Status.Nodes[opts.boundaryID]; ok {
tmplCtx, err := woc.createTemplateContext(boundaryNode.GetTemplateScope())
if err != nil {
return node, err
}
_, parentTemplate, templateStored, err := tmplCtx.ResolveTemplate(&boundaryNode)
if err != nil {
return node, err
}
// A new template was stored during resolution, persist it
if templateStored {
woc.updated = true
}

name := getStepOrDAGTaskName(nodeName)
includeScriptOutput = hasOutputResultRef(name, parentTemplate)
// Check if the output of this script is referenced elsewhere in the Workflow. If so, make sure to include it during
// execution.
includeScriptOutput, err := woc.includeScriptOutput(nodeName, opts.boundaryID)
if err != nil {
return node, err
}

mainCtr := tmpl.Script.Container
mainCtr.Args = append(mainCtr.Args, common.ExecutorScriptSourcePath)
_, err := woc.createWorkflowPod(nodeName, mainCtr, tmpl, &createWorkflowPodOpts{
_, err = woc.createWorkflowPod(nodeName, mainCtr, tmpl, &createWorkflowPodOpts{
includeScriptOutput: includeScriptOutput,
onExitPod: opts.onExitTemplate,
})
Expand Down Expand Up @@ -2535,3 +2533,26 @@ func (woc *wfOperationCtx) deletePDBResource() error {
woc.log.Infof("Deleted PDB resource for workflow.")
return nil
}

// Check if the output of this node is referenced elsewhere in the Workflow. If so, make sure to include it during
// execution.
func (woc *wfOperationCtx) includeScriptOutput(nodeName, boundaryID string) (bool, error) {
if boundaryNode, ok := woc.wf.Status.Nodes[boundaryID]; ok {
tmplCtx, err := woc.createTemplateContext(boundaryNode.GetTemplateScope())
if err != nil {
return false, err
}
_, parentTemplate, templateStored, err := tmplCtx.ResolveTemplate(&boundaryNode)
if err != nil {
return false, err
}
// A new template was stored during resolution, persist it
if templateStored {
woc.updated = true
}

name := getStepOrDAGTaskName(nodeName)
return hasOutputResultRef(name, parentTemplate), nil
}
return false, nil
}
54 changes: 54 additions & 0 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2682,3 +2682,57 @@ func TestRetryNodeOutputs(t *testing.T) {
woc.buildLocalScope(scope, "steps.influx", retryNode)
assert.Contains(t, scope.scope, "steps.influx.ip")
}

var containerOutputsResult = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: steps-
spec:
entrypoint: hello-hello-hello
templates:
- name: hello-hello-hello
steps:
- - name: hello1
template: whalesay
arguments:
parameters: [{name: message, value: "hello1"}]
- - name: hello2
template: whalesay
arguments:
parameters: [{name: message, value: "{{steps.hello1.outputs.result}}"}]
- name: whalesay
inputs:
parameters:
- name: message
container:
image: alpine:latest
command: [echo]
args: ["{{pod.name}}: {{inputs.parameters.message}}"]
`

func TestContainerOutputsResult(t *testing.T) {

controller := newController()
wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("")

// operate the workflow. it should create a pod.
wf := unmarshalWF(containerOutputsResult)
wf, err := wfcset.Create(wf)
assert.NoError(t, err)

assert.True(t, hasOutputResultRef("hello1", &wf.Spec.Templates[0]))
assert.False(t, hasOutputResultRef("hello2", &wf.Spec.Templates[0]))

woc := newWorkflowOperationCtx(wf, controller)
woc.operate()

for _, node := range wf.Status.Nodes {
if strings.Contains(node.Name, "hello1") {
assert.True(t, getStepOrDAGTaskName(node.Name) == "hello1")
} else if strings.Contains(node.Name, "hello2") {
assert.True(t, getStepOrDAGTaskName(node.Name) == "hello2")
}
}
}
2 changes: 1 addition & 1 deletion workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,8 +545,8 @@ func (woc *wfOperationCtx) addMetadata(pod *apiv1.Pod, tmpl *wfv1.Template, incl

if woc.workflowDeadline != nil {
execCtl.Deadline = woc.workflowDeadline

}

if woc.workflowDeadline != nil || includeScriptOutput {
execCtlBytes, err := json.Marshal(execCtl)
if err != nil {
Expand Down
11 changes: 10 additions & 1 deletion workflow/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,8 @@ func (we *WorkflowExecutor) CaptureScriptResult() error {
log.Infof("No Script output reference in workflow. Capturing script output ignored")
return nil
}
if we.Template.Script == nil {
if we.Template.Script == nil && we.Template.Container == nil {
log.Infof("Template type is neither of Script or Container. Capturing script output ignored")
return nil
}
log.Infof("Capturing script output")
Expand All @@ -722,6 +723,14 @@ func (we *WorkflowExecutor) CaptureScriptResult() error {
if outputLen > 0 && out[outputLen-1] == '\n' {
out = out[0 : outputLen-1]
}

const maxAnnotationSize int = 256 * (1 << 10) // 256 kB
// A character in a string is a byte
if len(out) > maxAnnotationSize {
log.Warnf("Output is larger than the maximum allowed size of 256 kB, only the last 256 kB were saved")
out = out[len(out)-maxAnnotationSize:]
}

we.Template.Outputs.Result = &out
return nil
}
Expand Down
4 changes: 3 additions & 1 deletion workflow/validate/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,7 @@ func (ctx *templateValidationCtx) addOutputsToScope(tmpl *wfv1.Template, prefix
if tmpl.Daemon != nil && *tmpl.Daemon {
scope[fmt.Sprintf("%s.ip", prefix)] = true
}
if tmpl.Script != nil {
if tmpl.Script != nil || tmpl.Container != nil {
scope[fmt.Sprintf("%s.outputs.result", prefix)] = true
}
for _, param := range tmpl.Outputs.Parameters {
Expand Down Expand Up @@ -798,6 +798,8 @@ func (ctx *templateValidationCtx) addOutputsToScope(tmpl *wfv1.Template, prefix
}
if aggregate {
switch tmpl.GetType() {
// Not that we don't also include TemplateTypeContainer here, even though it uses `outputs.result` it uses
// `outputs.parameters` as its aggregator.
case wfv1.TemplateTypeScript:
scope[fmt.Sprintf("%s.outputs.result", prefix)] = true
default:
Expand Down

0 comments on commit 4da6f4f

Please sign in to comment.