Skip to content

Commit

Permalink
Wire up domain replication queue cleanup coroutine (cadence-workflow#…
Browse files Browse the repository at this point in the history
…2963)

* Wire up domain replication queue cleanup routine during queue start
  • Loading branch information
yux0 authored Jan 9, 2020
1 parent d695915 commit afe9717
Show file tree
Hide file tree
Showing 9 changed files with 241 additions and 17 deletions.
4 changes: 4 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,8 @@ const (
HistoryRespondActivityTaskCanceledScope
// HistoryGetMutableStateScope tracks GetMutableStateScope API calls received by service
HistoryGetMutableStateScope
// HistoryPollMutableStateScope tracks PollMutableStateScope API calls received by service
HistoryPollMutableStateScope
// HistoryResetStickyTaskListScope tracks ResetStickyTaskListScope API calls received by service
HistoryResetStickyTaskListScope
// HistoryDescribeWorkflowExecutionScope tracks DescribeWorkflowExecution API calls received by service
Expand Down Expand Up @@ -893,6 +895,7 @@ const (
ReplicationTaskFetcherScope
// ReplicationTaskCleanupScope is scope used by all metrics emitted by ReplicationTaskProcessor cleanup
ReplicationTaskCleanupScope

NumHistoryScopes
)

Expand Down Expand Up @@ -1268,6 +1271,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
HistoryRespondActivityTaskFailedScope: {operation: "RespondActivityTaskFailed"},
HistoryRespondActivityTaskCanceledScope: {operation: "RespondActivityTaskCanceled"},
HistoryGetMutableStateScope: {operation: "GetMutableState"},
HistoryPollMutableStateScope: {operation: "PollMutableState"},
HistoryResetStickyTaskListScope: {operation: "ResetStickyTaskListScope"},
HistoryDescribeWorkflowExecutionScope: {operation: "DescribeWorkflowExecution"},
HistoryRecordDecisionTaskStartedScope: {operation: "RecordDecisionTaskStarted"},
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/client/bean.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func (s *BeanImpl) Close() {
s.metadataManager.Close()
s.taskManager.Close()
s.visibilityManager.Close()
s.domainReplicationQueue.Close()
s.domainReplicationQueue.Stop()
s.shardManager.Close()
s.historyManager.Close()
s.executionManagerFactory.Close()
Expand Down
10 changes: 0 additions & 10 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (

"github.com/pborman/uuid"

"github.com/uber/cadence/.gen/go/replicator"
workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/codec"
Expand Down Expand Up @@ -1550,15 +1549,6 @@ type (
ListDomains(request *ListDomainsRequest) (*ListDomainsResponse, error)
GetMetadata() (*GetMetadataResponse, error)
}

// DomainReplicationQueue is used to publish and list domain replication tasks
DomainReplicationQueue interface {
Closeable
Publish(message interface{}) error
GetReplicationMessages(lastMessageID int, maxCount int) ([]*replicator.ReplicationTask, int, error)
UpdateAckLevel(lastProcessedMessageID int, clusterName string) error
GetAckLevels() (map[string]int, error)
}
)

func (e *InvalidPersistenceRequestError) Error() string {
Expand Down
86 changes: 83 additions & 3 deletions common/persistence/domainReplicationQueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,27 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination domainReplicationQueue_mock.go -self_package github.com/uber/common/persistence

package persistence

import (
"errors"
"fmt"
"math"
"sync/atomic"
"time"

"github.com/uber/cadence/.gen/go/replicator"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/codec"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
)

const (
purgeInterval = time.Minute
purgeInterval = 5 * time.Minute
)

var _ DomainReplicationQueue = (*domainReplicationQueueImpl)(nil)
Expand All @@ -52,6 +58,7 @@ func NewDomainReplicationQueue(
encoder: codec.NewThriftRWEncoder(),
ackNotificationChan: make(chan bool),
done: make(chan bool),
status: common.DaemonStatusInitialized,
}
}

Expand All @@ -62,11 +69,36 @@ type (
metricsClient metrics.Client
logger log.Logger
encoder codec.BinaryEncoder
ackLevelUpdated bool
ackNotificationChan chan bool
done chan bool
status int32
}

// DomainReplicationQueue is used to publish and list domain replication tasks
DomainReplicationQueue interface {
common.Daemon
Publish(message interface{}) error
GetReplicationMessages(lastMessageID int, maxCount int) ([]*replicator.ReplicationTask, int, error)
UpdateAckLevel(lastProcessedMessageID int, clusterName string) error
GetAckLevels() (map[string]int, error)
}
)

func (q *domainReplicationQueueImpl) Start() {
if !atomic.CompareAndSwapInt32(&q.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
return
}
go q.purgeProcessor()
}

func (q *domainReplicationQueueImpl) Stop() {
if !atomic.CompareAndSwapInt32(&q.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
return
}
close(q.done)
}

func (q *domainReplicationQueueImpl) Publish(message interface{}) error {
task, ok := message.(*replicator.ReplicationTask)
if !ok {
Expand All @@ -84,6 +116,7 @@ func (q *domainReplicationQueueImpl) GetReplicationMessages(
lastMessageID int,
maxCount int,
) ([]*replicator.ReplicationTask, int, error) {

messages, err := q.queue.ReadMessages(lastMessageID, maxCount)
if err != nil {
return nil, lastMessageID, err
Expand Down Expand Up @@ -122,6 +155,53 @@ func (q *domainReplicationQueueImpl) GetAckLevels() (map[string]int, error) {
return q.queue.GetAckLevels()
}

func (q *domainReplicationQueueImpl) Close() {
close(q.done)
func (q *domainReplicationQueueImpl) purgeAckedMessages() error {
ackLevelByCluster, err := q.GetAckLevels()
if err != nil {
return fmt.Errorf("failed to purge messages: %v", err)
}

if len(ackLevelByCluster) == 0 {
return nil
}

minAckLevel := math.MaxInt64
for _, ackLevel := range ackLevelByCluster {
if ackLevel < minAckLevel {
minAckLevel = ackLevel
}
}

err = q.queue.DeleteMessagesBefore(minAckLevel)
if err != nil {
return fmt.Errorf("failed to purge messages: %v", err)
}

q.metricsClient.
Scope(metrics.FrontendDomainReplicationQueueScope).
UpdateGauge(metrics.DomainReplicationTaskAckLevel, float64(minAckLevel))
return nil
}

func (q *domainReplicationQueueImpl) purgeProcessor() {
ticker := time.NewTicker(purgeInterval)
defer ticker.Stop()

for {
select {
case <-q.done:
return
case <-ticker.C:
if q.ackLevelUpdated {
err := q.purgeAckedMessages()
if err != nil {
q.logger.Warn("Failed to purge acked domain replication messages.", tag.Error(err))
} else {
q.ackLevelUpdated = false
}
}
case <-q.ackNotificationChan:
q.ackLevelUpdated = true
}
}
}
142 changes: 142 additions & 0 deletions common/persistence/domainReplicationQueue_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions common/resource/resourceTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,17 @@ func NewTest(
shardMgr := &mocks.ShardManager{}
historyMgr := &mocks.HistoryV2Manager{}
executionMgr := &mocks.ExecutionManager{}
domainReplicationQueue := persistence.NewMockDomainReplicationQueue(controller)
domainReplicationQueue.EXPECT().Start().AnyTimes()
domainReplicationQueue.EXPECT().Stop().AnyTimes()
persistenceBean := persistenceClient.NewMockBean(controller)
persistenceBean.EXPECT().GetMetadataManager().Return(metadataMgr).AnyTimes()
persistenceBean.EXPECT().GetTaskManager().Return(taskMgr).AnyTimes()
persistenceBean.EXPECT().GetVisibilityManager().Return(visibilityMgr).AnyTimes()
persistenceBean.EXPECT().GetHistoryManager().Return(historyMgr).AnyTimes()
persistenceBean.EXPECT().GetShardManager().Return(shardMgr).AnyTimes()
persistenceBean.EXPECT().GetExecutionManager(gomock.Any()).Return(executionMgr, nil).AnyTimes()
persistenceBean.EXPECT().GetDomainReplicationQueue().Return(domainReplicationQueue).AnyTimes()

membershipMonitor := membership.NewMockMonitor(controller)
frontendServiceResolver := membership.NewMockServiceResolver(controller)
Expand Down Expand Up @@ -199,7 +203,7 @@ func NewTest(
MetadataMgr: metadataMgr,
TaskMgr: taskMgr,
VisibilityMgr: visibilityMgr,
DomainReplicationQueue: nil,
DomainReplicationQueue: domainReplicationQueue,
ShardMgr: shardMgr,
HistoryMgr: historyMgr,
ExecutionMgr: executionMgr,
Expand Down Expand Up @@ -385,7 +389,7 @@ func (s *Test) GetVisibilityManager() persistence.VisibilityManager {
// GetDomainReplicationQueue for testing
func (s *Test) GetDomainReplicationQueue() persistence.DomainReplicationQueue {
// user should implement this method for test
return nil
return s.DomainReplicationQueue
}

// GetShardManager for testing
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/Shopify/sarama v1.23.0
github.com/apache/thrift v0.0.0-20161221203622-b2a4d4ae21c7
github.com/benbjohnson/clock v0.0.0-20161215174838-7dc76406b6d3 // indirect
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869
github.com/bsm/sarama-cluster v2.1.13+incompatible
github.com/cactus/go-statsd-client v3.1.1+incompatible
github.com/cch123/elasticsql v0.0.0-20190321073543-a1a440758eb9
Expand Down
3 changes: 3 additions & 0 deletions service/frontend/adminHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,13 @@ func (adh *AdminHandler) RegisterHandler() {

// Start starts the handler
func (adh *AdminHandler) Start() {
// Start domain replication queue cleanup
adh.Resource.GetDomainReplicationQueue().Start()
}

// Stop stops the handler
func (adh *AdminHandler) Stop() {
adh.Resource.GetDomainReplicationQueue().Stop()
}

// AddSearchAttribute add search attribute to whitelist
Expand Down
Loading

0 comments on commit afe9717

Please sign in to comment.