From 4384e4ccc9ee560b2277dae6856a2dbc39a90d5d Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Tue, 6 Jul 2021 13:59:08 -0700 Subject: [PATCH] Target cluster cross cluster task processor (#4292) --- common/log/tag/values.go | 59 +-- common/metrics/defs.go | 5 +- service/history/task/cross_cluster_task.go | 43 +- .../task/cross_cluster_task_processor.go | 448 ++++++++++++++++++ .../task/cross_cluster_task_processor_test.go | 303 ++++++++++++ service/history/task/fetcher.go | 50 +- service/history/task/fetcher_test.go | 87 +++- 7 files changed, 944 insertions(+), 51 deletions(-) create mode 100644 service/history/task/cross_cluster_task_processor.go create mode 100644 service/history/task/cross_cluster_task_processor_test.go diff --git a/common/log/tag/values.go b/common/log/tag/values.go index 8c505a88239..58e8f41d757 100644 --- a/common/log/tag/values.go +++ b/common/log/tag/values.go @@ -97,35 +97,36 @@ var ( // Pre-defined values for TagSysComponent var ( - ComponentTaskList = component("tasklist") - ComponentHistoryEngine = component("history-engine") - ComponentHistoryCache = component("history-cache") - ComponentDecisionHandler = component("decision-handler") - ComponentEventsCache = component("events-cache") - ComponentTransferQueue = component("transfer-queue-processor") - ComponentTimerQueue = component("timer-queue-processor") - ComponentTimerBuilder = component("timer-builder") - ComponentReplicatorQueue = component("replicator-queue-processor") - ComponentShardController = component("shard-controller") - ComponentShard = component("shard") - ComponentShardItem = component("shard-item") - ComponentShardEngine = component("shard-engine") - ComponentMatchingEngine = component("matching-engine") - ComponentReplicator = component("replicator") - ComponentReplicationTaskProcessor = component("replication-task-processor") - ComponentReplicationAckManager = component("replication-ack-manager") - ComponentHistoryReplicator = component("history-replicator") - ComponentHistoryResender = component("history-resender") - ComponentIndexer = component("indexer") - ComponentIndexerProcessor = component("indexer-processor") - ComponentIndexerESProcessor = component("indexer-es-processor") - ComponentESVisibilityManager = component("es-visibility-manager") - ComponentArchiver = component("archiver") - ComponentBatcher = component("batcher") - ComponentWorker = component("worker") - ComponentServiceResolver = component("service-resolver") - ComponentFailoverCoordinator = component("failover-coordinator") - ComponentFailoverMarkerNotifier = component("failover-marker-notifier") + ComponentTaskList = component("tasklist") + ComponentHistoryEngine = component("history-engine") + ComponentHistoryCache = component("history-cache") + ComponentDecisionHandler = component("decision-handler") + ComponentEventsCache = component("events-cache") + ComponentTransferQueue = component("transfer-queue-processor") + ComponentTimerQueue = component("timer-queue-processor") + ComponentTimerBuilder = component("timer-builder") + ComponentReplicatorQueue = component("replicator-queue-processor") + ComponentShardController = component("shard-controller") + ComponentShard = component("shard") + ComponentShardItem = component("shard-item") + ComponentShardEngine = component("shard-engine") + ComponentMatchingEngine = component("matching-engine") + ComponentReplicator = component("replicator") + ComponentReplicationTaskProcessor = component("replication-task-processor") + ComponentReplicationAckManager = component("replication-ack-manager") + ComponentHistoryReplicator = component("history-replicator") + ComponentHistoryResender = component("history-resender") + ComponentIndexer = component("indexer") + ComponentIndexerProcessor = component("indexer-processor") + ComponentIndexerESProcessor = component("indexer-es-processor") + ComponentESVisibilityManager = component("es-visibility-manager") + ComponentArchiver = component("archiver") + ComponentBatcher = component("batcher") + ComponentWorker = component("worker") + ComponentServiceResolver = component("service-resolver") + ComponentFailoverCoordinator = component("failover-coordinator") + ComponentFailoverMarkerNotifier = component("failover-marker-notifier") + ComponentCrossClusterTaskProcessor = component("cross-cluster-task-processor") ) // Pre-defined values for TagSysLifecycle diff --git a/common/metrics/defs.go b/common/metrics/defs.go index a93d0e706ef..fd3dac8493d 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -990,8 +990,10 @@ const ( TimerStandbyTaskDeleteHistoryEventScope // TimerStandbyTaskWorkflowBackoffTimerScope is the scope used by metric emitted by timer queue processor for processing retry task. TimerStandbyTaskWorkflowBackoffTimerScope - // CrossClusterQueueProcessorScope is the scope used by all metric emitted by cross cluster queue processor + // CrossClusterQueueProcessorScope is the scope used by all metric emitted by cross cluster queue processor in the source cluster CrossClusterQueueProcessorScope + // CrossClusterTaskProcessorScope is the scope used by all metric emitted by cross cluster task processor in the target cluster + CrossClusterTaskProcessorScope // CrossClusterTaskStartChildExecutionScope is the scope used by metric emitted by cross cluster queue processor for processing start child workflow task. CrossClusterTaskStartChildExecutionScope // CrossClusterTaskCancelExecutionScope is the scope used by metric emitted by cross cluster queue processor for processing cancel workflow task. @@ -1566,6 +1568,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ TimerStandbyTaskWorkflowBackoffTimerScope: {operation: "TimerStandbyTaskWorkflowBackoffTimer"}, TimerStandbyTaskDeleteHistoryEventScope: {operation: "TimerStandbyTaskDeleteHistoryEvent"}, CrossClusterQueueProcessorScope: {operation: "CrossClusterQueueProcessor"}, + CrossClusterTaskProcessorScope: {operation: "CrossClusterTaskProcessor"}, CrossClusterTaskStartChildExecutionScope: {operation: "CrossClusterTaskStartChildExecution"}, CrossClusterTaskCancelExecutionScope: {operation: "CrossClusterTaskCancelExecution"}, CrossClusterTaskTypeSignalExecutionScope: {operation: "CrossClusterTaskTypeSignalExecution"}, diff --git a/service/history/task/cross_cluster_task.go b/service/history/task/cross_cluster_task.go index d57b8a9e262..e7f9ff52713 100644 --- a/service/history/task/cross_cluster_task.go +++ b/service/history/task/cross_cluster_task.go @@ -26,9 +26,12 @@ import ( "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/dynamicconfig" + "github.com/uber/cadence/common/future" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/persistence" ctask "github.com/uber/cadence/common/task" + "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/history/shard" ) @@ -73,9 +76,35 @@ type ( taskProcessor Processor //TODO: wire up the dependency redispatchFn func(task Task) //TODO: wire up the dependency maxRetryCount dynamicconfig.IntPropertyFn + settable future.Settable } ) +// NewCrossClusterTaskForTargetCluster creates a CrossClusterTask +// for the processing logic at target cluster +// the returned the Future will be unblocked when after the task +// is processed. The future value has type types.CrossClusterTaskResponse +// and there will be not error returned for this future. All errors will +// be recorded by the FailedCause field in the response. +func NewCrossClusterTaskForTargetCluster( + shard shard.Context, + taskRequest *types.CrossClusterTaskRequest, + logger log.Logger, + maxRetryCount dynamicconfig.IntPropertyFn, +) (CrossClusterTask, future.Future) { + // TODO: create CrossClusterTasks based on request + // for now create a dummy CrossClusterTask for testing + future, settable := future.NewFuture() + return &crossClusterSignalWorkflowTask{ + crossClusterTaskBase: &crossClusterTaskBase{ + Info: &persistence.CrossClusterTaskInfo{ + TaskID: taskRequest.TaskInfo.TaskID, + }, + settable: settable, + }, + }, future +} + // NewCrossClusterSignalWorkflowTask initialize cross cluster signal workflow task and task future func NewCrossClusterSignalWorkflowTask( shard shard.Context, @@ -169,13 +198,17 @@ func (c *crossClusterSignalWorkflowTask) Execute() error { } func (c *crossClusterSignalWorkflowTask) Ack() { - panic("Not implement") - + // TODO: rewrite the implementation + // current impl is just for testing purpose + if c.settable != nil { + c.settable.Set(types.CrossClusterTaskResponse{ + TaskID: c.Info.GetTaskID(), + }, nil) + } } func (c *crossClusterSignalWorkflowTask) Nack() { panic("Not implement") - } func (c *crossClusterSignalWorkflowTask) HandleErr( @@ -202,7 +235,6 @@ func (c *crossClusterCancelWorkflowTask) Execute() error { func (c *crossClusterCancelWorkflowTask) Ack() { panic("Not implement") - } func (c *crossClusterCancelWorkflowTask) Nack() { @@ -301,6 +333,9 @@ func (c *crossClusterTaskBase) GetQueueType() QueueType { } func (c *crossClusterTaskBase) IsReadyForPoll() bool { + c.Lock() + defer c.Unlock() + return c.state == ctask.TaskStatePending && (c.processingState == processingStateInitialed || c.processingState == processingStateResponseRecorded) } diff --git a/service/history/task/cross_cluster_task_processor.go b/service/history/task/cross_cluster_task_processor.go new file mode 100644 index 00000000000..99c8f8f1403 --- /dev/null +++ b/service/history/task/cross_cluster_task_processor.go @@ -0,0 +1,448 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package task + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/backoff" + "github.com/uber/cadence/common/dynamicconfig" + "github.com/uber/cadence/common/future" + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/log/tag" + "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/types" + "github.com/uber/cadence/service/history/shard" +) + +const ( + respondCrossClusterTaskTimeout = 5 * time.Second +) + +type ( + // CrossClusterTaskProcessorOptions configures crossClusterTaskProcessor + CrossClusterTaskProcessorOptions struct { + MaxPendingTasks dynamicconfig.IntPropertyFn + TaskMaxRetryCount dynamicconfig.IntPropertyFn + TaskRedispatchInterval dynamicconfig.DurationPropertyFn + TaskWaitInterval dynamicconfig.DurationPropertyFn + ServiceBusyBackoffInterval dynamicconfig.DurationPropertyFn + TimerJitterCoefficient dynamicconfig.FloatPropertyFn + } + + crossClusterTaskProcessors []*crossClusterTaskProcessor + + crossClusterTaskProcessor struct { + shard shard.Context + taskProcessor Processor + redispatcher Redispatcher + taskFetcher Fetcher + options *CrossClusterTaskProcessorOptions + retryPolicy backoff.RetryPolicy + logger log.Logger + metricsScope metrics.Scope + + status int32 + shutdownCh chan struct{} + shutdownWG sync.WaitGroup + + taskLock sync.Mutex + pendingTasks map[int64]future.Future + } +) + +// NewCrossClusterTaskProcessors creates a list of crossClusterTaskProcessors +// for processing cross cluster tasks at target cluster. +// One processor per source cluster per shard +func NewCrossClusterTaskProcessors( + shard shard.Context, + taskProcessor Processor, + taskFetchers Fetchers, + options *CrossClusterTaskProcessorOptions, +) common.Daemon { + processors := make(crossClusterTaskProcessors, 0, len(taskFetchers)) + for _, fetcher := range taskFetchers { + processor := newCrossClusterTaskProcessor( + shard, + taskProcessor, + fetcher, + options, + ) + processors = append(processors, processor) + } + return processors +} + +func (processors crossClusterTaskProcessors) Start() { + for _, processor := range processors { + processor.Start() + } +} + +func (processors crossClusterTaskProcessors) Stop() { + for _, processor := range processors { + processor.Stop() + } +} + +func newCrossClusterTaskProcessor( + shard shard.Context, + taskProcessor Processor, + taskFetcher Fetcher, + options *CrossClusterTaskProcessorOptions, +) *crossClusterTaskProcessor { + logger := shard.GetLogger().WithTags( + tag.ComponentCrossClusterTaskProcessor, + tag.SourceCluster(taskFetcher.GetSourceCluster()), + ) + metricsScope := shard.GetMetricsClient().Scope(metrics.CrossClusterTaskProcessorScope) + retryPolicy := backoff.NewExponentialRetryPolicy(time.Millisecond * 100) + retryPolicy.SetMaximumInterval(time.Second) + retryPolicy.SetExpirationInterval(options.TaskWaitInterval()) + return &crossClusterTaskProcessor{ + shard: shard, + taskProcessor: taskProcessor, + taskFetcher: taskFetcher, + redispatcher: NewRedispatcher( + taskProcessor, + &RedispatcherOptions{ + TaskRedispatchInterval: options.TaskRedispatchInterval, + TaskRedispatchIntervalJitterCoefficient: options.TimerJitterCoefficient, + }, + logger, + metricsScope, + ), + options: options, + retryPolicy: retryPolicy, + logger: logger, + metricsScope: metricsScope, + + status: common.DaemonStatusInitialized, + shutdownCh: make(chan struct{}), + + pendingTasks: make(map[int64]future.Future), + } +} + +func (p *crossClusterTaskProcessor) Start() { + if !atomic.CompareAndSwapInt32(&p.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) { + return + } + + p.redispatcher.Start() + + p.shutdownWG.Add(2) + go p.processLoop() + go p.respondPendingTaskLoop() + + p.logger.Info("Task processor started.", tag.LifeCycleStarted) +} + +func (p *crossClusterTaskProcessor) Stop() { + if !atomic.CompareAndSwapInt32(&p.status, common.DaemonStatusStarted, common.DaemonStatusStopped) { + return + } + + close(p.shutdownCh) + p.redispatcher.Stop() + + if success := common.AwaitWaitGroup(&p.shutdownWG, time.Minute); !success { + p.logger.Warn("Task processor timedout on shutdown.", tag.LifeCycleStopTimedout) + } + p.logger.Info("Task processor stopped.", tag.LifeCycleStopped) +} + +func (p *crossClusterTaskProcessor) processLoop() { + defer p.shutdownWG.Done() + + for { + if p.hasShutdown() { + return + } + + if p.numPendingTasks() > p.options.MaxPendingTasks() { + time.Sleep(backoff.JitDuration( + p.options.TaskWaitInterval(), + p.options.TimerJitterCoefficient(), + )) + continue + } + + // this will submit the fetching request to the host level task fetcher for batching + fetchFuture := p.taskFetcher.Fetch(p.shard.GetShardID()) + + var taskRequests []*types.CrossClusterTaskRequest + if err := fetchFuture.Get(context.Background(), &taskRequests); err != nil { + p.logger.Error("Unable to fetch cross cluster tasks", tag.Error(err)) + if common.IsServiceBusyError(err) { + time.Sleep(backoff.JitDuration( + p.options.ServiceBusyBackoffInterval(), + p.options.TimerJitterCoefficient(), + )) + } + continue + } + + p.processTaskRequests(taskRequests) + } +} + +func (p *crossClusterTaskProcessor) processTaskRequests( + taskRequests []*types.CrossClusterTaskRequest, +) { + taskRequests = p.dedupTaskRequests(taskRequests) + // it's ok to drop task requests, + // the same request will be sent by the source cluster again upon next fetch + for len(taskRequests) != 0 && !p.hasShutdown() && p.numPendingTasks() < p.options.MaxPendingTasks() { + + taskFutures := make(map[int64]future.Future, len(taskRequests)) + for _, taskRequest := range taskRequests { + crossClusterTask, future := NewCrossClusterTaskForTargetCluster( + p.shard, + taskRequest, + p.logger, + p.options.TaskMaxRetryCount, + ) + taskFutures[taskRequest.TaskInfo.GetTaskID()] = future + + if err := p.submitTask(crossClusterTask); err != nil { + return + } + } + + respondRequest := &types.RespondCrossClusterTasksCompletedRequest{ + ShardID: int32(p.shard.GetShardID()), + TargetCluster: p.shard.GetClusterMetadata().GetCurrentClusterName(), + FetchNewTasks: p.numPendingTasks() < p.options.MaxPendingTasks(), + } + taskWaitContext, cancel := context.WithTimeout(context.Background(), p.options.TaskWaitInterval()) + deadlineExceeded := false + for taskID, taskFuture := range taskFutures { + if deadlineExceeded && !taskFuture.IsReady() { + continue + } + + var taskResponse types.CrossClusterTaskResponse + if err := taskFuture.Get(taskWaitContext, &taskResponse); err != nil { + if err == context.DeadlineExceeded { + // switch to a valid context here, otherwise Get() will always return an error. + // using context.Background() is fine since we will only be calling Get() with it + // when the future is ready + taskWaitContext = context.Background() + deadlineExceeded = true + continue + } + + // this case should not happen, + // task failure should be converted to FailCause in the response by the processing logic + taskResponse = types.CrossClusterTaskResponse{ + TaskID: taskID, + FailedCause: types.CrossClusterTaskFailedCauseUncategorized.Ptr(), + } + p.logger.Error("Encountered uncategorized error from cross cluster task future", tag.Error(err)) + } + respondRequest.TaskResponses = append(respondRequest.TaskResponses, &taskResponse) + } + cancel() + + successfullyRespondedTaskIDs := make(map[int64]struct{}) + var respondResponse *types.RespondCrossClusterTasksCompletedResponse + var respondErr error + respondResponse, respondErr = p.respondTaskCompletedWithRetry(respondRequest) + if respondErr == nil { + for _, response := range respondRequest.TaskResponses { + successfullyRespondedTaskIDs[response.GetTaskID()] = struct{}{} + } + } + + // move tasks that are still running or failed to respond to pendingTasks map + // so that the respond can be done later + p.taskLock.Lock() + for taskID, future := range taskFutures { + if _, ok := successfullyRespondedTaskIDs[taskID]; ok { + continue + } + p.pendingTasks[taskID] = future + } + p.taskLock.Unlock() + + if respondErr != nil { + return + } + taskRequests = p.dedupTaskRequests(respondResponse.Tasks) + } +} + +func (p *crossClusterTaskProcessor) respondPendingTaskLoop() { + defer p.shutdownWG.Done() + + respondTimer := time.NewTimer(backoff.JitDuration( + p.options.TaskWaitInterval(), + p.options.TimerJitterCoefficient(), + )) + + for { + select { + case <-p.shutdownCh: + return + case <-respondTimer.C: + // reset the timer first so that if respond task API call retried for some time + // we won't add an additional TaskWaitInterval before checking the status of + // pending tasks again + respondTimer.Reset(backoff.JitDuration( + p.options.TaskWaitInterval(), + p.options.TimerJitterCoefficient(), + )) + p.taskLock.Lock() + respondRequest := &types.RespondCrossClusterTasksCompletedRequest{ + ShardID: int32(p.shard.GetShardID()), + TargetCluster: p.shard.GetClusterMetadata().GetCurrentClusterName(), + FetchNewTasks: false, + } + for taskID, taskFuture := range p.pendingTasks { + if taskFuture.IsReady() { + var taskResponse types.CrossClusterTaskResponse + if err := taskFuture.Get(context.Background(), &taskResponse); err != nil { + // this case should not happen, + // task failure should be converted to FailCause in the response by the processing logic + taskResponse = types.CrossClusterTaskResponse{ + TaskID: taskID, + FailedCause: types.CrossClusterTaskFailedCauseUncategorized.Ptr(), + } + p.logger.Error("Encountered uncategorized error from cross cluster task future", tag.Error(err)) + } + respondRequest.TaskResponses = append(respondRequest.TaskResponses, &taskResponse) + } + } + p.taskLock.Unlock() + if len(respondRequest.TaskResponses) == 0 { + continue + } + + _, err := p.respondTaskCompletedWithRetry(respondRequest) + if err == nil { + // we can be sure that source cluster has received the response + p.taskLock.Lock() + for _, response := range respondRequest.TaskResponses { + taskID := response.GetTaskID() + delete(p.pendingTasks, taskID) + } + p.taskLock.Unlock() + } + + if common.IsServiceBusyError(err) { + respondTimer.Reset(backoff.JitDuration( + p.options.ServiceBusyBackoffInterval(), + p.options.TimerJitterCoefficient(), + )) + } + } + } +} + +func (p *crossClusterTaskProcessor) dedupTaskRequests( + taskRequests []*types.CrossClusterTaskRequest, +) []*types.CrossClusterTaskRequest { + // NOTE: this is only best effort dedup for reducing the number unnecessary task executions. + // it's possible that a task is removed from the pendingTasks maps before this dedup logic + // is executed for that task. In that case, that task will be executed multiple times. This + // is fine as all task processing logic is supposed to be idempotent. + dedupedRequests := make([]*types.CrossClusterTaskRequest, 0, len(taskRequests)) + + p.taskLock.Lock() + defer p.taskLock.Unlock() + + for _, taskRequest := range taskRequests { + taskID := taskRequest.TaskInfo.GetTaskID() + if _, ok := p.pendingTasks[taskID]; ok { + continue + } + dedupedRequests = append(dedupedRequests, taskRequest) + } + + return dedupedRequests +} + +func (p *crossClusterTaskProcessor) respondTaskCompletedWithRetry( + request *types.RespondCrossClusterTasksCompletedRequest, +) (*types.RespondCrossClusterTasksCompletedResponse, error) { + var response *types.RespondCrossClusterTasksCompletedResponse + err := backoff.Retry( + func() error { + ctx, cancel := context.WithTimeout(context.Background(), respondCrossClusterTaskTimeout) + defer cancel() + + var err error + response, err = p.shard.GetService().GetHistoryRawClient().RespondCrossClusterTasksCompleted(ctx, request) + return err + }, + p.retryPolicy, + func(err error) bool { + if common.IsServiceBusyError(err) { + return false + } + return common.IsServiceTransientError(err) + }, + ) + + return response, err +} + +// submitTask submits the task to the host level task processor +// or to the redispatch queue if failed to submit (task ch full, or other errors) +// so that the submission can be retried later +// error will be returned by this function only when the shard has been shutdown +func (p *crossClusterTaskProcessor) submitTask( + task CrossClusterTask, +) error { + submitted, err := p.taskProcessor.TrySubmit(task) + if err != nil { + if p.hasShutdown() { + return err + } + } + + p.logger.Error("Failed to submit task", tag.Error(err)) + if err != nil || !submitted { + p.redispatcher.AddTask(task) + } + return nil +} + +func (p *crossClusterTaskProcessor) numPendingTasks() int { + p.taskLock.Lock() + defer p.taskLock.Unlock() + + return len(p.pendingTasks) +} + +func (p *crossClusterTaskProcessor) hasShutdown() bool { + select { + case <-p.shutdownCh: + return true + default: + return false + } +} diff --git a/service/history/task/cross_cluster_task_processor_test.go b/service/history/task/cross_cluster_task_processor_test.go new file mode 100644 index 00000000000..17d6f84e60c --- /dev/null +++ b/service/history/task/cross_cluster_task_processor_test.go @@ -0,0 +1,303 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package task + +import ( + "context" + "errors" + "math/rand" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "github.com/uber/cadence/common/backoff" + "github.com/uber/cadence/common/cluster" + "github.com/uber/cadence/common/dynamicconfig" + "github.com/uber/cadence/common/future" + "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/common/types" + "github.com/uber/cadence/service/history/config" + "github.com/uber/cadence/service/history/constants" + "github.com/uber/cadence/service/history/shard" +) + +type ( + crossClusterTaskProcessorSuite struct { + suite.Suite + *require.Assertions + + controller *gomock.Controller + mockShard *shard.TestContext + mockProcessor *MockProcessor + + processorOptions *CrossClusterTaskProcessorOptions + fetcherOptions *FetcherOptions + retryPolicy *backoff.ExponentialRetryPolicy + } +) + +func TestCrossClusterTaskProcessSuite(t *testing.T) { + s := new(crossClusterTaskProcessorSuite) + suite.Run(t, s) +} + +func (s *crossClusterTaskProcessorSuite) SetupTest() { + s.Assertions = require.New(s.T()) + + s.controller = gomock.NewController(s.T()) + s.mockShard = shard.NewTestContext( + s.controller, + &persistence.ShardInfo{ + ShardID: 0, + RangeID: 1, + }, + config.NewForTest(), + ) + s.mockProcessor = NewMockProcessor(s.controller) + s.mockShard.Resource.ClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes() + + s.processorOptions = &CrossClusterTaskProcessorOptions{ + MaxPendingTasks: dynamicconfig.GetIntPropertyFn(100), + TaskMaxRetryCount: dynamicconfig.GetIntPropertyFn(100), + TaskRedispatchInterval: dynamicconfig.GetDurationPropertyFn(time.Hour), + TaskWaitInterval: dynamicconfig.GetDurationPropertyFn(time.Millisecond * 100), + ServiceBusyBackoffInterval: dynamicconfig.GetDurationPropertyFn(time.Millisecond * 200), + TimerJitterCoefficient: dynamicconfig.GetFloatPropertyFn(0.1), + } + s.fetcherOptions = &FetcherOptions{ + Parallelism: dynamicconfig.GetIntPropertyFn(3), + AggregationInterval: dynamicconfig.GetDurationPropertyFn(time.Millisecond * 100), + TimerJitterCoefficient: dynamicconfig.GetFloatPropertyFn(0.5), + } + s.retryPolicy = backoff.NewExponentialRetryPolicy(time.Millisecond * 50) + s.retryPolicy.SetMaximumInterval(time.Millisecond * 100) + s.retryPolicy.SetMaximumAttempts(3) +} + +func (s *crossClusterTaskProcessorSuite) TestCrossClusterTaskProcessorStartStop() { + taskFetchers := NewCrossClusterTaskFetchers( + constants.TestClusterMetadata, + s.mockShard.Resource.GetClientBean(), + s.fetcherOptions, + s.mockShard.Resource.GetLogger(), + ) + taskProcessors := NewCrossClusterTaskProcessors(s.mockShard, s.mockProcessor, taskFetchers, s.processorOptions) + s.Len(taskProcessors, len(taskFetchers)) + + s.mockShard.Resource.RemoteAdminClient.EXPECT().GetCrossClusterTasks(gomock.Any(), gomock.Any()).Return( + &types.GetCrossClusterTasksResponse{ + TasksByShard: map[int32][]*types.CrossClusterTaskRequest{int32(s.mockShard.GetShardID()): {}}, + }, nil, + ).AnyTimes() + + taskFetchers.Start() + taskProcessors.Start() + taskProcessors.Stop() + taskFetchers.Stop() +} + +func (s *crossClusterTaskProcessorSuite) TestRespondPendingTasks_Failed() { + s.testRespondPendingTasks(true) +} + +func (s *crossClusterTaskProcessorSuite) TestRespondPendingTasks_Success() { + s.testRespondPendingTasks(false) +} + +func (s *crossClusterTaskProcessorSuite) testRespondPendingTasks(failedToRespond bool) { + fetcher := newTaskFetcher( + cluster.TestAlternativeClusterName, + cluster.TestCurrentClusterName, + nil, nil, nil, nil, + ) + processor := newCrossClusterTaskProcessor(s.mockShard, s.mockProcessor, fetcher, s.processorOptions) + numPendingTasks := 10 + completedTasks := 6 + futureSettables := make([]future.Settable, numPendingTasks) + for i := 0; i != numPendingTasks; i++ { + taskFuture, settable := future.NewFuture() + processor.pendingTasks[int64(i)] = taskFuture + futureSettables[i] = settable + } + + for idx := range rand.Perm(numPendingTasks)[:completedTasks] { + futureSettables[idx].Set(types.CrossClusterTaskResponse{TaskID: int64(idx)}, nil) + } + + s.mockShard.Resource.HistoryClient.EXPECT().RespondCrossClusterTasksCompleted(gomock.Any(), gomock.Any()).DoAndReturn( + func( + _ context.Context, + request *types.RespondCrossClusterTasksCompletedRequest, + ) (*types.RespondCrossClusterTasksCompletedResponse, error) { + select { + case <-processor.shutdownCh: + default: + close(processor.shutdownCh) + } + + s.Len(request.TaskResponses, completedTasks) + s.Equal(s.mockShard.Resource.GetClusterMetadata().GetCurrentClusterName(), request.TargetCluster) + s.Equal(s.mockShard.GetShardID(), int(request.GetShardID())) + s.False(request.GetFetchNewTasks()) + if failedToRespond { + return nil, errors.New("some random error") + } + return &types.RespondCrossClusterTasksCompletedResponse{}, nil + }, + ).AnyTimes() + + processor.shutdownWG.Add(1) + go processor.respondPendingTaskLoop() + processor.shutdownWG.Wait() + + if failedToRespond { + s.Len(processor.pendingTasks, numPendingTasks) + } else { + s.Len(processor.pendingTasks, numPendingTasks-completedTasks) + } +} + +func (s *crossClusterTaskProcessorSuite) TestProcessTaskRequests_RespondFailed() { + s.testProcessTaskRequests(true) +} + +func (s *crossClusterTaskProcessorSuite) TestProcessTaskRequests_RespondSuccess() { + s.testProcessTaskRequests(false) +} + +func (s *crossClusterTaskProcessorSuite) testProcessTaskRequests(failedToRespond bool) { + fetcher := newTaskFetcher( + cluster.TestAlternativeClusterName, + cluster.TestCurrentClusterName, + nil, nil, nil, nil, + ) + processor := newCrossClusterTaskProcessor(s.mockShard, s.mockProcessor, fetcher, s.processorOptions) + pendingTaskID := 0 + future, _ := future.NewFuture() + processor.pendingTasks[int64(pendingTaskID)] = future + + numTasks := 10 + var tasksRequests []*types.CrossClusterTaskRequest + for id := pendingTaskID; id != pendingTaskID+numTasks; id++ { + tasksRequests = append(tasksRequests, &types.CrossClusterTaskRequest{ + TaskInfo: &types.CrossClusterTaskInfo{ + TaskID: int64(id), + }, + }) + } + + completedTasks := 0 + s.mockProcessor.EXPECT().TrySubmit(gomock.Any()).DoAndReturn( + func(t Task) (bool, error) { + submitted := rand.Intn(2) == 0 + if submitted { + completedTasks++ + t.Ack() + } + // redispatcher interval is set to 1hr, basically disabled + // so that it won't re-submit tasks and we can easily count + // how many task submits are attempted + return submitted, nil + }, + ).Times(numTasks - 1) // -1 since there's a duplicate task + + s.mockShard.Resource.HistoryClient.EXPECT().RespondCrossClusterTasksCompleted(gomock.Any(), gomock.Any()).DoAndReturn( + func( + _ context.Context, + request *types.RespondCrossClusterTasksCompletedRequest, + ) (*types.RespondCrossClusterTasksCompletedResponse, error) { + s.Len(request.TaskResponses, completedTasks) + s.Equal(s.mockShard.Resource.GetClusterMetadata().GetCurrentClusterName(), request.TargetCluster) + s.Equal(s.mockShard.GetShardID(), int(request.GetShardID())) + s.True(request.GetFetchNewTasks()) + if failedToRespond { + return nil, errors.New("some random error") + } + return &types.RespondCrossClusterTasksCompletedResponse{ + Tasks: []*types.CrossClusterTaskRequest{ + { + TaskInfo: &types.CrossClusterTaskInfo{TaskID: int64(pendingTaskID)}, + }, + }, + }, nil + }, + ).AnyTimes() + + processor.processTaskRequests(tasksRequests) + + if failedToRespond { + s.Len(processor.pendingTasks, numTasks) + } else { + s.Len(processor.pendingTasks, numTasks-completedTasks) + } +} + +func (s *crossClusterTaskProcessorSuite) TestProcessLoop() { + crossClusterTaskFetchers := NewCrossClusterTaskFetchers( + constants.TestClusterMetadata, + s.mockShard.Resource.GetClientBean(), + &FetcherOptions{ + Parallelism: dynamicconfig.GetIntPropertyFn(3), + AggregationInterval: dynamicconfig.GetDurationPropertyFn(time.Millisecond * 100), + TimerJitterCoefficient: dynamicconfig.GetFloatPropertyFn(0.5), + }, + s.mockShard.GetLogger(), + ) + var fetcher Fetcher + for _, f := range crossClusterTaskFetchers { + if f.GetSourceCluster() == cluster.TestAlternativeClusterName { + s.Nil(fetcher) + fetcher = f + } + } + s.NotNil(fetcher) + processor := newCrossClusterTaskProcessor(s.mockShard, s.mockProcessor, fetcher, s.processorOptions) + + totalGetRequests := 3 + numGetRequests := 0 + s.mockShard.Resource.RemoteAdminClient.EXPECT().GetCrossClusterTasks(gomock.Any(), gomock.Any()).DoAndReturn( + func( + _ context.Context, + request *types.GetCrossClusterTasksRequest, + ) (*types.GetCrossClusterTasksResponse, error) { + numGetRequests++ + if numGetRequests == totalGetRequests { + close(processor.shutdownCh) + } + + s.Len(request.ShardIDs, 1) + s.Equal(int32(s.mockShard.GetShardID()), request.ShardIDs[0]) + s.Equal(cluster.TestCurrentClusterName, request.GetTargetCluster()) + return &types.GetCrossClusterTasksResponse{ + TasksByShard: map[int32][]*types.CrossClusterTaskRequest{int32(s.mockShard.GetShardID()): {}}, + }, nil + }, + ).AnyTimes() + + fetcher.Start() + processor.shutdownWG.Add(1) + go processor.processLoop() + processor.shutdownWG.Wait() + fetcher.Stop() +} diff --git a/service/history/task/fetcher.go b/service/history/task/fetcher.go index e47e9350df3..1cbc238a946 100644 --- a/service/history/task/fetcher.go +++ b/service/history/task/fetcher.go @@ -21,6 +21,7 @@ package task import ( + "context" "errors" "sync" "sync/atomic" @@ -34,6 +35,7 @@ import ( "github.com/uber/cadence/common/future" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" + "github.com/uber/cadence/common/types" ) type ( @@ -47,16 +49,17 @@ type ( } fetchRequest struct { - shardID int + shardID int32 params []interface{} settable future.Settable } fetchTaskFunc func( clientBean client.Bean, + sourceCluster string, currentCluster string, - requestByShard map[int]fetchRequest, - ) (map[int]interface{}, error) + requestByShard map[int32]fetchRequest, + ) (map[int32]interface{}, error) fetcherImpl struct { status int32 @@ -87,6 +90,7 @@ var ( // NewCrossClusterTaskFetchers creates a set of task fetchers, // one for each source cluster +// The future returned by Fetcher.Get() will have value type []*types.CrossClusterTaskRequest func NewCrossClusterTaskFetchers( clusterMetadata cluster.Metadata, clientBean client.Bean, @@ -104,12 +108,34 @@ func NewCrossClusterTaskFetchers( func crossClusterTaskFetchFn( clientBean client.Bean, + sourceCluster string, currentCluster string, - requestByShard map[int]fetchRequest, -) (map[int]interface{}, error) { - // TODO: implement the fetch func after the - // API for fetching tasks is created. - return nil, errors.New("not implemented") + requestByShard map[int32]fetchRequest, +) (map[int32]interface{}, error) { + adminClient := clientBean.GetRemoteAdminClient(sourceCluster) + shardIDs := make([]int32, 0, len(requestByShard)) + for shardID := range requestByShard { + shardIDs = append(shardIDs, shardID) + } + // number of tasks returned will be controlled by source cluster. + // if there are lots of tasks in the source cluster, they will be + // returned in batches. + request := &types.GetCrossClusterTasksRequest{ + ShardIDs: shardIDs, + TargetCluster: currentCluster, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultFetchTimeout) + defer cancel() + resp, err := adminClient.GetCrossClusterTasks(ctx, request) + if err != nil { + return nil, err + } + + responseByShard := make(map[int32]interface{}, len(resp.TasksByShard)) + for shardID, tasks := range resp.TasksByShard { + responseByShard[shardID] = tasks + } + return responseByShard, nil } func newTaskFetchers( @@ -214,7 +240,7 @@ func (f *fetcherImpl) Fetch( future, settable := future.NewFuture() f.requestChan <- fetchRequest{ - shardID: shardID, + shardID: int32(shardID), params: fetchParams, settable: settable, } @@ -236,7 +262,7 @@ func (f *fetcherImpl) aggregator() { f.options.TimerJitterCoefficient(), )) - outstandingRequests := make(map[int]fetchRequest) + outstandingRequests := make(map[int32]fetchRequest) for { select { @@ -273,13 +299,13 @@ func (f *fetcherImpl) aggregator() { } func (f *fetcherImpl) fetch( - outstandingRequests map[int]fetchRequest, + outstandingRequests map[int32]fetchRequest, ) error { if len(outstandingRequests) == 0 { return nil } - tasksByShard, err := f.fetchTaskFunc(f.clientBean, f.currentCluster, outstandingRequests) + tasksByShard, err := f.fetchTaskFunc(f.clientBean, f.sourceCluster, f.currentCluster, outstandingRequests) if err != nil { f.logger.Error("Failed to fetch tasks", tag.Error(err)) return err diff --git a/service/history/task/fetcher_test.go b/service/history/task/fetcher_test.go index 9be95971e2d..5ff1cd8d2a1 100644 --- a/service/history/task/fetcher_test.go +++ b/service/history/task/fetcher_test.go @@ -22,9 +22,11 @@ package task import ( "context" + "math/rand" "testing" "time" + "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -34,6 +36,9 @@ import ( "github.com/uber/cadence/common/future" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/loggerimpl" + "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/resource" + "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/history/constants" ) @@ -42,6 +47,8 @@ type ( suite.Suite *require.Assertions + controller *gomock.Controller + options *FetcherOptions logger log.Logger } @@ -59,6 +66,8 @@ func TestFetcherSuite(t *testing.T) { func (s *fetcherSuite) SetupTest() { s.Assertions = require.New(s.T()) + s.controller = gomock.NewController(s.T()) + s.options = &FetcherOptions{ Parallelism: dynamicconfig.GetIntPropertyFn(3), AggregationInterval: dynamicconfig.GetDurationPropertyFn(time.Millisecond * 100), @@ -67,8 +76,75 @@ func (s *fetcherSuite) SetupTest() { s.logger = loggerimpl.NewLoggerForTest(s.Suite) } -func (s *fetcherSuite) TestCrossClusterTaskFetchFn() { - // TODO: add test for crossClusterTaskFetchFn +func (s *fetcherSuite) TearDownTest() { + s.controller.Finish() +} + +func (s *fetcherSuite) TestCrossClusterTaskFetchers() { + s.options.Parallelism = dynamicconfig.GetIntPropertyFn(1) + sourceCluster := cluster.TestAlternativeClusterName + currentCluster := cluster.TestCurrentClusterName + + shardIDs := []int32{1, 10, 123} + tasksByShard := make(map[int32][]*types.CrossClusterTaskRequest) + + mockResource := resource.NewTest(s.controller, metrics.History) + mockResource.RemoteAdminClient.EXPECT().GetCrossClusterTasks(gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, request *types.GetCrossClusterTasksRequest) (*types.GetCrossClusterTasksResponse, error) { + s.Equal(currentCluster, request.GetTargetCluster()) + resp := &types.GetCrossClusterTasksResponse{ + TasksByShard: make(map[int32][]*types.CrossClusterTaskRequest), + } + for _, shardID := range request.GetShardIDs() { + _, ok := tasksByShard[shardID] + s.False(ok) + + numTasks := rand.Intn(10) + taskRequests := make([]*types.CrossClusterTaskRequest, numTasks) + for i := 0; i != numTasks; i++ { + taskRequests[i] = &types.CrossClusterTaskRequest{ + TaskInfo: &types.CrossClusterTaskInfo{ + TaskID: rand.Int63n(10000), + }, + } + } + resp.TasksByShard[shardID] = taskRequests + tasksByShard[shardID] = taskRequests + } + return resp, nil + }, + ).AnyTimes() + + crossClusterTaskFetchers := NewCrossClusterTaskFetchers( + constants.TestClusterMetadata, + mockResource.GetClientBean(), + s.options, + s.logger, + ) + var fetcher Fetcher + for _, f := range crossClusterTaskFetchers { + if f.GetSourceCluster() == sourceCluster { + s.Nil(fetcher) + fetcher = f + } + } + s.NotNil(fetcher) + + fetcher.Start() + futures := make(map[int32]future.Future, len(shardIDs)) + for _, shardID := range shardIDs { + futures[shardID] = fetcher.Fetch(int(shardID)) + } + + for shardID, future := range futures { + var taskRequests []*types.CrossClusterTaskRequest + err := future.Get(context.Background(), &taskRequests) + s.NoError(err) + s.Equal(tasksByShard[shardID], taskRequests) + } + + fetcher.Stop() + mockResource.Finish(s.T()) } func (s *fetcherSuite) TestNewTaskFetchers() { @@ -141,10 +217,11 @@ func (s *fetcherSuite) TestAggregator() { func (s *fetcherSuite) testFetchTaskFn( clientBean client.Bean, + sourceCluster string, currentCluster string, - tokenByShard map[int]fetchRequest, -) (map[int]interface{}, error) { - results := make(map[int]interface{}, len(tokenByShard)) + tokenByShard map[int32]fetchRequest, +) (map[int32]interface{}, error) { + results := make(map[int32]interface{}, len(tokenByShard)) for shardID, request := range tokenByShard { results[shardID] = testFetchResult{ fetchParams: request.params,