Skip to content

Commit

Permalink
[code-coverage] Add more tests for service/history/decision package (c…
Browse files Browse the repository at this point in the history
…adence-workflow#5909)

* generate mock for history query registry and add more test cases in decision handler

* add more cases for handleBufferedQueries() and assert on logs

* remove unnecessary lines
  • Loading branch information
ketsiambaku authored Apr 18, 2024
1 parent 16c4775 commit fd9cf96
Show file tree
Hide file tree
Showing 3 changed files with 452 additions and 10 deletions.
204 changes: 194 additions & 10 deletions service/history/decision/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,24 @@ package decision
import (
"context"
"errors"
"fmt"
"reflect"
"testing"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/uber-go/tally"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/client"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/cluster"
commonConfig "github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/log/loggerimpl"
"github.com/uber/cadence/common/log/testlogger"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
Expand Down Expand Up @@ -390,6 +394,7 @@ func (s *DecisionHandlerSuite) TestHandleDecisionTaskStarted() {
mutablestate *persistence.WorkflowMutableState
expectCalls func(h *handlerImpl)
expectErr error
assertCalls func(response *types.RecordDecisionTaskStartedResponse)
}{
{
name: "fail to retrieve domain From ID",
Expand All @@ -399,6 +404,7 @@ func (s *DecisionHandlerSuite) TestHandleDecisionTaskStarted() {
mutablestate: &persistence.WorkflowMutableState{
ExecutionInfo: &persistence.WorkflowExecutionInfo{},
},
assertCalls: func(response *types.RecordDecisionTaskStartedResponse) {},
},
{
name: "failure - decision task already started",
Expand All @@ -410,6 +416,7 @@ func (s *DecisionHandlerSuite) TestHandleDecisionTaskStarted() {
mutablestate: &persistence.WorkflowMutableState{
ExecutionInfo: &persistence.WorkflowExecutionInfo{},
},
assertCalls: func(response *types.RecordDecisionTaskStartedResponse) {},
},
{
name: "failure - workflow completed",
Expand All @@ -423,6 +430,7 @@ func (s *DecisionHandlerSuite) TestHandleDecisionTaskStarted() {
State: 2, //2 == WorkflowStateCompleted
},
},
assertCalls: func(response *types.RecordDecisionTaskStartedResponse) {},
},
{
name: "failure - decision task already completed",
Expand All @@ -437,6 +445,7 @@ func (s *DecisionHandlerSuite) TestHandleDecisionTaskStarted() {
NextEventID: 2,
},
},
assertCalls: func(response *types.RecordDecisionTaskStartedResponse) {},
},
{
name: "failure - cached mutable state is stale",
Expand All @@ -452,6 +461,7 @@ func (s *DecisionHandlerSuite) TestHandleDecisionTaskStarted() {
DecisionScheduleID: 1,
},
},
assertCalls: func(response *types.RecordDecisionTaskStartedResponse) {},
},
{
name: "success",
Expand All @@ -462,11 +472,43 @@ func (s *DecisionHandlerSuite) TestHandleDecisionTaskStarted() {
expectErr: nil,
mutablestate: &persistence.WorkflowMutableState{
ExecutionInfo: &persistence.WorkflowExecutionInfo{
DecisionScheduleID: 0,
NextEventID: 3,
DecisionRequestID: "test-request-id",
NextEventID: 3,
DecisionRequestID: "test-request-id",
DecisionAttempt: 1,
},
},
assertCalls: func(resp *types.RecordDecisionTaskStartedResponse) {
// expect test.mutablestate.ExecutionInfo.DecisionAttempt
s.Equal(int64(1), resp.DecisionInfo.ScheduledEvent.DecisionTaskScheduledEventAttributes.Attempt)
},
},
{
name: "success - decision startedID is empty",
domainID: _testDomainUUID,
expectCalls: func(h *handlerImpl) {
h.shard.(*shard.MockContext).EXPECT().GetEventsCache().Times(1).Return(events.NewMockCache(s.controller))
h.shard.(*shard.MockContext).EXPECT().GenerateTransferTaskIDs(gomock.Any()).Times(1).Return([]int64{0}, nil)
h.shard.(*shard.MockContext).EXPECT().
AppendHistoryV2Events(gomock.Any(), gomock.Any(), _testDomainUUID, types.WorkflowExecution{WorkflowID: _testWorkflowID, RunID: _testRunID}).
Return(&persistence.AppendHistoryNodesResponse{}, nil)
h.shard.(*shard.MockContext).EXPECT().UpdateWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.UpdateWorkflowExecutionResponse{MutableStateUpdateSessionStats: &persistence.MutableStateUpdateSessionStats{}}, nil)
h.shard.(*shard.MockContext).EXPECT().GetShardID().Return(_testShardID)
engine := engine.NewMockEngine(s.controller)
h.shard.(*shard.MockContext).EXPECT().GetEngine().Times(3).Return(engine)
engine.EXPECT().NotifyNewHistoryEvent(gomock.Any())
engine.EXPECT().NotifyNewTransferTasks(gomock.Any())
engine.EXPECT().NotifyNewTimerTasks(gomock.Any())
engine.EXPECT().NotifyNewCrossClusterTasks(gomock.Any())
engine.EXPECT().NotifyNewReplicationTasks(gomock.Any())
},
expectErr: nil,
mutablestate: &persistence.WorkflowMutableState{
ExecutionInfo: &persistence.WorkflowExecutionInfo{
DecisionStartedID: -23,
NextEventID: 2,
},
},
assertCalls: func(resp *types.RecordDecisionTaskStartedResponse) {},
},
}

Expand All @@ -478,14 +520,10 @@ func (s *DecisionHandlerSuite) TestHandleDecisionTaskStarted() {
WorkflowID: _testWorkflowID,
RunID: _testRunID,
},
ScheduleID: 0,
TaskID: 0,
RequestID: "test-request-id",
RequestID: "test-request-id",
PollRequest: &types.PollForDecisionTaskRequest{
Domain: test.domainID,
TaskList: nil,
Identity: "",
BinaryChecksum: "",
Domain: test.domainID,
Identity: "test-identity",
},
}
shardContext := shard.NewMockContext(s.controller)
Expand All @@ -498,6 +536,74 @@ func (s *DecisionHandlerSuite) TestHandleDecisionTaskStarted() {
s.Equal(test.expectErr, err)
if err == nil {
s.NotNil(resp)
s.Equal(test.mutablestate.ExecutionInfo.DecisionScheduleID, resp.ScheduledEventID)
s.Equal(test.mutablestate.ExecutionInfo.DecisionStartedID, resp.StartedEventID)
s.Equal(test.mutablestate.ExecutionInfo.NextEventID, resp.NextEventID)
s.Equal(test.mutablestate.ExecutionInfo.TaskList, resp.WorkflowExecutionTaskList.Name)
}
test.assertCalls(resp)
})
}
}

