Skip to content

Commit

Permalink
Replication cache for sharing hydrated messages (cadence-workflow#4952)
Browse files Browse the repository at this point in the history
* Replication cache for sharing hydrated messages

* Export ErrUnknownCluster

* Tag shardID for TaskStore

Co-authored-by: David Porter <[email protected]>
  • Loading branch information
vytautas-karpavicius and davidporter-id-au authored Aug 24, 2022
1 parent e597b87 commit 1be9b6d
Show file tree
Hide file tree
Showing 11 changed files with 720 additions and 85 deletions.
11 changes: 11 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,12 @@ const (
// Default value: 3
// Allowed filters: N/A
ReplicatorReadTaskMaxRetryCount
// ReplicatorCacheCapacity is the capacity of replication cache in number of tasks
// KeyName: history.replicatorCacheCapacity
// Value type: Int
// Default value: 10000
// Allowed filters: N/A
ReplicatorCacheCapacity

// ExecutionMgrNumConns is persistence connections number for ExecutionManager
// KeyName: history.executionMgrNumConns
Expand Down Expand Up @@ -3066,6 +3072,11 @@ var IntKeys = map[IntKey]DynamicInt{
Description: "ReplicatorReadTaskMaxRetryCount is the number of read replication task retry time",
DefaultValue: 3,
},
ReplicatorCacheCapacity: DynamicInt{
KeyName: "history.replicatorCacheCapacity",
Description: "ReplicatorCacheCapacity is the capacity of replication cache in number of tasks",
DefaultValue: 10000,
},
ExecutionMgrNumConns: DynamicInt{
KeyName: "history.executionMgrNumConns",
Description: "ExecutionMgrNumConns is persistence connections number for ExecutionManager",
Expand Down
1 change: 1 addition & 0 deletions common/log/tag/values.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ var (
ComponentReplicator = component("replicator")
ComponentReplicationTaskProcessor = component("replication-task-processor")
ComponentReplicationAckManager = component("replication-ack-manager")
ComponentReplicationCacheManager = component("replication-cache-manager")
ComponentHistoryReplicator = component("history-replicator")
ComponentHistoryResender = component("history-resender")
ComponentIndexer = component("indexer")
Expand Down
9 changes: 9 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1089,6 +1089,8 @@ const (
HistoryEventNotificationScope
// ReplicatorQueueProcessorScope is the scope used by all metric emitted by replicator queue processor
ReplicatorQueueProcessorScope
// ReplicatorCacheManagerScope is the scope used by all metric emitted by replicator cache manager
ReplicatorCacheManagerScope
// ReplicatorTaskHistoryScope is the scope used for history task processing by replicator queue processor
ReplicatorTaskHistoryScope
// ReplicatorTaskSyncActivityScope is the scope used for sync activity by replicator queue processor
Expand Down Expand Up @@ -1705,6 +1707,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
CrossClusterTargetTaskApplyParentClosePolicyScope: {operation: "CrossClusterTargetTaskTypeApplyParentClosePolicy"},
HistoryEventNotificationScope: {operation: "HistoryEventNotification"},
ReplicatorQueueProcessorScope: {operation: "ReplicatorQueueProcessor"},
ReplicatorCacheManagerScope: {operation: "ReplicatorCacheManager"},
ReplicatorTaskHistoryScope: {operation: "ReplicatorTaskHistory"},
ReplicatorTaskSyncActivityScope: {operation: "ReplicatorTaskSyncActivity"},
ReplicateHistoryEventsScope: {operation: "ReplicateHistoryEvents"},
Expand Down Expand Up @@ -2082,10 +2085,13 @@ const (
UnbufferReplicationTaskTimer
HistoryConflictsCounter
CompleteTaskFailedCounter
CacheSize
CacheRequests
CacheFailures
CacheLatency
CacheHitCounter
CacheMissCounter
CacheFullCounter
AcquireLockFailedCounter
WorkflowContextCleared
MutableStateSize
Expand Down Expand Up @@ -2633,10 +2639,13 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
UnbufferReplicationTaskTimer: {metricName: "unbuffer_replication_tasks", metricType: Timer},
HistoryConflictsCounter: {metricName: "history_conflicts", metricType: Counter},
CompleteTaskFailedCounter: {metricName: "complete_task_fail_count", metricType: Counter},
CacheSize: {metricName: "cache_size", metricType: Gauge},
CacheRequests: {metricName: "cache_requests", metricType: Counter},
CacheFailures: {metricName: "cache_errors", metricType: Counter},
CacheLatency: {metricName: "cache_latency", metricType: Timer},
CacheHitCounter: {metricName: "cache_hit", metricType: Counter},
CacheMissCounter: {metricName: "cache_miss", metricType: Counter},
CacheFullCounter: {metricName: "cache_full", metricType: Counter},
AcquireLockFailedCounter: {metricName: "acquire_lock_failed", metricType: Counter},
WorkflowContextCleared: {metricName: "workflow_context_cleared", metricType: Counter},
MutableStateSize: {metricName: "mutable_state_size", metricType: Timer},
Expand Down
2 changes: 2 additions & 0 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ type Config struct {
ReplicatorReadTaskMaxRetryCount dynamicconfig.IntPropertyFn
ReplicatorProcessorFetchTasksBatchSize dynamicconfig.IntPropertyFnWithShardIDFilter
ReplicatorUpperLatency dynamicconfig.DurationPropertyFn
ReplicatorCacheCapacity dynamicconfig.IntPropertyFn

// Persistence settings
ExecutionMgrNumConns dynamicconfig.IntPropertyFn
Expand Down Expand Up @@ -458,6 +459,7 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, storeType string, isA
ReplicatorReadTaskMaxRetryCount: dc.GetIntProperty(dynamicconfig.ReplicatorReadTaskMaxRetryCount),
ReplicatorProcessorFetchTasksBatchSize: dc.GetIntPropertyFilteredByShardID(dynamicconfig.ReplicatorTaskBatchSize),
ReplicatorUpperLatency: dc.GetDurationProperty(dynamicconfig.ReplicatorUpperLatency),
ReplicatorCacheCapacity: dc.GetIntProperty(dynamicconfig.ReplicatorCacheCapacity),

ExecutionMgrNumConns: dc.GetIntProperty(dynamicconfig.ExecutionMgrNumConns),
HistoryMgrNumConns: dc.GetIntProperty(dynamicconfig.HistoryMgrNumConns),
Expand Down
15 changes: 12 additions & 3 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ type (
crossClusterTaskProcessors common.Daemon
replicationTaskProcessors []replication.TaskProcessor
replicationAckManager replication.TaskAckManager
replicationTaskStore *replication.TaskStore
replicationHydrator replication.TaskHydrator
publicClient workflowserviceclient.Interface
eventsReapplier ndc.EventsReapplier
Expand Down Expand Up @@ -159,6 +160,15 @@ func NewEngineWithShardContext(
executionCache := execution.NewCache(shard)
failoverMarkerNotifier := failover.NewMarkerNotifier(shard, config, failoverCoordinator)
replicationHydrator := replication.NewDeferredTaskHydrator(shard.GetShardID(), historyV2Manager, executionCache, shard.GetDomainCache())
replicationTaskStore := replication.NewTaskStore(
shard.GetShardID(),
shard.GetConfig(),
shard.GetClusterMetadata(),
shard.GetDomainCache(),
shard.GetMetricsClient(),
shard.GetLogger(),
replicationHydrator,
)
historyEngImpl := &historyEngineImpl{
currentClusterName: currentClusterName,
shard: shard,
Expand Down Expand Up @@ -210,13 +220,12 @@ func NewEngineWithShardContext(
replicationAckManager: replication.NewTaskAckManager(
shard.GetShardID(),
shard,
shard.GetDomainCache(),
shard.GetMetricsClient(),
shard.GetLogger(),
shard.GetConfig(),
replication.NewDynamicTaskReader(shard.GetShardID(), executionManager, shard.GetTimeSource(), config),
replicationHydrator,
replicationTaskStore,
),
replicationTaskStore: replicationTaskStore,
}
historyEngImpl.decisionHandler = decision.NewHandler(
shard,
Expand Down
150 changes: 150 additions & 0 deletions service/history/replication/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// The MIT License (MIT)
//
// Copyright (c) 2022 Uber Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package replication

import (
"container/heap"
"errors"
"sync"

"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/types"
)

var (
errCacheFull = errors.New("cache is full")
errAlreadyAcked = errors.New("already acked")
)

// Cache is an in-memory implementation of a cache for storing hydrated replication messages.
// Messages can come out of order as long as their task ID is higher than last acknowledged message.
// Out of order is expected as different source clusters will share hydrated replication messages.
//
// Cache utilizes heap to keep replication messages in order. This is needed for efficient acknowledgements in O(log N).
//
// Cache capacity can be increased dynamically. Decrease will require a restart, as new tasks will not be accepted, but memory will not be reclaimed either.
//
// Cache methods are thread safe. It is expected to have writers and readers from different go routines.
type Cache struct {
mu sync.RWMutex

capacity dynamicconfig.IntPropertyFn

order int64Heap
cache map[int64]*types.ReplicationTask

lastAck int64
}

// NewCache create a new instance of replication cache
func NewCache(capacity dynamicconfig.IntPropertyFn) *Cache {
initialCapacity := capacity()
return &Cache{
capacity: capacity,
order: make(int64Heap, 0, initialCapacity),
cache: make(map[int64]*types.ReplicationTask, initialCapacity),
}
}

// Size returns current size of the cache
func (c *Cache) Size() int {
c.mu.RLock()
defer c.mu.RUnlock()

return len(c.order)
}

// Put stores replication task in the cache.
// - If cache is full, it will return errCacheFull
// - If given task has ID lower than previously acknowledged task, it will errOutOfOrder
func (c *Cache) Put(task *types.ReplicationTask) error {
c.mu.Lock()
defer c.mu.Unlock()

// Check for full cache
if len(c.order) >= c.capacity() {
return errCacheFull
}

taskID := task.SourceTaskID

// Reject task as it was already acknowledged
if c.lastAck >= taskID {
return errAlreadyAcked
}

// Do not add duplicate tasks
if _, exists := c.cache[taskID]; exists {
return nil
}

c.cache[taskID] = task
heap.Push(&c.order, taskID)

return nil
}

// Get will return a stored task having a given taskID.
// If task is not cache, nil is returned.
func (c *Cache) Get(taskID int64) *types.ReplicationTask {
c.mu.RLock()
defer c.mu.RUnlock()

return c.cache[taskID]
}

// Ack is used to acknowledge replication messages.
// Meaning they will be removed from the cache.
func (c *Cache) Ack(level int64) {
c.mu.Lock()
defer c.mu.Unlock()

for c.order.Len() > 0 && c.order.Peek() <= level {
taskID := heap.Pop(&c.order).(int64)
delete(c.cache, taskID)
}

c.lastAck = level
}

type int64Heap []int64

func (h int64Heap) Len() int { return len(h) }
func (h int64Heap) Less(i, j int) bool { return h[i] < h[j] }
func (h int64Heap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }

func (h *int64Heap) Push(x interface{}) {
*h = append(*h, x.(int64))
}

func (h *int64Heap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}

func (h *int64Heap) Peek() int64 {
return (*h)[0]
}
114 changes: 114 additions & 0 deletions service/history/replication/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// The MIT License (MIT)
//
// Copyright (c) 2022 Uber Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package replication

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/types"
)

func TestCache(t *testing.T) {
task1 := &types.ReplicationTask{SourceTaskID: 100}
task2 := &types.ReplicationTask{SourceTaskID: 200}
task3 := &types.ReplicationTask{SourceTaskID: 300}
task4 := &types.ReplicationTask{SourceTaskID: 400}

cache := NewCache(dynamicconfig.GetIntPropertyFn(3))

assert.Equal(t, 0, cache.Size())
require.NoError(t, cache.Put(task2))
assert.Equal(t, 1, cache.Size())
require.NoError(t, cache.Put(task2))
assert.Equal(t, 1, cache.Size())
require.NoError(t, cache.Put(task3))
assert.Equal(t, 2, cache.Size())
require.NoError(t, cache.Put(task1))
assert.Equal(t, 3, cache.Size())
assert.Equal(t, errCacheFull, cache.Put(task4))
assert.Equal(t, 3, cache.Size())

assert.Nil(t, cache.Get(0))
assert.Nil(t, cache.Get(99))
assert.Equal(t, task1, cache.Get(100))
assert.Nil(t, cache.Get(101))
assert.Equal(t, task2, cache.Get(200))
assert.Equal(t, task3, cache.Get(300))

cache.Ack(0)
assert.Equal(t, 3, cache.Size())
assert.Equal(t, task1, cache.Get(100))
assert.Equal(t, task2, cache.Get(200))
assert.Equal(t, task3, cache.Get(300))

cache.Ack(1)
assert.Equal(t, 3, cache.Size())
assert.Equal(t, task1, cache.Get(100))
assert.Equal(t, task2, cache.Get(200))
assert.Equal(t, task3, cache.Get(300))

cache.Ack(99)
assert.Equal(t, 3, cache.Size())
assert.Equal(t, task1, cache.Get(100))
assert.Equal(t, task2, cache.Get(200))
assert.Equal(t, task3, cache.Get(300))

cache.Ack(100)
assert.Equal(t, 2, cache.Size())
assert.Nil(t, cache.Get(100))
assert.Equal(t, task2, cache.Get(200))
assert.Equal(t, task3, cache.Get(300))

assert.Equal(t, errAlreadyAcked, cache.Put(task1))

cache.Ack(301)
assert.Equal(t, 0, cache.Size())
assert.Nil(t, cache.Get(100))
assert.Nil(t, cache.Get(200))
assert.Nil(t, cache.Get(300))

require.NoError(t, cache.Put(task4))
assert.Equal(t, 1, cache.Size())
assert.Nil(t, cache.Get(100))
assert.Nil(t, cache.Get(200))
assert.Nil(t, cache.Get(300))
assert.Equal(t, task4, cache.Get(400))
}

func BenchmarkCache(b *testing.B) {
cache := NewCache(dynamicconfig.GetIntPropertyFn(10000))
for i := 0; i < 5000; i++ {
cache.Put(&types.ReplicationTask{SourceTaskID: int64(i * 100)})
}

for n := 0; n < b.N; n++ {
readLevel := int64((n) * 100)
cache.Get(readLevel)
cache.Put(&types.ReplicationTask{SourceTaskID: int64(n * 100)})
cache.Ack(readLevel)
}
}
Loading

0 comments on commit 1be9b6d

Please sign in to comment.