Skip to content

Commit

Permalink
Introduce in memory decision task state machine (cadence-workflow#2597)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewjdawson2016 authored Sep 27, 2019
1 parent 4a63a82 commit 5fa1faf
Show file tree
Hide file tree
Showing 12 changed files with 580 additions and 207 deletions.
27 changes: 16 additions & 11 deletions common/log/tag/values.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ var (
WorkflowActionDecisionTaskTimedOut = workflowAction("add-decisiontask-timedout-event")
WorkflowActionDecisionTaskFailed = workflowAction("add-decisiontask-failed-event")

// in memory decision
WorkflowActionInMemoryDecisionTaskScheduled = workflowAction("add-in-memory-decisiontask-scheduled")
WorkflowActionInMemoryDecisionTaskStarted = workflowAction("add-in-memory-decisiontask-started")

// activity
WorkflowActionActivityTaskScheduled = workflowAction("add-activitytask-scheduled-event")
WorkflowActionActivityTaskStarted = workflowAction("add-activitytask-started-event")
Expand Down Expand Up @@ -134,17 +138,18 @@ var (

// Pre-defined values for SysErrorType
var (
ErrorTypeInvalidHistoryAction = errorType("InvalidHistoryAction")
ErrorTypeInvalidQueryTask = errorType("InvalidQueryTask")
ErrorTypeQueryTaskFailed = errorType("QueryTaskFailed")
ErrorTypePersistentStoreError = errorType("PersistentStoreError")
ErrorTypeHistorySerializationError = errorType("HistorySerializationError")
ErrorTypeHistoryDeserializationError = errorType("HistoryDeserializationError")
ErrorTypeDuplicateTask = errorType("DuplicateTask")
ErrorTypeMultipleCompletionDecisions = errorType("MultipleCompletionDecisions")
ErrorTypeDuplicateTransferTask = errorType("DuplicateTransferTask")
ErrorTypeDecisionFailed = errorType("DecisionFailed")
ErrorTypeInvalidMutableStateAction = errorType("InvalidMutableStateAction")
ErrorTypeInvalidHistoryAction = errorType("InvalidHistoryAction")
ErrorTypeInvalidQueryTask = errorType("InvalidQueryTask")
ErrorTypeQueryTaskFailed = errorType("QueryTaskFailed")
ErrorTypePersistentStoreError = errorType("PersistentStoreError")
ErrorTypeHistorySerializationError = errorType("HistorySerializationError")
ErrorTypeHistoryDeserializationError = errorType("HistoryDeserializationError")
ErrorTypeDuplicateTask = errorType("DuplicateTask")
ErrorTypeMultipleCompletionDecisions = errorType("MultipleCompletionDecisions")
ErrorTypeDuplicateTransferTask = errorType("DuplicateTransferTask")
ErrorTypeDecisionFailed = errorType("DecisionFailed")
ErrorTypeInvalidMutableStateAction = errorType("InvalidMutableStateAction")
ErrorTypeInvalidMemDecisionTaskAction = errorType("InvalidMemDecisionTaskAction")
)

// Pre-defined values for SysShardUpdate
Expand Down
85 changes: 85 additions & 0 deletions service/history/MockMutableState.go
Original file line number Diff line number Diff line change
Expand Up @@ -2889,3 +2889,88 @@ func (_m *mockMutableState) CloseTransactionAsSnapshot(_a0 time.Time, _a1 transa

return r0, r1, r2
}

// AddInMemoryDecisionTaskScheduled provides a mock function with given fields: _a0
func (_m *mockMutableState) AddInMemoryDecisionTaskScheduled(_a0 time.Duration) error {
ret := _m.Called(_a0)

var r0 error
if rf, ok := ret.Get(0).(func(time.Duration) error); ok {
r0 = rf(_a0)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(error)
}
}

return r0
}

// AddInMemoryDecisionTaskStarted provides a mock function with given fields:
func (_m *mockMutableState) AddInMemoryDecisionTaskStarted() error {
ret := _m.Called()

var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(error)
}
}

return r0
}

// DeleteInMemoryDecisionTask provides a mock function with given fields:
func (_m *mockMutableState) DeleteInMemoryDecisionTask() {
_m.Called()
}

