diff --git a/bench/README.md b/bench/README.md index 534f17ab2ad..bc07513408b 100644 --- a/bench/README.md +++ b/bench/README.md @@ -7,7 +7,7 @@ Setup ----------- ### Cadence server -Bench suite is running against a Cadence server/cluster. +Bench suite is running against a Cadence server/cluster. See [documentation](https://cadenceworkflow.io/docs/operation-guide/setup/) for Cadence server cluster setup. Note that only the Basic bench test don't require Advanced Visibility. @@ -20,7 +20,7 @@ For local env you can run it through: See more [documentation here](https://cadenceworkflow.io/docs/concepts/search-workflows/). ### Bench Workers -:warning: NOTE: unlike canary, starting bench worker will not automatically start a bench test. Next two sections will cover how to start and configure it. +:warning: NOTE: Starting this bench worker will not automatically start a bench test. Next two sections will cover how to start and configure it. Different ways of start the bench workers: diff --git a/canary/README.md b/canary/README.md new file mode 100644 index 00000000000..393aa701120 --- /dev/null +++ b/canary/README.md @@ -0,0 +1,175 @@ +# Periodical feature health check workflow tools(aka Canary) + +This README describes how to set up Cadence canary, different types of canary test cases, and how to start the canary. + +Setup +----------- +### Cadence server + +Canary test suite is running against a Cadence server/cluster. See [documentation](https://cadenceworkflow.io/docs/operation-guide/setup/) for Cadence server cluster setup. + +Note that some tests require features like [Advanced Visibility]((https://cadenceworkflow.io/docs/concepts/search-workflows/).) and [History Archival](https://cadenceworkflow.io/docs/concepts/archival/). + +For local server env you can run it through: +- Docker: Instructions for running Cadence server through docker can be found in `docker/README.md`. Either `docker-compose-es-v7.yml` or `docker-compose-es.yml` can be used to start the server. +- Build from source: Please check [CONTRIBUTING](/CONTRIBUTING.md) for how to build and run Cadence server from source. Please also make sure Kafka and ElasticSearch are running before starting the server with `./cadence-server --zone es start`. If ElasticSearch v7 is used, change the value for `--zone` flag to `es_v7`. + +### Start canary + +:warning: NOTE: By default, starting this canary worker will not automatically start a canary test. Next two sections will cover how to start and configure it. + +Different ways of start the canary workers: + +#### 1. Use docker image `ubercadence/cadence-canary:master` + +For now, this image has no release versions for simplified the release process. Always use `master` tag for the image. + +Similar to server/CLI images, the canary image will be built and published automatically by Github on every commit onto the `master` branch. + +You can [pre-built docker-compose file](../docker/docker-compose-canary.yml) to run against local server +In the `docker/` directory, run: +``` +docker-compose -f docker-compose-canary.yml up +``` +You can modify [the canary worker config](../docker/config/canary/development.yaml) to run against a prod server cluster. + + +#### 2. Build & Run the worker/canary + +In the project root, build cadence canary binary: + ``` + make cadence-canary + ``` + +Then start canary worker: + ``` + ./cadence-canary start + ``` +This is essentially the same as +``` + ./cadence-canary start -mode worker + ``` + +By default, it will load [the configuration in `config/canary/development.yaml`](../config/canary/development.yaml). +Run `./cadence-canary -h` for details to understand the start options of how to change the loading directory if needed. +This will only start the workers. + +Configurations +---------------------- +Canary workers configuration contains two parts: +- **Canary**: this part controls which domains canary workers are responsible for what tests the sanity workflow will exclude. +```yaml +canary: + domains: ["cadence-canary"] # it will start workers on all those domains(also try to register if not exists) + excludes: ["workflow.searchAttributes", "workflow.batch", "workflow.archival.visibility"] # it will exclude the three test cases + cron: + cronSchedule: #the schedule of cron canary, default to "@every 30s" + cronExecutionTimeout: #the timeout of each run of the cron execution, default to 18 minutes + startJobTimeout: #the timeout of each run of the sanity test suite, default to 9 minutes +``` +An exception here is `HistoryArchival` and `VisibilityArchival` test cases will always use `canary-archival-domain` domain. + +- **Cadence**: this control how canary worker should talk to Cadence server, which includes the server's service name and address. +```yaml +cadence: + service: "cadence-frontend" # frontend service name + host: "127.0.0.1:7933" # frontend address +``` +- **Metrics**: metrics configuration. Similar to server metric emitter, only M3/Statsd/Prometheus is supported. +- **Log**: logging configuration. Similar to server logging configuration. + +Canary Test Cases +---------------------- + +#### Cron Canary: periodically running Sanity test suite + +The Cron workflow is not a test case. It's a top-level workflow to kick off the Sanity suite(described below) periodically. +To start the cron canary: +``` + ./cadence-canary start -mode cronCanary + ``` + +For local development, you can also start the cron canary workflows along with the worker: +``` + ./cadence-canary start -m all + ``` + +The Cron Schedule is from the Configuration. +However, changing the schedule requires you manually terminate the existing cron workflow to take into effect. +It can be [improved](https://github.com/uber/cadence/issues/4469) in the future. + +The workflowID is fixed: `"cadence.canary.cron"` + +#### Test case starter & Sanity suite +The sanity workflow is test suite workflow. It will kick off a bunch of childWorkflows for all the test to verify that Cadence server is operating correctly. + +An error result of the sanity workflow indicates at least one of the test case fails. + +You can start the sanity workflow as one-off run: +``` +cadence --do workflow start --tl canary-task-queue --et 1200 --wt workflow.sanity -i 0 +``` +Note: +* tasklist(tl) is fixed to `canary-task-queue` +* execution timeout(et) is recommended to 20 minutes(`1200` seconds) but you can adjust it +* the only required input is the scheduled unix timestamp, and `0` will uses the workflow starting time + +Or using a cron job(e.g. every minute): +``` +cadence --do workflow start --tl canary-task-queue --et 1200 --wt workflow.sanity -i 0 --cron "* * * * *" +``` + +This is [the list of the test cases](./sanity.go) that it will start all supported test cases by default if no excludes are configured. +You can find [the workflow names of the tests cases in this file](./const.go) if you want to manually start certain test cases. +For example, manually start an `Echo` test case: +``` +cadence --do <> workflow start --tl canary-task-queue --et 10 --wt workflow.echo +``` + +Once you start the test cases, you can observe the progress: +``` +cadence --do cadence-canary workflow ob -w <...workflowID form the start command output> +``` + +#### Echo +Echo workflow tests the very basic workflow functionality. It executes an activity to return some output and verifies it as the workflow result. + +#### Signal +Signal workflow tests the signal feature. + +#### Visibility +Visibility workflow tests the basic visibility feature. No advanced visibility needed, but advanced visibility should also support it. + +#### SearchAttributes +SearchAttributes workflow tests the advanced visibility feature. Make sure advanced visibility feature is configured on the server. Otherwise, it should be excluded from the sanity test suite/case. + +#### ConcurrentExec +ConcurrentExec workflow tests executing activities concurrently. + +#### Query +Query workflow tests the Query feature. + +#### Timeout +Timeout workflow make sure the activity timeout is enforced. + +#### LocalActivity +LocalActivity workflow tests the local activity feature. + +#### Cancellation +Cancellation workflowt tests cancellation feature. + +#### Retry +Retry workflow tests activity retry policy. + +#### Reset +Reset workflow tests reset feature. + +#### HistoryArchival +HistoryArchival tests history archival feature. Make sure history archival feature is configured on the server. Otherwise, it should be excluded from the sanity test suite/case. +This test case always uses `canary-archival-domain` domain. + +#### VisibilityArchival +VisibilityArchival tests visibility archival feature. Make sure visibility feature is configured on the server. Otherwise, it should be excluded from the sanity test suite/case. + +#### Batch +Batch workflow tests the batch job feature. Make sure advanced visibility feature is configured on the server. Otherwise, it should be excluded from the sanity test suite/case. \ No newline at end of file diff --git a/canary/batch.go b/canary/batch.go index 2d883a2f9ca..a10980b8cac 100644 --- a/canary/batch.go +++ b/canary/batch.go @@ -63,7 +63,10 @@ type ( } ) -func batchWorkflow(ctx workflow.Context, scheduledTimeNanos int64, domain string) error { +func batchWorkflow(ctx workflow.Context, inputScheduledTimeNanos int64) error { + scheduledTimeNanos := getScheduledTimeFromInputIfNonZero(ctx, inputScheduledTimeNanos) + domain := workflow.GetInfo(ctx).Domain + profile, err := beginWorkflow(ctx, wfTypeBatch, scheduledTimeNanos) if err != nil { return err diff --git a/canary/canary.go b/canary/canary.go index d8349b2813a..823e67ad937 100644 --- a/canary/canary.go +++ b/canary/canary.go @@ -22,6 +22,7 @@ package canary import ( "context" + "fmt" "github.com/opentracing/opentracing-go" "go.uber.org/cadence/.gen/go/shared" @@ -32,7 +33,7 @@ import ( type ( // Runnable is an interface for anything that exposes a Run method Runnable interface { - Run() error + Run(mode string) error } canaryImpl struct { @@ -41,6 +42,7 @@ type ( archivalClient cadenceClient systemClient cadenceClient runtime *RuntimeContext + canaryConfig *Canary } activityContext struct { @@ -57,7 +59,7 @@ const ( ) // new returns a new instance of Canary runnable -func newCanary(domain string, rc *RuntimeContext) Runnable { +func newCanary(domain string, rc *RuntimeContext, canaryConfig *Canary) Runnable { canaryClient := newCadenceClient(domain, rc) archivalClient := newCadenceClient(archivalDomain, rc) systemClient := newCadenceClient(systemDomain, rc) @@ -67,11 +69,15 @@ func newCanary(domain string, rc *RuntimeContext) Runnable { archivalClient: archivalClient, systemClient: systemClient, runtime: rc, + canaryConfig: canaryConfig, } } // Run runs the canary -func (c *canaryImpl) Run() error { +func (c *canaryImpl) Run(mode string) error { + if mode != ModeCronCanary && mode != ModeAll && mode != ModeWorker { + return fmt.Errorf("wrong mode to start canary") + } var err error log := c.runtime.logger @@ -85,18 +91,25 @@ func (c *canaryImpl) Run() error { return err } - // start the initial cron workflow - c.startCronWorkflow() + if mode == ModeAll || mode == ModeCronCanary { + // start the initial cron workflow + c.startCronWorkflow() + } - err = c.startWorker() - if err != nil { - log.Error("start worker failed", zap.Error(err)) - return err + if mode == ModeAll || mode == ModeWorker { + err = c.startWorker() + if err != nil { + log.Error("start worker failed", zap.Error(err)) + return err + } } + return nil } func (c *canaryImpl) startWorker() error { + c.runtime.logger.Info("starting canary worker...") + options := worker.Options{ Logger: c.runtime.logger, MetricsScope: c.runtime.metrics, @@ -115,18 +128,24 @@ func (c *canaryImpl) startWorker() error { } func (c *canaryImpl) startCronWorkflow() { + c.runtime.logger.Info("starting canary cron workflow...") wfID := "cadence.canary.cron" - opts := newWorkflowOptions(wfID, cronWFExecutionTimeout) - opts.CronSchedule = "@every 30s" // run every 30s + opts := newWorkflowOptions(wfID, c.canaryConfig.Cron.CronExecutionTimeout) + opts.CronSchedule = c.canaryConfig.Cron.CronSchedule + // create the cron workflow span ctx := context.Background() span := opentracing.StartSpan("start-cron-workflow-span") defer span.Finish() ctx = opentracing.ContextWithSpan(ctx, span) - _, err := c.canaryClient.StartWorkflow(ctx, opts, cronWorkflow, c.canaryDomain, wfTypeSanity) + _, err := c.canaryClient.StartWorkflow(ctx, opts, cronWorkflow, wfTypeSanity) if err != nil { + // TODO: improvement: compare the cron schedule to decide whether or not terminating the current one + // https://github.com/uber/cadence/issues/4469 if _, ok := err.(*shared.WorkflowExecutionAlreadyStartedError); !ok { c.runtime.logger.Error("error starting cron workflow", zap.Error(err)) + } else { + c.runtime.logger.Info("cron workflow already started, you may need to terminate and restart if cron schedule is changed...") } } } @@ -137,6 +156,7 @@ func (c *canaryImpl) newActivityContext() context.Context { ctx := context.WithValue(context.Background(), ctxKeyActivityRuntime, &activityContext{cadence: c.canaryClient}) ctx = context.WithValue(ctx, ctxKeyActivityArchivalRuntime, &activityContext{cadence: c.archivalClient}) ctx = context.WithValue(ctx, ctxKeyActivitySystemClient, &activityContext{cadence: c.systemClient}) + ctx = context.WithValue(ctx, ctxKeyConfig, c.canaryConfig) return overrideWorkerOptions(ctx) } diff --git a/canary/cancellation.go b/canary/cancellation.go index 5622f7730f8..d8903ff7b80 100644 --- a/canary/cancellation.go +++ b/canary/cancellation.go @@ -43,7 +43,10 @@ func init() { } // cancellationWorkflow is the workflow implementation to test for cancellation of workflows -func cancellationWorkflow(ctx workflow.Context, scheduledTimeNanos int64, domain string) error { +func cancellationWorkflow(ctx workflow.Context, inputScheduledTimeNanos int64) error { + scheduledTimeNanos := getScheduledTimeFromInputIfNonZero(ctx, inputScheduledTimeNanos) + domain := workflow.GetInfo(ctx).Domain + profile, err := beginWorkflow(ctx, wfTypeCancellation, scheduledTimeNanos) if err != nil { return err diff --git a/canary/common.go b/canary/common.go index 22605f27045..153dabb1c41 100644 --- a/canary/common.go +++ b/canary/common.go @@ -105,3 +105,10 @@ func beginWorkflow(ctx workflow.Context, wfType string, scheduledTimeNanos int64 func concat(first string, second string) string { return first + "/" + second } + +func getScheduledTimeFromInputIfNonZero(ctx workflow.Context, nanos int64) int64 { + if nanos == 0 { + return workflow.Now(ctx).UnixNano() + } + return nanos +} diff --git a/canary/concurrentExec.go b/canary/concurrentExec.go index 513158b9b55..7fc05bdc3d6 100644 --- a/canary/concurrentExec.go +++ b/canary/concurrentExec.go @@ -43,7 +43,9 @@ func init() { // concurrentExecWorkflow is the workflow implementation to test // 1. client side events pagination when reconstructing workflow state // 2. concurrent execution of activities -func concurrentExecWorkflow(ctx workflow.Context, scheduledTimeNanos int64, domain string) error { +func concurrentExecWorkflow(ctx workflow.Context, inputScheduledTimeNanos int64) error { + scheduledTimeNanos := getScheduledTimeFromInputIfNonZero(ctx, inputScheduledTimeNanos) + profile, err := beginWorkflow(ctx, wfTypeConcurrentExec, scheduledTimeNanos) if err != nil { return err diff --git a/canary/config.go b/canary/config.go index 37c86b7bd72..25773bf6da2 100644 --- a/canary/config.go +++ b/canary/config.go @@ -22,6 +22,7 @@ package canary import ( "errors" + "time" "github.com/uber-go/tally" "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" @@ -64,6 +65,14 @@ type ( Canary struct { Domains []string `yaml:"domains"` Excludes []string `yaml:"excludes"` + Cron Cron `yaml:"cron"` + } + + // Cron contains configuration for the cron workflow for canary + Cron struct { + CronSchedule string `yaml:"cronSchedule"` // default to "@every 30s" + CronExecutionTimeout time.Duration `yaml:"cronExecutionTimeout"` //default to 18 minutes + StartJobTimeout time.Duration `yaml:"startJobTimeout"` // default to 9 minutes } // Cadence contains the configuration for cadence service diff --git a/canary/const.go b/canary/const.go index 6a81520ad66..13e279d8b7e 100644 --- a/canary/const.go +++ b/canary/const.go @@ -38,11 +38,19 @@ const ( ctxKeyActivityRuntime = "runtime" ctxKeyActivityArchivalRuntime = "runtime-archival" ctxKeyActivitySystemClient = "system-client" + ctxKeyConfig = "runtime-config" archivalDomain = "canary-archival-domain" systemDomain = "cadence-system" archivalTaskListName = "canary-archival-task-queue" ) +// canary running modes +const ( + ModeAll = "all" + ModeWorker = "worker" + ModeCronCanary = "cronCanary" +) + // workflowVersion represents the current version of every single // workflow function in this canary. Every workflow function verifies // that the decision task it is executing is compatible with this version @@ -51,11 +59,6 @@ const ( const workflowVersion = workflow.Version(3) const workflowChangeID = "initial version" -const ( - cronJobTimeout = 9 * time.Minute - cronWFExecutionTimeout = 18 * time.Minute -) - // wfType/activityType refers to the friendly short names given to // workflows and activities - at the time of registration, these names // will be used to associate with a workflow or activity function diff --git a/canary/cron.go b/canary/cron.go index 411b1d8bd67..c6e9ed45ade 100644 --- a/canary/cron.go +++ b/canary/cron.go @@ -51,8 +51,8 @@ func init() { // - every instance of job completes within 10 mins func cronWorkflow( ctx workflow.Context, - domain string, jobName string) error { + domain := workflow.GetInfo(ctx).Domain profile, err := beginWorkflow(ctx, wfTypeCron, workflow.Now(ctx).UnixNano()) aCtx := workflow.WithActivityOptions(ctx, newActivityOptions()) @@ -85,7 +85,8 @@ func cronActivity( parentID := activity.GetInfo(ctx).WorkflowExecution.ID jobID := fmt.Sprintf("%s-%s-%v", parentID, jobName, time.Now().Format(time.RFC3339)) - wf, err := startJob(&cadenceClient, scope, jobID, jobName, domain) + config := getContextValue(ctx, ctxKeyConfig).(*Canary) + wf, err := startJob(&cadenceClient, scope, jobID, jobName, domain, config) if err != nil { logger.Error("cronActivity: failed to start job", zap.Error(err)) if isDomainNotActiveErr(err) { @@ -104,7 +105,8 @@ func startJob( scope tally.Scope, jobID string, jobName string, - domain string) (*workflow.Execution, error) { + domain string, + config *Canary) (*workflow.Execution, error) { scope.Counter(startWorkflowCount).Inc(1) sw := scope.Timer(startWorkflowLatency).Start() @@ -116,7 +118,7 @@ func startJob( defer span.Finish() ctx = opentracing.ContextWithSpan(ctx, span) - opts := newWorkflowOptions(jobID, cronJobTimeout) + opts := newWorkflowOptions(jobID, config.Cron.StartJobTimeout) wf, err := cadenceClient.StartWorkflow(ctx, opts, jobName, time.Now().UnixNano(), domain) if err != nil { scope.Counter(startWorkflowFailureCount).Inc(1) diff --git a/canary/echo.go b/canary/echo.go index c674b8c2759..ce2d390e6ac 100644 --- a/canary/echo.go +++ b/canary/echo.go @@ -52,7 +52,9 @@ func init() { // echoWorkflow is a workflow implementation which simply executes an // activity that echoes back the input as output -func echoWorkflow(ctx workflow.Context, scheduledTimeNanos int64, domain string) error { +func echoWorkflow(ctx workflow.Context, inputScheduledTimeNanos int64) error { + scheduledTimeNanos := getScheduledTimeFromInputIfNonZero(ctx, inputScheduledTimeNanos) + profile, err := beginWorkflow(ctx, wfTypeEcho, scheduledTimeNanos) if err != nil { return err diff --git a/canary/historyArchival.go b/canary/historyArchival.go index 7f9b6eec31e..083eb7cd7c8 100644 --- a/canary/historyArchival.go +++ b/canary/historyArchival.go @@ -46,7 +46,9 @@ func init() { registerActivity(largeResultActivity, activityTypeLargeResult) } -func historyArchivalWorkflow(ctx workflow.Context, scheduledTimeNanos int64, _ string) error { +func historyArchivalWorkflow(ctx workflow.Context, inputScheduledTimeNanos int64) error { + scheduledTimeNanos := getScheduledTimeFromInputIfNonZero(ctx, inputScheduledTimeNanos) + profile, err := beginWorkflow(ctx, wfTypeHistoryArchival, scheduledTimeNanos) if err != nil { return err diff --git a/canary/localactivity.go b/canary/localactivity.go index 52ca69371d2..dd0c7570870 100644 --- a/canary/localactivity.go +++ b/canary/localactivity.go @@ -54,8 +54,10 @@ var checks = []conditionAndAction{ {checkCondition4, activityForCondition4}, } -func localActivityWorkfow(ctx workflow.Context) (string, error) { - profile, err := beginWorkflow(ctx, wfTypeSignal, workflow.Now(ctx).UnixNano()) +func localActivityWorkfow(ctx workflow.Context, inputScheduledTimeNanos int64) (string, error) { + scheduledTimeNanos := getScheduledTimeFromInputIfNonZero(ctx, inputScheduledTimeNanos) + + profile, err := beginWorkflow(ctx, wfTypeSignal, scheduledTimeNanos) if err != nil { return "", err } diff --git a/canary/query.go b/canary/query.go index 19f1506b942..88622e2aac2 100644 --- a/canary/query.go +++ b/canary/query.go @@ -43,7 +43,9 @@ func init() { } // queryWorkflow is the workflow implementation to test for querying workflow status -func queryWorkflow(ctx workflow.Context, scheduledTimeNanos int64, domain string) error { +func queryWorkflow(ctx workflow.Context, inputScheduledTimeNanos int64) error { + scheduledTimeNanos := getScheduledTimeFromInputIfNonZero(ctx, inputScheduledTimeNanos) + status := queryStatusStarted err := workflow.SetQueryHandler(ctx, queryType, func() (string, error) { return status, nil diff --git a/canary/reset.go b/canary/reset.go index 0202bf82155..2987c10f0f3 100644 --- a/canary/reset.go +++ b/canary/reset.go @@ -48,7 +48,10 @@ func init() { registerActivity(resetBaseActivity, activityTypeResetBase) } -func resetWorkflow(ctx workflow.Context, scheduledTimeNanos int64, domain string) error { +func resetWorkflow(ctx workflow.Context, inputScheduledTimeNanos int64) error { + scheduledTimeNanos := getScheduledTimeFromInputIfNonZero(ctx, inputScheduledTimeNanos) + domain := workflow.GetInfo(ctx).Domain + profile, err := beginWorkflow(ctx, wfTypeReset, scheduledTimeNanos) if err != nil { return err diff --git a/canary/retry.go b/canary/retry.go index 0f078cde71e..dbf364c1e55 100644 --- a/canary/retry.go +++ b/canary/retry.go @@ -43,7 +43,7 @@ func init() { registerActivity(retryOnFailureActivity, activityTypeRetryOnFailure) } -func retryWorkflow(ctx workflow.Context, scheduledTimeNanos int64, domain string) error { +func retryWorkflow(ctx workflow.Context, scheduledTimeNanos int64) error { profile, err := beginWorkflow(ctx, wfTypeRetry, scheduledTimeNanos) if err != nil { return err diff --git a/canary/runner.go b/canary/runner.go index e965231c5bd..ea7e78e4636 100644 --- a/canary/runner.go +++ b/canary/runner.go @@ -23,6 +23,7 @@ package canary import ( "fmt" "sync" + "time" "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" "go.uber.org/yarpc" @@ -87,26 +88,36 @@ func NewCanaryRunner(cfg *Config) (Runnable, error) { } // Run runs the canaries -func (r *canaryRunner) Run() error { +func (r *canaryRunner) Run(mode string) error { r.metrics.Counter("restarts").Inc(1) if len(r.config.Excludes) != 0 { updateSanityChildWFList(r.config.Excludes) } + if r.config.Cron.CronSchedule == "" { + r.config.Cron.CronSchedule = "@every 30s" + } + if r.config.Cron.CronExecutionTimeout == 0 { + r.config.Cron.CronExecutionTimeout = 18 * time.Minute + } + if r.config.Cron.StartJobTimeout == 0 { + r.config.Cron.StartJobTimeout = 9 * time.Minute + } + var wg sync.WaitGroup for _, d := range r.config.Domains { - canary := newCanary(d, r.RuntimeContext) + canary := newCanary(d, r.RuntimeContext, r.config) r.logger.Info("starting canary", zap.String("domain", d)) - r.execute(canary, &wg) + r.execute(canary, mode, &wg) } wg.Wait() return nil } -func (r *canaryRunner) execute(task Runnable, wg *sync.WaitGroup) { +func (r *canaryRunner) execute(task Runnable, mode string, wg *sync.WaitGroup) { wg.Add(1) go func() { - task.Run() + task.Run(mode) wg.Done() }() } diff --git a/canary/sanity.go b/canary/sanity.go index b8e8807abd5..d0b1470f57a 100644 --- a/canary/sanity.go +++ b/canary/sanity.go @@ -52,7 +52,10 @@ func init() { // exercises the frontend APIs and the basic functionality of the // client library - each probe / test MUST be implemented as a // child workflow of this workflow -func sanityWorkflow(ctx workflow.Context, scheduledTimeNanos int64, domain string) error { +func sanityWorkflow(ctx workflow.Context, inputScheduledTimeNanos int64) error { + scheduledTimeNanos := getScheduledTimeFromInputIfNonZero(ctx, inputScheduledTimeNanos) + domain := workflow.GetInfo(ctx).Domain + var err error profile, err := beginWorkflow(ctx, wfTypeSanity, scheduledTimeNanos) if err != nil { @@ -86,7 +89,7 @@ func forkChildWorkflows(ctx workflow.Context, domain string, names []string) (wo for _, childName := range names { cwo := newChildWorkflowOptions(domain, concat(myID, childName)) childCtx := workflow.WithChildOptions(ctx, cwo) - future := workflow.ExecuteChildWorkflow(childCtx, childName, now, domain) + future := workflow.ExecuteChildWorkflow(childCtx, childName, now) selector.AddFuture(future, func(f workflow.Future) { if err := f.Get(ctx, nil); err != nil { workflow.GetLogger(ctx).Error("child workflow failed", zap.Error(err)) diff --git a/canary/searchAttributes.go b/canary/searchAttributes.go index b2937d1cc24..bdb27bae0ac 100644 --- a/canary/searchAttributes.go +++ b/canary/searchAttributes.go @@ -42,7 +42,9 @@ func init() { } // searchAttributesWorkflow tests the search attributes apis -func searchAttributesWorkflow(ctx workflow.Context, scheduledTimeNanos int64, domain string) error { +func searchAttributesWorkflow(ctx workflow.Context, inputScheduledTimeNanos int64) error { + scheduledTimeNanos := getScheduledTimeFromInputIfNonZero(ctx, inputScheduledTimeNanos) + var err error profile, err := beginWorkflow(ctx, wfTypeSearchAttributes, scheduledTimeNanos) if err != nil { diff --git a/canary/signal.go b/canary/signal.go index 27a6d761579..13bfb211f54 100644 --- a/canary/signal.go +++ b/canary/signal.go @@ -43,7 +43,10 @@ func init() { // signalWorkflow is the workflow implementation to test SignalWorkflowExecution API // it simply spins up an activity which sends a signal to the parent -func signalWorkflow(ctx workflow.Context, scheduledTimeNanos int64, domain string) error { +func signalWorkflow(ctx workflow.Context, inputScheduledTimeNanos int64) error { + scheduledTimeNanos := getScheduledTimeFromInputIfNonZero(ctx, inputScheduledTimeNanos) + domain := workflow.GetInfo(ctx).Domain + var err error profile, err := beginWorkflow(ctx, wfTypeSignal, scheduledTimeNanos) if err != nil { diff --git a/canary/timeout.go b/canary/timeout.go index 157349a5f6c..22114f9b471 100644 --- a/canary/timeout.go +++ b/canary/timeout.go @@ -43,7 +43,9 @@ func init() { } // timeoutWorkflow is the workflow implementation to test for querying workflow status -func timeoutWorkflow(ctx workflow.Context, scheduledTimeNanos int64, domain string) error { +func timeoutWorkflow(ctx workflow.Context, inputScheduledTimeNanos int64) error { + scheduledTimeNanos := getScheduledTimeFromInputIfNonZero(ctx, inputScheduledTimeNanos) + profile, err := beginWorkflow(ctx, wfTypeConcurrentExec, scheduledTimeNanos) if err != nil { return err diff --git a/canary/visibility.go b/canary/visibility.go index ce3735aa245..90a44a11173 100644 --- a/canary/visibility.go +++ b/canary/visibility.go @@ -43,7 +43,9 @@ func init() { } // visibilityWorkflow tests the visibility apis -func visibilityWorkflow(ctx workflow.Context, scheduledTimeNanos int64, domain string) error { +func visibilityWorkflow(ctx workflow.Context, inputScheduledTimeNanos int64) error { + scheduledTimeNanos := getScheduledTimeFromInputIfNonZero(ctx, inputScheduledTimeNanos) + var err error profile, err := beginWorkflow(ctx, wfTypeVisibility, scheduledTimeNanos) if err != nil { diff --git a/canary/visibilityArchival.go b/canary/visibilityArchival.go index 83c669f08b7..88dc304f11d 100644 --- a/canary/visibilityArchival.go +++ b/canary/visibilityArchival.go @@ -57,7 +57,9 @@ func init() { registerActivity(visibilityArchivalActivity, activityTypeVisibilityArchival) } -func visibilityArchivalWorkflow(ctx workflow.Context, scheduledTimeNanos int64) error { +func visibilityArchivalWorkflow(ctx workflow.Context, inputScheduledTimeNanos int64) error { + scheduledTimeNanos := getScheduledTimeFromInputIfNonZero(ctx, inputScheduledTimeNanos) + profile, err := beginWorkflow(ctx, wfTypeVisibilityArchival, scheduledTimeNanos) if err != nil { return err diff --git a/cmd/canary/main.go b/cmd/canary/main.go index 401e50f6f77..50f35ae1cb0 100644 --- a/cmd/canary/main.go +++ b/cmd/canary/main.go @@ -21,6 +21,7 @@ package main import ( + "fmt" "log" "os" "path" @@ -48,12 +49,13 @@ func startHandler(c *cli.Context) { log.Fatal("Invalid config: ", err) } + mode := c.String("mode") canary, err := canary.NewCanaryRunner(&cfg) if err != nil { log.Fatal("Failed to initialize canary: ", err) } - if err := canary.Run(); err != nil { + if err := canary.Run(mode); err != nil { log.Fatal("Failed to run canary: ", err) } } @@ -119,7 +121,14 @@ func buildCLI() *cli.App { app.Commands = []cli.Command{ { Name: "start", - Usage: "start cadence canary", + Usage: "start cadence canary worker or cron, or both", + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "mode, m", + Value: canary.ModeWorker, + Usage: fmt.Sprintf("%v, %v or %v", canary.ModeWorker, canary.ModeCronCanary, canary.ModeAll), + }, + }, Action: func(c *cli.Context) { startHandler(c) },