Skip to content

Commit

Permalink
Emit metric for various mutable state stats (cadence-workflow#1136)
Browse files Browse the repository at this point in the history
MutableState support for computing following metric:
1) Total size of mutableState
2) Execution info size within mutable state
3) Activity info size within mutable state
4) Timer info size within mutable state
5) Child info size within mutable state
6) Signal info size within mutable state
7) Buffered events size within mutable state
8) Buffered replication tasks size within mutable state
9) Number of activity infos within mutable state
10) Number of timers within mutable state
11) Number of child executions within mutable state
12) Number of signals within mutable state
13) Number of requestCancels within mutable state
14) Number of buffered events within mutable state
15) Number of buffered replication tasks within mutable state

Session update support for following metric:
1) TotalSize of mutable state
2) Execution info size for session update
3) Activity info size for session update
4) Timer info size for session update
5) Child info size for session update
6) Signal info size for session update
7) Buffered events size for session update
8) Buffered replication tasks size for session update
9) Number of updated activity infos for session update
10) Number of updated timer infos for session update
11) Number of updated child infos for session update
12) Number of updated signal infos for session update
13) Number of updated requestCancel infos for session update
14) Number of deleted activity infos for session update
15) Number of deleted timer infos for session update
16) Number of deleted child infos for session update
17) Number of deleted signal infos for session update
18) Number of deleted requestCancel infos for session update

Support to emit metric for all mutableState and sessionUpdate stats on each update through workflowContext.

Emit metric on each AppendHistory call for size of update.

Support for keeping track of total history size for workflow execution as part of mutable state.
  • Loading branch information
samarabbas authored Sep 25, 2018
1 parent 593ac83 commit 5c93511
Show file tree
Hide file tree
Showing 21 changed files with 1,068 additions and 223 deletions.
58 changes: 58 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ const (
// ShardTagName is temporary until we can get all metric data removed for the service
ShardTagName = "shard"
CadenceRoleTagName = "cadence-role"
StatsTypeTagName = "stats-type"
)

// This package should hold all the metrics and tags for cadence
Expand All @@ -78,6 +79,9 @@ const (

HistoryRoleTagValue = "history"
MatchingRoleTagValue = "matching"

SizeStatsTypeTagValue = "size"
CountStatsTypeTagValue = "count"
)