// HasScheduledInMemoryDecisionTask provides a mock function with given fields:
func (_m *mockMutableState) HasScheduledInMemoryDecisionTask() bool {
ret := _m.Called()

var r0 bool
if rf, ok := ret.Get(0).(func() bool); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(bool)
}
}

return r0
}

// HasStartedInMemoryDecisionTask provides a mock function with given fields:
func (_m *mockMutableState) HasStartedInMemoryDecisionTask() bool {
ret := _m.Called()

var r0 bool
if rf, ok := ret.Get(0).(func() bool); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(bool)
}
}

return r0
}

// HasInMemoryDecisionTask provides a mock function with given fields:
func (_m *mockMutableState) HasInMemoryDecisionTask() bool {
ret := _m.Called()

var r0 bool
if rf, ok := ret.Get(0).(func() bool); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(bool)
}
}

return r0
}
100 changes: 78 additions & 22 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,44 +585,100 @@ func (e *historyEngineImpl) GetMutableState(
func (e *historyEngineImpl) QueryWorkflow(
ctx ctx.Context,
request *h.QueryWorkflowRequest,
) (*h.QueryWorkflowResponse, error) {
) (retResp *h.QueryWorkflowResponse, retErr error) {
domainCache, err := e.shard.GetDomainCache().GetDomainByID(request.GetDomainUUID())
if err != nil {
return nil, err
}
context, release, err := e.historyCache.getOrCreateWorkflowExecution(ctx, request.GetDomainUUID(), *request.GetExecution())
if err != nil {
return nil, err
}
ms, err := context.loadWorkflowExecution()
msBuilder, err := context.loadWorkflowExecution()
if err != nil {
release(err)
return nil, err
}
queryRegistry := ms.GetQueryRegistry()
queryRegistry := msBuilder.GetQueryRegistry()
release(nil)
queryID, _, queryTermCh := queryRegistry.bufferQuery(request.GetQuery())
defer queryRegistry.removeQuery(queryID)
domainCache, err := e.shard.GetDomainCache().GetDomainByID(request.GetDomainUUID())
if err != nil {
return nil, err

ttl := e.shard.GetConfig().LongPollExpirationInterval(domainCache.GetInfo().Name)
timer := time.NewTimer(ttl)

// after query is finished removed query from query registry and remove any potentially created in memory decision task
defer func() {
timer.Stop()
queryRegistry.removeQuery(queryID)
context, release, err := e.historyCache.getOrCreateWorkflowExecution(ctx, request.GetDomainUUID(), *request.GetExecution())
if err != nil {
retResp = nil
retErr = err
return
}
msBuilder, err := context.loadWorkflowExecution()
if err != nil {
release(err)
retResp = nil
retErr = err
return
}
msBuilder.DeleteInMemoryDecisionTask()
release(nil)
}()

// ensure decision task exists to dispatch query on
retryLoop:
for i := 0; i < conditionalRetryCount; i++ {
context, release, err = e.historyCache.getOrCreateWorkflowExecution(ctx, request.GetDomainUUID(), *request.GetExecution())
if err != nil {
return nil, err
}
msBuilder, err := context.loadWorkflowExecution()
if err != nil {
release(err)
return nil, err
}
// a scheduled decision task already exists - no need to schedule an in memory decision task
if (msBuilder.HasPendingDecision() && !msBuilder.HasInFlightDecision()) || msBuilder.HasScheduledInMemoryDecisionTask() {
release(nil)
break retryLoop
}
// there exists an inflight decision task - try again to schedule decision task
if msBuilder.HasInFlightDecision() {
release(nil)
// wait for currently inflight decision task to complete
select {
case <-timer.C:
return nil, ErrQueryTimeout
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(100 * time.Millisecond):
continue retryLoop
}
}
// there is no scheduled or started decision task - schedule in memory decision task
if err := msBuilder.AddInMemoryDecisionTaskScheduled(ttl); err != nil {
release(err)
return nil, err
}
}
timer := time.NewTimer(e.shard.GetConfig().LongPollExpirationInterval(domainCache.GetInfo().Name))
defer timer.Stop()

// at this point query has been buffered and there is a pending decision task to dispatch the query on
select {
case <-queryTermCh:
querySnapshot, err := queryRegistry.getQuerySnapshot(queryID)
if err != nil {
return nil, err
}
switch querySnapshot.state {
case queryStateCompleted:
result := querySnapshot.queryResult
switch result.GetResultType() {
case workflow.QueryResultTypeAnswered:
return &h.QueryWorkflowResponse{
QueryResult: result.GetAnswer(),
}, nil
case workflow.QueryResultTypeFailed:
return nil, &workflow.QueryFailedError{Message: fmt.Sprintf("%v: %v", result.GetErrorReason(), result.GetErrorDetails())}
}
case queryStateExpired:
return nil, ErrQueryTimeout
result := querySnapshot.queryResult
switch result.GetResultType() {
case workflow.QueryResultTypeAnswered:
return &h.QueryWorkflowResponse{
QueryResult: result.GetAnswer(),
}, nil
case workflow.QueryResultTypeFailed:
return nil, &workflow.QueryFailedError{Message: fmt.Sprintf("%v: %v", result.GetErrorReason(), result.GetErrorDetails())}
}
case <-timer.C:
return nil, ErrQueryTimeout
Expand Down
7 changes: 7 additions & 0 deletions service/history/mutableState.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ type (
}

mutableState interface {
AddInMemoryDecisionTaskScheduled(time.Duration) error
AddInMemoryDecisionTaskStarted() error
DeleteInMemoryDecisionTask()
HasScheduledInMemoryDecisionTask() bool
HasStartedInMemoryDecisionTask() bool
HasInMemoryDecisionTask() bool

AddActivityTaskCancelRequestedEvent(int64, string, string) (*workflow.HistoryEvent, *persistence.ActivityInfo, error)
AddActivityTaskCanceledEvent(int64, int64, int64, []uint8, string) (*workflow.HistoryEvent, error)
AddActivityTaskCompletedEvent(int64, int64, *workflow.RespondActivityTaskCompletedRequest) (*workflow.HistoryEvent, error)
Expand Down
32 changes: 32 additions & 0 deletions service/history/mutableStateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,30 @@ func newMutableStateBuilderWithReplicationState(
return s
}

func (e *mutableStateBuilder) AddInMemoryDecisionTaskScheduled(ttl time.Duration) error {
return e.decisionTaskManager.AddInMemoryDecisionTaskScheduled(ttl)
}

func (e *mutableStateBuilder) AddInMemoryDecisionTaskStarted() error {
return e.decisionTaskManager.AddInMemoryDecisionTaskStarted()
}

func (e *mutableStateBuilder) DeleteInMemoryDecisionTask() {
e.decisionTaskManager.DeleteInMemoryDecisionTask()
}

func (e *mutableStateBuilder) HasScheduledInMemoryDecisionTask() bool {
return e.decisionTaskManager.HasScheduledInMemoryDecisionTask()
}

func (e *mutableStateBuilder) HasStartedInMemoryDecisionTask() bool {
return e.decisionTaskManager.HasStartedInMemoryDecisionTask()
}

func (e *mutableStateBuilder) HasInMemoryDecisionTask() bool {
return e.decisionTaskManager.HasInMemoryDecisionTask()
}

func (e *mutableStateBuilder) CopyToPersistence() *persistence.WorkflowMutableState {
state := &persistence.WorkflowMutableState{}

Expand Down Expand Up @@ -1505,12 +1529,20 @@ func (e *mutableStateBuilder) ReplicateWorkflowExecutionStartedEvent(
func (e *mutableStateBuilder) AddFirstDecisionTaskScheduled(
startEvent *workflow.HistoryEvent,
) error {
opTag := tag.WorkflowActionDecisionTaskScheduled
if err := e.checkMutability(opTag); err != nil {
return err
}
return e.decisionTaskManager.AddFirstDecisionTaskScheduled(startEvent)
}

func (e *mutableStateBuilder) AddDecisionTaskScheduledEvent(
bypassTaskGeneration bool,
) (*decisionInfo, error) {
opTag := tag.WorkflowActionDecisionTaskScheduled
if err := e.checkMutability(opTag); err != nil {
return nil, err
}
return e.decisionTaskManager.AddDecisionTaskScheduledEvent(bypassTaskGeneration)
}

Expand Down
Loading

0 comments on commit 5fa1faf

Please sign in to comment.