diff --git a/common/metrics/defs.go b/common/metrics/defs.go index e397545fca2..c7ca65afc37 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -723,6 +723,8 @@ const ( HistoryRespondActivityTaskCanceledScope // HistoryGetMutableStateScope tracks GetMutableStateScope API calls received by service HistoryGetMutableStateScope + // HistoryPollMutableStateScope tracks PollMutableStateScope API calls received by service + HistoryPollMutableStateScope // HistoryResetStickyTaskListScope tracks ResetStickyTaskListScope API calls received by service HistoryResetStickyTaskListScope // HistoryDescribeWorkflowExecutionScope tracks DescribeWorkflowExecution API calls received by service @@ -893,6 +895,7 @@ const ( ReplicationTaskFetcherScope // ReplicationTaskCleanupScope is scope used by all metrics emitted by ReplicationTaskProcessor cleanup ReplicationTaskCleanupScope + NumHistoryScopes ) @@ -1268,6 +1271,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ HistoryRespondActivityTaskFailedScope: {operation: "RespondActivityTaskFailed"}, HistoryRespondActivityTaskCanceledScope: {operation: "RespondActivityTaskCanceled"}, HistoryGetMutableStateScope: {operation: "GetMutableState"}, + HistoryPollMutableStateScope: {operation: "PollMutableState"}, HistoryResetStickyTaskListScope: {operation: "ResetStickyTaskListScope"}, HistoryDescribeWorkflowExecutionScope: {operation: "DescribeWorkflowExecution"}, HistoryRecordDecisionTaskStartedScope: {operation: "RecordDecisionTaskStarted"}, diff --git a/common/persistence/client/bean.go b/common/persistence/client/bean.go index d171bc5a4f6..fa10b796d93 100644 --- a/common/persistence/client/bean.go +++ b/common/persistence/client/bean.go @@ -310,7 +310,7 @@ func (s *BeanImpl) Close() { s.metadataManager.Close() s.taskManager.Close() s.visibilityManager.Close() - s.domainReplicationQueue.Close() + s.domainReplicationQueue.Stop() s.shardManager.Close() s.historyManager.Close() s.executionManagerFactory.Close() diff --git a/common/persistence/dataInterfaces.go b/common/persistence/dataInterfaces.go index 17e70530b58..a2a71b233eb 100644 --- a/common/persistence/dataInterfaces.go +++ b/common/persistence/dataInterfaces.go @@ -29,7 +29,6 @@ import ( "github.com/pborman/uuid" - "github.com/uber/cadence/.gen/go/replicator" workflow "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common" "github.com/uber/cadence/common/codec" @@ -1550,15 +1549,6 @@ type ( ListDomains(request *ListDomainsRequest) (*ListDomainsResponse, error) GetMetadata() (*GetMetadataResponse, error) } - - // DomainReplicationQueue is used to publish and list domain replication tasks - DomainReplicationQueue interface { - Closeable - Publish(message interface{}) error - GetReplicationMessages(lastMessageID int, maxCount int) ([]*replicator.ReplicationTask, int, error) - UpdateAckLevel(lastProcessedMessageID int, clusterName string) error - GetAckLevels() (map[string]int, error) - } ) func (e *InvalidPersistenceRequestError) Error() string { diff --git a/common/persistence/domainReplicationQueue.go b/common/persistence/domainReplicationQueue.go index 0905cb96614..47de22ea7a4 100644 --- a/common/persistence/domainReplicationQueue.go +++ b/common/persistence/domainReplicationQueue.go @@ -18,21 +18,27 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. +//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination domainReplicationQueue_mock.go -self_package github.com/uber/common/persistence + package persistence import ( "errors" "fmt" + "math" + "sync/atomic" "time" "github.com/uber/cadence/.gen/go/replicator" + "github.com/uber/cadence/common" "github.com/uber/cadence/common/codec" "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" ) const ( - purgeInterval = time.Minute + purgeInterval = 5 * time.Minute ) var _ DomainReplicationQueue = (*domainReplicationQueueImpl)(nil) @@ -52,6 +58,7 @@ func NewDomainReplicationQueue( encoder: codec.NewThriftRWEncoder(), ackNotificationChan: make(chan bool), done: make(chan bool), + status: common.DaemonStatusInitialized, } } @@ -62,11 +69,36 @@ type ( metricsClient metrics.Client logger log.Logger encoder codec.BinaryEncoder + ackLevelUpdated bool ackNotificationChan chan bool done chan bool + status int32 + } + + // DomainReplicationQueue is used to publish and list domain replication tasks + DomainReplicationQueue interface { + common.Daemon + Publish(message interface{}) error + GetReplicationMessages(lastMessageID int, maxCount int) ([]*replicator.ReplicationTask, int, error) + UpdateAckLevel(lastProcessedMessageID int, clusterName string) error + GetAckLevels() (map[string]int, error) } ) +func (q *domainReplicationQueueImpl) Start() { + if !atomic.CompareAndSwapInt32(&q.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) { + return + } + go q.purgeProcessor() +} + +func (q *domainReplicationQueueImpl) Stop() { + if !atomic.CompareAndSwapInt32(&q.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) { + return + } + close(q.done) +} + func (q *domainReplicationQueueImpl) Publish(message interface{}) error { task, ok := message.(*replicator.ReplicationTask) if !ok { @@ -84,6 +116,7 @@ func (q *domainReplicationQueueImpl) GetReplicationMessages( lastMessageID int, maxCount int, ) ([]*replicator.ReplicationTask, int, error) { + messages, err := q.queue.ReadMessages(lastMessageID, maxCount) if err != nil { return nil, lastMessageID, err @@ -122,6 +155,53 @@ func (q *domainReplicationQueueImpl) GetAckLevels() (map[string]int, error) { return q.queue.GetAckLevels() } -func (q *domainReplicationQueueImpl) Close() { - close(q.done) +func (q *domainReplicationQueueImpl) purgeAckedMessages() error { + ackLevelByCluster, err := q.GetAckLevels() + if err != nil { + return fmt.Errorf("failed to purge messages: %v", err) + } + + if len(ackLevelByCluster) == 0 { + return nil + } + + minAckLevel := math.MaxInt64 + for _, ackLevel := range ackLevelByCluster { + if ackLevel < minAckLevel { + minAckLevel = ackLevel + } + } + + err = q.queue.DeleteMessagesBefore(minAckLevel) + if err != nil { + return fmt.Errorf("failed to purge messages: %v", err) + } + + q.metricsClient. + Scope(metrics.FrontendDomainReplicationQueueScope). + UpdateGauge(metrics.DomainReplicationTaskAckLevel, float64(minAckLevel)) + return nil +} + +func (q *domainReplicationQueueImpl) purgeProcessor() { + ticker := time.NewTicker(purgeInterval) + defer ticker.Stop() + + for { + select { + case <-q.done: + return + case <-ticker.C: + if q.ackLevelUpdated { + err := q.purgeAckedMessages() + if err != nil { + q.logger.Warn("Failed to purge acked domain replication messages.", tag.Error(err)) + } else { + q.ackLevelUpdated = false + } + } + case <-q.ackNotificationChan: + q.ackLevelUpdated = true + } + } } diff --git a/common/persistence/domainReplicationQueue_mock.go b/common/persistence/domainReplicationQueue_mock.go new file mode 100644 index 00000000000..2c623d9e645 --- /dev/null +++ b/common/persistence/domainReplicationQueue_mock.go @@ -0,0 +1,142 @@ +// The MIT License (MIT) +// +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. +// + +// Code generated by MockGen. DO NOT EDIT. +// Source: domainReplicationQueue.go + +// Package persistence is a generated GoMock package. +package persistence + +import ( + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + + replicator "github.com/uber/cadence/.gen/go/replicator" +) + +// MockDomainReplicationQueue is a mock of DomainReplicationQueue interface +type MockDomainReplicationQueue struct { + ctrl *gomock.Controller + recorder *MockDomainReplicationQueueMockRecorder +} + +// MockDomainReplicationQueueMockRecorder is the mock recorder for MockDomainReplicationQueue +type MockDomainReplicationQueueMockRecorder struct { + mock *MockDomainReplicationQueue +} + +// NewMockDomainReplicationQueue creates a new mock instance +func NewMockDomainReplicationQueue(ctrl *gomock.Controller) *MockDomainReplicationQueue { + mock := &MockDomainReplicationQueue{ctrl: ctrl} + mock.recorder = &MockDomainReplicationQueueMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockDomainReplicationQueue) EXPECT() *MockDomainReplicationQueueMockRecorder { + return m.recorder +} + +// Start mocks base method +func (m *MockDomainReplicationQueue) Start() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Start") +} + +// Start indicates an expected call of Start +func (mr *MockDomainReplicationQueueMockRecorder) Start() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockDomainReplicationQueue)(nil).Start)) +} + +// Stop mocks base method +func (m *MockDomainReplicationQueue) Stop() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Stop") +} + +// Stop indicates an expected call of Stop +func (mr *MockDomainReplicationQueueMockRecorder) Stop() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockDomainReplicationQueue)(nil).Stop)) +} + +// Publish mocks base method +func (m *MockDomainReplicationQueue) Publish(message interface{}) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Publish", message) + ret0, _ := ret[0].(error) + return ret0 +} + +// Publish indicates an expected call of Publish +func (mr *MockDomainReplicationQueueMockRecorder) Publish(message interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Publish", reflect.TypeOf((*MockDomainReplicationQueue)(nil).Publish), message) +} + +// GetReplicationMessages mocks base method +func (m *MockDomainReplicationQueue) GetReplicationMessages(lastMessageID, maxCount int) ([]*replicator.ReplicationTask, int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetReplicationMessages", lastMessageID, maxCount) + ret0, _ := ret[0].([]*replicator.ReplicationTask) + ret1, _ := ret[1].(int) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// GetReplicationMessages indicates an expected call of GetReplicationMessages +func (mr *MockDomainReplicationQueueMockRecorder) GetReplicationMessages(lastMessageID, maxCount interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetReplicationMessages", reflect.TypeOf((*MockDomainReplicationQueue)(nil).GetReplicationMessages), lastMessageID, maxCount) +} + +// UpdateAckLevel mocks base method +func (m *MockDomainReplicationQueue) UpdateAckLevel(lastProcessedMessageID int, clusterName string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateAckLevel", lastProcessedMessageID, clusterName) + ret0, _ := ret[0].(error) + return ret0 +} + +// UpdateAckLevel indicates an expected call of UpdateAckLevel +func (mr *MockDomainReplicationQueueMockRecorder) UpdateAckLevel(lastProcessedMessageID, clusterName interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateAckLevel", reflect.TypeOf((*MockDomainReplicationQueue)(nil).UpdateAckLevel), lastProcessedMessageID, clusterName) +} + +// GetAckLevels mocks base method +func (m *MockDomainReplicationQueue) GetAckLevels() (map[string]int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetAckLevels") + ret0, _ := ret[0].(map[string]int) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetAckLevels indicates an expected call of GetAckLevels +func (mr *MockDomainReplicationQueueMockRecorder) GetAckLevels() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAckLevels", reflect.TypeOf((*MockDomainReplicationQueue)(nil).GetAckLevels)) +} diff --git a/common/resource/resourceTest.go b/common/resource/resourceTest.go index 5b0029c4dd6..bcf82a6a81f 100644 --- a/common/resource/resourceTest.go +++ b/common/resource/resourceTest.go @@ -143,6 +143,9 @@ func NewTest( shardMgr := &mocks.ShardManager{} historyMgr := &mocks.HistoryV2Manager{} executionMgr := &mocks.ExecutionManager{} + domainReplicationQueue := persistence.NewMockDomainReplicationQueue(controller) + domainReplicationQueue.EXPECT().Start().AnyTimes() + domainReplicationQueue.EXPECT().Stop().AnyTimes() persistenceBean := persistenceClient.NewMockBean(controller) persistenceBean.EXPECT().GetMetadataManager().Return(metadataMgr).AnyTimes() persistenceBean.EXPECT().GetTaskManager().Return(taskMgr).AnyTimes() @@ -150,6 +153,7 @@ func NewTest( persistenceBean.EXPECT().GetHistoryManager().Return(historyMgr).AnyTimes() persistenceBean.EXPECT().GetShardManager().Return(shardMgr).AnyTimes() persistenceBean.EXPECT().GetExecutionManager(gomock.Any()).Return(executionMgr, nil).AnyTimes() + persistenceBean.EXPECT().GetDomainReplicationQueue().Return(domainReplicationQueue).AnyTimes() membershipMonitor := membership.NewMockMonitor(controller) frontendServiceResolver := membership.NewMockServiceResolver(controller) @@ -199,7 +203,7 @@ func NewTest( MetadataMgr: metadataMgr, TaskMgr: taskMgr, VisibilityMgr: visibilityMgr, - DomainReplicationQueue: nil, + DomainReplicationQueue: domainReplicationQueue, ShardMgr: shardMgr, HistoryMgr: historyMgr, ExecutionMgr: executionMgr, @@ -385,7 +389,7 @@ func (s *Test) GetVisibilityManager() persistence.VisibilityManager { // GetDomainReplicationQueue for testing func (s *Test) GetDomainReplicationQueue() persistence.DomainReplicationQueue { // user should implement this method for test - return nil + return s.DomainReplicationQueue } // GetShardManager for testing diff --git a/go.mod b/go.mod index 0bcc571fec7..681a5b2f22b 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/Shopify/sarama v1.23.0 github.com/apache/thrift v0.0.0-20161221203622-b2a4d4ae21c7 github.com/benbjohnson/clock v0.0.0-20161215174838-7dc76406b6d3 // indirect + github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 github.com/bsm/sarama-cluster v2.1.13+incompatible github.com/cactus/go-statsd-client v3.1.1+incompatible github.com/cch123/elasticsql v0.0.0-20190321073543-a1a440758eb9 diff --git a/service/frontend/adminHandler.go b/service/frontend/adminHandler.go index 438b2909b4b..c391c5d87d1 100644 --- a/service/frontend/adminHandler.go +++ b/service/frontend/adminHandler.go @@ -97,10 +97,13 @@ func (adh *AdminHandler) RegisterHandler() { // Start starts the handler func (adh *AdminHandler) Start() { + // Start domain replication queue cleanup + adh.Resource.GetDomainReplicationQueue().Start() } // Stop stops the handler func (adh *AdminHandler) Stop() { + adh.Resource.GetDomainReplicationQueue().Stop() } // AddSearchAttribute add search attribute to whitelist diff --git a/service/history/handler.go b/service/history/handler.go index 8626c00dd24..1b4fe8f2699 100644 --- a/service/history/handler.go +++ b/service/history/handler.go @@ -746,7 +746,7 @@ func (h *Handler) PollMutableState( defer log.CapturePanic(h.GetLogger(), &retError) h.startWG.Wait() - scope := metrics.HistoryClientPollMutableStateScope + scope := metrics.HistoryPollMutableStateScope h.GetMetricsClient().IncCounter(scope, metrics.CadenceRequests) sw := h.GetMetricsClient().StartTimer(scope, metrics.CadenceLatency) defer sw.Stop()