Skip to content

Commit

Permalink
Open source bench test (Part 2) (cadence-workflow#3998)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Feb 19, 2021
1 parent 8aba053 commit 1106daa
Show file tree
Hide file tree
Showing 24 changed files with 2,298 additions and 79 deletions.
141 changes: 140 additions & 1 deletion bench/lib/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,35 @@ type (
NumTaskLists int `yaml:"numTaskLists"`
}

// TODO: add comment for each config field

// CronTestConfig contains the configuration for running a set of
// testsuites in parallel based on a cron schedule
CronTestConfig struct {
TestSuites []TestSuiteConfig `yaml:"testSuites"`
}

// TestSuiteConfig contains the configration for running a set of
// tests sequentially
TestSuiteConfig struct {
Name string `yaml:"name"`
Domain string `yaml:"domain"`
Configs []AggregatedTestConfig `yaml:"configs"`
}

// AggregatedTestConfig contains the configration for a single test
AggregatedTestConfig struct {
Name string `yaml:"name"`
Description string `yaml:"description"`
TimeoutInSeconds int32 `yaml:"timeoutInSeconds"`
Basic *BasicTestConfig `yaml:"basic"`
Signal *SignalTestConfig `yaml:"signal"`
Timer *TimerTestConfig `yaml:"timer"`
ConcurrentExec *ConcurrentExecTestConfig `yaml:"concurrentExec"`
Cancellation *CancellationTestConfig `yaml:"cancellation"`
}

// BasicTestConfig contains the configuration for running the Basic test scenario
// TODO: update comment
BasicTestConfig struct {
TotalLaunchCount int `yaml:"totalLaunchCount"`
RoutineCount int `yaml:"routineCount"`
Expand All @@ -63,6 +90,118 @@ type (
PanicStressWorkflow bool `yaml:"panicStressWorkflow"` // default false
FailureThreshold float64 `yaml:"failureThreshold"`
}

// SignalTestConfig is the parameters for signalLoadTestWorkflow
SignalTestConfig struct {
// LoaderCount defines how many loader activities
LoaderCount int `yaml:"loaderCount"`
// LoadTestWorkflowCount defines how many load test workflow in total
LoadTestWorkflowCount int `yaml:"loadTestWorkflowCount"`
// SignalCount is the number of signals per workflow
SignalCount int `yaml:"signalCount"`
// SignalDataSize is the size of signal data
SignalDataSize int `yaml:"signalDataSize"`
// RateLimit is per loader rate limit to hit cadence server
RateLimit int `yaml:"rateLimit"`
WorkflowExecutionTimeoutInSeconds int `yaml:"workflowExecutionTimeoutInSeconds"`
DecisionTaskTimeoutInSeconds int `yaml:"decisionTaskTimeoutInSeconds"`
FailureThreshold float64 `yaml:"failureThreshold"`
ProcessSignalWorkflowConfig
}

// ProcessSignalWorkflowConfig is the parameters to process signal workflow
ProcessSignalWorkflowConfig struct {
// CampaignCount is the number of local activities to be executed
CampaignCount int `yaml:"campaignCount"`
// ActionRate is probability that local activities result in actual action
ActionRate float64 `yaml:"actionRate"`
// Local activity failure rate
FailureRate float64 `yaml:"failureRate"`
// SignalCount before continue as new
SignalBeforeContinueAsNew int `yaml:"signalBeforeContinueAsNew"`
EnableRollingWindow bool `yaml:"enableRollingWindow"`
ScheduleTimeNano int64 `yaml:"scheduleTimeNano"`
MaxSignalDelayInSeconds int `yaml:"maxSignalDelayInSeconds"`
MaxSignalDelayCount int `yaml:"maxSignalDelayCount"`
}

// TimerTestConfig contains the config for running timer bench test
TimerTestConfig struct {
// TotalTimerCount is the total number of timers to fire
TotalTimerCount int `yaml:"totalTimerCount"`

// TimerPerWorkflow is the number of timers in each workflow
// workflow will continue execution and complete when the first timer fires
// Set this number larger than one to test no-op timer case
// TotalTimerCount / TimerPerWorkflow = total number of workflows
TimerPerWorkflow int `yaml:"timerPerWorkflow"`

// ShortestTimerDurationInSeconds after test start, the first timer will fire
ShortestTimerDurationInSeconds int `yaml:"shortestTimerDurationInSeconds"`

// LongestTimerDurationInSeconds after test start, the last timer will fire
LongestTimerDurationInSeconds int `yaml:"longestTimerDurationInSeconds"`

// MaxTimerLatencyInSeconds specifies the maximum latency for the first timer in the workflow
// if a timer's latency is larger than this value, that timer will be considered as failed
// if > 1% timer fire beyond this threshold, the test will fail
MaxTimerLatencyInSeconds int `yaml:"maxTimerLatencyInSeconds"`

// TimerTimeoutInSeconds specifies the duration beyond which a timer is considered as lost and fail the test
TimerTimeoutInSeconds int `yaml:"timerTimeoutInSeconds"`

// RoutineCount is the number of goroutines used for starting workflows
// approx. RPS = 10 * RoutineCount
// # of workflows = TotalTimerCount / TimerPerWorkflow
// please make sure ShortestTimerDurationInSeconds > # of workflows / RPS, so that timer starts firing
// after all workflows has been started.
// please also make sure test timeout > LongestTimerDurationInSeconds + TimerTimeoutInSeconds
RoutineCount int `yaml:"routineCount"`
}

// ConcurrentExecTestConfig contains the config for running concurrent execution test
ConcurrentExecTestConfig struct {
// TotalBatches is the total number of batches
TotalBatches int `yaml:"totalBatches"`

// Concurrency specifies the number of batches that will be run concurrently
Concurrency int `yaml:"concurrency"`

// BatchType specifies the type of batch, can be either "activity" or "childWorkflow", case insensitive
BatchType string `yaml:"batchType"`

// BatchSize specifies the number of activities or childWorkflows scheduled in a single decision batch
BatchSize int `yaml:"batchSize"`

// BatchPeriodInSeconds specifies the time interval between two set of batches (each set has #concurrency batches)
BatchPeriodInSeconds int `yaml:"batchPeriodInSeconds"`

// BatchMaxLatencyInSeconds specifies the max latency for scheduling/starting the activity or childWorkflow.
// if latency is higher than this number, the corresponding activity or childWorkflow will be
// considered as failed. This bench test is considered as success if:
// avg(succeed activity or childWorkflow / batchSize) >= 0.99
// If any of the activity or childWorkflow returns an error (for example, execution takes longer than BatchTimeoutInSeconds),
// the bench test will fail immediately
BatchMaxLatencyInSeconds int `yaml:"batchMaxLatencyInSeconds"`

// BatchTimeoutInSeconds specifies the timeout for each batch execution
BatchTimeoutInSeconds int `yaml:"batchTimeoutInSeconds"`
}

// CancellationTestConfig contains the config for running workflow cancellation test
CancellationTestConfig struct {
// TotalLaunchCount is the total number of workflow to start
// note: make sure TotalLaunchCount mod Concurrency = 0 otherwise the validation for the test will fail
TotalLaunchCount int `yaml:"totalLaunchCount"`

// Concurrency specifies the concurrency for start and cancel workflow execution
// approx. RPS = 10 * concurrency, approx. duration = totalLaunchCount / RPS
Concurrency int `yaml:"concurrency"`

// ContextTimeoutInSeconds specifies the context timeout for start and cancel workflow execution call
// default: 3s
ContextTimeoutInSeconds int `yaml:"contextTimeoutInSeconds"`
}
)

func (c *Config) Validate() error {
Expand Down
6 changes: 4 additions & 2 deletions bench/lib/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/uber-go/tally"
"go.uber.org/cadence/workflow"
"go.uber.org/zap"

"github.com/uber/cadence/common"
)

// counters go here
Expand Down Expand Up @@ -84,7 +86,7 @@ func RecordActivityStart(
scheduledTimeNanos int64,
) (tally.Scope, tally.Stopwatch) {
scope = scope.Tagged(map[string]string{"operation": name})
elapsed := MaxInt64(0, time.Now().UnixNano()-scheduledTimeNanos)
elapsed := common.MaxInt64(0, time.Now().UnixNano()-scheduledTimeNanos)
scope.Timer(startLatency).Record(time.Duration(elapsed))
scope.Counter(startedCount).Inc(1)
sw := scope.Timer(latency).Start()
Expand Down Expand Up @@ -123,7 +125,7 @@ func recordWorkflowStart(
) *WorkflowMetricsProfile {
now := workflow.Now(ctx).UnixNano()
scope := workflowMetricScope(ctx, wfType)
elapsed := MaxInt64(0, now-scheduledTimeNanos)
elapsed := common.MaxInt64(0, now-scheduledTimeNanos)
scope.Timer(startLatency).Record(time.Duration(elapsed))
scope.Counter(startedCount).Inc(1)
return &WorkflowMetricsProfile{
Expand Down
66 changes: 0 additions & 66 deletions bench/lib/util.go

This file was deleted.

11 changes: 6 additions & 5 deletions bench/load/basic/launchWorkflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (

"github.com/uber/cadence/bench/lib"
"github.com/uber/cadence/bench/load/common"
c "github.com/uber/cadence/common"
)

const (
Expand Down Expand Up @@ -259,14 +260,14 @@ func verifyResultActivity(

// verify if any open workflow
listWorkflowRequest := &shared.ListOpenWorkflowExecutionsRequest{
Domain: lib.StringPtr(info.WorkflowDomain),
Domain: c.StringPtr(info.WorkflowDomain),
MaximumPageSize: &maxPageSize,
StartTimeFilter: &shared.StartTimeFilter{
EarliestTime: lib.Int64Ptr(params.WorkflowStartTime),
LatestTime: lib.Int64Ptr(time.Now().UnixNano()),
EarliestTime: c.Int64Ptr(params.WorkflowStartTime),
LatestTime: c.Int64Ptr(time.Now().UnixNano()),
},
TypeFilter: &shared.WorkflowTypeFilter{
Name: lib.StringPtr(stressWorkflowName),
Name: c.StringPtr(stressWorkflowName),
},
}
openWorkflow, err := cc.ListOpenWorkflow(ctx, listWorkflowRequest)
Expand All @@ -291,7 +292,7 @@ func verifyResultActivity(
params.WorkflowStartTime,
time.Now().UnixNano())
request := &shared.CountWorkflowExecutionsRequest{
Domain: lib.StringPtr(info.WorkflowDomain),
Domain: c.StringPtr(info.WorkflowDomain),
Query: &query,
}
resp, err := cc.CountWorkflow(ctx, request)
Expand Down
Loading

0 comments on commit 1106daa

Please sign in to comment.