Skip to content

Commit

Permalink
Add operator to admin failover
Browse files Browse the repository at this point in the history
  • Loading branch information
vancexu authored Oct 12, 2020
1 parent 396af91 commit d072276
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 1 deletion.
3 changes: 3 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,6 @@ const (

// StickyTaskConditionFailedErrorMsg error msg for sticky task ConditionFailedError
const StickyTaskConditionFailedErrorMsg = "StickyTaskConditionFailedError"

// MemoKeyForOperator is the memo key for operator
const MemoKeyForOperator = "operator"
23 changes: 23 additions & 0 deletions service/worker/failovermanager/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package failovermanager

import (
"context"
"encoding/json"
"errors"
"strings"
"time"
Expand Down Expand Up @@ -71,6 +72,8 @@ const (
WorkflowCompleted = "complete"
// WorkflowAborted state
WorkflowAborted = "aborted"

unknownOperator = "unknown"
)

type (
Expand Down Expand Up @@ -123,6 +126,7 @@ type (
SourceCluster string
SuccessDomains []string // SuccessDomains are guaranteed succeed processed
FailedDomains []string // FailedDomains contains false positive
Operator string
}
)

Expand All @@ -144,6 +148,7 @@ func FailoverWorkflow(ctx workflow.Context, params *FailoverParams) (*FailoverRe
var successDomains []string
var totalNumOfDomains int
wfState := WorkflowRunning
operator := getOperator(ctx)
err = workflow.SetQueryHandler(ctx, QueryType, func(input []byte) (*QueryResult, error) {
return &QueryResult{
TotalDomains: totalNumOfDomains,
Expand All @@ -154,6 +159,7 @@ func FailoverWorkflow(ctx workflow.Context, params *FailoverParams) (*FailoverRe
SourceCluster: params.SourceCluster,
SuccessDomains: successDomains,
FailedDomains: failedDomains,
Operator: operator,
}, nil
})
if err != nil {
Expand Down Expand Up @@ -217,6 +223,23 @@ func FailoverWorkflow(ctx workflow.Context, params *FailoverParams) (*FailoverRe
}, nil
}

func getOperator(ctx workflow.Context) string {
memo := workflow.GetInfo(ctx).Memo
if memo == nil || len(memo.Fields) == 0 {
return unknownOperator
}
opBytes, ok := memo.Fields[common.MemoKeyForOperator]
if !ok {
return unknownOperator
}
var operator string
err := json.Unmarshal(opBytes, &operator)
if err != nil {
return unknownOperator
}
return operator
}

func getGetDomainsActivityOptions() workflow.ActivityOptions {
return workflow.ActivityOptions{
ScheduleToStartTimeout: 10 * time.Second,
Expand Down
29 changes: 28 additions & 1 deletion service/worker/failovermanager/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (s *failoverWorkflowTestSuite) TestWorkflow_Success() {
s.Equal("s", res.SourceCluster)
s.Equal(domains, res.SuccessDomains)
s.Equal(0, len(res.FailedDomains))

s.Equal(unknownOperator, res.Operator)
}

func (s *failoverWorkflowTestSuite) TestWorkflow_Success_Batches() {
Expand Down Expand Up @@ -432,3 +432,30 @@ func (s *failoverWorkflowTestSuite) TestFailoverActivity_Error() {
s.Equal([]string{"d1"}, result.SuccessDomains)
s.Equal([]string{"d2"}, result.FailedDomains)
}

func (s *failoverWorkflowTestSuite) TestGetOperator() {
env := s.NewTestWorkflowEnvironment()

operator := "testOperator"
env.SetMemoOnStart(map[string]interface{}{
common.MemoKeyForOperator: operator,
})

env.OnActivity(getDomainsActivityName, mock.Anything, mock.Anything).Return(nil, nil)
env.OnActivity(failoverActivityName, mock.Anything, mock.Anything).Return(nil, nil)
params := &FailoverParams{
TargetCluster: "t",
SourceCluster: "s",
}

env.ExecuteWorkflow(WorkflowTypeName, params)
var result FailoverResult
s.NoError(env.GetWorkflowResult(&result))

queryResult, err := env.QueryWorkflow(QueryType)
s.NoError(err)
var res QueryResult
s.NoError(queryResult.Get(&res))

s.Equal(operator, res.Operator)
}
13 changes: 13 additions & 0 deletions tools/cli/adminFailoverCommands.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package cli
import (
"context"
"fmt"
"os/user"
"time"

"go.uber.org/cadence/.gen/go/shared"
Expand Down Expand Up @@ -69,6 +70,9 @@ func failoverStart(c *cli.Context, params *startParams) {
WorkflowIDReusePolicy: cclient.WorkflowIDReusePolicyAllowDuplicate,
TaskList: failovermanager.TaskListName,
ExecutionStartToCloseTimeout: workflowTimeout,
Memo: map[string]interface{}{
common.MemoKeyForOperator: getOperator(),
},
}
foParams := failovermanager.FailoverParams{
TargetCluster: targetCluster,
Expand Down Expand Up @@ -263,3 +267,12 @@ func getRunID(c *cli.Context) string {
}
return ""
}

func getOperator() string {
user, err := user.Current()
if err != nil {
ErrorAndExit("Unable to get operator info", err)
}

return fmt.Sprintf("%s (username: %s)", user.Name, user.Username)
}

0 comments on commit d072276

Please sign in to comment.