Skip to content

Commit

Permalink
Fix query task before first decision task completed (cadence-workflow…
Browse files Browse the repository at this point in the history
  • Loading branch information
yiminc authored Dec 6, 2018
1 parent 349f26c commit ac9b856
Show file tree
Hide file tree
Showing 15 changed files with 305 additions and 56 deletions.
4 changes: 2 additions & 2 deletions .gen/go/history/idl.go

Large diffs are not rendered by default.

43 changes: 41 additions & 2 deletions .gen/go/history/types.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

133 changes: 133 additions & 0 deletions host/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2838,6 +2838,139 @@ func (s *integrationSuite) TestQueryWorkflow_NonSticky() {
s.Equal("unknown-query-type", queryFailError.Message)
}

func (s *integrationSuite) TestQueryWorkflow_BeforeFirstDecision() {
id := "interation-test-query-workflow-before-first-decision"
wt := "interation-test-query-workflow-before-first-decision-type"
tl := "interation-test-query-workflow-before-first-decision-tasklist"
identity := "worker1"
activityName := "activity_type1"
queryType := "test-query"

workflowType := &workflow.WorkflowType{}
workflowType.Name = common.StringPtr(wt)

taskList := &workflow.TaskList{}
taskList.Name = common.StringPtr(tl)

// Start workflow execution
request := &workflow.StartWorkflowExecutionRequest{
RequestId: common.StringPtr(uuid.New()),
Domain: common.StringPtr(s.domainName),
WorkflowId: common.StringPtr(id),
WorkflowType: workflowType,
TaskList: taskList,
Input: nil,
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(100),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1),
Identity: common.StringPtr(identity),
}

we, err0 := s.engine.StartWorkflowExecution(createContext(), request)
s.Nil(err0)

//decider logic
activityScheduled := false
activityData := int32(1)
dtHandler := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType,
previousStartedEventID, startedEventID int64, history *workflow.History) ([]byte, []*workflow.Decision, error) {

if !activityScheduled {
activityScheduled = true
buf := new(bytes.Buffer)
s.Nil(binary.Write(buf, binary.LittleEndian, activityData))

return nil, []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeScheduleActivityTask),
ScheduleActivityTaskDecisionAttributes: &workflow.ScheduleActivityTaskDecisionAttributes{
ActivityId: common.StringPtr(strconv.Itoa(int(1))),
ActivityType: &workflow.ActivityType{Name: common.StringPtr(activityName)},
TaskList: &workflow.TaskList{Name: &tl},
Input: buf.Bytes(),
ScheduleToCloseTimeoutSeconds: common.Int32Ptr(100),
ScheduleToStartTimeoutSeconds: common.Int32Ptr(2),
StartToCloseTimeoutSeconds: common.Int32Ptr(50),
HeartbeatTimeoutSeconds: common.Int32Ptr(5),
},
}}, nil
}

return nil, []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeCompleteWorkflowExecution),
CompleteWorkflowExecutionDecisionAttributes: &workflow.CompleteWorkflowExecutionDecisionAttributes{
Result: []byte("Done."),
},
}}, nil
}

queryTaskHandled := false
queryHandler := func(task *workflow.PollForDecisionTaskResponse) ([]byte, error) {
s.NotNil(task.Query)
s.NotNil(task.Query.QueryType)
s.True(task.GetPreviousStartedEventId() > 0)
queryTaskHandled = true
if *task.Query.QueryType == queryType {
return []byte("query-result"), nil
}

return nil, errors.New("unknown-query-type")
}

poller := &TaskPoller{
Engine: s.engine,
Domain: s.domainName,
TaskList: taskList,
Identity: identity,
DecisionHandler: dtHandler,
QueryHandler: queryHandler,
Logger: s.logger,
T: s.T(),
}

workflowExecution := &workflow.WorkflowExecution{
WorkflowId: common.StringPtr(id),
RunId: common.StringPtr(*we.RunId),
}

type QueryResult struct {
Resp *workflow.QueryWorkflowResponse
Err error
}
queryResultCh := make(chan QueryResult)
queryWorkflowFn := func(queryType string) {
queryResp, err := s.engine.QueryWorkflow(createContext(), &workflow.QueryWorkflowRequest{
Domain: common.StringPtr(s.domainName),
Execution: workflowExecution,
Query: &workflow.WorkflowQuery{
QueryType: common.StringPtr(queryType),
},
})
queryResultCh <- QueryResult{Resp: queryResp, Err: err}
}

// drop first decision task
poller.PollAndProcessDecisionTask(false, true /* drop first decision task */)

