diff --git a/go.mod b/go.mod index bfc56339f44..bd135351b73 100644 --- a/go.mod +++ b/go.mod @@ -130,6 +130,7 @@ require ( github.com/xdg/stringprep v1.0.0 // indirect github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect go.uber.org/dig v1.10.0 // indirect + go.uber.org/goleak v1.0.0 go.uber.org/net/metrics v1.3.0 // indirect golang.org/x/crypto v0.16.0 // indirect golang.org/x/exp/typeparams v0.0.0-20220218215828-6cf2b201936e // indirect diff --git a/service/history/queue/timer_queue_processor_base.go b/service/history/queue/timer_queue_processor_base.go index 6cf68cede0e..52c67081a2c 100644 --- a/service/history/queue/timer_queue_processor_base.go +++ b/service/history/queue/timer_queue_processor_base.go @@ -174,6 +174,9 @@ func (t *timerQueueProcessorBase) Start() { go t.processorPump() } +// Edge Case: Stop doesn't stop TimerGate if timerQueueProcessorBase is only initiliazed without starting +// As a result, TimerGate needs to be stopped separately +// One way to fix this is to make sure TimerGate doesn't start daemon loop on initilization and requires explicit Start func (t *timerQueueProcessorBase) Stop() { if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusStarted, common.DaemonStatusStopped) { return diff --git a/service/history/queue/timer_queue_processor_base_test.go b/service/history/queue/timer_queue_processor_base_test.go index 2d2ed0b9c1a..0ec893dc5f2 100644 --- a/service/history/queue/timer_queue_processor_base_test.go +++ b/service/history/queue/timer_queue_processor_base_test.go @@ -31,6 +31,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/uber-go/tally" + "go.uber.org/goleak" "github.com/uber/cadence/common/cluster" "github.com/uber/cadence/common/dynamicconfig" @@ -93,10 +94,12 @@ func (s *timerQueueProcessorBaseSuite) SetupTest() { func (s *timerQueueProcessorBaseSuite) TearDownTest() { s.controller.Finish() s.mockShard.Finish(s.T()) + goleak.VerifyNone(s.T()) } func (s *timerQueueProcessorBaseSuite) TestIsProcessNow() { - timerQueueProcessBase := s.newTestTimerQueueProcessorBase(nil, nil, nil, nil, nil) + timerQueueProcessBase, done := s.newTestTimerQueueProcessorBase(nil, nil, nil, nil, nil) + defer done() s.True(timerQueueProcessBase.isProcessNow(time.Time{})) now := s.mockShard.GetCurrentTime(s.clusterName) @@ -107,6 +110,7 @@ func (s *timerQueueProcessorBaseSuite) TestIsProcessNow() { timeAfter := now.Add(10 * time.Second) s.False(timerQueueProcessBase.isProcessNow(timeAfter)) + } func (s *timerQueueProcessorBaseSuite) TestGetTimerTasks_More() { @@ -141,7 +145,8 @@ func (s *timerQueueProcessorBaseSuite) TestGetTimerTasks_More() { mockExecutionMgr := s.mockShard.Resource.ExecutionMgr mockExecutionMgr.On("GetTimerIndexTasks", mock.Anything, request).Return(response, nil).Once() - timerQueueProcessBase := s.newTestTimerQueueProcessorBase(nil, nil, nil, nil, nil) + timerQueueProcessBase, done := s.newTestTimerQueueProcessorBase(nil, nil, nil, nil, nil) + defer done() got, err := timerQueueProcessBase.getTimerTasks(readLevel, maxReadLevel, request.NextPageToken, batchSize) s.Nil(err) s.Equal(response.Timers, got.Timers) @@ -180,7 +185,8 @@ func (s *timerQueueProcessorBaseSuite) TestGetTimerTasks_NoMore() { mockExecutionMgr := s.mockShard.Resource.ExecutionMgr mockExecutionMgr.On("GetTimerIndexTasks", mock.Anything, request).Return(response, nil).Once() - timerQueueProcessBase := s.newTestTimerQueueProcessorBase(nil, nil, nil, nil, nil) + timerQueueProcessBase, done := s.newTestTimerQueueProcessorBase(nil, nil, nil, nil, nil) + defer done() got, err := timerQueueProcessBase.getTimerTasks(readLevel, maxReadLevel, request.NextPageToken, batchSize) s.Nil(err) s.Equal(response.Timers, got.Timers) @@ -220,7 +226,8 @@ func (s *timerQueueProcessorBaseSuite) TestReadLookAheadTask() { mockExecutionMgr := s.mockShard.Resource.ExecutionMgr mockExecutionMgr.On("GetTimerIndexTasks", mock.Anything, request).Return(response, nil).Once() - timerQueueProcessBase := s.newTestTimerQueueProcessorBase(nil, nil, nil, nil, nil) + timerQueueProcessBase, done := s.newTestTimerQueueProcessorBase(nil, nil, nil, nil, nil) + defer done() lookAheadTask, err := timerQueueProcessBase.readLookAheadTask(readLevel, maxReadLevel) s.Nil(err) s.Equal(response.Timers[0], lookAheadTask) @@ -265,7 +272,8 @@ func (s *timerQueueProcessorBaseSuite) TestReadAndFilterTasks_NoLookAhead_NoNext mockExecutionMgr.On("GetTimerIndexTasks", mock.Anything, request).Return(response, nil).Once() mockExecutionMgr.On("GetTimerIndexTasks", mock.Anything, lookAheadRequest).Return(&persistence.GetTimerIndexTasksResponse{}, nil).Once() - timerQueueProcessBase := s.newTestTimerQueueProcessorBase(nil, nil, nil, nil, nil) + timerQueueProcessBase, done := s.newTestTimerQueueProcessorBase(nil, nil, nil, nil, nil) + defer done() got, err := timerQueueProcessBase.readAndFilterTasks(readLevel, maxReadLevel, request.NextPageToken) s.Nil(err) s.Equal(response.Timers, got.timerTasks) @@ -304,7 +312,8 @@ func (s *timerQueueProcessorBaseSuite) TestReadAndFilterTasks_NoLookAhead_HasNex mockExecutionMgr := s.mockShard.Resource.ExecutionMgr mockExecutionMgr.On("GetTimerIndexTasks", mock.Anything, request).Return(response, nil).Once() - timerQueueProcessBase := s.newTestTimerQueueProcessorBase(nil, nil, nil, nil, nil) + timerQueueProcessBase, done := s.newTestTimerQueueProcessorBase(nil, nil, nil, nil, nil) + defer done() got, err := timerQueueProcessBase.readAndFilterTasks(readLevel, maxReadLevel, request.NextPageToken) s.Nil(err) s.Equal(response.Timers, got.timerTasks) @@ -354,7 +363,8 @@ func (s *timerQueueProcessorBaseSuite) TestReadAndFilterTasks_HasLookAhead_NoNex mockExecutionMgr := s.mockShard.Resource.ExecutionMgr mockExecutionMgr.On("GetTimerIndexTasks", mock.Anything, request).Return(response, nil).Once() - timerQueueProcessBase := s.newTestTimerQueueProcessorBase(nil, nil, nil, nil, nil) + timerQueueProcessBase, done := s.newTestTimerQueueProcessorBase(nil, nil, nil, nil, nil) + defer done() got, err := timerQueueProcessBase.readAndFilterTasks(readLevel, maxReadLevel, request.NextPageToken) s.Nil(err) s.Equal([]*persistence.TimerTaskInfo{response.Timers[0]}, got.timerTasks) @@ -404,7 +414,8 @@ func (s *timerQueueProcessorBaseSuite) TestReadAndFilterTasks_HasLookAhead_HasNe mockExecutionMgr := s.mockShard.Resource.ExecutionMgr mockExecutionMgr.On("GetTimerIndexTasks", mock.Anything, request).Return(response, nil).Once() - timerQueueProcessBase := s.newTestTimerQueueProcessorBase(nil, nil, nil, nil, nil) + timerQueueProcessBase, done := s.newTestTimerQueueProcessorBase(nil, nil, nil, nil, nil) + defer done() got, err := timerQueueProcessBase.readAndFilterTasks(readLevel, maxReadLevel, request.NextPageToken) s.Nil(err) s.Equal([]*persistence.TimerTaskInfo{response.Timers[0]}, got.timerTasks) @@ -462,7 +473,8 @@ func (s *timerQueueProcessorBaseSuite) TestReadAndFilterTasks_LookAheadFailed_No mockExecutionMgr.On("GetTimerIndexTasks", mock.Anything, request).Return(response, nil).Once() mockExecutionMgr.On("GetTimerIndexTasks", mock.Anything, lookAheadRequest).Return(nil, errors.New("some random error")).Times(s.mockShard.GetConfig().TimerProcessorGetFailureRetryCount()) - timerQueueProcessBase := s.newTestTimerQueueProcessorBase(nil, nil, nil, nil, nil) + timerQueueProcessBase, done := s.newTestTimerQueueProcessorBase(nil, nil, nil, nil, nil) + defer done() got, err := timerQueueProcessBase.readAndFilterTasks(readLevel, maxReadLevel, request.NextPageToken) s.Nil(err) s.Equal(response.Timers, got.timerTasks) @@ -471,7 +483,8 @@ func (s *timerQueueProcessorBaseSuite) TestReadAndFilterTasks_LookAheadFailed_No } func (s *timerQueueProcessorBaseSuite) TestNotifyNewTimes() { - timerQueueProcessBase := s.newTestTimerQueueProcessorBase(nil, nil, nil, nil, nil) + timerQueueProcessBase, done := s.newTestTimerQueueProcessorBase(nil, nil, nil, nil, nil) + defer done() // assert the initial state s.True(timerQueueProcessBase.newTime.IsZero()) @@ -539,7 +552,8 @@ func (s *timerQueueProcessorBaseSuite) TestProcessQueueCollections_SkipRead() { return shardMaxReadLevel } - timerQueueProcessBase := s.newTestTimerQueueProcessorBase(processingQueueStates, updateMaxReadLevel, nil, nil, nil) + timerQueueProcessBase, done := s.newTestTimerQueueProcessorBase(processingQueueStates, updateMaxReadLevel, nil, nil, nil) + defer done() timerQueueProcessBase.processQueueCollections(map[int]struct{}{queueLevel: {}}) s.Len(timerQueueProcessBase.processingQueueCollections, 1) @@ -620,7 +634,8 @@ func (s *timerQueueProcessorBaseSuite) TestProcessBatch_HasNextPage() { s.mockTaskProcessor.EXPECT().TrySubmit(gomock.Any()).Return(true, nil).AnyTimes() - timerQueueProcessBase := s.newTestTimerQueueProcessorBase(processingQueueStates, updateMaxReadLevel, nil, nil, nil) + timerQueueProcessBase, done := s.newTestTimerQueueProcessorBase(processingQueueStates, updateMaxReadLevel, nil, nil, nil) + defer done() timerQueueProcessBase.processQueueCollections(map[int]struct{}{queueLevel: {}}) s.Len(timerQueueProcessBase.processingQueueCollections, 1) @@ -710,7 +725,8 @@ func (s *timerQueueProcessorBaseSuite) TestProcessBatch_NoNextPage_HasLookAhead( s.mockTaskProcessor.EXPECT().TrySubmit(gomock.Any()).Return(true, nil).AnyTimes() - timerQueueProcessBase := s.newTestTimerQueueProcessorBase(processingQueueStates, updateMaxReadLevel, nil, nil, nil) + timerQueueProcessBase, done := s.newTestTimerQueueProcessorBase(processingQueueStates, updateMaxReadLevel, nil, nil, nil) + defer done() timerQueueProcessBase.processingQueueReadProgress[0] = timeTaskReadProgress{ currentQueue: timerQueueProcessBase.processingQueueCollections[0].ActiveQueue(), readLevel: ackLevel, @@ -807,7 +823,8 @@ func (s *timerQueueProcessorBaseSuite) TestProcessBatch_NoNextPage_NoLookAhead() s.mockTaskProcessor.EXPECT().TrySubmit(gomock.Any()).Return(true, nil).AnyTimes() - timerQueueProcessBase := s.newTestTimerQueueProcessorBase(processingQueueStates, updateMaxReadLevel, nil, nil, nil) + timerQueueProcessBase, done := s.newTestTimerQueueProcessorBase(processingQueueStates, updateMaxReadLevel, nil, nil, nil) + defer done() timerQueueProcessBase.processingQueueReadProgress[0] = timeTaskReadProgress{ currentQueue: timerQueueProcessBase.processingQueueCollections[0].ActiveQueue(), readLevel: ackLevel, @@ -854,7 +871,7 @@ func (s *timerQueueProcessorBaseSuite) TestTimerProcessorPump_HandleAckLevelUpda return newTimerTaskKey(now, 0) } - timerQueueProcessBase := s.newTestTimerQueueProcessorBase(processingQueueStates, updateMaxReadLevel, nil, nil, nil) + timerQueueProcessBase, _ := s.newTestTimerQueueProcessorBase(processingQueueStates, updateMaxReadLevel, nil, nil, nil) timerQueueProcessBase.options.UpdateAckInterval = dynamicconfig.GetDurationPropertyFn(1 * time.Millisecond) updatedCh := make(chan struct{}, 1) timerQueueProcessBase.updateAckLevelFn = func() (bool, task.Key, error) { @@ -889,7 +906,7 @@ func (s *timerQueueProcessorBaseSuite) TestTimerProcessorPump_SplitQueue() { return newTimerTaskKey(now, 0) } - timerQueueProcessBase := s.newTestTimerQueueProcessorBase(processingQueueStates, updateMaxReadLevel, nil, nil, nil) + timerQueueProcessBase, _ := s.newTestTimerQueueProcessorBase(processingQueueStates, updateMaxReadLevel, nil, nil, nil) timerQueueProcessBase.options.SplitQueueInterval = dynamicconfig.GetDurationPropertyFn(1 * time.Millisecond) splittedCh := make(chan struct{}, 1) timerQueueProcessBase.splitProcessingQueueCollectionFn = func(splitPolicy ProcessingQueueSplitPolicy, upsertPollTimeFn func(int, time.Time)) { @@ -912,21 +929,25 @@ func (s *timerQueueProcessorBaseSuite) newTestTimerQueueProcessorBase( updateClusterAckLevel updateClusterAckLevelFn, updateProcessingQueueStates updateProcessingQueueStatesFn, queueShutdown queueShutdownFn, -) *timerQueueProcessorBase { +) (*timerQueueProcessorBase, func()) { + timerGate := NewLocalTimerGate(s.mockShard.GetTimeSource()) + return newTimerQueueProcessorBase( - s.clusterName, - s.mockShard, - processingQueueStates, - s.mockTaskProcessor, - NewLocalTimerGate(s.mockShard.GetTimeSource()), - newTimerQueueProcessorOptions(s.mockShard.GetConfig(), true, false), - updateMaxReadLevel, - updateClusterAckLevel, - updateProcessingQueueStates, - queueShutdown, - nil, - nil, - s.logger, - s.metricsClient, - ) + s.clusterName, + s.mockShard, + processingQueueStates, + s.mockTaskProcessor, + timerGate, + newTimerQueueProcessorOptions(s.mockShard.GetConfig(), true, false), + updateMaxReadLevel, + updateClusterAckLevel, + updateProcessingQueueStates, + queueShutdown, + nil, + nil, + s.logger, + s.metricsClient, + ), func() { + timerGate.Close() + } } diff --git a/service/history/queue/transfer_queue_processor_test.go b/service/history/queue/transfer_queue_processor_test.go new file mode 100644 index 00000000000..7dada053e44 --- /dev/null +++ b/service/history/queue/transfer_queue_processor_test.go @@ -0,0 +1,75 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package queue + +import ( + "testing" + + "github.com/golang/mock/gomock" + "go.uber.org/goleak" + + "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/common/reconciliation/invariant" + "github.com/uber/cadence/service/history/config" + "github.com/uber/cadence/service/history/execution" + "github.com/uber/cadence/service/history/reset" + "github.com/uber/cadence/service/history/shard" + "github.com/uber/cadence/service/history/task" + "github.com/uber/cadence/service/history/workflowcache" + "github.com/uber/cadence/service/worker/archiver" +) + +func TestTransferQueueProcessor_RequireStartStop(t *testing.T) { + // some goroutine leak not from this test + defer goleak.VerifyNone(t) + ctrl := gomock.NewController(t) + mockShard := shard.NewTestContext( + t, ctrl, &persistence.ShardInfo{ + ShardID: 10, + RangeID: 1, + TransferAckLevel: 0, + }, + config.NewForTest()) + defer mockShard.Finish(t) + + mockProcessor := task.NewMockProcessor(ctrl) + mockResetter := reset.NewMockWorkflowResetter(ctrl) + mockArchiver := &archiver.ClientMock{} + mockInvariant := invariant.NewMockInvariant(ctrl) + mockWorkflowCache := workflowcache.NewMockWFCache(ctrl) + ratelimit := func(domain string) bool { return false } + + // Create a new transferQueueProcessor + processor := NewTransferQueueProcessor( + mockShard, + mockShard.GetEngine(), + mockProcessor, + execution.NewCache(mockShard), + mockResetter, + mockArchiver, + mockInvariant, + mockWorkflowCache, + ratelimit) + processor.Start() + processor.Stop() +}