func (s *DecisionHandlerSuite) TestCreateRecordDecisionTaskStartedResponse() {
tests := []struct {
name string
expectCalls func()
expectedErr error
indexes []string
}{
{
name: "success",
expectCalls: func() {
s.mockMutableState.EXPECT().GetWorkflowType().Return(&types.WorkflowType{})
s.mockMutableState.EXPECT().GetNextEventID().Return(int64(1))
s.mockMutableState.EXPECT().CreateTransientDecisionEvents(gomock.Any(), "test-identity").Return(&types.HistoryEvent{}, &types.HistoryEvent{})
s.mockMutableState.EXPECT().GetCurrentBranchToken().Return([]byte{}, nil)
registry := query.NewMockRegistry(s.controller)
s.mockMutableState.EXPECT().GetQueryRegistry().Return(registry)
registry.EXPECT().GetBufferedIDs().Return([]string{"test-id", "test-id1", "test-id2"})
registry.EXPECT().GetQueryInput(gomock.Any()).Return(&types.WorkflowQuery{}, nil).Times(2)
registry.EXPECT().GetQueryInput(gomock.Any()).Return(nil, &types.InternalServiceError{Message: "query does not exist"})
s.mockMutableState.EXPECT().GetHistorySize()
},
expectedErr: nil,
indexes: []string{"test-id", "test-id1"},
},
{
name: "failure",
expectCalls: func() {
s.mockMutableState.EXPECT().GetWorkflowType().Return(&types.WorkflowType{})
s.mockMutableState.EXPECT().GetNextEventID().Return(int64(1))
s.mockMutableState.EXPECT().CreateTransientDecisionEvents(gomock.Any(), "test-identity").Return(&types.HistoryEvent{}, &types.HistoryEvent{})
s.mockMutableState.EXPECT().GetCurrentBranchToken().Return([]byte{}, &types.BadRequestError{Message: fmt.Sprintf("getting branch index: %d, available branch count: %d", 0, 0)})
},
expectedErr: &types.BadRequestError{Message: fmt.Sprintf("getting branch index: %d, available branch count: %d", 0, 0)},
},
}

for _, test := range tests {
s.Run(test.name, func() {
decision := &execution.DecisionInfo{
ScheduleID: 1,
StartedID: 2,
RequestID: constants.TestRequestID,
TaskList: "test-tasklist",
Attempt: 1,
}
test.expectCalls()
resp, err := s.decisionHandler.createRecordDecisionTaskStartedResponse(constants.TestDomainID, s.mockMutableState, decision, "test-identity")
s.Equal(test.expectedErr, err)
if err != nil {
s.Nil(resp)
} else {
s.Equal(&types.HistoryEvent{}, resp.DecisionInfo.ScheduledEvent)
s.Equal(&types.HistoryEvent{}, resp.DecisionInfo.StartedEvent)
s.Equal([]byte{}, resp.BranchToken)
for _, index := range test.indexes {
_, ok := resp.Queries[index]
s.True(ok)
}
}
})
}
Expand Down Expand Up @@ -547,6 +653,84 @@ func (s *DecisionHandlerSuite) TestHandleBufferedQueries_QueryTooLarge() {
s.assertQueryCounts(s.queryRegistry, 0, 5, 0, 5)
}

func (s *DecisionHandlerSuite) TestHandleBufferedQueries_QueryRegistryFailures() {
tests := []struct {
name string
expectMockCalls func()
assertCalls func(logs *observer.ObservedLogs)
clientFeatureVersion string
queryResults map[string]*types.WorkflowQueryResult
}{
{
name: "no buffered queries",
expectMockCalls: func() {
queryRegistry := query.NewMockRegistry(s.controller)
s.mockMutableState.EXPECT().GetQueryRegistry().Return(queryRegistry)
queryRegistry.EXPECT().HasBufferedQuery().Return(false)
},
assertCalls: func(logs *observer.ObservedLogs) {},
},
{
name: "set query termination state failed - client unsupported",
expectMockCalls: func() {
queryRegistry := query.NewMockRegistry(s.controller)
s.mockMutableState.EXPECT().GetQueryRegistry().Return(queryRegistry)
queryRegistry.EXPECT().HasBufferedQuery().Return(true)
queryRegistry.EXPECT().GetBufferedIDs().Return([]string{"some-buffered-id"})
queryRegistry.EXPECT().SetTerminationState("some-buffered-id", gomock.Any()).Return(&types.InternalServiceError{Message: "query does not exist"})
},
assertCalls: func(logs *observer.ObservedLogs) {
s.Equal(1, logs.FilterMessage("failed to set query termination state to failed").Len())
},
clientFeatureVersion: "0.0.0",
},
{
name: "set query termination state failed - query too large",
expectMockCalls: func() {
queryRegistry := query.NewMockRegistry(s.controller)
s.mockMutableState.EXPECT().GetQueryRegistry().Return(queryRegistry)
queryRegistry.EXPECT().HasBufferedQuery().Return(true)
queryRegistry.EXPECT().GetBufferedIDs().Return([]string{"some-id"})
queryRegistry.EXPECT().SetTerminationState("some-id", gomock.Any()).Return(&types.InternalServiceError{Message: "query already in terminal state"}).Times(2)
},
queryResults: s.constructQueryResults([]string{"some-id"}, 10*1024*1024),
clientFeatureVersion: client.GoWorkerConsistentQueryVersion,
assertCalls: func(logs *observer.ObservedLogs) {
s.Equal(1, logs.FilterMessage("failed to set query termination state to failed").Len())
s.Equal(1, logs.FilterMessage("failed to set query termination state to unblocked").Len())
},
},
{
name: "set query termination state unblocked",
expectMockCalls: func() {
queryRegistry := query.NewMockRegistry(s.controller)
s.mockMutableState.EXPECT().GetQueryRegistry().Return(queryRegistry)
queryRegistry.EXPECT().HasBufferedQuery().Return(true)
queryRegistry.EXPECT().GetBufferedIDs().Return([]string{"some-buffered-id"})
queryRegistry.EXPECT().SetTerminationState("some-id", gomock.Any()).Return(&types.InternalServiceError{Message: "query does not exist"})
queryRegistry.EXPECT().SetTerminationState("some-buffered-id", gomock.Any()).Return(&types.InternalServiceError{Message: "query already in terminal state"})
},
clientFeatureVersion: client.GoWorkerConsistentQueryVersion,
queryResults: map[string]*types.WorkflowQueryResult{"some-id": &types.WorkflowQueryResult{}},
assertCalls: func(logs *observer.ObservedLogs) {
s.Equal(1, logs.FilterMessage("failed to set query termination state to completed").Len())
s.Equal(1, logs.FilterMessage("failed to set query termination state to unblocked").Len())
},
},
}
for _, test := range tests {
s.Run(test.name, func() {
core, observedLogs := observer.New(zap.ErrorLevel)
logger := zap.New(core)
s.decisionHandler.logger = loggerimpl.NewLogger(logger, loggerimpl.WithSampleFunc(func(int) bool { return true }))

test.expectMockCalls()
s.decisionHandler.handleBufferedQueries(s.mockMutableState, client.GoSDK, test.clientFeatureVersion, test.queryResults, false, constants.TestGlobalDomainEntry, false)
test.assertCalls(observedLogs)
})
}
}

func (s *DecisionHandlerSuite) constructQueryResults(ids []string, resultSize int) map[string]*types.WorkflowQueryResult {
results := make(map[string]*types.WorkflowQueryResult)
for _, id := range ids {
Expand Down
2 changes: 2 additions & 0 deletions service/history/query/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination registry_mock.go -self_package github.com/uber/cadence/service/history/query

package query

import (
Expand Down
Loading

0 comments on commit fd9cf96

Please sign in to comment.