Skip to content

Commit

Permalink
Update cadence batch command to receive more input (cadence-workflow#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll authored Feb 3, 2022
1 parent b8d47dc commit 19a8526
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 5 deletions.
14 changes: 10 additions & 4 deletions service/worker/batcher/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,15 @@ const (
batchActivityName = "cadence-sys-batch-activity"
// InfiniteDuration is a long duration(20 yrs) we used for infinite workflow running
InfiniteDuration = 20 * 365 * 24 * time.Hour
pageSize = 1000

_nonRetriableReason = "non-retriable-error"

// DefaultRPS is the default RPS
DefaultRPS = 50
// DefaultConcurrency is the default concurrency
DefaultConcurrency = 5
// DefaultPageSize is the default page size
DefaultPageSize = 1000
// DefaultAttemptsOnRetryableError is the default value for AttemptsOnRetryableError
DefaultAttemptsOnRetryableError = 50
// DefaultActivityHeartBeatTimeout is the default value for ActivityHeartBeatTimeout
Expand Down Expand Up @@ -135,6 +136,8 @@ type (
RPS int
// Number of goroutines running in parallel to process
Concurrency int
// Number of workflows processed in a batch
PageSize int
// Number of attempts for each workflow to process in case of retryable error before giving up
AttemptsOnRetryableError int
// timeout for activity heartbeat
Expand Down Expand Up @@ -237,6 +240,9 @@ func setDefaultParams(params BatchParams) BatchParams {
if params.Concurrency <= 0 {
params.Concurrency = DefaultConcurrency
}
if params.PageSize <= 0 {
params.PageSize = DefaultPageSize
}
if params.AttemptsOnRetryableError <= 0 {
params.AttemptsOnRetryableError = DefaultAttemptsOnRetryableError
}
Expand Down Expand Up @@ -298,8 +304,8 @@ func BatchActivity(ctx context.Context, batchParams BatchParams) (HeartBeatDetai
hbd.TotalEstimate = resp.GetCount()
}
rateLimiter := rate.NewLimiter(rate.Limit(batchParams.RPS), batchParams.RPS)
taskCh := make(chan taskDetail, pageSize)
respCh := make(chan error, pageSize)
taskCh := make(chan taskDetail, batchParams.PageSize)
respCh := make(chan error, batchParams.PageSize)
for i := 0; i < batchParams.Concurrency; i++ {
go startTaskProcessor(ctx, batchParams, domainID, taskCh, respCh, rateLimiter, client, adminClient)
}
Expand All @@ -310,7 +316,7 @@ func BatchActivity(ctx context.Context, batchParams BatchParams) (HeartBeatDetai
// And we can't use list API because terminate / reset will mutate the result.
resp, err := client.ScanWorkflowExecutions(ctx, &types.ListWorkflowExecutionsRequest{
Domain: batchParams.DomainName,
PageSize: int32(pageSize),
PageSize: int32(batchParams.PageSize),
NextPageToken: hbd.PageToken,
Query: batchParams.Query,
})
Expand Down
2 changes: 2 additions & 0 deletions tools/cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,8 @@ const (
FlagFailoverTypeWithAlias = FlagFailoverType + ", ft"
FlagFailoverTimeout = "failover_timeout_seconds"
FlagFailoverTimeoutWithAlias = FlagFailoverTimeout + ", fts"
FlagActivityHeartBeatTimeout = "heart_beat_timeout_seconds"
FlagActivityHeartBeatTimeoutWithAlias = FlagActivityHeartBeatTimeout + ", hbts"
FlagFailoverWaitTime = "failover_wait_time_second"
FlagFailoverWaitTimeWithAlias = FlagFailoverWaitTime + ", fwts"
FlagFailoverBatchSize = "failover_batch_size"
Expand Down
21 changes: 21 additions & 0 deletions tools/cli/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package cli

import (
"strings"
"time"

"github.com/urfave/cli"

Expand Down Expand Up @@ -511,6 +512,26 @@ func newBatchCommands() []cli.Command {
Name: FlagYes,
Usage: "Optional flag to disable confirmation prompt",
},
cli.IntFlag{
Name: FlagPageSize,
Value: batcher.DefaultPageSize,
Usage: "PageSize of processiing",
},
cli.IntFlag{
Name: FlagRetryAttempts,
Value: batcher.DefaultAttemptsOnRetryableError,
Usage: "Retry attempts for retriable errors",
},
cli.IntFlag{
Name: FlagActivityHeartBeatTimeoutWithAlias,
Value: int(batcher.DefaultActivityHeartBeatTimeout / time.Second),
Usage: "Heartbeat timeout for batcher activity in seconds",
},
cli.IntFlag{
Name: FlagConcurrency,
Value: batcher.DefaultConcurrency,
Usage: "Concurrency of batch activity",
},
},
Action: func(c *cli.Context) {
StartBatchJob(c)
Expand Down
11 changes: 10 additions & 1 deletion tools/cli/workflowBatchCommands.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"fmt"
"os"
"strings"
"time"

"github.com/pborman/uuid"
"github.com/urfave/cli"
Expand Down Expand Up @@ -172,6 +173,10 @@ func StartBatchJob(c *cli.Context) {
targetCluster = getRequiredOption(c, FlagTargetCluster)
}
rps := c.Int(FlagRPS)
pageSize := c.Int(FlagPageSize)
concurrency := c.Int(FlagConcurrency)
retryAttempt := c.Int(FlagRetryAttempts)
heartBeatTimeout := time.Duration(c.Int(FlagActivityHeartBeatTimeout)) * time.Second

svcClient := cFactory.ServerFrontendClient(c)
tcCtx, cancel := newContext(c)
Expand Down Expand Up @@ -221,7 +226,11 @@ func StartBatchJob(c *cli.Context) {
SourceCluster: sourceCluster,
TargetCluster: targetCluster,
},
RPS: rps,
RPS: rps,
Concurrency: concurrency,
PageSize: pageSize,
AttemptsOnRetryableError: retryAttempt,
ActivityHeartBeatTimeout: heartBeatTimeout,
}
input, err := json.Marshal(params)
if err != nil {
Expand Down

0 comments on commit 19a8526

Please sign in to comment.