Skip to content

Commit

Permalink
Fix admin workflow re-replicate command (cadence-workflow#4325)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Aug 17, 2021
1 parent abe2284 commit f52498a
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 55 deletions.
14 changes: 3 additions & 11 deletions service/frontend/adminHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ type (

var (
adminServiceRetryPolicy = common.CreateAdminServiceRetryPolicy()
resendStartEventID = common.Int64Ptr(0)
)

// NewAdminHandler creates a thrift handler for the cadence admin service
Expand Down Expand Up @@ -1004,10 +1003,10 @@ func (adh *adminHandlerImpl) ResendReplicationTasks(
request.DomainID,
request.GetWorkflowID(),
request.GetRunID(),
resendStartEventID,
request.StartEventID,
request.StartVersion,
nil,
nil,
request.EndEventID,
request.EndVersion,
)
}

Expand Down Expand Up @@ -1054,13 +1053,6 @@ func (adh *adminHandlerImpl) validateGetWorkflowExecutionRawHistoryV2Request(
return &types.BadRequestError{Message: "Invalid PageSize."}
}

if request.StartEventID == nil &&
request.StartEventVersion == nil &&
request.EndEventID == nil &&
request.EndEventVersion == nil {
return &types.BadRequestError{Message: "Invalid event query range."}
}

if (request.StartEventID != nil && request.StartEventVersion == nil) ||
(request.StartEventID == nil && request.StartEventVersion != nil) {
return &types.BadRequestError{Message: "Invalid start event id and start event version combination."}
Expand Down
17 changes: 9 additions & 8 deletions tools/cli/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ func newAdminDomainCommands() []cli.Command {
func newAdminKafkaCommands() []cli.Command {
return []cli.Command{
{
// TODO: do we still need this command given that kafka replication has been deprecated?
Name: "parse",
Aliases: []string{"par"},
Usage: "Parse replication tasks from kafka messages",
Expand Down Expand Up @@ -482,6 +483,7 @@ func newAdminKafkaCommands() []cli.Command {
},
},
{
// TODO: move this command be a subcommand of admin workflow
Name: "rereplicate",
Aliases: []string{"rrp"},
Usage: "Rereplicate replication tasks from history tables",
Expand All @@ -494,10 +496,9 @@ func newAdminKafkaCommands() []cli.Command {
Name: FlagNumberOfShards,
Usage: "NumberOfShards is required to calculate shardID. (see server config for numHistoryShards)",
},
// for one workflow
cli.Int64Flag{
Name: FlagMaxEventID,
Usage: "MaxEventID Optional, default to all events",
cli.StringFlag{
Name: FlagDomainID,
Usage: "DomainID",
},
cli.StringFlag{
Name: FlagWorkflowIDWithAlias,
Expand All @@ -507,13 +508,13 @@ func newAdminKafkaCommands() []cli.Command {
Name: FlagRunIDWithAlias,
Usage: "RunID",
},
cli.StringFlag{
Name: FlagDomainID,
Usage: "DomainID",
cli.Int64Flag{
Name: FlagMaxEventID,
Usage: "MaxEventID Optional, default to all events",
},
cli.StringFlag{
Name: FlagEndEventVersion,
Usage: "Workflow end event version",
Usage: "Workflow end event version, required if MaxEventID is specified",
}),
Action: func(c *cli.Context) {
AdminRereplicate(c)
Expand Down
50 changes: 14 additions & 36 deletions tools/cli/adminKafkaCommands.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,46 +452,29 @@ type ClustersConfig struct {

func doRereplicate(
ctx context.Context,
shardID int,
domainID string,
wid string,
rid string,
endEventID int64,
endEventVersion int64,
endEventID *int64,
endEventVersion *int64,
sourceCluster string,
adminClient admin.Client,
exeMgr persistence.ExecutionManager,
) {
fmt.Printf("Start rereplicate for wid: %v, rid:%v \n", wid, rid)
resp, err := exeMgr.GetWorkflowExecution(ctx, &persistence.GetWorkflowExecutionRequest{
DomainID: domainID,
Execution: types.WorkflowExecution{
WorkflowID: wid,
RunID: rid,
},
})
if err != nil {
ErrorAndExit("GetWorkflowExecution error", err)
}

versionHistories := resp.State.VersionHistories
if versionHistories == nil {
ErrorAndExit("The workflow is not a NDC workflow", nil)
}
fmt.Printf("Start rereplication for wid: %v, rid:%v \n", wid, rid)
if err := adminClient.ResendReplicationTasks(
ctx,
&types.ResendReplicationTasksRequest{
DomainID: domainID,
WorkflowID: wid,
RunID: rid,
RemoteCluster: sourceCluster,
EndEventID: common.Int64Ptr(endEventID + 1),
EndVersion: common.Int64Ptr(endEventVersion),
EndEventID: endEventID,
EndVersion: endEventVersion,
},
); err != nil {
ErrorAndExit("Failed to resend ndc workflow", err)
}
fmt.Printf("Done rereplicate for wid: %v, rid:%v \n", wid, rid)
fmt.Printf("Done rereplication for wid: %v, rid:%v \n", wid, rid)
}

// AdminRereplicate parses will re-publish replication tasks to topic
Expand All @@ -502,23 +485,20 @@ func AdminRereplicate(c *cli.Context) {
return
}
sourceCluster := getRequiredOption(c, FlagSourceCluster)
if !c.IsSet(FlagMaxEventID) {
ErrorAndExit("End event ID is not defined", nil)
}
if !c.IsSet(FlagEndEventVersion) {
ErrorAndExit("End event version is not defined", nil)
}

adminClient := cFactory.ServerAdminClient(c)
endEventID := c.Int64(FlagMaxEventID)
endVersion := c.Int64(FlagEndEventVersion)
var endEventID, endVersion *int64
if c.IsSet(FlagMaxEventID) {
endEventID = common.Int64Ptr(c.Int64(FlagMaxEventID) + 1)
}
if c.IsSet(FlagEndEventVersion) {
endVersion = common.Int64Ptr(c.Int64(FlagEndEventVersion))
}
domainID := getRequiredOption(c, FlagDomainID)
wid := getRequiredOption(c, FlagWorkflowID)
rid := getRequiredOption(c, FlagRunID)
shardID := common.WorkflowIDToHistoryShard(wid, numberOfShards)
contextTimeout := defaultResendContextTimeout
executionManager := initializeExecutionStore(c, shardID, 0)
defer executionManager.Close()

if c.GlobalIsSet(FlagContextTimeout) {
contextTimeout = time.Duration(c.GlobalInt(FlagContextTimeout)) * time.Second
}
Expand All @@ -527,15 +507,13 @@ func AdminRereplicate(c *cli.Context) {

doRereplicate(
ctx,
shardID,
domainID,
wid,
rid,
endEventID,
endVersion,
sourceCluster,
adminClient,
executionManager,
)
}

Expand Down

0 comments on commit f52498a

Please sign in to comment.