Skip to content

Commit

Permalink
Reset CLI: reset/batch-reset allow reset by bad binary (cadence-workf…
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored May 8, 2019
1 parent 0abd288 commit 4027955
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 36 deletions.
2 changes: 1 addition & 1 deletion tools/cli/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
const (
// Version is the controlled version string. It should be updated every time
// before we release a new version.
Version = "0.6.0"
Version = "0.6.2"
)

// NewCliApp instantiates a new instance of the CLI application.
Expand Down
116 changes: 81 additions & 35 deletions tools/cli/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import (
"sync"
"time"

"github.com/uber/cadence/service/history"

"github.com/uber/cadence/.gen/go/cadence/workflowserviceclient"

"github.com/fatih/color"
Expand Down Expand Up @@ -79,10 +81,11 @@ var envKeysForUserName = []string{
"HOME",
}

var resetTypesMap = map[string]interface{}{
"FirstDecisionCompleted": nil,
"LastDecisionCompleted": nil,
"LastContinuedAsNew": nil,
var resetTypesMap = map[string]string{
"FirstDecisionCompleted": "",
"LastDecisionCompleted": "",
"LastContinuedAsNew": "",
"BadBinary": FlagResetBadBinaryChecksum,
}

type jsonType int
Expand Down Expand Up @@ -129,7 +132,7 @@ var (
}
)

func mapKeysToArray(m map[string]interface{}) []string {
func mapKeysToArray(m map[string]string) []string {
var out []string
for k := range m {
out = append(out, k)
Expand Down Expand Up @@ -1330,9 +1333,14 @@ func ResetWorkflow(c *cli.Context) {
}
eventID := c.Int64(FlagEventID)
resetType := c.String(FlagResetType)
if _, ok := resetTypesMap[resetType]; !ok && eventID <= 0 {
extraForResetType, ok := resetTypesMap[resetType]
if !ok && eventID <= 0 {
ErrorAndExit("Must specify valid eventID or valid resetType", nil)
}
if ok {
getRequiredOption(c, extraForResetType)
}

ctx, cancel := newContext(c)
defer cancel()

Expand All @@ -1342,7 +1350,7 @@ func ResetWorkflow(c *cli.Context) {
decisionFinishID := eventID
var err error
if resetType != "" {
resetBaseRunID, decisionFinishID, err = getResetEventIDByType(ctx, resetType, domain, wid, rid, frontendClient)
resetBaseRunID, decisionFinishID, err = getResetEventIDByType(c, resetType, domain, wid, rid, ctx, frontendClient)
if err != nil {
ErrorAndExit("getResetEventIDByType failed", err)
}
Expand Down Expand Up @@ -1398,8 +1406,11 @@ func ResetInBatch(c *cli.Context) {
separator := getRequiredOption(c, FlagInputSeparator)
reason := getRequiredOption(c, FlagReason)
resetType := getRequiredOption(c, FlagResetType)
if _, ok := resetTypesMap[resetType]; !ok {
extraForResetType, ok := resetTypesMap[resetType]
if !ok {
ErrorAndExit("Not supported reset type", nil)
} else {
getRequiredOption(c, extraForResetType)
}

if !c.IsSet(FlagSkipCurrent) {
Expand All @@ -1418,39 +1429,40 @@ func ResetInBatch(c *cli.Context) {
}

// read exclude
excFile, err := os.Open(excFileName)
if err != nil {
ErrorAndExit("Open failed2", err)
}
defer excFile.Close()
scanner := bufio.NewScanner(excFile)
idx := 0
excludes := map[string]string{}
for scanner.Scan() {
idx++
line := strings.TrimSpace(scanner.Text())
if len(line) == 0 {
fmt.Printf("line %v is empty, skipped\n", idx)
continue
}
cols := strings.Split(line, separator)
if len(cols) < 1 {
ErrorAndExit("Split failed", fmt.Errorf("line %v has less than 1 cols separated by comma, only %v ", idx, len(cols)))
if len(excFileName) > 0 {
excFile, err := os.Open(excFileName)
if err != nil {
ErrorAndExit("Open failed2", err)
}
defer excFile.Close()
scanner := bufio.NewScanner(excFile)
idx := 0
for scanner.Scan() {
idx++
line := strings.TrimSpace(scanner.Text())
if len(line) == 0 {
fmt.Printf("line %v is empty, skipped\n", idx)
continue
}
cols := strings.Split(line, separator)
if len(cols) < 1 {
ErrorAndExit("Split failed", fmt.Errorf("line %v has less than 1 cols separated by comma, only %v ", idx, len(cols)))
}
wid := strings.TrimSpace(cols[0])
rid := "not-needed"
excludes[wid] = rid
}
wid := strings.TrimSpace(cols[0])
rid := "not-needed"
excludes[wid] = rid
}

fmt.Println("num of excludes:", len(excludes))

inFile, err := os.Open(inFileName)
if err != nil {
ErrorAndExit("Open failed", err)
}
defer inFile.Close()
scanner = bufio.NewScanner(inFile)
idx = 0
scanner := bufio.NewScanner(inFile)
idx := 0
for scanner.Scan() {
idx++
line := strings.TrimSpace(scanner.Text())
Expand Down Expand Up @@ -1523,7 +1535,7 @@ func doReset(c *cli.Context, domain, wid, rid string, reason, resetType string,
}
}

resetBaseRunID, decisionFinishID, err := getResetEventIDByType(ctx, resetType, domain, wid, rid, frontendClient)
resetBaseRunID, decisionFinishID, err := getResetEventIDByType(c, resetType, domain, wid, rid, ctx, frontendClient)
if err != nil {
return printErrorAndReturn("getResetEventIDByType failed", err)
}
Expand All @@ -1547,7 +1559,7 @@ func doReset(c *cli.Context, domain, wid, rid string, reason, resetType string,
return nil
}

func getResetEventIDByType(ctx context.Context, resetType, domain, wid, rid string, frontendClient workflowserviceclient.Interface) (resetBaseRunID string, decisionFinishID int64, err error) {
func getResetEventIDByType(c *cli.Context, resetType, domain, wid, rid string, ctx context.Context, frontendClient workflowserviceclient.Interface) (resetBaseRunID string, decisionFinishID int64, err error) {
fmt.Println("switch", resetType)
switch resetType {
case "LastDecisionCompleted":
Expand All @@ -1561,7 +1573,13 @@ func getResetEventIDByType(ctx context.Context, resetType, domain, wid, rid stri
return
}
case "FirstDecisionCompleted":
resetBaseRunID, decisionFinishID, err = getFirstDecisionCompletedID(ctx, domain, wid, rid, frontendClient)
resetBaseRunID, decisionFinishID, err = getFirstDecisionCompletedID(domain, wid, rid, ctx, frontendClient)
if err != nil {
return
}
case "BadBinary":
binCheckSum := c.String(FlagResetBadBinaryChecksum)
resetBaseRunID, decisionFinishID, err = getBadDecisionCompletedID(domain, wid, rid, binCheckSum, ctx, frontendClient)
if err != nil {
return
}
Expand Down Expand Up @@ -1605,7 +1623,35 @@ func getLastDecisionCompletedID(ctx context.Context, domain, wid, rid string, fr
return
}

func getFirstDecisionCompletedID(ctx context.Context, domain, wid, rid string, frontendClient workflowserviceclient.Interface) (resetBaseRunID string, decisionFinishID int64, err error) {
func getBadDecisionCompletedID(domain, wid, rid, binChecksum string, ctx context.Context, frontendClient workflowserviceclient.Interface) (resetBaseRunID string, decisionFinishID int64, err error) {
resetBaseRunID = rid
resp, err := frontendClient.DescribeWorkflowExecution(ctx, &shared.DescribeWorkflowExecutionRequest{
Domain: common.StringPtr(domain),
Execution: &shared.WorkflowExecution{
WorkflowId: common.StringPtr(wid),
RunId: common.StringPtr(rid),
},
})
if err != nil {
return "", 0, printErrorAndReturn("DescribeWorkflowExecution failed", err)
}

_, p := history.FindAutoResetPoint(&shared.BadBinaries{
Binaries: map[string]*shared.BadBinaryInfo{
binChecksum: {},
},
}, resp.WorkflowExecutionInfo.AutoResetPoints)
if p != nil {
decisionFinishID = p.GetFirstDecisionCompletedId()
}

if decisionFinishID == 0 {
return "", 0, printErrorAndReturn("Get DecisionFinishID failed", fmt.Errorf("no DecisionFinishID"))
}
return
}

func getFirstDecisionCompletedID(domain, wid, rid string, ctx context.Context, frontendClient workflowserviceclient.Interface) (resetBaseRunID string, decisionFinishID int64, err error) {
resetBaseRunID = rid
req := &shared.GetWorkflowExecutionHistoryRequest{
Domain: common.StringPtr(domain),
Expand Down
1 change: 1 addition & 0 deletions tools/cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ const (
FlagRemoveBadBinary = "remove_bad_binary"
FlagResetType = "reset_type"
FlagResetPointsOnly = "reset_points_only"
FlagResetBadBinaryChecksum = "reset_bad_binary_checksum"
)

var flagsForExecution = []cli.Flag{
Expand Down
9 changes: 9 additions & 0 deletions tools/cli/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ func newWorkflowCommands() []cli.Command {
Name: FlagResetType,
Usage: "where to reset. Support one of these: " + strings.Join(mapKeysToArray(resetTypesMap), ","),
},
cli.StringFlag{
Name: FlagResetBadBinaryChecksum,
Usage: "Binary checksum for resetType of BadBinary",
},
},
Action: func(c *cli.Context) {
ResetWorkflow(c)
Expand All @@ -240,6 +244,7 @@ func newWorkflowCommands() []cli.Command {
},
cli.StringFlag{
Name: FlagExcludeFile,
Value: "",
Usage: "Another input file to use for excluding from resetting, only workflowID is needed.",
},
cli.StringFlag{
Expand All @@ -264,6 +269,10 @@ func newWorkflowCommands() []cli.Command {
Name: FlagResetType,
Usage: "where to reset. Support one of these: " + strings.Join(mapKeysToArray(resetTypesMap), ","),
},
cli.StringFlag{
Name: FlagResetBadBinaryChecksum,
Usage: "Binary checksum for resetType of BadBinary",
},
},
Action: func(c *cli.Context) {
ResetInBatch(c)
Expand Down

0 comments on commit 4027955

Please sign in to comment.