Skip to content

Commit

Permalink
fix(controller): Implement offloading for workflow updates that are r…
Browse files Browse the repository at this point in the history
…e-applied. Fixes argoproj#2856 (argoproj#2941)
  • Loading branch information
alexec authored May 23, 2020
1 parent 6c369e6 commit 6464bd1
Show file tree
Hide file tree
Showing 31 changed files with 587 additions and 677 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,11 @@ CI ?= false
DB ?= postgres
K3D := $(shell if [ "`which kubectl`" != '' ] && [ "`kubectl config current-context`" = "k3s-default" ]; then echo true; else echo false; fi)
LOG_LEVEL := debug
ALWAYS_OFFLOAD_NODE_STATUS := true

ifeq ($(DB),no-db)
ALWAYS_OFFLOAD_NODE_STATUS := false
else
ALWAYS_OFFLOAD_NODE_STATUS := true
endif

ifeq ($(CI),true)
Expand Down Expand Up @@ -318,7 +319,6 @@ $(VERSION_FILE):
touch $(VERSION_FILE)

dist/$(DB).yaml: $(MANIFESTS) $(E2E_MANIFESTS) $(VERSION_FILE)
# We additionally disable ALWAYS_OFFLOAD_NODE_STATUS
kustomize build --load_restrictor=none test/e2e/manifests/$(DB) | sed 's/:$(MANIFESTS_VERSION)/:$(VERSION)/' | sed 's/pns/$(E2E_EXECUTOR)/' > dist/$(DB).yaml

.PHONY: install
Expand Down
12 changes: 6 additions & 6 deletions persist/sqldb/explosive_offload_node_status_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

var ExplosiveOffloadNodeStatusRepo OffloadNodeStatusRepo = &explosiveOffloadNodeStatusRepo{}
var notSupportedError = fmt.Errorf("offload node status is not supported")
var OffloadNotSupportedError = fmt.Errorf("offload node status is not supported")

type explosiveOffloadNodeStatusRepo struct {
}
Expand All @@ -17,21 +17,21 @@ func (n *explosiveOffloadNodeStatusRepo) IsEnabled() bool {
}

func (n *explosiveOffloadNodeStatusRepo) Save(string, string, wfv1.Nodes) (string, error) {
return "", notSupportedError
return "", OffloadNotSupportedError
}

func (n *explosiveOffloadNodeStatusRepo) Get(string, string) (wfv1.Nodes, error) {
return nil, notSupportedError
return nil, OffloadNotSupportedError
}

func (n *explosiveOffloadNodeStatusRepo) List(string) (map[UUIDVersion]wfv1.Nodes, error) {
return nil, notSupportedError
return nil, OffloadNotSupportedError
}

func (n *explosiveOffloadNodeStatusRepo) Delete(string, string) error {
return notSupportedError
return OffloadNotSupportedError
}

func (n *explosiveOffloadNodeStatusRepo) ListOldOffloads(string) ([]UUIDVersion, error) {
return nil, notSupportedError
return nil, OffloadNotSupportedError
}
3 changes: 2 additions & 1 deletion server/apiserver/argoserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
grpcutil "github.com/argoproj/argo/util/grpc"
"github.com/argoproj/argo/util/instanceid"
"github.com/argoproj/argo/util/json"
"github.com/argoproj/argo/workflow/hydrator"
)

const (
Expand Down Expand Up @@ -142,7 +143,7 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st
// disable the archiving - and still read old records
wfArchive = sqldb.NewWorkflowArchive(session, persistence.GetClusterName(), as.managedNamespace, instanceIDService)
}
artifactServer := artifacts.NewArtifactServer(as.authenticator, offloadRepo, wfArchive, instanceIDService)
artifactServer := artifacts.NewArtifactServer(as.authenticator, hydrator.New(offloadRepo), wfArchive, instanceIDService)
grpcServer := as.newGRPCServer(instanceIDService, offloadRepo, wfArchive, configMap.Links)
httpServer := as.newHTTPServer(ctx, port, artifactServer)

Expand Down
27 changes: 8 additions & 19 deletions server/artifacts/artifact_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@ import (
"github.com/argoproj/argo/server/auth"
"github.com/argoproj/argo/util/instanceid"
artifact "github.com/argoproj/argo/workflow/artifacts"
"github.com/argoproj/argo/workflow/packer"
"github.com/argoproj/argo/workflow/hydrator"
)