// call QueryWorkflow before first decision task completed
go queryWorkflowFn(queryType)

for {
// loop until process the query task
isQueryTask, errInner := poller.PollAndProcessDecisionTask(false, false)
s.Nil(errInner)
if isQueryTask {
break
}
} // wait until query result is ready
s.True(queryTaskHandled)

queryResult := <-queryResultCh
s.NoError(queryResult.Err)
s.NotNil(queryResult.Resp)
s.NotNil(queryResult.Resp.QueryResult)
queryResultString := string(queryResult.Resp.QueryResult)
s.Equal("query-result", queryResultString)
}

func (s *integrationSuite) TestDescribeWorkflowExecution() {
id := "interation-describe-wfe-test"
wt := "interation-describe-wfe-test-type"
Expand Down
2 changes: 1 addition & 1 deletion host/taskPoller.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ Loop:
lastDecisionScheduleEvent = e
}
}
if lastDecisionScheduleEvent != nil {
if lastDecisionScheduleEvent != nil && decisionAttempt > 0 {
require.Equal(p.T, decisionAttempt, lastDecisionScheduleEvent.DecisionTaskScheduledEventAttributes.GetAttempt())
}

Expand Down
1 change: 1 addition & 0 deletions idl/github.com/uber/cadence/history.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ struct GetMutableStateResponse {
10: optional shared.WorkflowExecution execution
20: optional shared.WorkflowType workflowType
30: optional i64 (js.type = "Long") NextEventId
35: optional i64 (js.type = "Long") PreviousStartedEventId
40: optional i64 (js.type = "Long") LastFirstEventId
50: optional shared.TaskList taskList
60: optional shared.TaskList stickyTaskList
Expand Down
14 changes: 14 additions & 0 deletions service/history/MockMutableState.go
Original file line number Diff line number Diff line change
Expand Up @@ -1416,6 +1416,20 @@ func (_m *mockMutableState) GetNextEventID() int64 {
return r0
}

// GetPreviousStartedEventID returns last started decision task event ID
func (_m *mockMutableState) GetPreviousStartedEventID() int64 {
ret := _m.Called()

var r0 int64
if rf, ok := ret.Get(0).(func() int64); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(int64)
}

return r0
}

// GetPendingActivityInfos provides a mock function with given fields:
func (_m *mockMutableState) GetPendingActivityInfos() map[int64]*persistence.ActivityInfo {
ret := _m.Called()
Expand Down
2 changes: 2 additions & 0 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,7 @@ func (e *historyEngineImpl) GetMutableState(ctx context.Context,
response.LastFirstEventId = common.Int64Ptr(event.lastFirstEventID)
response.NextEventId = common.Int64Ptr(event.nextEventID)
response.IsWorkflowRunning = common.BoolPtr(event.isWorkflowRunning)
response.PreviousStartedEventId = common.Int64Ptr(event.previousStartedEventID)
if expectedNextEventID < response.GetNextEventId() || !response.GetIsWorkflowRunning() {
return response, nil
}
Expand Down Expand Up @@ -663,6 +664,7 @@ func (e *historyEngineImpl) getMutableState(ctx context.Context,
WorkflowType: &workflow.WorkflowType{Name: common.StringPtr(executionInfo.WorkflowTypeName)},
LastFirstEventId: common.Int64Ptr(msBuilder.GetLastFirstEventID()),
NextEventId: common.Int64Ptr(msBuilder.GetNextEventID()),
PreviousStartedEventId: common.Int64Ptr(msBuilder.GetPreviousStartedEventID()),
TaskList: &workflow.TaskList{Name: common.StringPtr(executionInfo.TaskList)},
StickyTaskList: &workflow.TaskList{Name: common.StringPtr(executionInfo.StickyTaskList)},
ClientLibraryVersion: common.StringPtr(executionInfo.ClientLibraryVersion),
Expand Down
9 changes: 5 additions & 4 deletions service/history/historyEngineInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ type (

historyEventNotification struct {
workflowIdentifier
lastFirstEventID int64
nextEventID int64
isWorkflowRunning bool
timestamp time.Time
lastFirstEventID int64
nextEventID int64
previousStartedEventID int64
isWorkflowRunning bool
timestamp time.Time
}

// Engine represents an interface for managing workflow execution history.
Expand Down
9 changes: 5 additions & 4 deletions service/history/historyEventNotifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,17 @@ type (
var _ historyEventNotifier = (*historyEventNotifierImpl)(nil)

func newHistoryEventNotification(domainID string, workflowExecution *gen.WorkflowExecution,
lastFirstEventID int64, nextEventID int64, isWorkflowRunning bool) *historyEventNotification {
lastFirstEventID int64, nextEventID int64, previousStartedEventID int64, isWorkflowRunning bool) *historyEventNotification {
return &historyEventNotification{
workflowIdentifier: workflowIdentifier{
domainID: domainID,
workflowID: *workflowExecution.WorkflowId,
runID: *workflowExecution.RunId,
},
lastFirstEventID: lastFirstEventID,
nextEventID: nextEventID,
isWorkflowRunning: isWorkflowRunning,
lastFirstEventID: lastFirstEventID,
nextEventID: nextEventID,
previousStartedEventID: previousStartedEventID,
isWorkflowRunning: isWorkflowRunning,
}
}

Expand Down
6 changes: 4 additions & 2 deletions service/history/historyEventNotifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,10 @@ func (s *historyEventNotifierSuite) TestSingleSubscriberWatchingEvents() {
RunId: common.StringPtr("run ID"),
}
var lastFirstEventID int64 = 3
var previousStartedEventID int64 = 5
var nextEventID int64 = 18
isRunning := true
historyEvent := newHistoryEventNotification(domainID, execution, lastFirstEventID, nextEventID, isRunning)
historyEvent := newHistoryEventNotification(domainID, execution, lastFirstEventID, nextEventID, previousStartedEventID, isRunning)
timerChan := time.NewTimer(time.Second * 2).C

subscriberID, channel, err := s.historyEventNotifier.WatchHistoryEvent(newWorkflowIdentifier(domainID, execution))
Expand Down Expand Up @@ -112,9 +113,10 @@ func (s *historyEventNotifierSuite) TestMultipleSubscriberWatchingEvents() {
}

var lastFirstEventID int64 = 3
var previousStartedEventID int64 = 5
var nextEventID int64 = 18
isRunning := true
historyEvent := newHistoryEventNotification(domainID, execution, lastFirstEventID, nextEventID, isRunning)
historyEvent := newHistoryEventNotification(domainID, execution, lastFirstEventID, nextEventID, previousStartedEventID, isRunning)
timerChan := time.NewTimer(time.Second * 5).C

subscriberCount := 100
Expand Down
2 changes: 2 additions & 0 deletions service/history/historyReplicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1403,6 +1403,7 @@ func (s *historyReplicatorSuite) TestApplyOtherEvents_IncomingGreaterThanCurrent

// these does not matter, but will be used by ms builder change notification
msBuilder.On("GetLastFirstEventID").Return(currentNextEventID - 4)
msBuilder.On("GetPreviousStartedEventID").Return(currentNextEventID - 4)
msBuilder.On("IsWorkflowExecutionRunning").Return(true)

s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.MatchedBy(func(input *persistence.UpdateWorkflowExecutionRequest) bool {
Expand Down Expand Up @@ -1585,6 +1586,7 @@ func (s *historyReplicatorSuite) TestApplyOtherEvents_IncomingGreaterThanCurrent
msBuilder.On("GetLastWriteVersion").Return(currentVersion)
msBuilder.On("GetNextEventID").Return(currentNextEventID)
msBuilder.On("GetEventStoreVersion").Return(int32(0))
msBuilder.On("GetPreviousStartedEventID").Return(currentNextEventID - 4)
msBuilder.On("GetReplicationState").Return(replicationState)
msBuilder.On("BufferReplicationTask", request).Return(nil).Once()
msBuilder.On("IncrementHistorySize", mock.Anything).Return().Once()
Expand Down
1 change: 1 addition & 0 deletions service/history/mutableState.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ type (
GetLastUpdatedTimestamp() int64
GetLastWriteVersion() int64
GetNextEventID() int64
GetPreviousStartedEventID() int64
GetPendingDecision(int64) (*decisionInfo, bool)
GetPendingActivityInfos() map[int64]*persistence.ActivityInfo
GetPendingTimerInfos() map[string]*persistence.TimerInfo
Expand Down
5 changes: 5 additions & 0 deletions service/history/mutableStateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1102,6 +1102,11 @@ func (e *mutableStateBuilder) GetNextEventID() int64 {
return e.executionInfo.NextEventID
}

// GetPreviousStartedEventID returns last started decision task event ID
func (e *mutableStateBuilder) GetPreviousStartedEventID() int64 {
return e.executionInfo.LastProcessedEvent
}

func (e *mutableStateBuilder) IsWorkflowExecutionRunning() bool {
return e.executionInfo.State != persistence.WorkflowStateCompleted
}
Expand Down
Loading

0 comments on commit ac9b856

Please sign in to comment.