Skip to content

Commit

Permalink
fix: Couldn't Terminate/Stop the ResourceTemplate Workflow (argoproj#…
Browse files Browse the repository at this point in the history
  • Loading branch information
sarabala1979 authored Aug 11, 2020
1 parent 12ddc1f commit 54c2134
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 5 deletions.
46 changes: 46 additions & 0 deletions test/e2e/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1014,6 +1014,52 @@ func (s *CLISuite) TestRetryOmit() {
WaitForWorkflow(20 * time.Second)
}


func (s *CLISuite) TestResourceTemplateStopAndTerminate() {
s.testNeedsOffloading()
s.Run("ResourceTemplateStop", func() {
s.Given().
WorkflowName("resource-tmpl-wf").
When().
RunCli([]string{"submit", "functional/resource-template.yaml"}, func(t *testing.T, output string, err error) {
assert.Contains(t, output, "Pending")
}).
RunCli([]string{"get", "resource-tmpl-wf"}, func(t *testing.T, output string, err error) {
assert.Contains(t, output, "Running")
}).
RunCli([]string{"stop", "resource-tmpl-wf"}, func(t *testing.T, output string, err error) {
assert.Contains(t, output, "workflow resource-tmpl-wf stopped")
}).
WaitForWorkflow(10 * time.Second).
RunCli([]string{"get", "resource-tmpl-wf"}, func(t *testing.T, output string, err error) {
assert.Contains(t, output, "Stopped with strategy 'Stop'")
}).
RunCli([]string{"delete", "resource-tmpl-wf"}, func(t *testing.T, output string, err error) {
assert.Contains(t, output, "deleted")
})

})
s.Run("ResourceTemplateTerminate", func() {
s.Given().
WorkflowName("resource-tmpl-wf-1").
When().
RunCli([]string{"submit", "functional/resource-template.yaml", "--name", "resource-tmpl-wf-1"}, func(t *testing.T, output string, err error) {
assert.Contains(t, output, "Pending")
}).
RunCli([]string{"get", "resource-tmpl-wf-1"}, func(t *testing.T, output string, err error) {
assert.Contains(t, output, "Running")
}).
RunCli([]string{"terminate", "resource-tmpl-wf-1"}, func(t *testing.T, output string, err error) {
assert.Contains(t, output, "workflow resource-tmpl-wf-1 terminated")
}).
WaitForWorkflow(10 * time.Second).
RunCli([]string{"get", "resource-tmpl-wf-1"}, func(t *testing.T, output string, err error) {
assert.Contains(t, output, "Stopped with strategy 'Terminate'")
})

})
}

func (s *CLISuite) TestMetaDataNamespace() {
s.testNeedsOffloading()
s.Given().
Expand Down
34 changes: 34 additions & 0 deletions test/e2e/functional/resource-template.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: resource-tmpl-wf
labels:
argo-e2e: true
spec:
entrypoint: main
templates:
- name: main
steps:
- - name: a
template: wf1
- name: wf1
resource:
action: create
successCondition: status.phase == Test
manifest: |
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: hello-world-
labels:
argo-e2e: true
spec:
entrypoint: whalesay
templates:
- name: whalesay
resources:
requests:
cpu: 3
container:
image: argoproj/argosay:v2
args: [echo, ":) Hello Argo!"]
16 changes: 11 additions & 5 deletions workflow/controller/exec_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,26 @@ func (woc *wfOperationCtx) applyExecutionControl(pod *apiv1.Pod, wfNodesLock *sy
woc.log.Warnf("Failed to unmarshal execution control from pod %s", pod.Name)
}
}
containerName := common.WaitContainerName
// A resource template does not have a wait container,
// instead the only container is the main container (which is running argoexec)
if len(pod.Spec.Containers) == 1 {
containerName = common.MainContainerName
}

