diff --git a/Makefile b/Makefile index 07705c99375..82dd9edf0d4 100644 --- a/Makefile +++ b/Makefile @@ -8,34 +8,12 @@ THRIFT_SRCS = idl/code.uber.internal/devexp/minions/minions.thrift \ idl/code.uber.internal/devexp/minions/matching.thrift \ # list all executables -PROGS = minions \ - cmd/stress/stress \ - cmd/demo/demo \ +PROGS = minions minions: main.go \ $(wildcard config/*.go) \ $(wildcard service/*.go) \ -cmd/stress/stress: cmd/stress/main.go \ - $(wildcard health/driver/*.go) \ - $(wildcard health/stress/*.go) \ - $(wildcard test/flow/*.go) \ - $(wildcard test/workflow/*.go) \ - $(wildcard common/*.go) \ - $(wildcard common/**/*.go) \ - $(wildcard workflow/*.go) \ - $(wildcard persistence/*.go) \ - $(wildcard store/*.go) \ - -cmd/demo/demo: cmd/demo/*.go \ - $(wildcard test/flow/*.go) \ - $(wildcard test/workflow/*.go) \ - $(wildcard common/*.go) \ - $(wildcard common/**/*.go) \ - $(wildcard workflow/*.go) \ - $(wildcard persistence/*.go) \ - $(wildcard store/*.go) \ - -include go-build/rules.mk go-build/rules.mk: diff --git a/cmd/demo/greetingsPlain.go b/cmd/demo/greetingsPlain.go deleted file mode 100644 index 34f1c0f6ec0..00000000000 --- a/cmd/demo/greetingsPlain.go +++ /dev/null @@ -1,11 +0,0 @@ -package main - -type greeting struct { - operations Operations -} - -func (g greeting) ExecuteWorkflow() { - greeting := g.operations.Greeting() - name := g.operations.Name() - g.operations.sayGreeting(greeting + " " + name) -} diff --git a/cmd/demo/greetingsWorkflow.go b/cmd/demo/greetingsWorkflow.go deleted file mode 100644 index e258c94ee0d..00000000000 --- a/cmd/demo/greetingsWorkflow.go +++ /dev/null @@ -1,75 +0,0 @@ -package main - -import ( - "encoding/json" - "fmt" - - "code.uber.internal/devexp/minions/test/flow" - "code.uber.internal/devexp/minions/test/workflow" -) - -type ( - // Workflow Deciders and Activities. - greetingsWorkflow struct{} - getNameActivity struct{} - getGreetingActivity struct{} - sayGreetingActivity struct{} - - sayGreetingActivityRequest struct { - Name string - Greeting string - } -) - -// Greetings Workflow Decider. -func (w greetingsWorkflow) Execute(ctx workflow.Context, input []byte) (result []byte, err workflow.Error) { - // Get Greeting. - greetResult, err := ctx.ExecuteActivity(activityInfo("getGreetingActivity")) - if err != nil { - return nil, err - } - - // Get Name. - nameResult, err := ctx.ExecuteActivity(activityInfo("getNameActivity")) - if err != nil { - return nil, err - } - - // Say Greeting. - request := &sayGreetingActivityRequest{Name: string(nameResult), Greeting: string(greetResult)} - _, err = ctx.ExecuteActivity(activityInfoWithInput("sayGreetingActivity", request)) - if err != nil { - return nil, err - } - - return nil, nil -} - -// Get Name Activity. -func (g getNameActivity) Execute(context flow.ActivityExecutionContext, input []byte) ([]byte, flow.Error) { - var name string - fmt.Printf("Enter the Name: ") - fmt.Scanf("%s", &name) - return []byte(name), nil -} - -// Get Greeting Activity. -func (ga getGreetingActivity) Execute(context flow.ActivityExecutionContext, input []byte) ([]byte, flow.Error) { - var greeting string - fmt.Printf("Enter the Greeting: ") - fmt.Scanf("%s", &greeting) - return []byte(greeting), nil -} - -// Say Greeting Activity. -func (ga sayGreetingActivity) Execute(context flow.ActivityExecutionContext, input []byte) ([]byte, flow.Error) { - greeetingParams := &sayGreetingActivityRequest{} - err := json.Unmarshal(input, greeetingParams) - if err != nil { - return nil, &workflowError{reason: err.Error()} - } - - fmt.Printf("Saying Final Greeting: ") - fmt.Printf("%s %s!\n", greeetingParams.Greeting, greeetingParams.Name) - return nil, nil -} diff --git a/cmd/demo/main.go b/cmd/demo/main.go deleted file mode 100644 index 65afad6c801..00000000000 --- a/cmd/demo/main.go +++ /dev/null @@ -1,269 +0,0 @@ -package main - -import ( - "flag" - "io/ioutil" - slog "log" - "os" - "strings" - "sync" - "time" - - "github.com/uber-common/bark" - "github.com/uber/tchannel-go" - "github.com/uber/tchannel-go/thrift" - - "code.uber.internal/go-common.git/x/config" - "code.uber.internal/go-common.git/x/log" - "code.uber.internal/go-common.git/x/metrics" - jaeger "github.com/uber/jaeger-client-go/config" - - m "code.uber.internal/devexp/minions/.gen/go/minions" - "code.uber.internal/devexp/minions/common" - "code.uber.internal/devexp/minions/health/driver" - "code.uber.internal/devexp/minions/persistence" - "code.uber.internal/devexp/minions/workflow" - wmetrics "code.uber.internal/devexp/minions/workflow/metrics" - "code.uber.internal/go-common.git/x/tchannel" - "github.com/Sirupsen/logrus" - "github.com/pborman/uuid" -) - -// Host is created for each host running the stress test -type ( - Host struct { - hostName string - engine workflow.Engine - reporter common.Reporter - - instancesWG sync.WaitGroup - doneCh chan struct{} - config Configuration - flowConfig *workflowConfig - runOnMinionsProduction bool - } - - // Configuration is the configuration used by cherami-stress - Configuration struct { - TChannel xtchannel.Configuration - Logging log.Configuration - Jaeger jaeger.Configuration - Metrics metrics.Configuration `yaml:"metrics"` - } -) - -var generalMetrics = map[common.MetricName]common.MetricType{ - common.WorkflowsStartTotalCounter: common.Counter, - common.WorkflowsCompletionTotalCounter: common.Counter, - common.ActivitiesTotalCounter: common.Counter, - common.DecisionsTotalCounter: common.Counter, - common.WorkflowEndToEndLatency: common.Timer, - common.ActivityEndToEndLatency: common.Timer, - common.DecisionsEndToEndLatency: common.Timer, -} - -// NewStressHost creates an instance of stress host -func newHost(engine workflow.Engine, instanceName string, reporter common.Reporter, - runOnMinionsProduction bool, config Configuration, flowConfig *workflowConfig) *Host { - h := &Host{ - engine: engine, - hostName: instanceName, - reporter: reporter, - doneCh: make(chan struct{}), - config: config, - flowConfig: flowConfig, - runOnMinionsProduction: runOnMinionsProduction, - } - - h.reporter.InitMetrics(generalMetrics) - return h -} - -// GetThriftClient gets thrift client. -func (s *Host) GetThriftClient(tchan *tchannel.Channel) m.TChanWorkflowService { - tclient := thrift.NewClient(tchan, "uber-minions", nil) - return m.NewTChanWorkflowServiceClient(tclient) -} - -// Start is used the start the stress host -func (s *Host) Start() { - launchCh := make(chan struct{}) - - go func() { - - var service m.TChanWorkflowService - - if s.runOnMinionsProduction { - // TChannel to production. - tchan, err := s.config.TChannel.NewClient("demo-client", nil) - if err != nil { - log.Panicf("Failed to get a client for the uber-minions: %s\n", err.Error()) - } - service = s.GetThriftClient(tchan) - } else { - serviceMockEngine := driver.NewServiceMockEngine(s.engine) - serviceMockEngine.Start() - service = serviceMockEngine - } - - if s.flowConfig.replayWorkflow { - replayWorkflow(service, s.reporter, s.flowConfig) - } else { - launchDemoWorkflow(service, s.reporter, s.flowConfig) - } - - // close(launchCh) - }() - - if _, ok := s.reporter.(*common.SimpleReporter); ok { - go s.printMetric() - } - - <-launchCh - - close(s.doneCh) -} - -func (s *Host) printMetric() { - sr, ok := s.reporter.(*common.SimpleReporter) - if !ok { - log.Error("Invalid reporter passed to printMetric.") - return - } - - ticker := time.NewTicker(2 * time.Second) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - // sr.PrintStressMetric() - // sr.PrintFinalMetric() - if sr.IsProcessComplete() { - // sr.PrintFinalMetric() - return - } - - case <-s.doneCh: - return - } - } -} - -func main() { - var host string - var emitMetric string - var runActivities bool - var runDeciders bool - var startCount int - var panicWorkflow bool - var runGreeterActivity bool - var runNameActivity bool - var runSayGreetingActivity bool - var runOnMinionsProduction bool - var useWorkflowID string - var useRunID string - var replayWorkflow bool - - flag.StringVar(&host, "host", "127.0.0.1", "Cassandra host to use.") - flag.BoolVar(&runActivities, "runActivities", false, "Run activites.") - flag.BoolVar(&runDeciders, "runDeciders", false, "Run deciders.") - flag.IntVar(&startCount, "startCount", 0, "start count workflows.") - flag.StringVar(&emitMetric, "emitMetric", "local", "Metric source: m3 | local") - flag.BoolVar(&panicWorkflow, "panicWorkflow", false, "To run panic workflow.") - flag.BoolVar(&runGreeterActivity, "runGreeterActivity", false, "To run greeter activity.") - flag.BoolVar(&runNameActivity, "runNameActivity", false, "To run name activity.") - flag.BoolVar(&runSayGreetingActivity, "runSayGreetingActivity", false, "To run say greeting activity.") - flag.BoolVar(&runOnMinionsProduction, "runOnMinionsProduction", false, "Run against the minions production") - flag.StringVar(&useWorkflowID, "useWorkflowID", uuid.New(), "Use this as workflow ID") - flag.StringVar(&useRunID, "useRunID", uuid.New(), "Use this as Run ID") - flag.BoolVar(&replayWorkflow, "replayWorkflow", false, "replay Workflow") - - flag.Parse() - - // TODO: For demo disable go cql logging. - slog.SetOutput(ioutil.Discard) - - // log.Info("Starting Demo.") - var cfg Configuration - if err := config.Load(&cfg); err != nil { - log.WithField(common.TagErr, err).Fatal(`Error initializing configuration`) - } - log.Configure(&cfg.Logging, true) - // log.Infof("Logging Level: %v", cfg.Logging.Level) - - if host == "" { - ip, err := tchannel.ListenIP() - if err != nil { - log.WithField(common.TagErr, err).Fatal(`Failed to find local listen IP`) - } - - host = ip.String() - } - - instanceName, e := os.Hostname() - if e != nil { - log.WithField(common.TagErr, e).Fatal(`Error getting hostname`) - } - instanceName = strings.Replace(instanceName, "-", ".", -1) - - var reporter common.Reporter - if emitMetric == "m3" { - log.Infof("M3 metric reporter: hostport=%v, service=%v", cfg.Metrics.M3.HostPort, cfg.Metrics.M3.Service) - m, e := cfg.Metrics.New() - if e != nil { - log.WithField(common.TagErr, e).Fatal(`Failed to initialize metrics`) - } - - // create the common tags to be used by all services - reporter = common.NewM3Reporter(m, map[string]string{ - common.TagHostname: instanceName, - }) - } else { - reporter = common.NewSimpleReporter(nil, nil) - } - - m3ReporterClient := wmetrics.NewClient(reporter, wmetrics.Workflow) - - var engine workflow.Engine - - if !runOnMinionsProduction { - if host == "127.0.0.1" { - testBase := workflow.TestBase{} - options := workflow.TestBaseOptions{} - options.ClusterHost = host - options.KeySpace = "workflow" - options.DropKeySpace = false - testBase.SetupWorkflowStoreWithOptions(options.TestBaseOptions) - - logrus.SetLevel(logrus.InfoLevel) - - engine = workflow.NewWorkflowEngine(testBase.WorkflowMgr, testBase.TaskMgr, bark.NewLoggerFromLogrus(logrus.New())) - } else { - executionPersistence, err2 := persistence.NewCassandraWorkflowExecutionPersistence(host, "workflow") - if err2 != nil { - panic(err2) - } - - executionPersistenceClient := workflow.NewWorkflowExecutionPersistenceClient(executionPersistence, m3ReporterClient) - - taskPersistence, err3 := persistence.NewCassandraTaskPersistence(host, "workflow") - if err3 != nil { - panic(err3) - } - - taskPersistenceClient := workflow.NewTaskPersistenceClient(taskPersistence, m3ReporterClient) - - engine = workflow.NewEngineWithMetricsImpl( - workflow.NewWorkflowEngine(executionPersistenceClient, taskPersistenceClient, log.WithField("host", "workflow_host")), - m3ReporterClient) - } - } - - config := &workflowConfig{runActivities: runActivities, runDeciders: runDeciders, - startCount: startCount, panicWorkflow: panicWorkflow, runGreeterActivity: runGreeterActivity, - runNameActivity: runNameActivity, runSayGreetingActivity: runSayGreetingActivity, - useWorkflowID: useWorkflowID, useRunID: useRunID, replayWorkflow: replayWorkflow} - h := newHost(engine, instanceName, reporter, runOnMinionsProduction, cfg, config) - h.Start() -} diff --git a/cmd/demo/panicWorkflow.go b/cmd/demo/panicWorkflow.go deleted file mode 100644 index 594f6248c8f..00000000000 --- a/cmd/demo/panicWorkflow.go +++ /dev/null @@ -1,39 +0,0 @@ -package main - -import ( - "code.uber.internal/devexp/minions/test/flow" - "code.uber.internal/devexp/minions/test/workflow" -) - -type ( - panicWorkflow struct { - } - - simpleActivity struct { - } -) - -// panicWorkflow decider code. -func (w panicWorkflow) Execute(ctx workflow.Context, input []byte) (result []byte, err workflow.Error) { - - c1 := ctx.NewChannel() - - ctx.Go(func(ctx workflow.Context) { - simpleActivityParams := serializeParams("simpleActivity", nil) - _, err = ctx.ExecuteActivity(simpleActivityParams) - - if err != nil { - panic("Simulated failure") - } - c1.Send(ctx, true) - }) - - c1.Recv(ctx) - - return nil, nil -} - -// simpleActivity activity that fails with error. -func (g simpleActivity) Execute(context flow.ActivityExecutionContext, input []byte) ([]byte, flow.Error) { - return nil, &workflowError{reason: "failed connection"} -} diff --git a/cmd/demo/startWorkflow.go b/cmd/demo/startWorkflow.go deleted file mode 100644 index 839416b9238..00000000000 --- a/cmd/demo/startWorkflow.go +++ /dev/null @@ -1,216 +0,0 @@ -package main - -import ( - "encoding/json" - "time" - - m "code.uber.internal/devexp/minions/.gen/go/minions" - gen "code.uber.internal/devexp/minions/.gen/go/shared" - "code.uber.internal/devexp/minions/common" - "code.uber.internal/devexp/minions/test/flow" - "code.uber.internal/devexp/minions/test/workflow" - log "github.com/Sirupsen/logrus" - "github.com/pborman/uuid" - "github.com/uber/tchannel-go/thrift" -) - -type ( - workflowConfig struct { - runActivities bool - runDeciders bool - startCount int - panicWorkflow bool - runGreeterActivity bool - runNameActivity bool - runSayGreetingActivity bool - useWorkflowID string - useRunID string - replayWorkflow bool - } - - workflowError struct { - reason string - details []byte - } - - // Operations exposed - Operations interface { - Name() string - Greeting() string - sayGreeting(greeting string) - } -) - -func activityInfo(activityName string) flow.ExecuteActivityParameters { - return serializeParams(activityName, nil) -} - -func activityInfoWithInput(activityName string, request *sayGreetingActivityRequest) flow.ExecuteActivityParameters { - sayGreetInput, err := json.Marshal(request) - if err != nil { - log.Panicf("Marshalling failed with error: %+v", err) - } - return serializeParams(activityName, sayGreetInput) -} - -func serializeParams(activityName string, input []byte) flow.ExecuteActivityParameters { - return flow.ExecuteActivityParameters{ - TaskListName: "testTaskList" + "-" + activityName, - ActivityType: gen.ActivityType{Name: common.StringPtr(activityName)}, - Input: input} -} - -func (we *workflowError) Reason() string { - return we.reason -} - -func (we *workflowError) Details() []byte { - return we.details -} - -func (we *workflowError) Error() string { - return we.reason -} - -func replayWorkflow(service m.TChanWorkflowService, reporter common.Reporter, config *workflowConfig) { - - // greetingsWorkflow := workflow.NewWorkflowDefinition(greetingsWorkflow{}) - // workflowFactory := func(wt m.WorkflowType) (flow.WorkflowDefinition, flow.Error) { - // switch wt.GetName() { - // case "greetingsWorkflow": - // return greetingsWorkflow, nil - // case "panicWorkflow": - // return workflow.NewWorkflowDefinition(panicWorkflow{}), nil - // } - // panic("Invalid workflow type") - // } - - if config.replayWorkflow { - - ctx, cancel := thrift.NewContext(10 * time.Second) - defer cancel() - - executionInfo := &gen.WorkflowExecution{ - WorkflowId: common.StringPtr(config.useWorkflowID), - RunId: common.StringPtr(config.useRunID)} - - request := &gen.GetWorkflowExecutionHistoryRequest{ - Execution: executionInfo} - _, err := service.GetWorkflowExecutionHistory(ctx, request) - if err != nil { - log.Panicf("GetWorkflowExecutionHistory failed with error: %s", err.Error()) - } - - // taskHandler := flow.NewWorkflowTaskHandlerImpl("testTask", "testID", workflowFactory, logger, reporter) - // workflowTask := &flow.WorkflowTask{Task: &m.PollForDecisionTaskResponse{ - // TaskToken: []byte("test-token"), - // History: historyResponse.GetHistory(), - // WorkflowExecution: executionInfo, - // WorkflowType: &m.WorkflowType{Name: common.StringPtr("greetingsWorkflow")}, - // }} - - // _, stackTrace, err := taskHandler.ProcessWorkflowTask(workflowTask, true) - // if err != nil { - // log.Panicf("TaskHandler::ProcessWorkflowTask failed with error: %s", err.Error()) - // } - // log.Infof("ProcessWorkflowTask: Stack Trace: %s", stackTrace) - - return - } -} - -func launchDemoWorkflow(service m.TChanWorkflowService, reporter common.Reporter, config *workflowConfig) { - logger := log.WithFields(log.Fields{}) - - // Workflow execution parameters. - workflowExecutionParameters := flow.WorkerExecutionParameters{} - workflowExecutionParameters.TaskListName = "testTaskList" - workflowExecutionParameters.ConcurrentPollRoutineSize = 4 - - workflowFactory := func(wt gen.WorkflowType) (flow.WorkflowDefinition, flow.Error) { - switch wt.GetName() { - case "greetingsWorkflow": - return workflow.NewWorkflowDefinition(greetingsWorkflow{}), nil - case "panicWorkflow": - return workflow.NewWorkflowDefinition(panicWorkflow{}), nil - } - panic("Invalid workflow type") - } - - activityFactory := func(at gen.ActivityType) (flow.ActivityImplementation, flow.Error) { - switch at.GetName() { - case "getGreetingActivity": - return getGreetingActivity{}, nil - case "getNameActivity": - return getNameActivity{}, nil - case "sayGreetingActivity": - return sayGreetingActivity{}, nil - case "simpleActivity": - return simpleActivity{}, nil - } - panic("Invalid activity type") - } - - if config.runDeciders { - // Launch worker. - workflowWorker := flow.NewWorkflowWorker(workflowExecutionParameters, workflowFactory, service, logger, reporter) - workflowWorker.Start() - log.Infoln("Started Deciders for worklows.") - } - - taskListActivitySuffix := "" - switch { - case config.runGreeterActivity: - taskListActivitySuffix = "-getGreetingActivity" - case config.runNameActivity: - taskListActivitySuffix = "-getNameActivity" - case config.runSayGreetingActivity: - taskListActivitySuffix = "-sayGreetingActivity" - } - - // Create activity execution parameters. - activityExecutionParameters := flow.WorkerExecutionParameters{} - activityExecutionParameters.TaskListName = "testTaskList" + taskListActivitySuffix - activityExecutionParameters.ConcurrentPollRoutineSize = 10 - - if config.runActivities { - // Register activity instances and launch the worker. - activityWorker := flow.NewActivityWorker(activityExecutionParameters, activityFactory, service, logger, reporter) - activityWorker.Start() - log.Infoln("Started activities for workflows.") - } - - startedWorkflowsCount := 0 - - workflowName := "greetingsWorkflow" - if config.panicWorkflow { - workflowName = "panicWorkflow" - } - - for i := 0; i < config.startCount; i++ { - // Start a workflow. - workflowID := config.useWorkflowID - if i > 0 { - workflowID = uuid.New() - } - workflowOptions := flow.StartWorkflowOptions{ - WorkflowID: workflowID, - WorkflowType: gen.WorkflowType{Name: common.StringPtr(workflowName)}, - TaskListName: "testTaskList", - WorkflowInput: nil, - ExecutionStartToCloseTimeoutSeconds: 10, - DecisionTaskStartToCloseTimeoutSeconds: 10, - } - workflowClient := flow.NewWorkflowClient(workflowOptions, service, reporter) - _, err := workflowClient.StartWorkflowExecution() - if err != nil { - panic("Failed to create workflow.") - } else { - startedWorkflowsCount++ - } - } - - if startedWorkflowsCount > 0 { - log.Infof("Started %d %s.\n", startedWorkflowsCount, workflowName) - } -} diff --git a/cmd/stress/main.go b/cmd/stress/main.go deleted file mode 100644 index b0c985a5470..00000000000 --- a/cmd/stress/main.go +++ /dev/null @@ -1,120 +0,0 @@ -package main - -import ( - "flag" - "math/rand" - "os" - "strings" - "time" - - "github.com/uber/tchannel-go" - - "code.uber.internal/go-common.git/x/config" - "code.uber.internal/go-common.git/x/log" - - "code.uber.internal/devexp/minions/common" - s "code.uber.internal/devexp/minions/health/stress" - "code.uber.internal/devexp/minions/persistence" - "code.uber.internal/devexp/minions/workflow" - wmetrics "code.uber.internal/devexp/minions/workflow/metrics" -) - -func main() { - var host string - var emitMetric string - var runOnMinionsProduction bool - - flag.StringVar(&host, "host", "", "Cassandra host to use.") - flag.StringVar(&emitMetric, "emitMetric", "local", "Metric source: m3 | local") - flag.BoolVar(&runOnMinionsProduction, "runOnMinionsProduction", false, "Run against the minions production") - - flag.Parse() - - log.Info("Starting stress.") - var cfg s.Configuration - if err := config.Load(&cfg); err != nil { - log.WithField(common.TagErr, err).Fatal(`Error initializing configuration`) - } - log.Configure(&cfg.Logging, true) - log.Infof("Logging Level: %v", cfg.Logging.Level) - - if host == "" { - ip, err := tchannel.ListenIP() - if err != nil { - log.WithField(common.TagErr, err).Fatal(`Failed to find local listen IP`) - } - - host = ip.String() - } - - instanceName, e := os.Hostname() - if e != nil { - log.WithField(common.TagErr, e).Fatal(`Error getting hostname`) - } - instanceName = strings.Replace(instanceName, "-", ".", -1) - - var reporter common.Reporter - if emitMetric == "m3" { - log.Infof("M3 metric reporter: hostport=%v, service=%v", cfg.Metrics.M3.HostPort, cfg.Metrics.M3.Service) - m, e := cfg.Metrics.New() - if e != nil { - log.WithField(common.TagErr, e).Fatal(`Failed to initialize metrics`) - } - - // create the common tags to be used by all services - reporter = common.NewM3Reporter(m, map[string]string{ - common.TagHostname: instanceName, - }) - } else { - reporter = common.NewSimpleReporter(nil, nil) - } - - m3ReporterClient := wmetrics.NewClient(reporter, wmetrics.Workflow) - - var engine workflow.Engine - - if !runOnMinionsProduction { - if host == "127.0.0.1" { - testBase := workflow.TestBase{} - options := workflow.TestBaseOptions{} - options.ClusterHost = host - options.DropKeySpace = true - testBase.SetupWorkflowStoreWithOptions(options.TestBaseOptions) - engine = workflow.NewWorkflowEngine(testBase.WorkflowMgr, testBase.TaskMgr, log.WithField("host", "workflow_host")) - } else { - executionPersistence, err2 := persistence.NewCassandraWorkflowExecutionPersistence(host, "workflow") - if err2 != nil { - panic(err2) - } - - executionPersistenceClient := workflow.NewWorkflowExecutionPersistenceClient(executionPersistence, m3ReporterClient) - - taskPersistence, err3 := persistence.NewCassandraTaskPersistence(host, "workflow") - if err3 != nil { - panic(err3) - } - - taskPersistenceClient := workflow.NewTaskPersistenceClient(taskPersistence, m3ReporterClient) - - engine = workflow.NewEngineWithMetricsImpl( - workflow.NewWorkflowEngine(executionPersistenceClient, taskPersistenceClient, log.WithField("host", "workflow_host")), - m3ReporterClient) - } - h := s.NewStressHost(engine, instanceName, cfg, reporter, false /* runOnMinionsProduction */) - h.Start() - } else { - // Running against production. - h := s.NewStressHost(nil, instanceName, cfg, reporter, runOnMinionsProduction) - h.Start() - } -} - -func generateRandomKeyspace(n int) string { - rand.Seed(time.Now().UnixNano()) - letterRunes := []rune("workflow") - b := make([]rune, n) - for i := range b { - b[i] = letterRunes[rand.Intn(len(letterRunes))] - } - return string(b) -} diff --git a/common/m3reporter.go b/common/m3reporter.go deleted file mode 100644 index fbf19edce0c..00000000000 --- a/common/m3reporter.go +++ /dev/null @@ -1,214 +0,0 @@ -package common - -import ( - "time" - - "code.uber.internal/go-common.git/x/log" - m3 "code.uber.internal/go-common.git/x/metrics" -) - -// M3Reporter is the struct which implements the Reporter interface to -// report metrics to M3 -type M3Reporter struct { - // ms is the M3 Scope which will be initialized when we start the service - ms m3.Scope - - // tags associated with this object - tags map[string]string - - // timerMetrics is the map of all Timer style metrics - // we keep this so that we don't create the object every single time - timerMetrics map[string]*m3.Timer - - // counterMetrics is the map of all Counter style metrics - counterMetrics map[string]*m3.Counter - - // gaugeMetrics is the map of all Gauge style metrics - gaugeMetrics map[string]*m3.Gauge -} - -// M3Stopwatch is the struct which implements the Stopwatch interface to -// start and stop the timer -type M3Stopwatch struct { - mTimer *m3.Timer - tags map[string]string - startTime time.Time -} - -func newStopWatch(mTimer *m3.Timer, tags map[string]string) *M3Stopwatch { - sWatch := &M3Stopwatch{ - mTimer: mTimer, - tags: tags, - } - - return sWatch -} - -// NewM3Reporter returns a StatsReporter that reports to m3 on the given addr. -func NewM3Reporter(ms m3.Scope, tags map[string]string) Reporter { - m3Report := &M3Reporter{ - ms: ms, - tags: make(map[string]string), - timerMetrics: make(map[string]*m3.Timer), - counterMetrics: make(map[string]*m3.Counter), - gaugeMetrics: make(map[string]*m3.Gauge), - } - - if tags != nil { - copyMap(tags, m3Report.tags) - } - - // Tagged returns a new scope with the given tags, merges with any existing tags for the scope ms - m3Report.ms = ms.Tagged(m3Report.GetTags()) - return m3Report -} - -// GetChildReporter creates the child reporter for this parent reporter -func (r *M3Reporter) GetChildReporter(tags map[string]string) Reporter { - - m3Report := &M3Reporter{ - ms: r.ms, - tags: make(map[string]string), - timerMetrics: make(map[string]*m3.Timer), - counterMetrics: make(map[string]*m3.Counter), - gaugeMetrics: make(map[string]*m3.Gauge), - } - - // copy the parent tags as well - copyMap(r.GetTags(), m3Report.GetTags()) - - if tags != nil { - copyMap(tags, m3Report.tags) - } - - // Tagged returns a new child scope with the given tags, merges with any existing tags - m3Report.ms = r.ms.Tagged(m3Report.GetTags()) - - return m3Report -} - -// GetTags returns the tags for this reporter object -func (r *M3Reporter) GetTags() map[string]string { - return r.tags -} - -// getTimerScopeMap returns the respective scope map for this reporter object -func (r *M3Reporter) getTimerScopeMap() map[string]*m3.Timer { - return r.timerMetrics -} - -// getCounterScopeMap returns the respective scope map for this reporter object -func (r *M3Reporter) getCounterScopeMap() map[string]*m3.Counter { - return r.counterMetrics -} - -// getGaugeScopeMap returns the respective scope map for this reporter object -func (r *M3Reporter) getGaugeScopeMap() map[string]*m3.Gauge { - return r.gaugeMetrics -} - -// InitMetrics is used to create the M3 scope objects for the respective metrics -func (r *M3Reporter) InitMetrics(metricMap map[MetricName]MetricType) { - for mName, mType := range metricMap { - switch mType { - case Counter: - r.counterMetrics[string(mName)] = r.ms.Counter(string(mName)) - case Timer: - r.timerMetrics[string(mName)] = r.ms.Timer(string(mName)) - case Gauge: - r.gaugeMetrics[string(mName)] = r.ms.Gauge(string(mName)) - } - } -} - -// IncCounter reports Counter metric to M3 -func (r *M3Reporter) IncCounter(name string, tags map[string]string, delta int64) { - // First we need to make sure we create a counter type for this metric - var mCounter *m3.Counter - var ok bool - // If we don't find this metric, this is a no-op - if mCounter, ok = r.counterMetrics[name]; ok { - // if there are no tags specified, just use the already existing - if tags == nil { - mCounter.Inc(delta) - } else { - mCounter.Tagged(tags).Inc(delta) - } - } else { - log.Errorf("counter metric: %v doesn't have a scope object", name) - } -} - -// UpdateGauge reports Gauge type metric to M3 -func (r *M3Reporter) UpdateGauge(name string, tags map[string]string, value int64) { - // First we need to make sure we create a gauge type for this metric - var mGauge *m3.Gauge - var ok bool - // If we don't find this metric, this is a no-op - if mGauge, ok = r.gaugeMetrics[name]; ok { - // if there are no tags specified, just use the already existing - if tags == nil { - mGauge.Update(value) - } else { - mGauge.Tagged(tags).Update(value) - } - } else { - log.Errorf("gauge metric: %v doesn't have a scope object", name) - } -} - -// StartTimer returns a Stopwatch which when stopped will report the metric to M3 -func (r *M3Reporter) StartTimer(name string, tags map[string]string) Stopwatch { - // First we need to make sure we create a timer type for this metric - var mTimer *m3.Timer - var ok bool - // If we don't find this metric, return an empty stopwatch - if mTimer, ok = r.timerMetrics[name]; ok { - // We need to create a new stopwatch each time because we can update the - // metric simultaneously - sw := newStopWatch(mTimer, tags) - sw.start() - return sw - } - // just return an empty stopwatch - sw := &M3Stopwatch{} - log.Errorf("timer metric: %v doesn't have a scope object", name) - return sw -} - -// RecordTimer should be used for measuring latency when you cannot start the stop watch. -func (r *M3Reporter) RecordTimer(name string, tags map[string]string, d time.Duration) { - // First we need to make sure we create a timer type for this metric - var mTimer *m3.Timer - var ok bool - // If we don't find this metric, return an empty stopwatch - if mTimer, ok = r.timerMetrics[name]; ok { - // if there are no tags specified, just use the already existing - if tags == nil { - mTimer.Record(d) - } else { - mTimer.Tagged(tags).Record(d) - } - } else { - log.Errorf("timer metric: %v doesn't have a scope object", name) - } -} - -// start just sets the start time -func (r *M3Stopwatch) start() { - r.startTime = time.Now() -} - -// Stop stops the stop watch and records the latency to M3 -func (r *M3Stopwatch) Stop() time.Duration { - d := time.Since(r.startTime) - if r.mTimer != nil { - // if there are no tags specified, just use the already existing - if r.tags == nil { - r.mTimer.Record(d) - } else { - r.mTimer.Tagged(r.tags).Record(d) - } - } - return d -} diff --git a/common/simplereporter.go b/common/simplereporter.go index 2403ae3431f..8713be77f23 100644 --- a/common/simplereporter.go +++ b/common/simplereporter.go @@ -4,16 +4,18 @@ import ( "sync/atomic" "time" - "code.uber.internal/go-common.git/x/log" - "code.uber.internal/go-common.git/x/metrics" + "github.com/uber-common/bark" + "github.com/uber-go/tally" ) type ( // SimpleReporter is the reporter used to dump metric to console for stress runs SimpleReporter struct { - scope metrics.Scope + scope tally.Scope tags map[string]string + logger bark.Logger + startTime time.Time workflowsStartCount int64 activitiesTotalCount int64 @@ -53,7 +55,7 @@ const ( ) // NewSimpleReporter create an instance of Reporter which can be used for driver to emit metric to console -func NewSimpleReporter(scope metrics.Scope, tags map[string]string) Reporter { +func NewSimpleReporter(scope tally.Scope, tags map[string]string, logger bark.Logger) Reporter { reporter := &SimpleReporter{ scope: scope, tags: make(map[string]string), @@ -83,7 +85,7 @@ func (r *SimpleReporter) InitMetrics(metricMap map[MetricName]MetricType) { // GetChildReporter creates the child reporter for this parent reporter func (r *SimpleReporter) GetChildReporter(tags map[string]string) Reporter { - sr := NewSimpleReporter(r.GetScope(), tags) + sr := NewSimpleReporter(r.GetScope(), tags, r.logger) // copy the parent tags as well copyMap(r.GetTags(), sr.GetTags()) @@ -97,7 +99,7 @@ func (r *SimpleReporter) GetTags() map[string]string { } // GetScope returns the metrics scope for this reporter -func (r *SimpleReporter) GetScope() metrics.Scope { +func (r *SimpleReporter) GetScope() tally.Scope { return r.scope } @@ -119,7 +121,7 @@ func (r *SimpleReporter) IncCounter(name string, tags map[string]string, delta i case DecisionsEndToEndLatency: atomic.AddInt64(&r.decisionsEndToEndLatency, delta) default: - log.WithField(`name`, name).Error(`Unknown metric`) + r.logger.WithField(`name`, name).Error(`Unknown metric`) } } @@ -197,13 +199,13 @@ func (r *SimpleReporter) PrintStressMetric() { latency = currentLatency / totalWorkflowsCompleted } - log.Infof("Workflows Started(Count=%v, Throughput=%v)", + r.logger.Infof("Workflows Started(Count=%v, Throughput=%v)", totalWorkflowStarted, creationThroughput) - log.Infof("Workflows Completed(Count=%v, Throughput=%v, Average Latency: %v)", + r.logger.Infof("Workflows Completed(Count=%v, Throughput=%v, Average Latency: %v)", totalWorkflowsCompleted, completionThroughput, latency) - log.Infof("Activites(Count=%v, Throughput=%v, Average Latency: %v)", + r.logger.Infof("Activites(Count=%v, Throughput=%v, Average Latency: %v)", totalActivitiesCount, activitiesThroughput, activityLatency) - log.Infof("Decisions(Count=%v, Throughput=%v, Average Latency: %v)", + r.logger.Infof("Decisions(Count=%v, Throughput=%v, Average Latency: %v)", totalDecisionsCount, decisionsThroughput, decisionsLatency) r.previousWorkflowsStartCount = totalWorkflowStarted @@ -249,11 +251,11 @@ func (r *SimpleReporter) PrintFinalMetric() { decisionLatency = totalDecisionsLatency / decisionsCount } - log.Infof("Total Workflows processed:(Started=%v, Completed=%v), Throughput: %v, Average Latency: %v", + r.logger.Infof("Total Workflows processed:(Started=%v, Completed=%v), Throughput: %v, Average Latency: %v", workflowsCount, workflowsCompletedCount, throughput, time.Duration(latency)) - log.Infof("Total Activites processed:(Count=%v, Throughput=%v, Average Latency=%v)", + r.logger.Infof("Total Activites processed:(Count=%v, Throughput=%v, Average Latency=%v)", activitiesCount, activityThroughput, activityLatency) - log.Infof("Total Decisions processed:(Count=%v, Throughput=%v, Average Latency=%v)", + r.logger.Infof("Total Decisions processed:(Count=%v, Throughput=%v, Average Latency=%v)", decisionsCount, decisionsThroughput, decisionLatency) } diff --git a/glide.lock b/glide.lock index 573eb4e60f6..07e5f2f4a1b 100644 --- a/glide.lock +++ b/glide.lock @@ -1,12 +1,11 @@ -hash: f15cbe176d457e37df646a4cbf1999aef75ce89e1522f3b02ce750d4d85257d0 -updated: 2016-12-05T15:47:18.099871848-08:00 +hash: 7a8e6af1aaecc54a4e19126a0c5b6d101c0b1e1385f79b5f73a48e5f8a5162e8 +updated: 2016-12-15T12:53:35.566424608-08:00 imports: - name: code.uber.internal/go-common.git version: 4b349844a2c93b3c125ebff6d62b12ab73621e9f subpackages: - x/backoff - x/config - - x/jaeger - x/kafka - x/log - x/metrics @@ -21,7 +20,7 @@ imports: - x/tchannel/metrics - x/time - name: github.com/apache/thrift - version: 84d6af4cf903571319e0ebddd7beb12bc93fb752 + version: 0d9b713b173f35ce02552b2f4372899440a99b25 subpackages: - lib/go/thrift - name: github.com/cactus/go-statsd-client @@ -30,6 +29,10 @@ imports: - statsd - name: github.com/codahale/hdrhistogram version: f8ad88b59a584afeee9d334eff879b104439117b +- name: github.com/davecgh/go-spew + version: 346938d642f2ec3594ed81d874461961cd0faa76 + subpackages: + - spew - name: github.com/facebookgo/clock version: 600d898af40aa09a7a93ecb9265d87b0504b6f03 - name: github.com/getsentry/raven-go @@ -56,8 +59,21 @@ imports: - log - name: github.com/pborman/uuid version: 3d4f2ba23642d3cfd06bd4b54cf03d99d95c0f1b +- name: github.com/pmezard/go-difflib + version: 792786c7400a136282c1664665ae0a8db921c6c2 + subpackages: + - difflib - name: github.com/Sirupsen/logrus version: 08a8a7c27e3d058a8989316a850daad1c10bf4ab +- name: github.com/stretchr/objx + version: 1a9d0bb9f541897e62256577b352fdbc1fb4fd94 +- name: github.com/stretchr/testify + version: 18a02ba4a312f95da08ff4cfc0055750ce50ae9e + subpackages: + - assert + - mock + - require + - suite - name: github.com/uber-common/bark version: d52ffa061726911f47fcd3d9e8b9b55f22794772 - name: github.com/uber-go/atomic @@ -91,7 +107,7 @@ imports: - trand - typed - name: golang.org/x/net - version: 4971afdc2f162e82d185353533d3cf16188a9f4e + version: cfae461cedfdcab6e261a26eb77db53695623303 subpackages: - context - name: golang.org/x/sys @@ -104,21 +120,4 @@ imports: version: 3e4f037f12a1221a0864cf0dd2e81c452ab22448 - name: gopkg.in/yaml.v2 version: a5b47d31c556af34a302ce5d659e6fea44d90de0 -testImports: -- name: github.com/davecgh/go-spew - version: 346938d642f2ec3594ed81d874461961cd0faa76 - subpackages: - - spew -- name: github.com/pmezard/go-difflib - version: 792786c7400a136282c1664665ae0a8db921c6c2 - subpackages: - - difflib -- name: github.com/stretchr/objx - version: 1a9d0bb9f541897e62256577b352fdbc1fb4fd94 -- name: github.com/stretchr/testify - version: 18a02ba4a312f95da08ff4cfc0055750ce50ae9e - subpackages: - - assert - - mock - - require - - suite +testImports: [] diff --git a/glide.yaml b/glide.yaml index 08ff19a50d1..49fc870cafd 100644 --- a/glide.yaml +++ b/glide.yaml @@ -1,12 +1,5 @@ package: code.uber.internal/devexp/minions import: -- package: code.uber.internal/go-common.git - version: master - subpackages: - - x/config - - x/log - - x/metrics - - x/tchannel - package: github.com/uber/tchannel-go subpackages: - thrift diff --git a/health/driver/serviceMocker.go b/health/driver/serviceMocker.go deleted file mode 100644 index af2e5370059..00000000000 --- a/health/driver/serviceMocker.go +++ /dev/null @@ -1,73 +0,0 @@ -package driver - -import ( - m "code.uber.internal/devexp/minions/.gen/go/shared" - "code.uber.internal/devexp/minions/workflow" - log "github.com/Sirupsen/logrus" - "github.com/uber-common/bark" - "github.com/uber/tchannel-go/thrift" -) - -type ( - // ServiceMockEngine implements TChanWorkflowService to talk to engine directly - ServiceMockEngine struct { - engine workflow.Engine - logger bark.Logger - } -) - -// NewServiceMockEngine creats an isntance of mocker service layer for the engine -func NewServiceMockEngine(engine workflow.Engine) *ServiceMockEngine { - mockEngine := &ServiceMockEngine{} - mockEngine.logger = bark.NewLoggerFromLogrus(log.New()) - mockEngine.engine = engine - return mockEngine -} - -// PollForActivityTask polls for activity task. -func (se *ServiceMockEngine) PollForActivityTask(ctx thrift.Context, pollRequest *m.PollForActivityTaskRequest) (*m.PollForActivityTaskResponse, error) { - return se.engine.PollForActivityTask(pollRequest) -} - -// PollForDecisionTask polls for decision task. -func (se *ServiceMockEngine) PollForDecisionTask(ctx thrift.Context, pollRequest *m.PollForDecisionTaskRequest) (*m.PollForDecisionTaskResponse, error) { - return se.engine.PollForDecisionTask(pollRequest) -} - -// RecordActivityTaskHeartbeat records activity task heart beat. -func (se *ServiceMockEngine) RecordActivityTaskHeartbeat(ctx thrift.Context, heartbeatRequest *m.RecordActivityTaskHeartbeatRequest) (*m.RecordActivityTaskHeartbeatResponse, error) { - // TODO: - return nil, nil -} - -// RespondActivityTaskCompleted responds to an activity completion. -func (se *ServiceMockEngine) RespondActivityTaskCompleted(ctx thrift.Context, completeRequest *m.RespondActivityTaskCompletedRequest) error { - return se.engine.RespondActivityTaskCompleted(completeRequest) -} - -// RespondActivityTaskFailed responds to an activity failure. -func (se *ServiceMockEngine) RespondActivityTaskFailed(ctx thrift.Context, failRequest *m.RespondActivityTaskFailedRequest) error { - return se.engine.RespondActivityTaskFailed(failRequest) -} - -// RespondDecisionTaskCompleted responds to an decision completion. -func (se *ServiceMockEngine) RespondDecisionTaskCompleted(ctx thrift.Context, completeRequest *m.RespondDecisionTaskCompletedRequest) error { - return se.engine.RespondDecisionTaskCompleted(completeRequest) -} - -// StartWorkflowExecution starts a workflow. -func (se *ServiceMockEngine) StartWorkflowExecution(ctx thrift.Context, startRequest *m.StartWorkflowExecutionRequest) (*m.StartWorkflowExecutionResponse, error) { - workflowExecution, err := se.engine.StartWorkflowExecution(startRequest) - return &m.StartWorkflowExecutionResponse{RunId: workflowExecution.RunId}, err -} - -// Start the workflow engine in a different go routine. -func (se *ServiceMockEngine) Start() { - go se.engine.Start() -} - -// GetWorkflowExecutionHistory retrieves the history for given workflow execution -func (se *ServiceMockEngine) GetWorkflowExecutionHistory(ctx thrift.Context, - request *m.GetWorkflowExecutionHistoryRequest) (*m.GetWorkflowExecutionHistoryResponse, error) { - return se.engine.GetWorkflowExecutionHistory(request) -} diff --git a/health/driver/stressWorkflow.go b/health/driver/stressWorkflow.go deleted file mode 100644 index 322c8440c38..00000000000 --- a/health/driver/stressWorkflow.go +++ /dev/null @@ -1,185 +0,0 @@ -package driver - -import ( - "encoding/json" - "fmt" - "sync" - "sync/atomic" - "time" - - "github.com/pborman/uuid" - - m "code.uber.internal/devexp/minions/.gen/go/minions" - s "code.uber.internal/devexp/minions/.gen/go/shared" - "code.uber.internal/devexp/minions/common" - "code.uber.internal/devexp/minions/test/flow" - "code.uber.internal/devexp/minions/test/workflow" - log "github.com/Sirupsen/logrus" -) - -type ( - // WorkflowParams inputs to workflow. - WorkflowParams struct { - ChainSequence int - ActivitySleepMin time.Duration - ActivitySleepMax time.Duration - } - - // Stress workflow decider - stressWorkflow struct { - } - - // Sleep activity. - stressSleepActivity struct { - } - - sleepActivityParams struct { - sleepMin time.Duration - sleepMax time.Duration - } - - workflowError struct { - reason string - details []byte - } -) - -func (we *workflowError) Reason() string { - return we.reason -} - -func (we *workflowError) Details() []byte { - return we.details -} - -func (we *workflowError) Error() string { - return we.reason -} - -func logrusSettings() { - formatter := &log.TextFormatter{} - formatter.FullTimestamp = true - log.SetFormatter(formatter) - log.SetLevel(log.DebugLevel) -} - -// Stress workflow decider -func (wf stressWorkflow) Execute(ctx workflow.Context, input []byte) (result []byte, err workflow.Error) { - workflowInput := &WorkflowParams{} - err1 := json.Unmarshal(input, workflowInput) - if err != nil { - return nil, &workflowError{reason: err1.Error(), details: nil} - } - - activityParams := &sleepActivityParams{sleepMin: workflowInput.ActivitySleepMin, sleepMax: workflowInput.ActivitySleepMax} - activityInput, err1 := json.Marshal(activityParams) - if err1 != nil { - return nil, &workflowError{reason: err1.Error(), details: []byte("Failed to serialize sleep activity input")} - } - - activityParameters := flow.ExecuteActivityParameters{ - TaskListName: "testTaskList", - ActivityType: s.ActivityType{Name: common.StringPtr("sleepActivity")}, - Input: activityInput, - } - - for i := 0; i < workflowInput.ChainSequence; i++ { - _, err2 := ctx.ExecuteActivity(activityParameters) - if err2 != nil { - return nil, err2 - } - } - return nil, nil -} - -func (sa stressSleepActivity) Execute(context flow.ActivityExecutionContext, input []byte) ([]byte, flow.Error) { - activityParams := &sleepActivityParams{} - err := json.Unmarshal(input, activityParams) - if err != nil { - // TODO: fix this error types. - return nil, &workflowError{reason: err.Error(), details: []byte("Failed to de-serialize sleep activity input")} - } - - // log.Infof("Activity parameters: %+v", activityParams) - // TODO: Input is getting nil input if it has beens stored properly - - // randomMultiple := rand.Intn(int((activityParams.sleepMax - activityParams.sleepMin) / time.Second)) - // sleepDuration := activityParams.sleepMin + time.Duration(randomMultiple)*time.Second - - // time.Sleep(time.Second) - return nil, nil -} - -// LaunchWorkflows starts workflows. -func LaunchWorkflows(countOfWorkflows int, goRoutineCount int, wp *WorkflowParams, - service m.TChanWorkflowService, reporter common.Reporter) error { - logrusSettings() - logger := log.WithFields(log.Fields{}) - - // Workflow execution parameters. - workflowExecutionParameters := flow.WorkerExecutionParameters{} - workflowExecutionParameters.TaskListName = "testTaskList" - workflowExecutionParameters.ConcurrentPollRoutineSize = 4 - - workflowFactory := func(wt s.WorkflowType) (flow.WorkflowDefinition, flow.Error) { - return workflow.NewWorkflowDefinition(stressWorkflow{}), nil - } - activityFactory := func(at s.ActivityType) (flow.ActivityImplementation, flow.Error) { - return &stressSleepActivity{}, nil - } - - // Launch worker. - workflowWorker := flow.NewWorkflowWorker(workflowExecutionParameters, workflowFactory, service, logger, reporter) - workflowWorker.Start() - - // Create activity execution parameters. - activityExecutionParameters := flow.WorkerExecutionParameters{} - activityExecutionParameters.TaskListName = "testTaskList" - activityExecutionParameters.ConcurrentPollRoutineSize = 10 - - // Register activity instances and launch the worker. - activityWorker := flow.NewActivityWorker(activityExecutionParameters, activityFactory, service, logger, reporter) - activityWorker.Start() - - // Start a workflow. - workflowInput, err := json.Marshal(wp) - if err != nil { - log.Error("Unable to marshal workflow input with") - return err - } - workflowOptions := flow.StartWorkflowOptions{ - WorkflowType: s.WorkflowType{Name: common.StringPtr("stressWorkfow")}, - TaskListName: "testTaskList", - WorkflowInput: workflowInput, - ExecutionStartToCloseTimeoutSeconds: 10, - DecisionTaskStartToCloseTimeoutSeconds: 10, - } - - var totalWorkflowCount int32 - var goWaitGroup sync.WaitGroup - - workflowCreator := func(routineId int, createCount int, options flow.StartWorkflowOptions) { - defer goWaitGroup.Done() - - for i := 0; i < createCount; i++ { - options.WorkflowID = fmt.Sprintf("%s-%d-%d", uuid.New(), routineId, i) - workflowClient := flow.NewWorkflowClient(options, service, reporter) - _, err := workflowClient.StartWorkflowExecution() - if err == nil { - atomic.AddInt32(&totalWorkflowCount, 1) - // log.Infof("Created Workflow - workflow Id: %s, run Id: %s \n", we.GetWorkflowId(), we.GetRunId()) - } else { - log.Error(err) - } - time.Sleep(100 * time.Millisecond) - } - } - - goWaitGroup.Add(goRoutineCount) - for i := 0; i < goRoutineCount; i++ { - go workflowCreator(i, countOfWorkflows/goRoutineCount, workflowOptions) - } - goWaitGroup.Wait() - log.Infof("Total Created Workflow Count: %d \n", totalWorkflowCount) - return nil -} diff --git a/health/stress/config.go b/health/stress/config.go deleted file mode 100644 index 4731c79c6a5..00000000000 --- a/health/stress/config.go +++ /dev/null @@ -1,29 +0,0 @@ -package stress - -import ( - "code.uber.internal/go-common.git/x/log" - "code.uber.internal/go-common.git/x/metrics" - "code.uber.internal/go-common.git/x/tchannel" -) - -// Configuration is the configuration used by cherami-stress -type Configuration struct { - TChannel xtchannel.Configuration - Logging log.Configuration - Metrics metrics.Configuration `yaml:"metrics"` - StressConfig WorkflowStressConfiguration `yaml:"stress"` -} - -// WorkflowStressConfiguration encompasses stress configuration -type WorkflowStressConfiguration struct { - WorkflowConfig WorkflowConfiguration `yaml:"workflow"` -} - -// WorkflowConfiguration is the configuration of number of workflows to launch. -type WorkflowConfiguration struct { - TotalLaunchCount int `yaml:"totalLaunchCount"` - RoutineCount int `yaml:"routineCount"` - ChainSequence int `yaml:"chainSequence"` - ActivitySleepMin int `yaml:"activitySleepMin"` - ActivitySleepMax int `yaml:"activitySleepMax"` -} diff --git a/health/stress/host.go b/health/stress/host.go deleted file mode 100644 index 7d7bbd47b19..00000000000 --- a/health/stress/host.go +++ /dev/null @@ -1,134 +0,0 @@ -package stress - -import ( - "sync" - "time" - - tchannel "github.com/uber/tchannel-go" - "github.com/uber/tchannel-go/thrift" - - "code.uber.internal/go-common.git/x/log" - - m "code.uber.internal/devexp/minions/.gen/go/minions" - "code.uber.internal/devexp/minions/common" - "code.uber.internal/devexp/minions/health/driver" - "code.uber.internal/devexp/minions/workflow" -) - -// Host is created for each host running the stress test -type Host struct { - hostName string - config Configuration - engine workflow.Engine - reporter common.Reporter - - instancesWG sync.WaitGroup - doneCh chan struct{} - runOnMinionsProduction bool -} - -var stressMetrics = map[common.MetricName]common.MetricType{ - common.WorkflowsStartTotalCounter: common.Counter, - common.WorkflowsCompletionTotalCounter: common.Counter, - common.ActivitiesTotalCounter: common.Counter, - common.DecisionsTotalCounter: common.Counter, - common.WorkflowEndToEndLatency: common.Timer, - common.ActivityEndToEndLatency: common.Timer, - common.DecisionsEndToEndLatency: common.Timer, -} - -// NewStressHost creates an instance of stress host -func NewStressHost(engine workflow.Engine, instanceName string, config Configuration, - reporter common.Reporter, runOnMinionsProduction bool) *Host { - h := &Host{ - engine: engine, - hostName: instanceName, - config: config, - reporter: reporter, - doneCh: make(chan struct{}), - runOnMinionsProduction: runOnMinionsProduction, - } - - h.reporter.InitMetrics(stressMetrics) - return h -} - -// GetThriftClient gets thrift client. -func (s *Host) GetThriftClient(tchan *tchannel.Channel) m.TChanWorkflowService { - tclient := thrift.NewClient(tchan, "uber-minions", nil) - return m.NewTChanWorkflowServiceClient(tclient) -} - -// Start is used the start the stress host -func (s *Host) Start() { - launchCh := make(chan struct{}) - - workflowConfig := s.config.StressConfig.WorkflowConfig - log.Infof("Launching stress workflow with configuration: %+v", workflowConfig) - - go func() { - - var service m.TChanWorkflowService - - if s.runOnMinionsProduction { - // TChannel to production. - tchan, err := s.config.TChannel.NewClient("stress-client", nil) - if err != nil { - log.Panicf("Failed to get a client for the uber-minions: %s\n", err.Error()) - } - service = s.GetThriftClient(tchan) - } else { - serviceMockEngine := driver.NewServiceMockEngine(s.engine) - serviceMockEngine.Start() - service = serviceMockEngine - } - - workflowPrams := &driver.WorkflowParams{ - ChainSequence: workflowConfig.ChainSequence, - ActivitySleepMin: time.Duration(workflowConfig.ActivitySleepMin) * time.Second, - ActivitySleepMax: time.Duration(workflowConfig.ActivitySleepMax) * time.Second} - - driver.LaunchWorkflows( - workflowConfig.TotalLaunchCount, - workflowConfig.RoutineCount, - workflowPrams, - service, - s.reporter) - - // close(launchCh) - }() - - if _, ok := s.reporter.(*common.SimpleReporter); ok { - go s.printMetric() - } - - <-launchCh - - close(s.doneCh) -} - -func (s *Host) printMetric() { - sr, ok := s.reporter.(*common.SimpleReporter) - if !ok { - log.Error("Invalid reporter passed to printMetric.") - return - } - - ticker := time.NewTicker(2 * time.Second) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - sr.PrintStressMetric() - sr.PrintFinalMetric() - if sr.IsProcessComplete() { - sr.PrintFinalMetric() - return - } - - case <-s.doneCh: - return - } - } -} diff --git a/persistence/persistenceTestBase.go b/persistence/persistenceTestBase.go index 3dc6d5786de..73a5402b816 100644 --- a/persistence/persistenceTestBase.go +++ b/persistence/persistenceTestBase.go @@ -7,7 +7,7 @@ import ( workflow "code.uber.internal/devexp/minions/.gen/go/shared" "code.uber.internal/devexp/minions/common" - "code.uber.internal/go-common.git/x/log" + log "github.com/Sirupsen/logrus" "github.com/gocql/gocql" )