// Common service base metrics
Expand Down Expand Up @@ -458,6 +462,14 @@ const (
HistoryCacheGetOrCreateScope
// HistoryCacheGetCurrentExecutionScope is the scope used by history cache for getting current execution
HistoryCacheGetCurrentExecutionScope
// ExecutionSizeStatsScope is the scope used for emiting workflow execution size related stats
ExecutionSizeStatsScope
// ExecutionCountStatsScope is the scope used for emiting workflow execution count related stats
ExecutionCountStatsScope
// SessionSizeStatsScope is the scope used for emiting session update size related stats
SessionSizeStatsScope
// SessionCountStatsScope is the scope used for emiting session update count related stats
SessionCountStatsScope

NumHistoryScopes
)
Expand Down Expand Up @@ -676,6 +688,10 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
HistoryCacheGetAndCreateScope: {operation: "HistoryCacheGetAndCreate"},
HistoryCacheGetOrCreateScope: {operation: "HistoryCacheGetOrCreate"},
HistoryCacheGetCurrentExecutionScope: {operation: "HistoryCacheGetCurrentExecution"},
ExecutionSizeStatsScope: {operation: "ExecutionStats", tags: map[string]string{StatsTypeTagName: SizeStatsTypeTagValue}},
ExecutionCountStatsScope: {operation: "ExecutionStats", tags: map[string]string{StatsTypeTagName: CountStatsTypeTagValue}},
SessionSizeStatsScope: {operation: "SessionStats", tags: map[string]string{StatsTypeTagName: SizeStatsTypeTagValue}},
SessionCountStatsScope: {operation: "SessionStats", tags: map[string]string{StatsTypeTagName: CountStatsTypeTagValue}},
},
// Matching Scope Names
Matching: {
Expand Down Expand Up @@ -821,6 +837,27 @@ const (
CacheMissCounter
AcquireLockFailedCounter
WorkflowContextCleared
HistorySize
MutableStateSize
ExecutionInfoSize
ActivityInfoSize
TimerInfoSize
ChildInfoSize
SignalInfoSize
BufferedEventsSize
BufferedReplicationTasksSize
ActivityInfoCount
TimerInfoCount
ChildInfoCount
SignalInfoCount
RequestCancelInfoCount
BufferedEventsCount
BufferedReplicationTasksCount
DeleteActivityInfoCount
DeleteTimerInfoCount
DeleteChildInfoCount
DeleteSignalInfoCount
DeleteRequestCancelInfoCount

NumHistoryMetrics
)
Expand Down Expand Up @@ -969,6 +1006,27 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
CacheMissCounter: {metricName: "cache-miss", metricType: Counter},
AcquireLockFailedCounter: {metricName: "acquire-lock-failed", metricType: Counter},
WorkflowContextCleared: {metricName: "workflow-context-cleared", metricType: Counter},
HistorySize: {metricName: "history-size", metricType: Timer},
MutableStateSize: {metricName: "mutable-state-size", metricType: Timer},
ExecutionInfoSize: {metricName: "execution-info-size", metricType: Timer},
ActivityInfoSize: {metricName: "activity-info-size", metricType: Timer},
TimerInfoSize: {metricName: "timer-info-size", metricType: Timer},
ChildInfoSize: {metricName: "child-info-size", metricType: Timer},
SignalInfoSize: {metricName: "signal-info", metricType: Timer},
BufferedEventsSize: {metricName: "buffered-events-size", metricType: Timer},
BufferedReplicationTasksSize: {metricName: "buffered-replication-tasks-size", metricType: Timer},
ActivityInfoCount: {metricName: "activity-info-count", metricType: Timer},
TimerInfoCount: {metricName: "timer-info-count", metricType: Timer},
ChildInfoCount: {metricName: "child-info-count", metricType: Timer},
SignalInfoCount: {metricName: "signal-info-count", metricType: Timer},
RequestCancelInfoCount: {metricName: "request-cancel-info-count", metricType: Timer},
BufferedEventsCount: {metricName: "buffered-events-count", metricType: Timer},
BufferedReplicationTasksCount: {metricName: "buffered-replication-tasks-count", metricType: Timer},
DeleteActivityInfoCount: {metricName: "delete-activity-info", metricType: Timer},
DeleteTimerInfoCount: {metricName: "delete-timer-info", metricType: Timer},
DeleteChildInfoCount: {metricName: "delete-child-info", metricType: Timer},
DeleteSignalInfoCount: {metricName: "delete-signal-info", metricType: Timer},
DeleteRequestCancelInfoCount: {metricName: "delete-request-cancel-info", metricType: Timer},
},
Matching: {
PollSuccessCounter: {metricName: "poll.success"},
Expand Down
3 changes: 3 additions & 0 deletions common/persistence/cassandra/cassandraHistoryPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func (h *cassandraHistoryPersistence) GetWorkflowExecutionHistory(request *p.Get
lastFirstEventID := common.EmptyEventID // first_event_id of last batch
eventBatch := p.SerializedHistoryEventBatch{}
history := &workflow.History{}
size := 0
for iter.Scan(nil, &eventBatchVersionPointer, &eventBatch.Data, &eventBatch.EncodingType, &eventBatch.Version) {
found = true

Expand Down Expand Up @@ -212,6 +213,7 @@ func (h *cassandraHistoryPersistence) GetWorkflowExecutionHistory(request *p.Get
history.Events = append(history.Events, historyBatch.Events...)
token.LastEventID = historyBatch.Events[len(historyBatch.Events)-1].GetEventId()
token.LastEventBatchVersion = eventBatchVersion
size += len(eventBatch.Data)
}

eventBatchVersionPointer = new(int64)
Expand Down Expand Up @@ -242,6 +244,7 @@ func (h *cassandraHistoryPersistence) GetWorkflowExecutionHistory(request *p.Get
NextPageToken: data,
History: history,
LastFirstEventID: lastFirstEventID,
Size: size,
}

return response, nil
Expand Down
8 changes: 8 additions & 0 deletions common/persistence/cassandra/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ const (
`start_time: ?, ` +
`last_updated_time: ?, ` +
`create_request_id: ?, ` +
`history_size: ?, ` +
`decision_version: ?, ` +
`decision_schedule_id: ?, ` +
`decision_started_id: ?, ` +
Expand Down Expand Up @@ -1241,6 +1242,7 @@ func (d *cassandraPersistence) CreateWorkflowExecutionWithinBatch(request *p.Cre
cqlNowTimestamp,
cqlNowTimestamp,
request.RequestID,
request.HistorySize,
request.DecisionVersion,
request.DecisionScheduleID,
request.DecisionStartedID,
Expand Down Expand Up @@ -1299,6 +1301,7 @@ func (d *cassandraPersistence) CreateWorkflowExecutionWithinBatch(request *p.Cre
cqlNowTimestamp,
cqlNowTimestamp,
request.RequestID,
request.HistorySize,
request.DecisionVersion,
request.DecisionScheduleID,
request.DecisionStartedID,
Expand Down Expand Up @@ -1465,6 +1468,7 @@ func (d *cassandraPersistence) UpdateWorkflowExecution(request *p.UpdateWorkflow
executionInfo.StartTimestamp,
cqlNowTimestamp,
executionInfo.CreateRequestID,
executionInfo.HistorySize,
executionInfo.DecisionVersion,
executionInfo.DecisionScheduleID,
executionInfo.DecisionStartedID,
Expand Down Expand Up @@ -1524,6 +1528,7 @@ func (d *cassandraPersistence) UpdateWorkflowExecution(request *p.UpdateWorkflow
executionInfo.StartTimestamp,
cqlNowTimestamp,
executionInfo.CreateRequestID,
executionInfo.HistorySize,
executionInfo.DecisionVersion,
executionInfo.DecisionScheduleID,
executionInfo.DecisionStartedID,
Expand Down Expand Up @@ -1756,6 +1761,7 @@ func (d *cassandraPersistence) ResetMutableState(request *p.ResetMutableStateReq
executionInfo.StartTimestamp,
cqlNowTimestamp,
executionInfo.CreateRequestID,
executionInfo.HistorySize,
executionInfo.DecisionVersion,
executionInfo.DecisionScheduleID,
executionInfo.DecisionStartedID,
Expand Down Expand Up @@ -3288,6 +3294,8 @@ func createWorkflowExecutionInfo(result map[string]interface{}) *p.WorkflowExecu
info.LastUpdatedTimestamp = v.(time.Time)
case "create_request_id":
info.CreateRequestID = v.(gocql.UUID).String()
case "history_size":
info.HistorySize = v.(int64)
case "decision_version":
info.DecisionVersion = v.(int64)
case "decision_schedule_id":
Expand Down
4 changes: 4 additions & 0 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ type (
StartTimestamp time.Time
LastUpdatedTimestamp time.Time
CreateRequestID string
HistorySize int64
DecisionVersion int64
DecisionScheduleID int64
DecisionStartedID int64
Expand Down Expand Up @@ -582,6 +583,7 @@ type (
ExecutionContext []byte
NextEventID int64
LastProcessedEvent int64
HistorySize int64
TransferTasks []Task
ReplicationTasks []Task
TimerTasks []Task
Expand Down Expand Up @@ -868,6 +870,8 @@ type (
NextPageToken []byte
// the first_event_id of last loaded batch
LastFirstEventID int64
// Size of history read from store
Size int
}

// DeleteWorkflowExecutionHistoryRequest is used to delete workflow execution history
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ func (s *MatchingPersistenceSuite) TestUpdateWorkflow() {
s.Empty(info0.ClientLibraryVersion)
s.Empty(info0.ClientFeatureVersion)
s.Empty(info0.ClientImpl)
s.Equal(int64(0), info0.HistorySize)

log.Infof("Workflow execution last updated: %v", info0.LastUpdatedTimestamp)

Expand All @@ -274,6 +275,7 @@ func (s *MatchingPersistenceSuite) TestUpdateWorkflow() {
updatedInfo.ClientLibraryVersion = "random client library version"
updatedInfo.ClientFeatureVersion = "random client feature version"
updatedInfo.ClientImpl = "random client impl"
updatedInfo.HistorySize = int64(109)
err2 := s.UpdateWorkflowExecution(updatedInfo, []int64{int64(4)}, nil, int64(3), nil, nil, nil, nil, nil, nil)
s.NoError(err2)

Expand Down Expand Up @@ -305,6 +307,7 @@ func (s *MatchingPersistenceSuite) TestUpdateWorkflow() {
s.Equal(updatedInfo.ClientLibraryVersion, info1.ClientLibraryVersion)
s.Equal(updatedInfo.ClientFeatureVersion, info1.ClientFeatureVersion)
s.Equal(updatedInfo.ClientImpl, info1.ClientImpl)
s.Equal(int64(109), info1.HistorySize)

log.Infof("Workflow execution last updated: %v", info1.LastUpdatedTimestamp)

Expand Down Expand Up @@ -335,6 +338,7 @@ func (s *MatchingPersistenceSuite) TestUpdateWorkflow() {
s.Equal(int32(1), info2.DecisionTimeout)
s.Equal(int64(123), info2.DecisionAttempt)
s.Equal(int64(321), info2.DecisionTimestamp)
s.Equal(int64(109), info2.HistorySize)

log.Infof("Workflow execution last updated: %v", info2.LastUpdatedTimestamp)

Expand Down Expand Up @@ -2718,6 +2722,7 @@ func (s *MatchingPersistenceSuite) TestResetMutableStateCurrentIsNotSelf() {
DecisionTimeoutValue: 14,
State: p.WorkflowStateRunning,
NextEventID: 123,
HistorySize: int64(1024),
CreateRequestID: uuid.New(),
DecisionVersion: common.EmptyVersion,
DecisionScheduleID: 111,
Expand Down Expand Up @@ -2750,6 +2755,7 @@ func (s *MatchingPersistenceSuite) TestResetMutableStateCurrentIsNotSelf() {
continueAsNewInfo.State = p.WorkflowStateCompleted
continueAsNewInfo.NextEventID += 3
continueAsNewInfo.LastProcessedEvent += 2
s.Equal(int64(1024), info.HistorySize)

workflowExecutionCurrent2 := gen.WorkflowExecution{
WorkflowId: common.StringPtr(workflowID),
Expand Down
1 change: 1 addition & 0 deletions schema/cassandra/cadence/schema.cql
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ CREATE TYPE workflow_execution (
expiration_time timestamp, -- retry expiration time
max_attempts int, -- max number of attempts including initial non-retry attempt
non_retriable_errors list<text>,
history_size bigint,
);

-- Replication information for each cluster
Expand Down
1 change: 1 addition & 0 deletions schema/cassandra/cadence/versioned/v0.11/history_size.cql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TYPE workflow_execution ADD history_size bigint;
5 changes: 3 additions & 2 deletions schema/cassandra/cadence/versioned/v0.11/manifest.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
{
"CurrVersion": "0.11",
"MinCompatibleVersion": "0.11",
"Description": "Adding server side workflow retry",
"Description": "Mutable state support for server-side retries and history size",
"SchemaUpdateCqlFiles": [
"workflow_retry.cql"
"workflow_retry.cql",
"history_size.cql"
]
}
24 changes: 24 additions & 0 deletions service/history/MockMutableState.go
Original file line number Diff line number Diff line change
Expand Up @@ -1597,6 +1597,20 @@ func (_m *mockMutableState) GetSignalInfo(_a0 int64) (*persistence.SignalInfo, b
return r0, r1
}

// GetStats provides a mock function with given fields:
func (_m *mockMutableState) GetStats() *mutableStateStats {
ret := _m.Called()

var r0 *mutableStateStats
if rf, ok := ret.Get(0).(func() *mutableStateStats); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(*mutableStateStats)
}

return r0
}

// GetStartVersion provides a mock function with given fields:
func (_m *mockMutableState) GetStartVersion() int64 {
ret := _m.Called()
Expand Down Expand Up @@ -1720,6 +1734,11 @@ func (_m *mockMutableState) HasPendingDecisionTask() bool {
return r0
}

// IncrementHistorySize provides a mock function with given fields: appendSize
func (_m *mockMutableState) IncrementHistorySize(appendSize int) {
_m.Called(appendSize)
}

// IsCancelRequested provides a mock function with given fields:
func (_m *mockMutableState) IsCancelRequested() (bool, string) {
ret := _m.Called()
Expand Down Expand Up @@ -2116,6 +2135,11 @@ func (_m *mockMutableState) SetHistoryBuilder(hBuilder *historyBuilder) {
_m.Called(hBuilder)
}

// SetNewRunSize provides a mock function with given fields: size
func (_m *mockMutableState) SetNewRunSize(size int) {
_m.Called(size)
}

// UpdateActivity provides a mock function with given fields: _a0
func (_m *mockMutableState) UpdateActivity(_a0 *persistence.ActivityInfo) error {
ret := _m.Called(_a0)
Expand Down
17 changes: 9 additions & 8 deletions service/history/conflictResolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,21 @@ func (r *conflictResolverImpl) reset(prevRunID string, requestID string, replayE
execution := r.context.workflowExecution
replayNextEventID := replayEventID + 1
var nextPageToken []byte
var history *shared.History
var err error
var resetMutableStateBuilder *mutableStateBuilder
var sBuilder stateBuilder
var lastFirstEventID int64
var lastEvent *shared.HistoryEvent
eventsToApply := replayNextEventID - common.FirstEventID
for hasMore := true; hasMore; hasMore = len(nextPageToken) > 0 {
history, nextPageToken, lastFirstEventID, err = r.getHistory(domainID, execution, common.FirstEventID,
replayNextEventID, nextPageToken)
response, err := r.getHistory(domainID, execution, common.FirstEventID, replayNextEventID, nextPageToken)
if err != nil {
r.logError("Conflict resolution err getting history.", err)
return nil, err
}

history := response.History
lastFirstEventID := response.LastFirstEventID
nextPageToken = response.NextPageToken

batchSize := int64(len(history.Events))
// NextEventID could be in the middle of the batch. Trim the history events to not have more events then what
// need to be applied
Expand Down Expand Up @@ -111,6 +111,7 @@ func (r *conflictResolverImpl) reset(prevRunID string, requestID string, replayE
return nil, err
}
resetMutableStateBuilder.executionInfo.LastFirstEventID = lastFirstEventID
resetMutableStateBuilder.IncrementHistorySize(response.Size)
}

// Applying events to mutableState does not move the nextEventID. Explicitly set nextEventID to new value
Expand All @@ -131,7 +132,7 @@ func (r *conflictResolverImpl) reset(prevRunID string, requestID string, replayE
}

func (r *conflictResolverImpl) getHistory(domainID string, execution shared.WorkflowExecution, firstEventID,
nextEventID int64, nextPageToken []byte) (*shared.History, []byte, int64, error) {
nextEventID int64, nextPageToken []byte) (*persistence.GetWorkflowExecutionHistoryResponse, error) {

response, err := r.historyMgr.GetWorkflowExecutionHistory(&persistence.GetWorkflowExecutionHistoryRequest{
DomainID: domainID,
Expand All @@ -143,10 +144,10 @@ func (r *conflictResolverImpl) getHistory(domainID string, execution shared.Work
})

if err != nil {
return nil, nil, common.EmptyEventID, err
return nil, err
}

return response.History, response.NextPageToken, response.LastFirstEventID, nil
return response, nil
}

func (r *conflictResolverImpl) logError(msg string, err error) {
Expand Down
6 changes: 4 additions & 2 deletions service/history/conflictResolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ func (s *conflictResolverSuite) TestGetHistory() {
NextPageToken: pageToken,
LastFirstEventID: event1.GetEventId(),
}, nil)
history, token, firstEventID, err := s.conflictResolver.getHistory(domainID, execution, common.FirstEventID, nextEventID, nil)
response, err := s.conflictResolver.getHistory(domainID, execution, common.FirstEventID, nextEventID, nil)
history, token, firstEventID := response.History, response.NextPageToken, response.LastFirstEventID
s.Nil(err)
s.Equal(history.Events, []*shared.HistoryEvent{event1, event2})
s.Equal(pageToken, token)
Expand All @@ -174,7 +175,8 @@ func (s *conflictResolverSuite) TestGetHistory() {
NextPageToken: nil,
LastFirstEventID: event4.GetEventId(),
}, nil)
history, token, firstEventID, err = s.conflictResolver.getHistory(domainID, execution, common.FirstEventID, nextEventID, token)
response, err = s.conflictResolver.getHistory(domainID, execution, common.FirstEventID, nextEventID, token)
history, token, firstEventID = response.History, response.NextPageToken, response.LastFirstEventID
s.Nil(err)
s.Equal(history.Events, []*shared.HistoryEvent{event3, event4, event5})
s.Empty(token)
Expand Down
Loading

0 comments on commit 5c93511

Please sign in to comment.