if woc.wf.Spec.Shutdown != "" {
if _, onExitPod := pod.Labels[common.LabelKeyOnExit]; !woc.wf.Spec.Shutdown.ShouldExecute(onExitPod) {
podExecCtl.Deadline = &time.Time{}
woc.log.Infof("Applying shutdown deadline for pod %s", pod.Name)
return woc.updateExecutionControl(pod.Name, podExecCtl)
return woc.updateExecutionControl(pod.Name, podExecCtl, containerName)
}
}

if woc.workflowDeadline != nil {
if podExecCtl.Deadline == nil || woc.workflowDeadline.Before(*podExecCtl.Deadline) {
podExecCtl.Deadline = woc.workflowDeadline
woc.log.Infof("Applying sooner Workflow Deadline for pod %s at: %v", pod.Name, woc.workflowDeadline)
return woc.updateExecutionControl(pod.Name, podExecCtl)
return woc.updateExecutionControl(pod.Name, podExecCtl, containerName)
}
}

Expand All @@ -106,7 +112,7 @@ func (woc *wfOperationCtx) killDaemonedChildren(nodeID string) error {
if childNode.Daemoned == nil || !*childNode.Daemoned {
continue
}
err := woc.updateExecutionControl(childNode.ID, execCtl)
err := woc.updateExecutionControl(childNode.ID, execCtl, common.WaitContainerName)
if err != nil {
woc.log.Errorf("Failed to update execution control of node %s: %+v", childNode.ID, err)
if firstErr == nil {
Expand All @@ -118,7 +124,7 @@ func (woc *wfOperationCtx) killDaemonedChildren(nodeID string) error {
}

// updateExecutionControl updates the execution control parameters
func (woc *wfOperationCtx) updateExecutionControl(podName string, execCtl common.ExecutionControl) error {
func (woc *wfOperationCtx) updateExecutionControl(podName string, execCtl common.ExecutionControl, containerName string) error {
execCtlBytes, err := json.Marshal(execCtl)
if err != nil {
return errors.InternalWrapError(err)
Expand All @@ -144,7 +150,7 @@ func (woc *wfOperationCtx) updateExecutionControl(podName string, execCtl common
woc.log.Infof("Signalling %s of updates", podName)
exec, err := common.ExecPodContainer(
woc.controller.restConfig, woc.wf.ObjectMeta.Namespace, podName,
common.WaitContainerName, true, true, "sh", "-c", "kill -s USR2 $(pidof argoexec)",
containerName, true, true, "sh", "-c", "kill -s USR2 $(pidof argoexec)",
)
if err != nil {
return err
Expand Down
25 changes: 25 additions & 0 deletions workflow/executor/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"os/exec"
"os/signal"
"strings"
"time"

Expand All @@ -18,6 +20,8 @@ import (

"github.com/argoproj/argo/errors"
"github.com/argoproj/argo/util/intstr"
"github.com/argoproj/argo/workflow/common"
os_specific "github.com/argoproj/argo/workflow/executor/os-specific"
)

// ExecResource will run kubectl action against a manifest
Expand Down Expand Up @@ -114,8 +118,29 @@ func (g gjsonLabels) Get(label string) string {
return gjson.GetBytes(g.json, label).String()
}

// signalMonitoring start the goroutine which listens for a SIGUSR2.
// Upon receiving of the signal, We update the pod annotation and exit the process.
func (we *WorkflowExecutor) signalMonitoring() {
log.Infof("Starting SIGUSR2 signal monitor")
sigs := make(chan os.Signal, 1)

signal.Notify(sigs, os_specific.GetOsSignal())
go func() {
for {
<-sigs
log.Infof("Received SIGUSR2 signal. Process is terminated")
_ = we.AddAnnotation(common.AnnotationKeyNodeMessage, "Received user signal to terminate the workflow")
os.Exit(130)
}
}()
}

// WaitResource waits for a specific resource to satisfy either the success or failure condition
func (we *WorkflowExecutor) WaitResource(resourceNamespace string, resourceName string) error {

// Monitor the SIGTERM
we.signalMonitoring()

if we.Template.Resource.SuccessCondition == "" && we.Template.Resource.FailureCondition == "" {
return nil
}
Expand Down

0 comments on commit 54c2134

Please sign in to comment.