From 1be9b6d6a01c3b787678369e48480806abf00106 Mon Sep 17 00:00:00 2001 From: Vytautas Date: Wed, 24 Aug 2022 12:35:41 +0300 Subject: [PATCH] Replication cache for sharing hydrated messages (#4952) * Replication cache for sharing hydrated messages * Export ErrUnknownCluster * Tag shardID for TaskStore Co-authored-by: David Porter --- common/dynamicconfig/constants.go | 11 + common/log/tag/values.go | 1 + common/metrics/defs.go | 9 + service/history/config/config.go | 2 + service/history/historyEngine.go | 15 +- service/history/replication/cache.go | 150 ++++++++++++ service/history/replication/cache_test.go | 114 +++++++++ .../history/replication/task_ack_manager.go | 73 +----- .../replication/task_ack_manager_test.go | 33 +-- service/history/replication/task_store.go | 229 ++++++++++++++++++ .../history/replication/task_store_test.go | 168 +++++++++++++ 11 files changed, 720 insertions(+), 85 deletions(-) create mode 100644 service/history/replication/cache.go create mode 100644 service/history/replication/cache_test.go create mode 100644 service/history/replication/task_store.go create mode 100644 service/history/replication/task_store_test.go diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 2c066adec6d..8c89187c8b6 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -951,6 +951,12 @@ const ( // Default value: 3 // Allowed filters: N/A ReplicatorReadTaskMaxRetryCount + // ReplicatorCacheCapacity is the capacity of replication cache in number of tasks + // KeyName: history.replicatorCacheCapacity + // Value type: Int + // Default value: 10000 + // Allowed filters: N/A + ReplicatorCacheCapacity // ExecutionMgrNumConns is persistence connections number for ExecutionManager // KeyName: history.executionMgrNumConns @@ -3066,6 +3072,11 @@ var IntKeys = map[IntKey]DynamicInt{ Description: "ReplicatorReadTaskMaxRetryCount is the number of read replication task retry time", DefaultValue: 3, }, + ReplicatorCacheCapacity: DynamicInt{ + KeyName: "history.replicatorCacheCapacity", + Description: "ReplicatorCacheCapacity is the capacity of replication cache in number of tasks", + DefaultValue: 10000, + }, ExecutionMgrNumConns: DynamicInt{ KeyName: "history.executionMgrNumConns", Description: "ExecutionMgrNumConns is persistence connections number for ExecutionManager", diff --git a/common/log/tag/values.go b/common/log/tag/values.go index 5ba5b1da920..de61ff6ae01 100644 --- a/common/log/tag/values.go +++ b/common/log/tag/values.go @@ -114,6 +114,7 @@ var ( ComponentReplicator = component("replicator") ComponentReplicationTaskProcessor = component("replication-task-processor") ComponentReplicationAckManager = component("replication-ack-manager") + ComponentReplicationCacheManager = component("replication-cache-manager") ComponentHistoryReplicator = component("history-replicator") ComponentHistoryResender = component("history-resender") ComponentIndexer = component("indexer") diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 871bfcd671b..98f5a43778b 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -1089,6 +1089,8 @@ const ( HistoryEventNotificationScope // ReplicatorQueueProcessorScope is the scope used by all metric emitted by replicator queue processor ReplicatorQueueProcessorScope + // ReplicatorCacheManagerScope is the scope used by all metric emitted by replicator cache manager + ReplicatorCacheManagerScope // ReplicatorTaskHistoryScope is the scope used for history task processing by replicator queue processor ReplicatorTaskHistoryScope // ReplicatorTaskSyncActivityScope is the scope used for sync activity by replicator queue processor @@ -1705,6 +1707,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ CrossClusterTargetTaskApplyParentClosePolicyScope: {operation: "CrossClusterTargetTaskTypeApplyParentClosePolicy"}, HistoryEventNotificationScope: {operation: "HistoryEventNotification"}, ReplicatorQueueProcessorScope: {operation: "ReplicatorQueueProcessor"}, + ReplicatorCacheManagerScope: {operation: "ReplicatorCacheManager"}, ReplicatorTaskHistoryScope: {operation: "ReplicatorTaskHistory"}, ReplicatorTaskSyncActivityScope: {operation: "ReplicatorTaskSyncActivity"}, ReplicateHistoryEventsScope: {operation: "ReplicateHistoryEvents"}, @@ -2082,10 +2085,13 @@ const ( UnbufferReplicationTaskTimer HistoryConflictsCounter CompleteTaskFailedCounter + CacheSize CacheRequests CacheFailures CacheLatency + CacheHitCounter CacheMissCounter + CacheFullCounter AcquireLockFailedCounter WorkflowContextCleared MutableStateSize @@ -2633,10 +2639,13 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{ UnbufferReplicationTaskTimer: {metricName: "unbuffer_replication_tasks", metricType: Timer}, HistoryConflictsCounter: {metricName: "history_conflicts", metricType: Counter}, CompleteTaskFailedCounter: {metricName: "complete_task_fail_count", metricType: Counter}, + CacheSize: {metricName: "cache_size", metricType: Gauge}, CacheRequests: {metricName: "cache_requests", metricType: Counter}, CacheFailures: {metricName: "cache_errors", metricType: Counter}, CacheLatency: {metricName: "cache_latency", metricType: Timer}, + CacheHitCounter: {metricName: "cache_hit", metricType: Counter}, CacheMissCounter: {metricName: "cache_miss", metricType: Counter}, + CacheFullCounter: {metricName: "cache_full", metricType: Counter}, AcquireLockFailedCounter: {metricName: "acquire_lock_failed", metricType: Counter}, WorkflowContextCleared: {metricName: "workflow_context_cleared", metricType: Counter}, MutableStateSize: {metricName: "mutable_state_size", metricType: Timer}, diff --git a/service/history/config/config.go b/service/history/config/config.go index e506991e181..ce348a66ebb 100644 --- a/service/history/config/config.go +++ b/service/history/config/config.go @@ -189,6 +189,7 @@ type Config struct { ReplicatorReadTaskMaxRetryCount dynamicconfig.IntPropertyFn ReplicatorProcessorFetchTasksBatchSize dynamicconfig.IntPropertyFnWithShardIDFilter ReplicatorUpperLatency dynamicconfig.DurationPropertyFn + ReplicatorCacheCapacity dynamicconfig.IntPropertyFn // Persistence settings ExecutionMgrNumConns dynamicconfig.IntPropertyFn @@ -458,6 +459,7 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, storeType string, isA ReplicatorReadTaskMaxRetryCount: dc.GetIntProperty(dynamicconfig.ReplicatorReadTaskMaxRetryCount), ReplicatorProcessorFetchTasksBatchSize: dc.GetIntPropertyFilteredByShardID(dynamicconfig.ReplicatorTaskBatchSize), ReplicatorUpperLatency: dc.GetDurationProperty(dynamicconfig.ReplicatorUpperLatency), + ReplicatorCacheCapacity: dc.GetIntProperty(dynamicconfig.ReplicatorCacheCapacity), ExecutionMgrNumConns: dc.GetIntProperty(dynamicconfig.ExecutionMgrNumConns), HistoryMgrNumConns: dc.GetIntProperty(dynamicconfig.HistoryMgrNumConns), diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 0c68ffc6e3d..60a292cdd06 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -113,6 +113,7 @@ type ( crossClusterTaskProcessors common.Daemon replicationTaskProcessors []replication.TaskProcessor replicationAckManager replication.TaskAckManager + replicationTaskStore *replication.TaskStore replicationHydrator replication.TaskHydrator publicClient workflowserviceclient.Interface eventsReapplier ndc.EventsReapplier @@ -159,6 +160,15 @@ func NewEngineWithShardContext( executionCache := execution.NewCache(shard) failoverMarkerNotifier := failover.NewMarkerNotifier(shard, config, failoverCoordinator) replicationHydrator := replication.NewDeferredTaskHydrator(shard.GetShardID(), historyV2Manager, executionCache, shard.GetDomainCache()) + replicationTaskStore := replication.NewTaskStore( + shard.GetShardID(), + shard.GetConfig(), + shard.GetClusterMetadata(), + shard.GetDomainCache(), + shard.GetMetricsClient(), + shard.GetLogger(), + replicationHydrator, + ) historyEngImpl := &historyEngineImpl{ currentClusterName: currentClusterName, shard: shard, @@ -210,13 +220,12 @@ func NewEngineWithShardContext( replicationAckManager: replication.NewTaskAckManager( shard.GetShardID(), shard, - shard.GetDomainCache(), shard.GetMetricsClient(), shard.GetLogger(), - shard.GetConfig(), replication.NewDynamicTaskReader(shard.GetShardID(), executionManager, shard.GetTimeSource(), config), - replicationHydrator, + replicationTaskStore, ), + replicationTaskStore: replicationTaskStore, } historyEngImpl.decisionHandler = decision.NewHandler( shard, diff --git a/service/history/replication/cache.go b/service/history/replication/cache.go new file mode 100644 index 00000000000..80421382162 --- /dev/null +++ b/service/history/replication/cache.go @@ -0,0 +1,150 @@ +// The MIT License (MIT) +// +// Copyright (c) 2022 Uber Technologies Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package replication + +import ( + "container/heap" + "errors" + "sync" + + "github.com/uber/cadence/common/dynamicconfig" + "github.com/uber/cadence/common/types" +) + +var ( + errCacheFull = errors.New("cache is full") + errAlreadyAcked = errors.New("already acked") +) + +// Cache is an in-memory implementation of a cache for storing hydrated replication messages. +// Messages can come out of order as long as their task ID is higher than last acknowledged message. +// Out of order is expected as different source clusters will share hydrated replication messages. +// +// Cache utilizes heap to keep replication messages in order. This is needed for efficient acknowledgements in O(log N). +// +// Cache capacity can be increased dynamically. Decrease will require a restart, as new tasks will not be accepted, but memory will not be reclaimed either. +// +// Cache methods are thread safe. It is expected to have writers and readers from different go routines. +type Cache struct { + mu sync.RWMutex + + capacity dynamicconfig.IntPropertyFn + + order int64Heap + cache map[int64]*types.ReplicationTask + + lastAck int64 +} + +// NewCache create a new instance of replication cache +func NewCache(capacity dynamicconfig.IntPropertyFn) *Cache { + initialCapacity := capacity() + return &Cache{ + capacity: capacity, + order: make(int64Heap, 0, initialCapacity), + cache: make(map[int64]*types.ReplicationTask, initialCapacity), + } +} + +// Size returns current size of the cache +func (c *Cache) Size() int { + c.mu.RLock() + defer c.mu.RUnlock() + + return len(c.order) +} + +// Put stores replication task in the cache. +// - If cache is full, it will return errCacheFull +// - If given task has ID lower than previously acknowledged task, it will errOutOfOrder +func (c *Cache) Put(task *types.ReplicationTask) error { + c.mu.Lock() + defer c.mu.Unlock() + + // Check for full cache + if len(c.order) >= c.capacity() { + return errCacheFull + } + + taskID := task.SourceTaskID + + // Reject task as it was already acknowledged + if c.lastAck >= taskID { + return errAlreadyAcked + } + + // Do not add duplicate tasks + if _, exists := c.cache[taskID]; exists { + return nil + } + + c.cache[taskID] = task + heap.Push(&c.order, taskID) + + return nil +} + +// Get will return a stored task having a given taskID. +// If task is not cache, nil is returned. +func (c *Cache) Get(taskID int64) *types.ReplicationTask { + c.mu.RLock() + defer c.mu.RUnlock() + + return c.cache[taskID] +} + +// Ack is used to acknowledge replication messages. +// Meaning they will be removed from the cache. +func (c *Cache) Ack(level int64) { + c.mu.Lock() + defer c.mu.Unlock() + + for c.order.Len() > 0 && c.order.Peek() <= level { + taskID := heap.Pop(&c.order).(int64) + delete(c.cache, taskID) + } + + c.lastAck = level +} + +type int64Heap []int64 + +func (h int64Heap) Len() int { return len(h) } +func (h int64Heap) Less(i, j int) bool { return h[i] < h[j] } +func (h int64Heap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +func (h *int64Heap) Push(x interface{}) { + *h = append(*h, x.(int64)) +} + +func (h *int64Heap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} + +func (h *int64Heap) Peek() int64 { + return (*h)[0] +} diff --git a/service/history/replication/cache_test.go b/service/history/replication/cache_test.go new file mode 100644 index 00000000000..ba774d3f32d --- /dev/null +++ b/service/history/replication/cache_test.go @@ -0,0 +1,114 @@ +// The MIT License (MIT) +// +// Copyright (c) 2022 Uber Technologies Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package replication + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/uber/cadence/common/dynamicconfig" + "github.com/uber/cadence/common/types" +) + +func TestCache(t *testing.T) { + task1 := &types.ReplicationTask{SourceTaskID: 100} + task2 := &types.ReplicationTask{SourceTaskID: 200} + task3 := &types.ReplicationTask{SourceTaskID: 300} + task4 := &types.ReplicationTask{SourceTaskID: 400} + + cache := NewCache(dynamicconfig.GetIntPropertyFn(3)) + + assert.Equal(t, 0, cache.Size()) + require.NoError(t, cache.Put(task2)) + assert.Equal(t, 1, cache.Size()) + require.NoError(t, cache.Put(task2)) + assert.Equal(t, 1, cache.Size()) + require.NoError(t, cache.Put(task3)) + assert.Equal(t, 2, cache.Size()) + require.NoError(t, cache.Put(task1)) + assert.Equal(t, 3, cache.Size()) + assert.Equal(t, errCacheFull, cache.Put(task4)) + assert.Equal(t, 3, cache.Size()) + + assert.Nil(t, cache.Get(0)) + assert.Nil(t, cache.Get(99)) + assert.Equal(t, task1, cache.Get(100)) + assert.Nil(t, cache.Get(101)) + assert.Equal(t, task2, cache.Get(200)) + assert.Equal(t, task3, cache.Get(300)) + + cache.Ack(0) + assert.Equal(t, 3, cache.Size()) + assert.Equal(t, task1, cache.Get(100)) + assert.Equal(t, task2, cache.Get(200)) + assert.Equal(t, task3, cache.Get(300)) + + cache.Ack(1) + assert.Equal(t, 3, cache.Size()) + assert.Equal(t, task1, cache.Get(100)) + assert.Equal(t, task2, cache.Get(200)) + assert.Equal(t, task3, cache.Get(300)) + + cache.Ack(99) + assert.Equal(t, 3, cache.Size()) + assert.Equal(t, task1, cache.Get(100)) + assert.Equal(t, task2, cache.Get(200)) + assert.Equal(t, task3, cache.Get(300)) + + cache.Ack(100) + assert.Equal(t, 2, cache.Size()) + assert.Nil(t, cache.Get(100)) + assert.Equal(t, task2, cache.Get(200)) + assert.Equal(t, task3, cache.Get(300)) + + assert.Equal(t, errAlreadyAcked, cache.Put(task1)) + + cache.Ack(301) + assert.Equal(t, 0, cache.Size()) + assert.Nil(t, cache.Get(100)) + assert.Nil(t, cache.Get(200)) + assert.Nil(t, cache.Get(300)) + + require.NoError(t, cache.Put(task4)) + assert.Equal(t, 1, cache.Size()) + assert.Nil(t, cache.Get(100)) + assert.Nil(t, cache.Get(200)) + assert.Nil(t, cache.Get(300)) + assert.Equal(t, task4, cache.Get(400)) +} + +func BenchmarkCache(b *testing.B) { + cache := NewCache(dynamicconfig.GetIntPropertyFn(10000)) + for i := 0; i < 5000; i++ { + cache.Put(&types.ReplicationTask{SourceTaskID: int64(i * 100)}) + } + + for n := 0; n < b.N; n++ { + readLevel := int64((n) * 100) + cache.Get(readLevel) + cache.Put(&types.ReplicationTask{SourceTaskID: int64(n * 100)}) + cache.Ack(readLevel) + } +} diff --git a/service/history/replication/task_ack_manager.go b/service/history/replication/task_ack_manager.go index ebba9c474ac..5dd1e82cb1a 100644 --- a/service/history/replication/task_ack_manager.go +++ b/service/history/replication/task_ack_manager.go @@ -28,30 +28,23 @@ import ( "time" "github.com/uber/cadence/common" - "github.com/uber/cadence/common/backoff" - "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" - "github.com/uber/cadence/common/quotas" "github.com/uber/cadence/common/types" - "github.com/uber/cadence/service/history/config" ) type ( // TaskAckManager is the ack manager for replication tasks TaskAckManager struct { - ackLevels ackLevelStore - domains domainCache - rateLimiter *quotas.DynamicRateLimiter - throttleRetry *backoff.ThrottleRetry + ackLevels ackLevelStore scope metrics.Scope logger log.Logger - reader taskReader - hydrator taskHydrator + reader taskReader + store *TaskStore } ackLevelStore interface { @@ -60,48 +53,30 @@ type ( GetClusterReplicationLevel(cluster string) int64 UpdateClusterReplicationLevel(cluster string, lastTaskID int64) error } - domainCache interface { - GetDomainByID(id string) (*cache.DomainCacheEntry, error) - } taskReader interface { Read(ctx context.Context, readLevel int64, maxReadLevel int64) ([]*persistence.ReplicationTaskInfo, bool, error) } - taskHydrator interface { - Hydrate(ctx context.Context, task persistence.ReplicationTaskInfo) (*types.ReplicationTask, error) - } ) // NewTaskAckManager initializes a new replication task ack manager func NewTaskAckManager( shardID int, ackLevels ackLevelStore, - domains domainCache, metricsClient metrics.Client, logger log.Logger, - config *config.Config, reader taskReader, - hydrator taskHydrator, + store *TaskStore, ) TaskAckManager { - retryPolicy := backoff.NewExponentialRetryPolicy(100 * time.Millisecond) - retryPolicy.SetMaximumAttempts(config.ReplicatorReadTaskMaxRetryCount()) - retryPolicy.SetBackoffCoefficient(1) - return TaskAckManager{ - ackLevels: ackLevels, - domains: domains, - rateLimiter: quotas.NewDynamicRateLimiter(config.ReplicationTaskGenerationQPS.AsFloat64()), - throttleRetry: backoff.NewThrottleRetry( - backoff.WithRetryPolicy(retryPolicy), - backoff.WithRetryableError(persistence.IsTransientError), - ), + ackLevels: ackLevels, scope: metricsClient.Scope( metrics.ReplicatorQueueProcessorScope, metrics.InstanceTag(strconv.Itoa(shardID)), ), - logger: logger.WithTags(tag.ComponentReplicationAckManager), - reader: reader, - hydrator: hydrator, + logger: logger.WithTags(tag.ComponentReplicationAckManager), + reader: reader, + store: store, } } @@ -121,25 +96,8 @@ func (t *TaskAckManager) GetTasks(ctx context.Context, pollingCluster string, la readLevel := lastReadTaskID TaskInfoLoop: for _, task := range tasks { - // filter task info by domain clusters. - domainEntity, err := t.domains.GetDomainByID(task.DomainID) - if err != nil { - return nil, err - } - if skipTask(pollingCluster, domainEntity) { - readLevel = task.TaskID - continue - } + replicationTask, err := t.store.Get(ctx, pollingCluster, *task) - // construct replication task from DB - _ = t.rateLimiter.Wait(ctx) - var replicationTask *types.ReplicationTask - op := func() error { - var err error - replicationTask, err = t.hydrator.Hydrate(ctx, *task) - return err - } - err = t.throttleRetry.Do(ctx, op) switch err.(type) { case nil: // No action @@ -166,6 +124,10 @@ TaskInfoLoop: t.logger.Error("error updating replication level for shard", tag.Error(err), tag.OperationFailed) } + if err := t.store.Ack(pollingCluster, lastReadTaskID); err != nil { + t.logger.Error("error updating replication level for hydrated task store", tag.Error(err), tag.OperationFailed) + } + t.logger.Debug("Get replication tasks", tag.SourceCluster(pollingCluster), tag.ShardReplicationAck(lastReadTaskID), tag.ReadLevel(readLevel)) return &types.ReplicationMessages{ ReplicationTasks: replicationTasks, @@ -173,12 +135,3 @@ TaskInfoLoop: LastRetrievedMessageID: readLevel, }, nil } - -func skipTask(pollingCluster string, domainEntity *cache.DomainCacheEntry) bool { - for _, cluster := range domainEntity.GetReplicationConfig().Clusters { - if cluster.ClusterName == pollingCluster { - return false - } - } - return true -} diff --git a/service/history/replication/task_ack_manager_test.go b/service/history/replication/task_ack_manager_test.go index 679dd08a4de..1704680ee9d 100644 --- a/service/history/replication/task_ack_manager_test.go +++ b/service/history/replication/task_ack_manager_test.go @@ -31,12 +31,10 @@ import ( "github.com/stretchr/testify/require" "github.com/uber/cadence/common/cache" - "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/types" - "github.com/uber/cadence/service/history/config" ) var ( @@ -45,15 +43,16 @@ var ( testTask13 = persistence.ReplicationTaskInfo{TaskID: 13, DomainID: testDomainID} testTask14 = persistence.ReplicationTaskInfo{TaskID: 14, DomainID: testDomainID} - testHydratedTask11 = types.ReplicationTask{SourceTaskID: 11} - testHydratedTask12 = types.ReplicationTask{SourceTaskID: 12} - testHydratedTask14 = types.ReplicationTask{SourceTaskID: 14} + testHydratedTask11 = types.ReplicationTask{SourceTaskID: 11, HistoryTaskV2Attributes: &types.HistoryTaskV2Attributes{DomainID: testDomainID}} + testHydratedTask12 = types.ReplicationTask{SourceTaskID: 12, SyncActivityTaskAttributes: &types.SyncActivityTaskAttributes{DomainID: testDomainID}} + testHydratedTask14 = types.ReplicationTask{SourceTaskID: 14, FailoverMarkerAttributes: &types.FailoverMarkerAttributes{DomainID: testDomainID}} testHydratedTaskErrorRecoverable = types.ReplicationTask{SourceTaskID: -100} testHydratedTaskErrorNonRecoverable = types.ReplicationTask{SourceTaskID: -200} testClusterA = "cluster-A" testClusterB = "cluster-B" + testClusterC = "cluster-C" testDomainName = "test-domain-name" @@ -175,7 +174,7 @@ func TestTaskAckManager_GetTasks(t *testing.T) { expectErr: "error reading replication tasks", }, { - name: "failed to get domain - return error", + name: "failed to get domain - stops", ackLevels: &fakeAckLevelStore{ readLevel: 200, remote: map[string]int64{testClusterA: 2}, @@ -185,7 +184,10 @@ func TestTaskAckManager_GetTasks(t *testing.T) { hydrator: fakeTaskHydrator{}, pollingCluster: testClusterA, lastReadLevel: 5, - expectErr: "domain does not exist", + expectResult: &types.ReplicationMessages{ + LastRetrievedMessageID: 5, + HasMore: true, + }, }, { name: "failed to update ack level - no error, return response anyway", @@ -209,12 +211,8 @@ func TestTaskAckManager_GetTasks(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - config := config.Config{ - ReplicationTaskGenerationQPS: dynamicconfig.GetFloatPropertyFn(0), - ReplicatorReadTaskMaxRetryCount: dynamicconfig.GetIntPropertyFn(1), - } - - ackManager := NewTaskAckManager(testShardID, tt.ackLevels, tt.domains, metrics.NewNoopMetricsClient(), log.NewNoop(), &config, tt.reader, tt.hydrator) + taskStore := createTestTaskStore(tt.domains, tt.hydrator) + ackManager := NewTaskAckManager(testShardID, tt.ackLevels, metrics.NewNoopMetricsClient(), log.NewNoop(), tt.reader, taskStore) result, err := ackManager.GetTasks(context.Background(), tt.pollingCluster, tt.lastReadLevel) if tt.expectErr != "" { @@ -284,12 +282,3 @@ func (h fakeTaskHydrator) Hydrate(ctx context.Context, task persistence.Replicat } panic("fix the test, should not reach this") } - -type fakeDomainCache map[string]*cache.DomainCacheEntry - -func (cache fakeDomainCache) GetDomainByID(id string) (*cache.DomainCacheEntry, error) { - if entry, ok := cache[id]; ok { - return entry, nil - } - return nil, types.EntityNotExistsError{Message: "domain does not exist"} -} diff --git a/service/history/replication/task_store.go b/service/history/replication/task_store.go new file mode 100644 index 00000000000..4b985a1a680 --- /dev/null +++ b/service/history/replication/task_store.go @@ -0,0 +1,229 @@ +// The MIT License (MIT) +// +// Copyright (c) 2022 Uber Technologies Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package replication + +import ( + "context" + "errors" + "fmt" + "strconv" + "time" + + "github.com/uber/cadence/common/backoff" + "github.com/uber/cadence/common/cache" + "github.com/uber/cadence/common/cluster" + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/log/tag" + "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/common/quotas" + "github.com/uber/cadence/common/types" + "github.com/uber/cadence/service/history/config" +) + +// ErrUnknownCluster is returned when given cluster is not defined in cluster metadata +var ErrUnknownCluster = errors.New("unknown cluster") + +// TaskStore is a component that hydrates and caches replication messages so that they can be reused across several polling source clusters. +// It also exposes public Put method. This allows pre-store already hydrated messages at the end of successful transaction, saving a DB call to fetch history events. +// +// TaskStore uses a separate cache per each source cluster allowing messages to be fetched at different rates. +// Once a cache becomes full it will not accept further messages for that cluster. Later those messages be fetched from DB and hydrated again. +// A cache stores only a pointer to the message. It is hydrates once and shared across caches. Cluster acknowledging the message will remove it from that corresponding cache. +// Once all clusters acknowledge it, no more references will be held, and GC will eventually pick it up. +type TaskStore struct { + clusters map[string]*Cache + domains domainCache + hydrator taskHydrator + rateLimiter *quotas.DynamicRateLimiter + throttleRetry *backoff.ThrottleRetry + + scope metrics.Scope + logger log.Logger +} + +type ( + domainCache interface { + GetDomainByID(id string) (*cache.DomainCacheEntry, error) + } + taskHydrator interface { + Hydrate(ctx context.Context, task persistence.ReplicationTaskInfo) (*types.ReplicationTask, error) + } +) + +// NewTaskStore create new instance of TaskStore +func NewTaskStore( + shardID int, + config *config.Config, + clusterMetadata cluster.Metadata, + domains domainCache, + metricsClient metrics.Client, + logger log.Logger, + hydrator taskHydrator, +) *TaskStore { + + clusters := map[string]*Cache{} + for clusterName := range clusterMetadata.GetRemoteClusterInfo() { + clusters[clusterName] = NewCache(config.ReplicatorCacheCapacity) + } + + retryPolicy := backoff.NewExponentialRetryPolicy(100 * time.Millisecond) + retryPolicy.SetMaximumAttempts(config.ReplicatorReadTaskMaxRetryCount()) + retryPolicy.SetBackoffCoefficient(1) + + return &TaskStore{ + clusters: clusters, + domains: domains, + hydrator: hydrator, + throttleRetry: backoff.NewThrottleRetry( + backoff.WithRetryPolicy(retryPolicy), + backoff.WithRetryableError(persistence.IsTransientError), + ), + scope: metricsClient.Scope( + metrics.ReplicatorCacheManagerScope, + metrics.InstanceTag(strconv.Itoa(shardID)), + ), + logger: logger.WithTags(tag.ComponentReplicationCacheManager), + rateLimiter: quotas.NewDynamicRateLimiter(config.ReplicationTaskGenerationQPS.AsFloat64()), + } +} + +// Get will return a hydrated replication message for a given cluster based on raw task info. +// It will either return it immediately from cache or hydrate it, store in cache and then return. +// +// Returned task may be nil. This may be due domain not existing in a given cluster or replication message is not longer relevant. +// Either case is valid and such replication message should be ignored and not returned in the response. +func (m *TaskStore) Get(ctx context.Context, cluster string, info persistence.ReplicationTaskInfo) (*types.ReplicationTask, error) { + cache, ok := m.clusters[cluster] + if !ok { + return nil, ErrUnknownCluster + } + + domain, err := m.domains.GetDomainByID(info.DomainID) + if err != nil { + return nil, fmt.Errorf("resolving domain: %w", err) + } + + // Domain does not exist in this cluster, do not replicate the task + if !domain.HasReplicationCluster(cluster) { + return nil, nil + } + + scope := m.scope.Tagged(metrics.SourceClusterTag(cluster)) + + scope.IncCounter(metrics.CacheRequests) + sw := scope.StartTimer(metrics.CacheLatency) + defer sw.Stop() + + task := cache.Get(info.TaskID) + if task != nil { + scope.IncCounter(metrics.CacheHitCounter) + return task, nil + } + + m.scope.IncCounter(metrics.CacheMissCounter) + + // Rate limit to not kill the database + m.rateLimiter.Wait(ctx) + + err = m.throttleRetry.Do(ctx, func() error { + var err error + task, err = m.hydrator.Hydrate(ctx, info) + return err + }) + + if err != nil { + m.scope.IncCounter(metrics.CacheFailures) + return nil, err + } + + m.Put(task) + + return task, nil +} + +// Put will try to store hydrated replication to all cluster caches. +// Tasks may not be relevant, as domain is not enabled in some clusters. Ignore task for that cluster. +// Some clusters may be already have full cache. Ignore task, it will be fetched and hydrated again later. +// Some clusters may already acknowledged such task. Ignore task, it is no longer relevant for such cluster. +func (m *TaskStore) Put(task *types.ReplicationTask) { + // Do not store nil tasks + if task == nil { + return + } + + domain, err := m.getDomain(task) + if err != nil { + m.logger.Error("failed to resolve domain", tag.Error(err)) + return + } + + for cluster, cache := range m.clusters { + if domain != nil && !domain.HasReplicationCluster(cluster) { + continue + } + + scope := m.scope.Tagged(metrics.SourceClusterTag(cluster)) + + err := cache.Put(task) + switch err { + case errCacheFull: + scope.IncCounter(metrics.CacheFullCounter) + case errAlreadyAcked: + // No action, this is expected. + // Some cluster(s) may be already past this, due to different fetch rates. + } + + scope.UpdateGauge(metrics.CacheSize, float64(cache.Size())) + } +} + +// Ack will acknowledge replication message for a given cluster. +// This will result in all messages removed from the cache up to a given lastTaskID. +func (m *TaskStore) Ack(cluster string, lastTaskID int64) error { + cache, ok := m.clusters[cluster] + if !ok { + return ErrUnknownCluster + } + + cache.Ack(lastTaskID) + + scope := m.scope.Tagged(metrics.SourceClusterTag(cluster)) + scope.UpdateGauge(metrics.CacheSize, float64(cache.Size())) + + return nil +} + +func (m *TaskStore) getDomain(task *types.ReplicationTask) (*cache.DomainCacheEntry, error) { + if domainID := task.GetHistoryTaskV2Attributes().GetDomainID(); domainID != "" { + return m.domains.GetDomainByID(domainID) + } + if domainID := task.GetSyncActivityTaskAttributes().GetDomainID(); domainID != "" { + return m.domains.GetDomainByID(domainID) + } + if domainID := task.GetFailoverMarkerAttributes().GetDomainID(); domainID != "" { + return m.domains.GetDomainByID(domainID) + } + + return nil, nil +} diff --git a/service/history/replication/task_store_test.go b/service/history/replication/task_store_test.go new file mode 100644 index 00000000000..21a6e174c48 --- /dev/null +++ b/service/history/replication/task_store_test.go @@ -0,0 +1,168 @@ +// The MIT License (MIT) +// +// Copyright (c) 2022 Uber Technologies Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package replication + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/uber/cadence/common/cache" + "github.com/uber/cadence/common/cluster" + "github.com/uber/cadence/common/config" + "github.com/uber/cadence/common/dynamicconfig" + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/types" + hconfig "github.com/uber/cadence/service/history/config" +) + +func TestTaskStore(t *testing.T) { + ctx := context.Background() + + t.Run("Get error on unknown cluster", func(t *testing.T) { + ts := createTestTaskStore(nil, nil) + _, err := ts.Get(ctx, "unknown cluster", testTask11) + assert.Equal(t, ErrUnknownCluster, err) + }) + + t.Run("Get error resolving domain", func(t *testing.T) { + ts := createTestTaskStore(fakeDomainCache{}, nil) + _, err := ts.Get(ctx, testClusterA, testTask11) + assert.EqualError(t, err, "resolving domain: domain does not exist") + }) + + t.Run("Get skips task for domains non belonging to polling cluster", func(t *testing.T) { + ts := createTestTaskStore(fakeDomainCache{testDomainID: testDomain}, nil) + task, err := ts.Get(ctx, testClusterB, testTask11) + assert.NoError(t, err) + assert.Nil(t, task) + }) + + t.Run("Get returns cached replication task", func(t *testing.T) { + ts := createTestTaskStore(fakeDomainCache{testDomainID: testDomain}, nil) + ts.Put(&testHydratedTask11) + task, err := ts.Get(ctx, testClusterA, testTask11) + assert.NoError(t, err) + assert.Equal(t, &testHydratedTask11, task) + }) + + t.Run("Get returns non-cached replication task by hydrating it", func(t *testing.T) { + ts := createTestTaskStore(fakeDomainCache{testDomainID: testDomain}, fakeTaskHydrator{testTask11.TaskID: testHydratedTask11}) + task, err := ts.Get(ctx, testClusterA, testTask11) + assert.NoError(t, err) + assert.Equal(t, &testHydratedTask11, task) + }) + + t.Run("Get fails to hydrate replication task", func(t *testing.T) { + ts := createTestTaskStore(fakeDomainCache{testDomainID: testDomain}, fakeTaskHydrator{testTask11.TaskID: testHydratedTaskErrorNonRecoverable}) + task, err := ts.Get(ctx, testClusterA, testTask11) + assert.EqualError(t, err, "error hydrating task") + assert.Nil(t, task) + }) + + t.Run("Put does not store nil task", func(t *testing.T) { + ts := createTestTaskStore(nil, nil) + ts.Put(nil) + for _, cache := range ts.clusters { + assert.Zero(t, cache.Size()) + } + }) + + t.Run("Put error resolving domain - does not store task", func(t *testing.T) { + ts := createTestTaskStore(fakeDomainCache{}, nil) + ts.Put(&testHydratedTask11) + ts.Put(&testHydratedTask12) + ts.Put(&testHydratedTask14) + for _, cache := range ts.clusters { + assert.Zero(t, cache.Size()) + } + }) + + t.Run("Put hydrated task into appropriate cache", func(t *testing.T) { + ts := createTestTaskStore(fakeDomainCache{testDomainID: testDomain}, nil) + ts.Put(&testHydratedTask11) + for _, cluster := range testDomain.GetReplicationConfig().Clusters { + assert.Equal(t, 1, ts.clusters[cluster.ClusterName].Size()) + } + }) + + t.Run("Put hydrated task without domain info - will put it to all caches", func(t *testing.T) { + ts := createTestTaskStore(fakeDomainCache{testDomainID: testDomain}, nil) + ts.Put(&types.ReplicationTask{SourceTaskID: 123}) + for _, cluster := range testDomain.GetReplicationConfig().Clusters { + assert.Equal(t, 1, ts.clusters[cluster.ClusterName].Size()) + } + }) + + t.Run("Put full cache error", func(t *testing.T) { + ts := createTestTaskStore(fakeDomainCache{testDomainID: testDomain}, nil) + ts.Put(&testHydratedTask11) + ts.Put(&testHydratedTask12) + ts.Put(&testHydratedTask14) + for _, cluster := range testDomain.GetReplicationConfig().Clusters { + assert.Equal(t, 2, ts.clusters[cluster.ClusterName].Size()) + } + }) + + t.Run("Put will not store acked task", func(t *testing.T) { + ts := createTestTaskStore(fakeDomainCache{testDomainID: testDomain}, nil) + for _, cluster := range testDomain.GetReplicationConfig().Clusters { + ts.Ack(cluster.ClusterName, testHydratedTask11.SourceTaskID) + ts.Put(&testHydratedTask11) + assert.Equal(t, 0, ts.clusters[cluster.ClusterName].Size()) + } + }) + + t.Run("Ack error on unknown cluster", func(t *testing.T) { + ts := createTestTaskStore(nil, nil) + err := ts.Ack("unknown cluster", 0) + assert.Equal(t, ErrUnknownCluster, err) + }) +} + +func createTestTaskStore(domains domainCache, hydrator taskHydrator) *TaskStore { + cfg := hconfig.Config{ + ReplicatorCacheCapacity: dynamicconfig.GetIntPropertyFn(2), + ReplicationTaskGenerationQPS: dynamicconfig.GetFloatPropertyFn(0), + ReplicatorReadTaskMaxRetryCount: dynamicconfig.GetIntPropertyFn(1), + } + + clusterMetadata := cluster.NewMetadata(0, testClusterC, testClusterC, map[string]config.ClusterInformation{ + testClusterA: {Enabled: true}, + testClusterB: {Enabled: true}, + testClusterC: {Enabled: true}, + }) + + return NewTaskStore(1, &cfg, clusterMetadata, domains, metrics.NewNoopMetricsClient(), log.NewNoop(), hydrator) +} + +type fakeDomainCache map[string]*cache.DomainCacheEntry + +func (cache fakeDomainCache) GetDomainByID(id string) (*cache.DomainCacheEntry, error) { + if entry, ok := cache[id]; ok { + return entry, nil + } + return nil, types.EntityNotExistsError{Message: "domain does not exist"} +}