Skip to content

Commit

Permalink
Fix flacky integration tests (cadence-workflow#4094)
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll authored Mar 30, 2021
1 parent dfc42d3 commit 5a10dc4
Showing 1 changed file with 16 additions and 12 deletions.
28 changes: 16 additions & 12 deletions host/queryworkflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"context"
"encoding/binary"
"errors"
"sync/atomic"
"time"

"github.com/pborman/uuid"
Expand Down Expand Up @@ -588,7 +589,8 @@ func (s *IntegrationSuite) TestQueryWorkflow_Consistent_PiggybackQuery() {
// decider logic
activityScheduled := false
activityData := int32(1)
handledSignal := false
var handledSignal atomic.Value
handledSignal.Store(false)
dtHandler := func(execution *types.WorkflowExecution, wt *types.WorkflowType,
previousStartedEventID, startedEventID int64, history *types.History) ([]byte, []*types.Decision, error) {

Expand All @@ -613,7 +615,7 @@ func (s *IntegrationSuite) TestQueryWorkflow_Consistent_PiggybackQuery() {
} else if previousStartedEventID > 0 {
for _, event := range history.Events[previousStartedEventID:] {
if *event.EventType == types.EventTypeWorkflowExecutionSignaled {
handledSignal = true
handledSignal.Store(true)
return nil, []*types.Decision{}, nil
}
}
Expand Down Expand Up @@ -668,7 +670,7 @@ func (s *IntegrationSuite) TestQueryWorkflow_Consistent_PiggybackQuery() {
queryWorkflowFn := func(queryType string, rejectCondition *types.QueryRejectCondition) {
// before the query is answer the signal is not handled because the decision task is not dispatched
// to the worker yet
s.False(handledSignal)
s.False(handledSignal.Load().(bool))
queryResp, err := s.engine.QueryWorkflow(createContext(), &types.QueryWorkflowRequest{
Domain: s.domainName,
Execution: &types.WorkflowExecution{
Expand All @@ -683,7 +685,7 @@ func (s *IntegrationSuite) TestQueryWorkflow_Consistent_PiggybackQuery() {
})
// after the query is answered the signal is handled because query is consistent and since
// signal came before query signal must be handled by the time query returns
s.True(handledSignal)
s.True(handledSignal.Load().(bool))
queryResultCh <- QueryResult{Resp: queryResp, Err: err}
}

Expand Down Expand Up @@ -934,7 +936,8 @@ func (s *IntegrationSuite) TestQueryWorkflow_Consistent_BlockedByStarted_NonStic
// decider logic
activityScheduled := false
activityData := int32(1)
handledSignal := false
var handledSignal atomic.Value
handledSignal.Store(false)
dtHandler := func(execution *types.WorkflowExecution, wt *types.WorkflowType,
previousStartedEventID, startedEventID int64, history *types.History) ([]byte, []*types.Decision, error) {

Expand All @@ -961,7 +964,7 @@ func (s *IntegrationSuite) TestQueryWorkflow_Consistent_BlockedByStarted_NonStic
if *event.EventType == types.EventTypeWorkflowExecutionSignaled {
// wait for some time to force decision task to stay in started state while query is issued
<-time.After(5 * time.Second)
handledSignal = true
handledSignal.Store(true)
return nil, []*types.Decision{}, nil
}
}
Expand Down Expand Up @@ -1014,7 +1017,7 @@ func (s *IntegrationSuite) TestQueryWorkflow_Consistent_BlockedByStarted_NonStic
}
queryResultCh := make(chan QueryResult)
queryWorkflowFn := func(queryType string, rejectCondition *types.QueryRejectCondition) {
s.False(handledSignal)
s.False(handledSignal.Load().(bool))
queryResp, err := s.engine.QueryWorkflow(createContext(), &types.QueryWorkflowRequest{
Domain: s.domainName,
Execution: &types.WorkflowExecution{
Expand All @@ -1027,7 +1030,7 @@ func (s *IntegrationSuite) TestQueryWorkflow_Consistent_BlockedByStarted_NonStic
QueryRejectCondition: rejectCondition,
QueryConsistencyLevel: types.QueryConsistencyLevelStrong.Ptr(),
})
s.True(handledSignal)
s.True(handledSignal.Load().(bool))
queryResultCh <- QueryResult{Resp: queryResp, Err: err}
}

Expand Down Expand Up @@ -1123,7 +1126,8 @@ func (s *IntegrationSuite) TestQueryWorkflow_Consistent_NewDecisionTask_Sticky()
// decider logic
activityScheduled := false
activityData := int32(1)
handledSignal := false
var handledSignal atomic.Value
handledSignal.Store(false)
dtHandler := func(execution *types.WorkflowExecution, wt *types.WorkflowType,
previousStartedEventID, startedEventID int64, history *types.History) ([]byte, []*types.Decision, error) {
if !activityScheduled {
Expand All @@ -1149,7 +1153,7 @@ func (s *IntegrationSuite) TestQueryWorkflow_Consistent_NewDecisionTask_Sticky()
if *event.EventType == types.EventTypeWorkflowExecutionSignaled {
// wait for some time to force decision task to stay in started state while query is issued
<-time.After(5 * time.Second)
handledSignal = true
handledSignal.Store(true)
return nil, []*types.Decision{}, nil
}
}
Expand Down Expand Up @@ -1204,7 +1208,7 @@ func (s *IntegrationSuite) TestQueryWorkflow_Consistent_NewDecisionTask_Sticky()
}
queryResultCh := make(chan QueryResult)
queryWorkflowFn := func(queryType string, rejectCondition *types.QueryRejectCondition) {
s.False(handledSignal)
s.False(handledSignal.Load().(bool))
queryResp, err := s.engine.QueryWorkflow(createContext(), &types.QueryWorkflowRequest{
Domain: s.domainName,
Execution: &types.WorkflowExecution{
Expand All @@ -1217,7 +1221,7 @@ func (s *IntegrationSuite) TestQueryWorkflow_Consistent_NewDecisionTask_Sticky()
QueryRejectCondition: rejectCondition,
QueryConsistencyLevel: types.QueryConsistencyLevelStrong.Ptr(),
})
s.True(handledSignal)
s.True(handledSignal.Load().(bool))
queryResultCh <- QueryResult{Resp: queryResp, Err: err}
}

Expand Down

0 comments on commit 5a10dc4

Please sign in to comment.