Skip to content

Commit

Permalink
feat: Allow for setting default configurations for workflows, Fixes a…
Browse files Browse the repository at this point in the history
  • Loading branch information
NikeNano authored Mar 6, 2020
1 parent 81ab538 commit 3890a12
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 2 deletions.
2 changes: 2 additions & 0 deletions workflow/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type WorkflowControllerConfig struct {

// Config customized Docker Sock path
DockerSockPath string `json:"dockerSockPath,omitempty"`

DefautWorkflowSpec *wfv1.WorkflowSpec `json:"workflowDefaults,omitempty"`
}

// KubeConfig is used for wait & init sidecar containers to communicate with a k8s apiserver by a outofcluster method,
Expand Down
41 changes: 39 additions & 2 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controller

import (
"context"
"encoding/json"
"fmt"
"os"
"strings"
Expand All @@ -17,6 +18,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -45,8 +47,10 @@ type WorkflowController struct {
// namespace of the workflow controller
namespace string
managedNamespace string

// configMap is the name of the config map in which to derive configuration of the controller from
configMap string

// Config is the workflow controller's configuration
Config config.WorkflowControllerConfig

Expand Down Expand Up @@ -355,16 +359,22 @@ func (wfc *WorkflowController) processNextItem() bool {
wfc.throttler.Remove(key)
return true
}
err = wfc.addingWorkflowDefaultValueIfValueNotExist(wf)
if err != nil {
log.Warnf("Failed to unmarshal key '%s' to workflow object: %v", key, err)
woc := newWorkflowOperationCtx(wf, wfc)
woc.markWorkflowFailed(fmt.Sprintf("invalid spec: %s", err.Error()))
woc.persistUpdates()
wfc.throttler.Remove(key)
}

if wf.ObjectMeta.Labels[common.LabelKeyCompleted] == "true" {
wfc.throttler.Remove(key)
// can get here if we already added the completed=true label,
// but we are still draining the controller's workflow workqueue
return true
}

woc := newWorkflowOperationCtx(wf, wfc)

// Loading running workflow from persistence storage if nodeStatusOffload enabled
if wf.Status.IsOffloadNodeStatus() {
nodes, err := wfc.offloadNodeStatusRepo.Get(string(wf.UID), wf.GetOffloadNodeStatusVersion())
Expand Down Expand Up @@ -417,6 +427,33 @@ func (wfc *WorkflowController) processNextItem() bool {
return true
}

// addingWorkflowDefaultValueIfValueNotExist sets values in the workflow.Spec with defaults from the
// workflowController. Values in the workflow will be given the upper hand over the defaults.
// The defaults for the workflow controller is set in the WorkflowController.Config.DefautWorkflowSpec
func (wfc *WorkflowController) addingWorkflowDefaultValueIfValueNotExist(wf *wfv1.Workflow) error {
//var workflowSpec *wfv1.WorkflowSpec = &wf.Spec
if wfc.Config.DefautWorkflowSpec != nil {
defaultsSpec, err := json.Marshal(*wfc.Config.DefautWorkflowSpec)
if err != nil {
return err
}
workflowSpec, err := json.Marshal(wf.Spec)
if err != nil {
return err
}
// https://github.com/kubernetes/apimachinery/blob/2373d029717c4d169463414a6127cd1d0d12680e/pkg/util/strategicpatch/patch.go#L94
new, err := strategicpatch.StrategicMergePatch(defaultsSpec, workflowSpec, wfv1.WorkflowSpec{})
if err != nil {
return err
}
err = json.Unmarshal(new, &wf.Spec)
if err != nil {
return err
}
}
return nil
}

func (wfc *WorkflowController) podWorker() {
for wfc.processNextPodItem() {
}
Expand Down
160 changes: 160 additions & 0 deletions workflow/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,55 @@ spec:
args: ["hello world"]
`

var testDefaultWf = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: hello-world
spec:
entrypoint: whalesay
serviceAccountName: whalesay
templates:
- name: whalesay
metadata:
annotations:
annotationKey1: "annotationValue1"
annotationKey2: "annotationValue2"
labels:
labelKey1: "labelValue1"
labelKey2: "labelValue2"
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["hello world"]
`

var testDefaultWfTTL = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: hello-world
spec:
entrypoint: whalesay
serviceAccountName: whalesay
ttlSecondsAfterFinished: 7
ttlStrategy:
secondsAfterCompletion: 5
templates:
- name: whalesay
metadata:
annotations:
annotationKey1: "annotationValue1"
annotationKey2: "annotationValue2"
labels:
labelKey1: "labelValue1"
labelKey2: "labelValue2"
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["hello world"]
`

func newController() *WorkflowController {
wfclientset := fakewfclientset.NewSimpleClientset()
informerFactory := wfextv.NewSharedInformerFactory(wfclientset, 10*time.Minute)
Expand All @@ -65,6 +114,68 @@ func newController() *WorkflowController {
}
}

func newControllerWithDefaults() *WorkflowController {
wfclientset := fakewfclientset.NewSimpleClientset()
informerFactory := wfextv.NewSharedInformerFactory(wfclientset, 10*time.Minute)
wftmplInformer := informerFactory.Argoproj().V1alpha1().WorkflowTemplates()
ctx := context.Background()
go wftmplInformer.Informer().Run(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(), wftmplInformer.Informer().HasSynced) {
panic("Timed out waiting for caches to sync")
}
myBool := true
return &WorkflowController{
Config: config.WorkflowControllerConfig{
ExecutorImage: "executor:latest",
DefautWorkflowSpec: &wfv1.WorkflowSpec{
HostNetwork: &myBool,
},
},
kubeclientset: fake.NewSimpleClientset(),
wfclientset: wfclientset,
completedPods: make(chan string, 512),
wftmplInformer: wftmplInformer,
wfQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
wfArchive: sqldb.NullWorkflowArchive,
}
}

func newControllerWithComplexDefaults() *WorkflowController {
wfclientset := fakewfclientset.NewSimpleClientset()
informerFactory := wfextv.NewSharedInformerFactory(wfclientset, 10*time.Minute)
wftmplInformer := informerFactory.Argoproj().V1alpha1().WorkflowTemplates()
ctx := context.Background()
go wftmplInformer.Informer().Run(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(), wftmplInformer.Informer().HasSynced) {
panic("Timed out waiting for caches to sync")
}
myBool := true
var ten int32 = 10
var seven int32 = 10
return &WorkflowController{
Config: config.WorkflowControllerConfig{
ExecutorImage: "executor:latest",
DefautWorkflowSpec: &wfv1.WorkflowSpec{
HostNetwork: &myBool,
Entrypoint: "good_entrypoint",
ServiceAccountName: "my_service_account",
TTLStrategy: &wfv1.TTLStrategy{
SecondsAfterCompletion: &ten,
SecondsAfterSuccess: &ten,
SecondsAfterFailure: &ten,
},
TTLSecondsAfterFinished: &seven,
},
},
kubeclientset: fake.NewSimpleClientset(),
wfclientset: wfclientset,
completedPods: make(chan string, 512),
wftmplInformer: wftmplInformer,
wfQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
wfArchive: sqldb.NullWorkflowArchive,
}
}

func unmarshalWF(yamlStr string) *wfv1.Workflow {
var wf wfv1.Workflow
err := yaml.Unmarshal([]byte(yamlStr), &wf)
Expand Down Expand Up @@ -98,3 +209,52 @@ func makePodsPhase(t *testing.T, phase apiv1.PodPhase, kubeclientset kubernetes.
}
}
}

func TestAddingWorkflowDefaultValueIfValueNotExist(t *testing.T) {
assert.Equal(t, "hello", "hello")
ans := true
controller := newController()
workflow := unmarshalWF(helloWorldWf)
err := controller.addingWorkflowDefaultValueIfValueNotExist(workflow)
assert.NoError(t, err)
assert.Equal(t, workflow, unmarshalWF(helloWorldWf))
controllerDefaults := newControllerWithDefaults()
defautWorkflowSpec := unmarshalWF(helloWorldWf)
err = controllerDefaults.addingWorkflowDefaultValueIfValueNotExist(defautWorkflowSpec)
assert.NoError(t, err)
assert.Equal(t, defautWorkflowSpec.Spec.HostNetwork, &ans)
assert.NotEqual(t, defautWorkflowSpec, unmarshalWF(helloWorldWf))
assert.Equal(t, *defautWorkflowSpec.Spec.HostNetwork, true)
}

func TestAddingWorkflowDefaultComplex(t *testing.T) {
assert.Equal(t, "hello", "hello")
controller := newControllerWithComplexDefaults()
workflow := unmarshalWF(testDefaultWf)
var ten int32 = 10
assert.Equal(t, workflow.Spec.Entrypoint, "whalesay")
assert.Nil(t, workflow.Spec.TTLStrategy)
err := controller.addingWorkflowDefaultValueIfValueNotExist(workflow)
assert.NoError(t, err)
assert.NotEqual(t, workflow, unmarshalWF(testDefaultWf))
assert.Equal(t, workflow.Spec.Entrypoint, "whalesay")
assert.Equal(t, workflow.Spec.ServiceAccountName, "whalesay")
assert.Equal(t, *workflow.Spec.TTLStrategy.SecondsAfterFailure, ten)
}

func TestAddingWorkflowDefaultComplexTwo(t *testing.T) {
assert.Equal(t, "hello", "hello")
controller := newControllerWithComplexDefaults()
workflow := unmarshalWF(testDefaultWfTTL)
var ten int32 = 10
var seven int32 = 7
var five int32 = 5
err := controller.addingWorkflowDefaultValueIfValueNotExist(workflow)
assert.NoError(t, err)
assert.NotEqual(t, workflow, unmarshalWF(testDefaultWfTTL))
assert.Equal(t, workflow.Spec.Entrypoint, "whalesay")
assert.Equal(t, workflow.Spec.ServiceAccountName, "whalesay")
assert.Equal(t, *workflow.Spec.TTLStrategy.SecondsAfterCompletion, five)
assert.Equal(t, *workflow.Spec.TTLStrategy.SecondsAfterFailure, ten)
assert.Equal(t, *workflow.Spec.TTLSecondsAfterFinished, seven)
}

0 comments on commit 3890a12

Please sign in to comment.