Skip to content

Commit

Permalink
Auto-reset: store auto-reset points at decisionTaskCompleted (cadence…
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored Apr 30, 2019
1 parent 14fd1a1 commit b7552d0
Show file tree
Hide file tree
Showing 16 changed files with 236 additions and 105 deletions.
2 changes: 2 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1181,6 +1181,7 @@ const (
MultipleCompletionDecisionsCounter
FailedDecisionsCounter
StaleMutableStateCounter
AutoResetPointsLimitExceededCounter
ConcurrencyUpdateFailureCounter
CadenceErrEventAlreadyStartedCounter
CadenceErrShardOwnershipLostCounter
Expand Down Expand Up @@ -1416,6 +1417,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
MultipleCompletionDecisionsCounter: {metricName: "multiple_completion_decisions", metricType: Counter},
FailedDecisionsCounter: {metricName: "failed_decisions", metricType: Counter},
StaleMutableStateCounter: {metricName: "stale_mutable_state", metricType: Counter},
AutoResetPointsLimitExceededCounter: {metricName: "auto_reset_points_exceed_limit", metricType: Counter},
ConcurrencyUpdateFailureCounter: {metricName: "concurrency_update_failure", metricType: Counter},
CadenceErrShardOwnershipLostCounter: {metricName: "cadence_errors_shard_ownership_lost", metricType: Counter},
CadenceErrEventAlreadyStartedCounter: {metricName: "cadence_errors_event_already_started", metricType: Counter},
Expand Down
3 changes: 3 additions & 0 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ var keys = map[Key]string{
HistoryVisibilityClosedMaxQPS: "history.historyVisibilityClosedMaxQPS",
HistoryLongPollExpirationInterval: "history.longPollExpirationInterval",
HistoryCacheInitialSize: "history.cacheInitialSize",
HistoryMaxAutoResetPoints: "history.historyMaxAutoResetPoints",
HistoryCacheMaxSize: "history.cacheMaxSize",
HistoryCacheTTL: "history.cacheTTL",
EventsCacheInitialSize: "history.eventsCacheInitialSize",
Expand Down Expand Up @@ -427,6 +428,8 @@ const (
EnableAdminProtection
// AdminOperationToken is the token to pass admin checking
AdminOperationToken
// HistoryMaxAutoResetPoints is the key for max number of auto reset points stored in mutableState
HistoryMaxAutoResetPoints

// EnableEventsV2 is whether to use eventsV2
EnableEventsV2
Expand Down
48 changes: 27 additions & 21 deletions service/history/MockMutableState.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,12 +335,12 @@ func (_m *mockMutableState) AddContinueAsNewEvent(_a0 int64, _a1 int64, _a2 *cac
}

// AddDecisionTaskCompletedEvent provides a mock function with given fields: _a0, _a1, _a2
func (_m *mockMutableState) AddDecisionTaskCompletedEvent(_a0 int64, _a1 int64, _a2 *shared.RespondDecisionTaskCompletedRequest) *shared.HistoryEvent {
ret := _m.Called(_a0, _a1, _a2)
func (_m *mockMutableState) AddDecisionTaskCompletedEvent(_a0 int64, _a1 int64, _a2 *shared.RespondDecisionTaskCompletedRequest, _a3 int) *shared.HistoryEvent {
ret := _m.Called(_a0, _a1, _a2, _a3)

var r0 *shared.HistoryEvent
if rf, ok := ret.Get(0).(func(int64, int64, *shared.RespondDecisionTaskCompletedRequest) *shared.HistoryEvent); ok {
r0 = rf(_a0, _a1, _a2)
if rf, ok := ret.Get(0).(func(int64, int64, *shared.RespondDecisionTaskCompletedRequest, int) *shared.HistoryEvent); ok {
r0 = rf(_a0, _a1, _a2, _a3)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*shared.HistoryEvent)
Expand Down Expand Up @@ -827,16 +827,6 @@ func (_m *mockMutableState) AddWorkflowExecutionTerminatedEvent(_a0 *shared.Term
return r0
}

// AfterAddDecisionTaskCompletedEvent provides a mock function with given fields: _a0
func (_m *mockMutableState) AfterAddDecisionTaskCompletedEvent(_a0 int64) {
_m.Called(_a0)
}

// BeforeAddDecisionTaskCompletedEvent provides a mock function with given fields:
func (_m *mockMutableState) BeforeAddDecisionTaskCompletedEvent() {
_m.Called()
}

// BufferReplicationTask provides a mock function with given fields: _a0
func (_m *mockMutableState) BufferReplicationTask(_a0 *h.ReplicateEventsRequest) error {
ret := _m.Called(_a0)
Expand Down Expand Up @@ -879,6 +869,22 @@ func (_m *mockMutableState) CloseUpdateSession() (*mutableStateSessionUpdates, e
return r0, r1
}

// CloseUpdateSession provides a mock function with given fields:
func (_m *mockMutableState) CheckResettable() error {
ret := _m.Called()

var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(error)
}
}

return r0
}

// CopyToPersistence provides a mock function with given fields:
func (_m *mockMutableState) CopyToPersistence() *persistence.WorkflowMutableState {
ret := _m.Called()
Expand Down Expand Up @@ -1930,9 +1936,9 @@ func (_m *mockMutableState) ReplicateChildWorkflowExecutionTimedOutEvent(_a0 *sh
_m.Called(_a0)
}

// ReplicateDecisionTaskCompletedEvent provides a mock function with given fields: _a0, _a1
func (_m *mockMutableState) ReplicateDecisionTaskCompletedEvent(_a0 int64, _a1 int64) {
_m.Called(_a0, _a1)
// ReplicateDecisionTaskCompletedEvent provides a mock function with given fields: _a0
func (_m *mockMutableState) ReplicateDecisionTaskCompletedEvent(_a0 *shared.HistoryEvent) {
_m.Called(_a0)
}

// ReplicateDecisionTaskFailedEvent provides a mock function with given fields: _a0, _a1
Expand Down Expand Up @@ -2108,12 +2114,12 @@ func (_m *mockMutableState) ReplicateWorkflowExecutionCompletedEvent(_a0 int64,
}

// ReplicateWorkflowExecutionContinuedAsNewEvent provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4, _a5
func (_m *mockMutableState) ReplicateWorkflowExecutionContinuedAsNewEvent(_a0 int64, _a1 string, _a2 string, _a3 *shared.HistoryEvent, _a4 *shared.HistoryEvent, _a5 *decisionInfo, _a6 mutableState, _a7 int32) error {
ret := _m.Called(_a0, _a1, _a2, _a3, _a4, _a5, _a6, _a7)
func (_m *mockMutableState) ReplicateWorkflowExecutionContinuedAsNewEvent(_a0 int64, _a1 string, _a2 string, _a3 *shared.HistoryEvent, _a4 *shared.HistoryEvent, _a5 *decisionInfo, _a6 mutableState, _a7 int32, _a8 int32) error {
ret := _m.Called(_a0, _a1, _a2, _a3, _a4, _a5, _a6, _a7, _a8)

var r0 error
if rf, ok := ret.Get(0).(func(int64, string, string, *shared.HistoryEvent, *shared.HistoryEvent, *decisionInfo, mutableState, int32) error); ok {
r0 = rf(_a0, _a1, _a2, _a3, _a4, _a5, _a6, _a7)
if rf, ok := ret.Get(0).(func(int64, string, string, *shared.HistoryEvent, *shared.HistoryEvent, *decisionInfo, mutableState, int32, int32) error); ok {
r0 = rf(_a0, _a1, _a2, _a3, _a4, _a5, _a6, _a7, _a8)
} else {
r0 = ret.Error(0)
}
Expand Down
16 changes: 15 additions & 1 deletion service/history/MockWorkflowResetor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var _ workflowResetor = (*mockWorkflowResetor)(nil)
func (_m *mockWorkflowResetor) ResetWorkflowExecution(ctx context.Context, resetRequest *workflow.ResetWorkflowExecutionRequest,
baseContext workflowExecutionContext, baseMutableState mutableState,
currContext workflowExecutionContext, currMutableState mutableState) (*workflow.ResetWorkflowExecutionResponse, error) {
ret := _m.Called(ctx, resetRequest)
ret := _m.Called(ctx, resetRequest, baseContext, baseMutableState, currContext, currMutableState)

var r0 *workflow.ResetWorkflowExecutionResponse
if rf, ok := ret.Get(0).(func(context.Context, *workflow.ResetWorkflowExecutionRequest,
Expand Down Expand Up @@ -76,3 +76,17 @@ func (_m *mockWorkflowResetor) ApplyResetEvent(ctx context.Context, request *h.R

return r0
}

// CheckResettable provides a mock function for CheckResettable
func (_m *mockWorkflowResetor) CheckResettable(ms mutableState, curr bool) error {
ret := _m.Called(ms, curr)

var r0 error
if rf, ok := ret.Get(0).(func(baseMutableState mutableState, curr bool) error); ok {
r0 = rf(ms, curr)
} else {
r0 = ret.Error(0)
}

return r0
}
16 changes: 12 additions & 4 deletions service/history/historyBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/persistence"
)

type (
Expand Down Expand Up @@ -71,8 +72,8 @@ func (b *historyBuilder) HasTransientEvents() bool {
}

func (b *historyBuilder) AddWorkflowExecutionStartedEvent(request *h.StartWorkflowExecutionRequest,
previousRunID *string) *workflow.HistoryEvent {
event := b.newWorkflowExecutionStartedEvent(request, previousRunID)
previousExecution *persistence.WorkflowExecutionInfo) *workflow.HistoryEvent {
event := b.newWorkflowExecutionStartedEvent(request, previousExecution)

return b.addEventToHistory(event)
}
Expand Down Expand Up @@ -452,7 +453,13 @@ func (b *historyBuilder) addTransientEvent(event *workflow.HistoryEvent) *workfl
}

func (b *historyBuilder) newWorkflowExecutionStartedEvent(
startRequest *h.StartWorkflowExecutionRequest, previousRunID *string) *workflow.HistoryEvent {
startRequest *h.StartWorkflowExecutionRequest, previousExecution *persistence.WorkflowExecutionInfo) *workflow.HistoryEvent {
var prevRunID *string
var resetPoints *workflow.ResetPoints
if previousExecution != nil {
prevRunID = common.StringPtr(previousExecution.RunID)
resetPoints = previousExecution.AutoResetPoints
}
request := startRequest.StartRequest
historyEvent := b.msBuilder.CreateNewHistoryEvent(workflow.EventTypeWorkflowExecutionStarted)
attributes := &workflow.WorkflowExecutionStartedEventAttributes{}
Expand All @@ -463,7 +470,8 @@ func (b *historyBuilder) newWorkflowExecutionStartedEvent(
attributes.ExecutionStartToCloseTimeoutSeconds = common.Int32Ptr(*request.ExecutionStartToCloseTimeoutSeconds)
attributes.TaskStartToCloseTimeoutSeconds = common.Int32Ptr(*request.TaskStartToCloseTimeoutSeconds)
attributes.ChildPolicy = request.ChildPolicy
attributes.ContinuedExecutionRunId = previousRunID
attributes.ContinuedExecutionRunId = prevRunID
attributes.PrevAutoResetPoints = resetPoints
attributes.Identity = common.StringPtr(common.StringDefault(request.Identity))
attributes.RetryPolicy = request.RetryPolicy
attributes.Attempt = common.Int32Ptr(startRequest.GetAttempt())
Expand Down
2 changes: 1 addition & 1 deletion service/history/historyBuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ func (s *historyBuilderSuite) addDecisionTaskCompletedEvent(scheduleID, startedI
e := s.msBuilder.AddDecisionTaskCompletedEvent(scheduleID, startedID, &workflow.RespondDecisionTaskCompletedRequest{
ExecutionContext: context,
Identity: common.StringPtr(identity),
})
}, defaultHistoryMaxAutoResetPoints)

return e
}
Expand Down
11 changes: 10 additions & 1 deletion service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1245,7 +1245,16 @@ Update_History_Loop:
}

startedID := di.StartedID
completedEvent := msBuilder.AddDecisionTaskCompletedEvent(scheduleID, startedID, request)
maxResetPoints := e.config.MaxAutoResetPoints(domainEntry.GetInfo().Name)
if msBuilder.GetExecutionInfo().AutoResetPoints != nil && maxResetPoints == len(msBuilder.GetExecutionInfo().AutoResetPoints.Points) {
e.metricsClient.IncCounter(metrics.HistoryRespondDecisionTaskCompletedScope, metrics.AutoResetPointsLimitExceededCounter)
e.throttledLogger.Warn("the number of auto-reset points is exceeding the limit, will do rotating.",
tag.WorkflowDomainName(domainEntry.GetInfo().Name),
tag.WorkflowID(workflowExecution.GetWorkflowId()),
tag.WorkflowDomainName(workflowExecution.GetRunId()),
tag.Number(int64(maxResetPoints)))
}
completedEvent := msBuilder.AddDecisionTaskCompletedEvent(scheduleID, startedID, request, maxResetPoints)
if completedEvent == nil {
return nil, &workflow.InternalServiceError{Message: "Unable to add DecisionTaskCompleted event to history."}
}
Expand Down
2 changes: 1 addition & 1 deletion service/history/historyEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4832,7 +4832,7 @@ func addDecisionTaskCompletedEvent(builder mutableState, scheduleID, startedID i
e := builder.AddDecisionTaskCompletedEvent(scheduleID, startedID, &workflow.RespondDecisionTaskCompletedRequest{
ExecutionContext: context,
Identity: common.StringPtr(identity),
})
}, defaultHistoryMaxAutoResetPoints)

builder.FlushBufferedEvents()

Expand Down
9 changes: 4 additions & 5 deletions service/history/mutableState.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type (
AddChildWorkflowExecutionTimedOutEvent(int64, *workflow.WorkflowExecution, *workflow.WorkflowExecutionTimedOutEventAttributes) *workflow.HistoryEvent
AddCompletedWorkflowEvent(int64, *workflow.CompleteWorkflowExecutionDecisionAttributes) *workflow.HistoryEvent
AddContinueAsNewEvent(int64, int64, *cache.DomainCacheEntry, string, *workflow.ContinueAsNewWorkflowExecutionDecisionAttributes, int32) (*workflow.HistoryEvent, mutableState, error)
AddDecisionTaskCompletedEvent(int64, int64, *workflow.RespondDecisionTaskCompletedRequest) *workflow.HistoryEvent
AddDecisionTaskCompletedEvent(int64, int64, *workflow.RespondDecisionTaskCompletedRequest, int) *workflow.HistoryEvent
AddDecisionTaskFailedEvent(scheduleEventID int64, startedEventID int64, cause workflow.DecisionTaskFailedCause, details []byte, identity, reason, baseRunID, newRunID string, forkEventVersion int64) *workflow.HistoryEvent
AddDecisionTaskScheduleToStartTimeoutEvent(int64) *workflow.HistoryEvent
AddDecisionTaskScheduledEvent() *decisionInfo
Expand All @@ -90,11 +90,10 @@ type (
AddWorkflowExecutionSignaled(signalName string, input []byte, identity string) *workflow.HistoryEvent
AddWorkflowExecutionStartedEvent(workflow.WorkflowExecution, *h.StartWorkflowExecutionRequest) *workflow.HistoryEvent
AddWorkflowExecutionTerminatedEvent(*workflow.TerminateWorkflowExecutionRequest) *workflow.HistoryEvent
AfterAddDecisionTaskCompletedEvent(int64)
BeforeAddDecisionTaskCompletedEvent()
BufferReplicationTask(*h.ReplicateEventsRequest) error
ClearStickyness()
CloseUpdateSession() (*mutableStateSessionUpdates, error)
CheckResettable() error
CopyToPersistence() *persistence.WorkflowMutableState
CreateActivityRetryTimer(*persistence.ActivityInfo, string) persistence.Task
CreateNewHistoryEvent(eventType workflow.EventType) *workflow.HistoryEvent
Expand Down Expand Up @@ -171,7 +170,7 @@ type (
ReplicateChildWorkflowExecutionStartedEvent(*workflow.HistoryEvent) error
ReplicateChildWorkflowExecutionTerminatedEvent(*workflow.HistoryEvent)
ReplicateChildWorkflowExecutionTimedOutEvent(*workflow.HistoryEvent)
ReplicateDecisionTaskCompletedEvent(int64, int64)
ReplicateDecisionTaskCompletedEvent(*workflow.HistoryEvent)
ReplicateDecisionTaskFailedEvent()
ReplicateDecisionTaskScheduledEvent(int64, int64, string, int32, int64) *decisionInfo
ReplicateDecisionTaskStartedEvent(*decisionInfo, int64, int64, int64, string, int64) *decisionInfo
Expand All @@ -191,7 +190,7 @@ type (
ReplicateWorkflowExecutionCancelRequestedEvent(*workflow.HistoryEvent)
ReplicateWorkflowExecutionCanceledEvent(int64, *workflow.HistoryEvent)
ReplicateWorkflowExecutionCompletedEvent(int64, *workflow.HistoryEvent)
ReplicateWorkflowExecutionContinuedAsNewEvent(int64, string, string, *workflow.HistoryEvent, *workflow.HistoryEvent, *decisionInfo, mutableState, int32) error
ReplicateWorkflowExecutionContinuedAsNewEvent(int64, string, string, *workflow.HistoryEvent, *workflow.HistoryEvent, *decisionInfo, mutableState, int32, int32) error
ReplicateWorkflowExecutionFailedEvent(int64, *workflow.HistoryEvent)
ReplicateWorkflowExecutionSignaled(*workflow.HistoryEvent)
ReplicateWorkflowExecutionStartedEvent(string, *string, workflow.WorkflowExecution, string, *workflow.HistoryEvent)
Expand Down
Loading

0 comments on commit b7552d0

Please sign in to comment.