Skip to content

Commit

Permalink
Add more options to reset batch: skip_base_not_current, non_determini…
Browse files Browse the repository at this point in the history
…stic_only (cadence-workflow#2931)
  • Loading branch information
longquanzheng authored Dec 30, 2019
1 parent 2348e96 commit 533e36b
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 28 deletions.
5 changes: 4 additions & 1 deletion tools/cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ const (
FlagExcludeFile = "exclude_file"
FlagInputSeparator = "input_separator"
FlagParallism = "input_parallism"
FlagSkipCurrent = "skip_current_open"
FlagSkipCurrentOpen = "skip_current_open"
FlagSkipBaseIsNotCurrent = "skip_base_is_not_current"
FlagDryRun = "dry_run"
FlagNonDeterministicOnly = "only_non_deterministic"
FlagInputTopic = "input_topic"
FlagInputTopicWithAlias = FlagInputTopic + ", it"
FlagHostFile = "host_file"
Expand Down
25 changes: 20 additions & 5 deletions tools/cli/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func newWorkflowCommands() []cli.Command {
{
Name: "reset-batch",
Usage: "reset workflow in batch by resetType: " + strings.Join(mapKeysToArray(resetTypesMap), ",") +
"batch source is from input file or visibility query.",
"To get base workflowIDs/runIDs to reset, source is from input file or visibility query.",
Flags: []cli.Flag{
cli.StringFlag{
Name: FlagInputFileWithAlias,
Expand All @@ -283,8 +283,8 @@ func newWorkflowCommands() []cli.Command {
},
cli.StringFlag{
Name: FlagInputSeparator,
Value: ",",
Usage: "Separator for input file",
Value: "\t",
Usage: "Separator for input file(default to tab)",
},
cli.StringFlag{
Name: FlagReason,
Expand All @@ -296,8 +296,23 @@ func newWorkflowCommands() []cli.Command {
Usage: "Number of goroutines to run in parallel. Each goroutine would process one line for every second.",
},
cli.BoolFlag{
Name: FlagSkipCurrent,
Usage: "Skip the workflow if the current run is open.",
Name: FlagSkipCurrentOpen,
Usage: "Skip the workflow if the current run is open for the same workflowID as base.",
},
cli.BoolFlag{
Name: FlagSkipBaseIsNotCurrent,
// TODO https://github.com/uber/cadence/issues/2930
// The right way to prevent needs server side implementation .
// This client side is only best effort
Usage: "Skip if base run is not current run.",
},
cli.BoolFlag{
Name: FlagNonDeterministicOnly,
Usage: "Only apply onto workflows whose last event is decisionTaskFailed with non deterministic error.",
},
cli.BoolFlag{
Name: FlagDryRun,
Usage: "Not do real action of reset(just logging in STDOUT)",
},
cli.StringFlag{
Name: FlagResetType,
Expand Down
122 changes: 100 additions & 22 deletions tools/cli/workflowCommands.go
Original file line number Diff line number Diff line change
Expand Up @@ -1434,7 +1434,7 @@ func ResetWorkflow(c *cli.Context) {
prettyPrintJSONObject(resp)
}

func processResets(c *cli.Context, domain string, wes chan shared.WorkflowExecution, done chan bool, wg *sync.WaitGroup, reason, resetType string, skipOpen bool) {
func processResets(c *cli.Context, domain string, wes chan shared.WorkflowExecution, done chan bool, wg *sync.WaitGroup, params batchResetParamsType) {
for {
select {
case we := <-wes:
Expand All @@ -1443,7 +1443,7 @@ func processResets(c *cli.Context, domain string, wes chan shared.WorkflowExecut
rid := we.GetRunId()
var err error
for i := 0; i < 3; i++ {
err = doReset(c, domain, wid, rid, reason, resetType, skipOpen)
err = doReset(c, domain, wid, rid, params)
if err == nil {
break
}
Expand All @@ -1464,17 +1464,24 @@ func processResets(c *cli.Context, domain string, wes chan shared.WorkflowExecut
}
}

type batchResetParamsType struct {
reason string
skipOpen bool
nonDeterministicOnly bool
skipBaseNotCurrent bool
dryRun bool
resetType string
}

// ResetInBatch resets workflow in batch
func ResetInBatch(c *cli.Context) {
domain := getRequiredGlobalOption(c, FlagDomain)
reason := getRequiredOption(c, FlagReason)
resetType := getRequiredOption(c, FlagResetType)

inFileName := c.String(FlagInputFile)
query := c.String(FlagListQuery)
excFileName := c.String(FlagExcludeFile)
separator := c.String(FlagInputSeparator)
skipOpen := c.Bool(FlagSkipCurrent)
parallel := c.Int(FlagParallism)

extraForResetType, ok := resetTypesMap[resetType]
Expand All @@ -1484,6 +1491,15 @@ func ResetInBatch(c *cli.Context) {
getRequiredOption(c, extraForResetType)
}

batchResetParams := batchResetParamsType{
reason: getRequiredOption(c, FlagReason),
skipOpen: c.Bool(FlagSkipCurrentOpen),
nonDeterministicOnly: c.Bool(FlagNonDeterministicOnly),
skipBaseNotCurrent: c.Bool(FlagSkipBaseIsNotCurrent),
dryRun: c.Bool(FlagDryRun),
resetType: resetType,
}

if inFileName == "" && query == "" {
ErrorAndExit("Must provide input file or list query to get target workflows to reset", nil)
}
Expand All @@ -1494,7 +1510,7 @@ func ResetInBatch(c *cli.Context) {
done := make(chan bool)
for i := 0; i < parallel; i++ {
wg.Add(1)
go processResets(c, domain, wes, done, wg, reason, resetType, skipOpen)
go processResets(c, domain, wes, done, wg, batchResetParams)
}

// read exclude
Expand Down Expand Up @@ -1615,7 +1631,7 @@ func printErrorAndReturn(msg string, err error) error {
return err
}

func doReset(c *cli.Context, domain, wid, rid string, reason, resetType string, skipOpen bool) error {
func doReset(c *cli.Context, domain, wid, rid string, params batchResetParamsType) error {
ctx, cancel := newContext(c)
defer cancel()

Expand All @@ -1631,44 +1647,106 @@ func doReset(c *cli.Context, domain, wid, rid string, reason, resetType string,
}

currentRunID := resp.WorkflowExecutionInfo.Execution.GetRunId()
if currentRunID == rid {
fmt.Println("current run is the reset run: ", wid, rid)
//return nil
if currentRunID != rid && params.skipBaseNotCurrent {
fmt.Println("skip because base run is different from current run: ", wid, rid, currentRunID)
return nil
}
if rid == "" {
rid = currentRunID
}

if resp.WorkflowExecutionInfo.CloseStatus == nil || resp.WorkflowExecutionInfo.CloseTime == nil {
if skipOpen {
if params.skipOpen {
fmt.Println("skip because current run is open: ", wid, rid, currentRunID)
//skip and not terminate current if open
return nil
}
}

resetBaseRunID, decisionFinishID, err := getResetEventIDByType(ctx, c, resetType, domain, wid, rid, frontendClient)
if params.nonDeterministicOnly {
isLDN, err := isLastEventDecisionTaskFailedWithNonDeterminism(ctx, domain, wid, rid, frontendClient)
if err != nil {
return printErrorAndReturn("check isLastEventDecisionTaskFailedWithNonDeterminism failed", err)
}
if !isLDN {
fmt.Println("skip because last event is not DecisionTaskFailedWithNonDeterminism")
return nil
}
}

resetBaseRunID, decisionFinishID, err := getResetEventIDByType(ctx, c, params.resetType, domain, wid, rid, frontendClient)
if err != nil {
return printErrorAndReturn("getResetEventIDByType failed", err)
}
fmt.Println("DecisionFinishEventId for reset:", wid, rid, resetBaseRunID, decisionFinishID)

resp2, err := frontendClient.ResetWorkflowExecution(ctx, &shared.ResetWorkflowExecutionRequest{
if params.dryRun {
fmt.Printf("dry run to reset wid: %v, rid:%v to baseRunID:%v, eventID:%v \n", wid, rid, resetBaseRunID, decisionFinishID)
}else{
resp2, err := frontendClient.ResetWorkflowExecution(ctx, &shared.ResetWorkflowExecutionRequest{
Domain: common.StringPtr(domain),
WorkflowExecution: &shared.WorkflowExecution{
WorkflowId: common.StringPtr(wid),
RunId: common.StringPtr(resetBaseRunID),
},
DecisionFinishEventId: common.Int64Ptr(decisionFinishID),
RequestId: common.StringPtr(uuid.New()),
Reason: common.StringPtr(fmt.Sprintf("%v:%v", getCurrentUserFromEnv(), params.reason)),
})

if err != nil {
return printErrorAndReturn("ResetWorkflowExecution failed", err)
}
fmt.Println("new runID for wid/rid is ,", wid, rid, resp2.GetRunId())
}

return nil
}

func isLastEventDecisionTaskFailedWithNonDeterminism(ctx context.Context, domain, wid, rid string, frontendClient workflowserviceclient.Interface) (bool, error) {
req := &shared.GetWorkflowExecutionHistoryRequest{
Domain: common.StringPtr(domain),
WorkflowExecution: &shared.WorkflowExecution{
Execution: &shared.WorkflowExecution{
WorkflowId: common.StringPtr(wid),
RunId: common.StringPtr(resetBaseRunID),
RunId: common.StringPtr(rid),
},
DecisionFinishEventId: common.Int64Ptr(decisionFinishID),
RequestId: common.StringPtr(uuid.New()),
Reason: common.StringPtr(fmt.Sprintf("%v:%v", getCurrentUserFromEnv(), reason)),
})
MaximumPageSize: common.Int32Ptr(1000),
NextPageToken: nil,
}

if err != nil {
return printErrorAndReturn("ResetWorkflowExecution failed", err)
var firstEvent, decisionFailed *shared.HistoryEvent
for {
resp, err := frontendClient.GetWorkflowExecutionHistory(ctx, req)
if err != nil {
return false, printErrorAndReturn("GetWorkflowExecutionHistory failed", err)
}
for _, e := range resp.GetHistory().GetEvents() {
if firstEvent == nil {
firstEvent = e
}
if e.GetEventType() == shared.EventTypeDecisionTaskFailed {
decisionFailed = e
} else if e.GetEventType() == shared.EventTypeDecisionTaskCompleted {
decisionFailed = nil
}
}
if len(resp.NextPageToken) != 0 {
req.NextPageToken = resp.NextPageToken
} else {
break
}
}
fmt.Println("new runID for wid/rid is ,", wid, rid, resp2.GetRunId())
return nil

if decisionFailed != nil {
attr := decisionFailed.GetDecisionTaskFailedEventAttributes()
if attr.GetCause() == shared.DecisionTaskFailedCauseWorkflowWorkerUnhandledFailure ||
strings.Contains(string(attr.GetDetails()), "nondeterministic") {
fmt.Printf("found non determnistic workflow wid:%v, rid:%v, orignalStartTime:%v \n", wid, rid, time.Unix(0, firstEvent.GetTimestamp()))
return true, nil
}
}

return false, nil
}

func getResetEventIDByType(ctx context.Context, c *cli.Context, resetType, domain, wid, rid string, frontendClient workflowserviceclient.Interface) (resetBaseRunID string, decisionFinishID int64, err error) {
Expand Down

0 comments on commit 533e36b

Please sign in to comment.