type ArtifactServer struct {
authN auth.Gatekeeper
offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo
wfArchive sqldb.WorkflowArchive
instanceIDService instanceid.Service
authN auth.Gatekeeper
hydrator hydrator.Interface
wfArchive sqldb.WorkflowArchive
instanceIDService instanceid.Service
}

func NewArtifactServer(authN auth.Gatekeeper, offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo, wfArchive sqldb.WorkflowArchive, instanceIDService instanceid.Service) *ArtifactServer {
return &ArtifactServer{authN, offloadNodeStatusRepo, wfArchive, instanceIDService}
func NewArtifactServer(authN auth.Gatekeeper, hydrator hydrator.Interface, wfArchive sqldb.WorkflowArchive, instanceIDService instanceid.Service) *ArtifactServer {
return &ArtifactServer{authN, hydrator, wfArchive, instanceIDService}
}

func (a *ArtifactServer) GetArtifact(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -171,21 +171,10 @@ func (a *ArtifactServer) getWorkflowAndValidate(ctx context.Context, namespace s
if err != nil {
return nil, err
}
err = packer.DecompressWorkflow(wf)
err = a.hydrator.Hydrate(wf)
if err != nil {
return nil, err
}
if wf.Status.IsOffloadNodeStatus() {
if a.offloadNodeStatusRepo.IsEnabled() {
offloadedNodes, err := a.offloadNodeStatusRepo.Get(string(wf.UID), wf.GetOffloadNodeStatusVersion())
if err != nil {
return nil, err
}
wf.Status.Nodes = offloadedNodes
} else {
log.WithFields(log.Fields{"namespace": namespace, "name": workflowName}).Warn(sqldb.OffloadNodeStatusDisabled)
}
}
return wf, nil
}

Expand Down
40 changes: 9 additions & 31 deletions server/workflow/workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/argoproj/argo/util/instanceid"
"github.com/argoproj/argo/util/logs"
"github.com/argoproj/argo/workflow/common"
"github.com/argoproj/argo/workflow/packer"
"github.com/argoproj/argo/workflow/hydrator"
"github.com/argoproj/argo/workflow/templateresolution"
"github.com/argoproj/argo/workflow/util"
"github.com/argoproj/argo/workflow/validate"
Expand All @@ -27,11 +27,12 @@ import (
type workflowServer struct {
instanceIDService instanceid.Service
offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo
hydrator hydrator.Interface
}

// NewWorkflowServer returns a new workflowServer
func NewWorkflowServer(instanceIDService instanceid.Service, offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo) workflowpkg.WorkflowServiceServer {
return &workflowServer{instanceIDService, offloadNodeStatusRepo}
return &workflowServer{instanceIDService, offloadNodeStatusRepo, hydrator.New(offloadNodeStatusRepo)}
}

func (s *workflowServer) CreateWorkflow(ctx context.Context, req *workflowpkg.WorkflowCreateRequest) (*v1alpha1.Workflow, error) {
Expand Down Expand Up @@ -83,23 +84,11 @@ func (s *workflowServer) GetWorkflow(ctx context.Context, req *workflowpkg.Workf
if err != nil {
return nil, err
}

if wf.Status.IsOffloadNodeStatus() {
if s.offloadNodeStatusRepo.IsEnabled() {
offloadedNodes, err := s.offloadNodeStatusRepo.Get(string(wf.UID), wf.GetOffloadNodeStatusVersion())
if err != nil {
return nil, err
}
wf.Status.Nodes = offloadedNodes
} else {
log.WithFields(log.Fields{"namespace": wf.Namespace, "name": wf.Name}).Warn(sqldb.OffloadNodeStatusDisabled)
}
}
err = packer.DecompressWorkflow(wf)
err = s.hydrator.Hydrate(wf)
if err != nil {
return nil, err
}
return wf, nil
return wf, err
}

func (s *workflowServer) ListWorkflows(ctx context.Context, req *workflowpkg.WorkflowListRequest) (*v1alpha1.WorkflowList, error) {
Expand Down Expand Up @@ -173,21 +162,10 @@ func (s *workflowServer) WatchWorkflows(req *workflowpkg.WatchWorkflowsRequest,
return fmt.Errorf("watch object was not a workflow %v", reflect.TypeOf(event.Object))
}
logCtx := log.WithFields(log.Fields{"workflow": wf.Name, "type": event.Type, "phase": wf.Status.Phase})
err := packer.DecompressWorkflow(wf)
err := s.hydrator.Hydrate(wf)
if err != nil {
return err
}
if wf.Status.IsOffloadNodeStatus() {
if s.offloadNodeStatusRepo.IsEnabled() {
offloadedNodes, err := s.offloadNodeStatusRepo.Get(string(wf.UID), wf.GetOffloadNodeStatusVersion())
if err != nil {
return err
}
wf.Status.Nodes = offloadedNodes
} else {
log.WithFields(log.Fields{"namespace": wf.Namespace, "name": wf.Name}).Warn(sqldb.OffloadNodeStatusDisabled)
}
}
logCtx.Debug("Sending event")
err = ws.Send(&workflowpkg.WorkflowWatchEvent{Type: string(event.Type), Object: wf})
if err != nil {
Expand Down Expand Up @@ -218,7 +196,7 @@ func (s *workflowServer) RetryWorkflow(ctx context.Context, req *workflowpkg.Wor
return nil, err
}

wf, err = util.RetryWorkflow(kubeClient, s.offloadNodeStatusRepo, wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), wf, req.RestartSuccessful, req.NodeFieldSelector)
wf, err = util.RetryWorkflow(kubeClient, s.hydrator, wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), wf, req.RestartSuccessful, req.NodeFieldSelector)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -251,7 +229,7 @@ func (s *workflowServer) ResumeWorkflow(ctx context.Context, req *workflowpkg.Wo
return nil, err
}

err = util.ResumeWorkflow(wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), s.offloadNodeStatusRepo, req.Name, req.NodeFieldSelector)
err = util.ResumeWorkflow(wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), s.hydrator, req.Name, req.NodeFieldSelector)
if err != nil {
log.Warnf("Failed to resume %s: %+v", req.Name, err)
return nil, err
Expand Down Expand Up @@ -312,7 +290,7 @@ func (s *workflowServer) StopWorkflow(ctx context.Context, req *workflowpkg.Work
if err != nil {
return nil, err
}
err = util.StopWorkflow(wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), s.offloadNodeStatusRepo, req.Name, req.NodeFieldSelector, req.Message)
err = util.StopWorkflow(wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), s.hydrator, req.Name, req.NodeFieldSelector, req.Message)
if err != nil {
return nil, err
}
Expand Down
5 changes: 1 addition & 4 deletions server/workflow/workflow_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,10 +581,7 @@ func generateNameReactor(action ktesting.Action) (handled bool, ret runtime.Obje
}

func getWorkflow(ctx context.Context, server workflowpkg.WorkflowServiceServer, namespace string, wfName string) (*v1alpha1.Workflow, error) {
return server.GetWorkflow(ctx, &workflowpkg.WorkflowGetRequest{
Name: wfName,
Namespace: namespace,
})
return server.GetWorkflow(ctx, &workflowpkg.WorkflowGetRequest{Name: wfName, Namespace: namespace})
}

func getWorkflowList(ctx context.Context, server workflowpkg.WorkflowServiceServer, namespace string) (*v1alpha1.WorkflowList, error) {
Expand Down
11 changes: 0 additions & 11 deletions test/e2e/argo_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,11 +487,6 @@ func (s *ArgoServerSuite) TestWorkflowService() {
Array().
Length().
Equal(1)
if s.Persistence.IsEnabled() {
// check we are loading offloaded node status
j.Path("$.items[0].status.offloadNodeStatusVersion").
NotNull()
}
j.Path("$.items[0].status.nodes").
NotNull()
})
Expand All @@ -518,12 +513,6 @@ func (s *ArgoServerSuite) TestWorkflowService() {
Expect().
Status(200).
JSON()
if s.Persistence.IsEnabled() {
// check we are loading offloaded node status
j.
Path("$.status.offloadNodeStatusVersion").
NotNull()
}
j.Path("$.status.nodes").
NotNull()
s.e(s.T()).GET("/api/v1/workflows/argo/not-found").
Expand Down
38 changes: 17 additions & 21 deletions test/e2e/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ func (s *CLISuite) BeforeTest(suiteName, testName string) {
_ = os.Unsetenv("ARGO_TOKEN")
}

func (s *CLISuite) testNeedsOffloading() {
skip := s.Persistence.IsEnabled() && os.Getenv("ARGO_SERVER") == ""
if skip {
s.T().Skip("test needs offloading, but not Argo Server available")
}
}

func (s *CLISuite) TestCompletion() {
s.Given().RunCli([]string{"completion", "bash"}, func(t *testing.T, output string, err error) {
assert.NoError(t, err)
Expand Down Expand Up @@ -214,6 +221,7 @@ func (s *CLISuite) TestRoot() {
})
})
s.Run("List", func() {
s.testNeedsOffloading()
for i := 0; i < 3; i++ {
s.Given().
Workflow("@smoke/basic-generate-name.yaml").
Expand All @@ -232,6 +240,7 @@ func (s *CLISuite) TestRoot() {
})
})
s.Run("Get", func() {
s.testNeedsOffloading()
s.Given().RunCli([]string{"get", "basic"}, func(t *testing.T, output string, err error) {
if assert.NoError(t, err) {
assert.Contains(t, output, "Name:")
Expand Down Expand Up @@ -267,11 +276,7 @@ func (s *CLISuite) TestRoot() {
}

func (s *CLISuite) TestWorkflowSuspendResume() {
if s.Persistence.IsEnabled() {
// Persistence is enabled for this test, but it is not enabled for the Argo Server in this test suite.
// When this is the case, this behavior is tested in cli_with_server_test.go
s.T().SkipNow()
}
s.testNeedsOffloading()
s.Given().
Workflow("@testdata/sleep-3s.yaml").
When().
Expand All @@ -294,15 +299,8 @@ func (s *CLISuite) TestWorkflowSuspendResume() {
})
}

func (s *CLISuite) TestNodeSuspendResumeNoPersistence() {
if s.Persistence.IsEnabled() {
// Persistence is enabled for this test, but it is not enabled for the Argo Server in this test suite.
s.T().SkipNow()
}
NodeSuspendResumeCommon(s.E2ESuite)
}

func NodeSuspendResumeCommon(s fixtures.E2ESuite) {
func (s *CLISuite) TestNodeSuspendResume() {
s.testNeedsOffloading()
s.Given().
Workflow("@testdata/node-suspend.yaml").
When().
Expand Down Expand Up @@ -497,13 +495,8 @@ func (s *CLISuite) TestWorkflowLint() {
})
}

func (s *CLISuite) TestWorkflowRetryNoPersistence() {
if s.Persistence.IsEnabled() {
// Persistence is enabled for this test, but it is not enabled for the Argo Server in this test suite.
// When this is the case, this behavior is tested in cli_with_server_test.go
s.T().SkipNow()
}

func (s *CLISuite) TestWorkflowRetry() {
s.testNeedsOffloading()
var retryTime corev1.Time

s.Given().
Expand Down Expand Up @@ -556,6 +549,7 @@ func (s *CLISuite) TestWorkflowTerminate() {
}

func (s *CLISuite) TestWorkflowWait() {
s.testNeedsOffloading()
s.Given().
Workflow("@smoke/basic.yaml").
When().
Expand All @@ -569,6 +563,7 @@ func (s *CLISuite) TestWorkflowWait() {
}

func (s *CLISuite) TestWorkflowWatch() {
s.testNeedsOffloading()
s.Given().
Workflow("@smoke/basic.yaml").
When().
Expand Down Expand Up @@ -620,6 +615,7 @@ func (s *CLISuite) TestTemplate() {
})
})
s.Run("Submittable-Template", func() {
s.testNeedsOffloading()
s.Given().RunCli([]string{"submit", "--from", "workflowtemplate/workflow-template-whalesay-template", "-l", "argo-e2e=true"}, func(t *testing.T, output string, err error) {
if assert.NoError(t, err) {
assert.Contains(t, output, "Name:")
Expand Down
Loading

0 comments on commit 6464bd1

Please sign in to comment.