Skip to content

Commit

Permalink
Add engine metrics
Browse files Browse the repository at this point in the history
Summary:
Merge branch 'master' of code.uber.internal:devexp/minions into stress

Add Start/Stop to engine interface.

Fix start/stop

Add timer and schema script

Cleanup of schema.

Update drop key space.

Reviewers: samar

Subscribers: jenkins

Differential Revision: https://code.uberinternal.com/D654615
  • Loading branch information
sivakku committed Dec 2, 2016
1 parent 017f955 commit 66b4cc6
Show file tree
Hide file tree
Showing 16 changed files with 335 additions and 71 deletions.
44 changes: 33 additions & 11 deletions cmd/stress/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package main

import (
"flag"
"math/rand"
"os"
"strings"
"time"

"github.com/uber/tchannel-go"

Expand Down Expand Up @@ -66,21 +68,41 @@ func main() {

m3ReporterClient := wmetrics.NewClient(reporter, wmetrics.Workflow)

executionPersistence, err2 := workflow.NewCassandraWorkflowExecutionPersistence(host, "workflow")
if err2 != nil {
panic(err2)
}
var engine workflow.Engine

executionPersistenceClient := workflow.NewWorkflowExecutionPersistenceClient(executionPersistence, m3ReporterClient)
if host == "127.0.0.1" {
testBase := workflow.TestBase{}
testBase.SetupWorkflowStoreWithOptions(workflow.TestBaseOptions{ClusterHost: host})
engine = workflow.NewWorkflowEngine(testBase.WorkflowMgr, testBase.TaskMgr, log.WithField("host", "workflow_host"))
} else {
executionPersistence, err2 := workflow.NewCassandraWorkflowExecutionPersistence(host, "workflow")
if err2 != nil {
panic(err2)
}

taskPersistence, err3 := workflow.NewCassandraTaskPersistence(host, "workflow")
if err3 != nil {
panic(err3)
}
executionPersistenceClient := workflow.NewWorkflowExecutionPersistenceClient(executionPersistence, m3ReporterClient)

taskPersistence, err3 := workflow.NewCassandraTaskPersistence(host, "workflow")
if err3 != nil {
panic(err3)
}

taskPersistenceClient := workflow.NewTaskPersistenceClient(taskPersistence, m3ReporterClient)
taskPersistenceClient := workflow.NewTaskPersistenceClient(taskPersistence, m3ReporterClient)

engine := workflow.NewWorkflowEngine(executionPersistenceClient, taskPersistenceClient, log.WithField("host", "workflow_host")).(*workflow.EngineImpl)
engine = workflow.NewEngineWithMetricsImpl(
workflow.NewWorkflowEngine(executionPersistenceClient, taskPersistenceClient, log.WithField("host", "workflow_host")),
m3ReporterClient)
}
h := s.NewStressHost(engine, instanceName, cfg, reporter)
h.Start()
}

func generateRandomKeyspace(n int) string {
rand.Seed(time.Now().UnixNano())
letterRunes := []rune("workflow")
b := make([]rune, n)
for i := range b {
b[i] = letterRunes[rand.Intn(len(letterRunes))]
}
return string(b)
}
94 changes: 73 additions & 21 deletions common/simplereporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,23 @@ type (
scope metrics.Scope
tags map[string]string

startTime time.Time
workflowsStartCount int64
activitiesTotalCount int64
decisionsTotalCount int64
workflowsCompletionCount int64
workflowsEndToEndLatency int64

previousReportTime time.Time
previousWorkflowsStartCount int64
previousActivitiesTotalCount int64
previousDecisionsTotalCount int64
previousWorkflowsCompletionCount int64
previousWorkflowsEndToEndLatency int64
startTime time.Time
workflowsStartCount int64
activitiesTotalCount int64
decisionsTotalCount int64
workflowsCompletionCount int64
workflowsEndToEndLatency int64
activitiesEndToEndLatency int64
decisionsEndToEndLatency int64

previousReportTime time.Time
previousWorkflowsStartCount int64
previousActivitiesTotalCount int64
previousDecisionsTotalCount int64
previousWorkflowsCompletionCount int64
previousWorkflowsEndToEndLatency int64
previousActivitiesEndToEndLatency int64
previousDecisionsEndToEndLatency int64
}

simpleStopWatch struct {
Expand All @@ -42,8 +46,10 @@ const (
WorkflowsStartTotalCounter = "workflows-start-total"
ActivitiesTotalCounter = "activities-total"
DecisionsTotalCounter = "decisions-total"
WorkflowEndToEndLatency = "workflows-endtoend-latency"
WorkflowsCompletionTotalCounter = "workflows-completion-total"
WorkflowEndToEndLatency = "workflows-endtoend-latency"
ActivityEndToEndLatency = "activities-endtoend-latency"
DecisionsEndToEndLatency = "decisions-endtoend-latency"
)

// NewSimpleReporter create an instance of Reporter which can be used for driver to emit metric to console
Expand All @@ -63,6 +69,8 @@ func NewSimpleReporter(scope metrics.Scope, tags map[string]string) Reporter {
reporter.decisionsTotalCount = 0
reporter.workflowsCompletionCount = 0
reporter.workflowsEndToEndLatency = 0
reporter.activitiesEndToEndLatency = 0
reporter.decisionsEndToEndLatency = 0
reporter.startTime = time.Now()

return reporter
Expand Down Expand Up @@ -106,6 +114,10 @@ func (r *SimpleReporter) IncCounter(name string, tags map[string]string, delta i
atomic.AddInt64(&r.workflowsCompletionCount, delta)
case WorkflowEndToEndLatency:
atomic.AddInt64(&r.workflowsEndToEndLatency, delta)
case ActivityEndToEndLatency:
atomic.AddInt64(&r.activitiesEndToEndLatency, delta)
case DecisionsEndToEndLatency:
atomic.AddInt64(&r.decisionsEndToEndLatency, delta)
default:
log.WithField(`name`, name).Error(`Unknown metric`)
}
Expand Down Expand Up @@ -152,12 +164,26 @@ func (r *SimpleReporter) PrintStressMetric() {
activitiesThroughput = (totalActivitiesCount - r.previousActivitiesTotalCount) / int64(elapsed)
}

var activityLatency int64
activitiesEndToEndLatency := atomic.LoadInt64(&r.activitiesEndToEndLatency)
if totalActivitiesCount > 0 && activitiesEndToEndLatency > r.previousActivitiesEndToEndLatency {
currentLatency := activitiesEndToEndLatency - r.previousActivitiesEndToEndLatency
activityLatency = currentLatency / totalActivitiesCount
}

totalDecisionsCount := atomic.LoadInt64(&r.decisionsTotalCount)
decisionsThroughput := int64(0)
if elapsed > 0 && totalDecisionsCount > r.previousDecisionsTotalCount {
decisionsThroughput = (totalDecisionsCount - r.previousDecisionsTotalCount) / int64(elapsed)
}

var decisionsLatency int64
decisionsEndToEndLatency := atomic.LoadInt64(&r.decisionsEndToEndLatency)
if totalDecisionsCount > 0 && decisionsEndToEndLatency > r.previousDecisionsEndToEndLatency {
currentLatency := decisionsEndToEndLatency - r.previousDecisionsEndToEndLatency
activityLatency = currentLatency / totalDecisionsCount
}

totalWorkflowsCompleted := atomic.LoadInt64(&r.workflowsCompletionCount)
completionThroughput := int64(0)
if elapsed > 0 && totalWorkflowsCompleted > r.previousWorkflowsCompletionCount {
Expand All @@ -171,16 +197,22 @@ func (r *SimpleReporter) PrintStressMetric() {
latency = currentLatency / totalWorkflowsCompleted
}

log.Infof("Workflows Started(Count=%v, Throughput=%v)", totalWorkflowStarted, creationThroughput)
log.Infof("Workflows Completed(Count=%v, Throughput=%v, Average Latency: %v)", totalWorkflowsCompleted, completionThroughput, latency)
log.Infof("Activites(Count=%v, Throughput=%v)", totalActivitiesCount, activitiesThroughput)
log.Infof("Decisions(Count=%v, Throughput=%v)", totalDecisionsCount, decisionsThroughput)
log.Infof("Workflows Started(Count=%v, Throughput=%v)",
totalWorkflowStarted, creationThroughput)
log.Infof("Workflows Completed(Count=%v, Throughput=%v, Average Latency: %v)",
totalWorkflowsCompleted, completionThroughput, latency)
log.Infof("Activites(Count=%v, Throughput=%v, Average Latency: %v)",
totalActivitiesCount, activitiesThroughput, activityLatency)
log.Infof("Decisions(Count=%v, Throughput=%v, Average Latency: %v)",
totalDecisionsCount, decisionsThroughput, decisionsLatency)

r.previousWorkflowsStartCount = totalWorkflowStarted
r.previousActivitiesTotalCount = totalActivitiesCount
r.previousDecisionsTotalCount = totalDecisionsCount
r.previousWorkflowsCompletionCount = totalWorkflowsCompleted
r.previousWorkflowsEndToEndLatency = workflowsLatency
r.previousActivitiesEndToEndLatency = activitiesEndToEndLatency
r.previousDecisionsEndToEndLatency = decisionsEndToEndLatency
r.previousReportTime = currentTime
}

Expand All @@ -190,19 +222,39 @@ func (r *SimpleReporter) PrintFinalMetric() {
workflowsCompletedCount := atomic.LoadInt64(&r.workflowsCompletionCount)
totalLatency := atomic.LoadInt64(&r.workflowsEndToEndLatency)
activitiesCount := atomic.LoadInt64(&r.activitiesTotalCount)
totalActivitiesLatency := atomic.LoadInt64(&r.activitiesEndToEndLatency)
decisionsCount := atomic.LoadInt64(&r.decisionsTotalCount)
totalDecisionsLatency := atomic.LoadInt64(&r.decisionsEndToEndLatency)

elapsed := time.Since(r.startTime) / time.Second

var throughput int64
var latency int64
if workflowsCompletedCount > 0 {
elapsed := time.Since(r.startTime) / time.Second
throughput = workflowsCompletedCount / int64(elapsed)
latency = totalLatency / workflowsCompletedCount
}

log.Infof("Total workflows processed:(Started=%v, Completed=%v), Throughput: %v, Average Latency: %v",
var activityThroughput int64
var activityLatency int64
if activitiesCount > 0 {
activityThroughput = activitiesCount / int64(elapsed)
activityLatency = totalActivitiesLatency / activitiesCount
}

var decisionsThroughput int64
var decisionLatency int64
if decisionsCount > 0 {
decisionsThroughput = decisionsCount / int64(elapsed)
decisionLatency = totalDecisionsLatency / decisionsCount
}

log.Infof("Total Workflows processed:(Started=%v, Completed=%v), Throughput: %v, Average Latency: %v",
workflowsCount, workflowsCompletedCount, throughput, time.Duration(latency))
log.Infof("Total activites processed: %v, decisions processed: %v", activitiesCount, decisionsCount)
log.Infof("Total Activites processed:(Count=%v, Throughput=%v, Average Latency=%v)",
activitiesCount, activityThroughput, activityLatency)
log.Infof("Total Decisions processed:(Count=%v, Throughput=%v, Average Latency=%v)",
decisionsCount, decisionsThroughput, decisionLatency)
}

// IsProcessComplete indicates if we have completed processing.
Expand Down
4 changes: 2 additions & 2 deletions config/base.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ jaeger:

stress:
workflow:
totalLaunchCount: 10000
routineCount: 1
totalLaunchCount: 100000
routineCount: 100
chainSequence: 4
activitySleepMin: 1
activitySleepMax: 60
Expand Down
4 changes: 2 additions & 2 deletions health/driver/serviceMocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ import (
type (
// ServiceMockEngine implements TChanWorkflowService to talk to engine directly
ServiceMockEngine struct {
engine *workflow.EngineImpl
engine workflow.Engine
logger bark.Logger
}
)

// NewServiceMockEngine creats an isntance of mocker service layer for the engine
func NewServiceMockEngine(engine *workflow.EngineImpl) *ServiceMockEngine {
func NewServiceMockEngine(engine workflow.Engine) *ServiceMockEngine {
mockEngine := &ServiceMockEngine{}
mockEngine.logger = bark.NewLoggerFromLogrus(log.New())
mockEngine.engine = engine
Expand Down
6 changes: 4 additions & 2 deletions health/driver/stressWorkflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"sync/atomic"
"time"

"github.com/pborman/uuid"

m "code.uber.internal/devexp/minions/.gen/go/minions"
"code.uber.internal/devexp/minions/common"
"code.uber.internal/devexp/minions/test/flow"
Expand Down Expand Up @@ -103,7 +105,7 @@ func (sa stressSleepActivity) Execute(context flow.ActivityExecutionContext, inp
// LaunchWorkflows starts workflows.
func LaunchWorkflows(countOfWorkflows int, goRoutineCount int, wp *WorkflowParams,
service *ServiceMockEngine, reporter common.Reporter) error {
// logrusSettings()
logrusSettings()

workerOverrides := &flow.WorkerOverrides{Reporter: reporter}
// Workflow execution parameters.
Expand Down Expand Up @@ -152,7 +154,7 @@ func LaunchWorkflows(countOfWorkflows int, goRoutineCount int, wp *WorkflowParam
defer goWaitGroup.Done()

for i := 0; i < createCount; i++ {
options.WorkflowID = fmt.Sprintf("stressWorkflowId-%d-%d", routineId, i)
options.WorkflowID = fmt.Sprintf("%s-%d-%d", uuid.New(), routineId, i)
workflowClient := flow.NewWorkflowClient(options, service, reporter)
_, err := workflowClient.StartWorkflowExecution()
if err == nil {
Expand Down
7 changes: 5 additions & 2 deletions health/stress/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
type Host struct {
hostName string
config Configuration
engine *workflow.EngineImpl
engine workflow.Engine
reporter common.Reporter

instancesWG sync.WaitGroup
Expand All @@ -28,10 +28,12 @@ var stressMetrics = map[common.MetricName]common.MetricType{
common.ActivitiesTotalCounter: common.Counter,
common.DecisionsTotalCounter: common.Counter,
common.WorkflowEndToEndLatency: common.Timer,
common.ActivityEndToEndLatency: common.Timer,
common.DecisionsEndToEndLatency: common.Timer,
}

// NewStressHost creates an instance of stress host
func NewStressHost(engine *workflow.EngineImpl, instanceName string, config Configuration, reporter common.Reporter) *Host {
func NewStressHost(engine workflow.Engine, instanceName string, config Configuration, reporter common.Reporter) *Host {
h := &Host{
engine: engine,
hostName: instanceName,
Expand Down Expand Up @@ -93,6 +95,7 @@ func (s *Host) printMetric() {
select {
case <-ticker.C:
sr.PrintStressMetric()
sr.PrintFinalMetric()
if sr.IsProcessComplete() {
sr.PrintFinalMetric()
return
Expand Down
1 change: 1 addition & 0 deletions schema/drop_keyspace_stress_test.cql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP KEYSPACE workflow;
25 changes: 25 additions & 0 deletions scripts/install_stress_schema.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/bin/bash

# this script is used to setup the "cherami" keyspace
# and load all the tables in the .cql file within this keyspace,
# if cassandra is running
#
# this script is only intended to be used in test environments
# for updating prod schema, please refer to odp/cherami-tools and
# https://code.uberinternal.com/w/cherami/runbooks/

# the default cqlsh listen port is 9042
port=9042

# the default keyspace is workflow
# TODO: probably allow getting this from command line
workflow_keyspace="workflow"

# cmd/stress/stress -emitMetric=m3 -host="10.185.19.27,10.184.45.6,10.185.27.8,10.185.17.12,10.185.15.30"
for host in 10.185.19.27
do
echo Installing schema on cassandra cluster via $host
./cassandra/bin/cqlsh -f ./schema/drop_keyspace_stress_test.cql $host $port
./cassandra/bin/cqlsh -f ./schema/keyspace_prod.cql $host $port
./cassandra/bin/cqlsh -k $workflow_keyspace -f ./schema/workflow_test.cql $host $port
done
2 changes: 1 addition & 1 deletion test/flow/event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (wc *workflowContext) ScheduleActivityTask(parameters ExecuteActivityParame

wc.executeDecisions = append(wc.executeDecisions, decision)
wc.scheduledActivites[scheduleTaskAttr.GetActivityId()] = callback
wc.contextLogger.Debugf("Schedule ActivityTask: %s: %+v", scheduleTaskAttr.GetActivityId(), scheduleTaskAttr)
// wc.contextLogger.Debugf("Schedule ActivityTask: %s: %+v", scheduleTaskAttr.GetActivityId(), scheduleTaskAttr)
}

func (weh *workflowExecutionEventHandler) ProcessEvent(event *m.HistoryEvent) ([]*m.Decision, error) {
Expand Down
12 changes: 6 additions & 6 deletions test/flow/task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ func (wth *workflowTaskHandler) ProcessWorkflowTask(workflowTask *WorkflowTask)
return nil, fmt.Errorf("Nil workflowtask provided.")
}

wth.reporter.IncCounter(common.DecisionsTotalCounter, nil, 1)
wth.contextLogger.Debugf("Processing New Workflow Task: Type=%s, PreviousStartedEventId=%d",
workflowTask.task.GetWorkflowType().GetName(), workflowTask.task.GetPreviousStartedEventId())
// wth.reporter.IncCounter(common.DecisionsTotalCounter, nil, 1)
// wth.contextLogger.Debugf("Processing New Workflow Task: Type=%s, PreviousStartedEventId=%d",
// workflowTask.task.GetWorkflowType().GetName(), workflowTask.task.GetPreviousStartedEventId())

// Setup workflow Info
workflowInfo := &WorkflowInfo{
Expand Down Expand Up @@ -126,7 +126,7 @@ func (wth *workflowTaskHandler) ProcessWorkflowTask(workflowTask *WorkflowTask)

// Process events
for _, event := range history.Events {
wth.contextLogger.Debugf("ProcessWorkflowTask: Id=%d, Event=%+v", event.GetEventId(), event)
// wth.contextLogger.Debugf("ProcessWorkflowTask: Id=%d, Event=%+v", event.GetEventId(), event)
if event.GetEventType() == m.EventType_WorkflowExecutionStarted {
startTime = time.Unix(0, event.GetTimestamp())
}
Expand Down Expand Up @@ -195,8 +195,8 @@ func newActivityTaskHandler(taskListName string, identity string, factory Activi

// Execute executes an implementation of the activity.
func (ath *activityTaskHandler) Execute(context context.Context, activityTask *ActivityTask) (interface{}, error) {
ath.contextLogger.Debugf("activityTaskHandler::Execute: %+v", activityTask.task)
ath.reporter.IncCounter(common.ActivitiesTotalCounter, nil, 1)
//ath.contextLogger.Debugf("activityTaskHandler::Execute: %+v", activityTask.task)
//ath.reporter.IncCounter(common.ActivitiesTotalCounter, nil, 1)

activityExecutionContext := &activityExecutionContext{
taskToken: activityTask.task.TaskToken,
Expand Down
Loading

0 comments on commit 66b4cc6

Please sign in to comment.