Skip to content

Commit

Permalink
fix(controller): Correct the order merging the fields in WorkflowTemp…
Browse files Browse the repository at this point in the history
…lateRef scenario. Fixes argoproj#4044 (argoproj#4063)
  • Loading branch information
sarabala1979 authored Sep 23, 2020
1 parent 764b56b commit 6b350b0
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 5 deletions.
11 changes: 6 additions & 5 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3037,19 +3037,20 @@ func (woc *wfOperationCtx) loadExecutionSpec() (wfv1.TemplateReferenceHolder, wf
}
}

//In WorkflowTemplateRef scenario, we need to merge the Workflow Default, Workflow spec and storedWorkflowspec for execWf.
//In WorkflowTemplateRef scenario, we need to merge the Workflow spec, :StoredWorkflowspec and Workflow Default for execWf.
// Overlay
targetWf := wfv1.Workflow{Spec: *woc.wf.Spec.DeepCopy()}
err := woc.controller.setWorkflowDefaults(&targetWf)

err := wfutil.MergeTo(&wfv1.Workflow{Spec: *woc.wf.Status.StoredWorkflowSpec}, &targetWf)
if err != nil {
log.WithFields(log.Fields{"key": woc.wf.Name, "error": err}).Warn("Failed to apply default workflow values")
return nil, executionParameters, err
}

err = wfutil.MergeTo(&wfv1.Workflow{Spec: *woc.wf.Status.StoredWorkflowSpec}, &targetWf)
err = woc.controller.setWorkflowDefaults(&targetWf)
if err != nil {
log.WithFields(log.Fields{"key": woc.wf.Name, "error": err}).Warn("Failed to apply default workflow values")
return nil, executionParameters, err
}

// Setting the merged workflow to executable workflow
woc.execWf = &targetWf

Expand Down
36 changes: 36 additions & 0 deletions workflow/controller/operator_workflow_template_ref_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,42 @@ func TestWorkflowTemplateRefWithWorkflowTemplateArgs(t *testing.T) {
assert.Equal(t, "test", woc.globalParams["workflow.parameters.param1"])

})

t.Run("CheckMergingWFDefaults", func(t *testing.T) {
wfDefaultActiveS := int64(5)
cancel, controller := newController(wf, wftmpl)
defer cancel()
controller.Config.WorkflowDefaults = &wfv1.Workflow{Spec: wfv1.WorkflowSpec{
ActiveDeadlineSeconds: &wfDefaultActiveS,
},
}
woc := newWorkflowOperationCtx(wf, controller)
woc.operate()
assert.Equal(t, wfDefaultActiveS, *woc.execWf.Spec.ActiveDeadlineSeconds)

})
t.Run("CheckMergingWFTandWF", func(t *testing.T) {
wfActiveS := int64(10)
wftActiveS := int64(10)
wfDefaultActiveS := int64(5)

wftmpl.Spec.ActiveDeadlineSeconds = &wftActiveS
cancel, controller := newController(wf, wftmpl)
defer cancel()
controller.Config.WorkflowDefaults = &wfv1.Workflow{Spec: wfv1.WorkflowSpec{
ActiveDeadlineSeconds: &wfDefaultActiveS,
},
}
wf.Spec.ActiveDeadlineSeconds = &wfActiveS
woc := newWorkflowOperationCtx(wf, controller)
woc.operate()
assert.Equal(t, wfActiveS, *woc.execWf.Spec.ActiveDeadlineSeconds)

wf.Spec.ActiveDeadlineSeconds = nil
woc = newWorkflowOperationCtx(wf, controller)
woc.operate()
assert.Equal(t, wftActiveS, *woc.execWf.Spec.ActiveDeadlineSeconds)
})
}

const wfWithStatus = `
Expand Down

0 comments on commit 6b350b0

Please sign in to comment.