diff --git a/common/collection/concurrentPriorityQueue.go b/common/collection/concurrentPriorityQueue.go new file mode 100644 index 00000000000..9708a2e2061 --- /dev/null +++ b/common/collection/concurrentPriorityQueue.go @@ -0,0 +1,79 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package collection + +import ( + "sync" +) + +type ( + concurrentPriorityQueueImpl struct { + lock sync.Mutex + priorityQueue Queue + } +) + +// NewConcurrentPriorityQueue create a new concurrent priority queue +func NewConcurrentPriorityQueue(compareLess func(this interface{}, other interface{}) bool) Queue { + return &concurrentPriorityQueueImpl{ + priorityQueue: NewPriorityQueue(compareLess), + } +} + +// Peek returns the top item of the priority queue +func (pq *concurrentPriorityQueueImpl) Peek() interface{} { + pq.lock.Lock() + defer pq.lock.Unlock() + + return pq.priorityQueue.Peek() +} + +// Add push an item to priority queue +func (pq *concurrentPriorityQueueImpl) Add(item interface{}) { + pq.lock.Lock() + defer pq.lock.Unlock() + + pq.priorityQueue.Add(item) +} + +// Remove pop an item from priority queue +func (pq *concurrentPriorityQueueImpl) Remove() interface{} { + pq.lock.Lock() + defer pq.lock.Unlock() + + return pq.priorityQueue.Remove() +} + +// IsEmpty indicate if the priority queue is empty +func (pq *concurrentPriorityQueueImpl) IsEmpty() bool { + pq.lock.Lock() + defer pq.lock.Unlock() + + return pq.priorityQueue.IsEmpty() +} + +// Len return the size of the queue +func (pq *concurrentPriorityQueueImpl) Len() int { + pq.lock.Lock() + defer pq.lock.Unlock() + + return pq.priorityQueue.Len() +} diff --git a/common/collection/concurrent_tx_map.go b/common/collection/concurrent_tx_map.go index c605e673698..8f4e00f4253 100644 --- a/common/collection/concurrent_tx_map.go +++ b/common/collection/concurrent_tx_map.go @@ -34,62 +34,6 @@ const ( ) type ( - // HashFunc represents a hash function for string - HashFunc func(interface{}) uint32 - - // ActionFunc take a key and value, do calulation and return err - ActionFunc func(key interface{}, value interface{}) error - // PredicateFunc take a key and value, do calulation and return boolean - PredicateFunc func(key interface{}, value interface{}) bool - - // ConcurrentTxMap is a generic interface for any - // implementation of a dictionary or a key value - // lookup table that is thread safe, and - ConcurrentTxMap interface { - // Get returns the value for the given key - Get(key interface{}) (interface{}, bool) - // Contains returns true if the key exist and false otherwise - Contains(key interface{}) bool - // Put records the mapping from given key to value - Put(key interface{}, value interface{}) - // PutIfNotExist records the key value mapping only - // if the mapping does not already exist - PutIfNotExist(key interface{}, value interface{}) bool - // Remove deletes the key from the map - Remove(key interface{}) - // GetAndDo returns the value corresponding to the key, and apply fn to key value before return value - // return (value, value exist or not, error when evaluation fn) - GetAndDo(key interface{}, fn ActionFunc) (interface{}, bool, error) - // PutOrDo put the key value in the map, if key does not exists, otherwise, call fn with existing key and value - // return (value, fn evaluated or not, error when evaluation fn) - PutOrDo(key interface{}, value interface{}, fn ActionFunc) (interface{}, bool, error) - // RemoveIf deletes the given key from the map if fn return true - // return whether the key is removed or not - RemoveIf(key interface{}, fn PredicateFunc) bool - // Iter returns an iterator to the map - Iter() MapIterator - // Size returns the number of items in the map - Size() int - } - - // MapIterator represents the interface for - // map iterators - MapIterator interface { - // Close closes the iterator - // and releases any allocated resources - Close() - // Entries returns a channel of MapEntry - // objects that can be used in a range loop - Entries() <-chan *MapEntry - } - - // MapEntry represents a key-value entry within the map - MapEntry struct { - // Key represents the key - Key interface{} - // Value represents the value - Value interface{} - } // ShardedConcurrentTxMap is an implementation of // ConcurrentMap that internally uses multiple @@ -289,8 +233,8 @@ func (cmap *ShardedConcurrentTxMap) Iter() MapIterator { return iterator } -// Size returns the number of items in the map -func (cmap *ShardedConcurrentTxMap) Size() int { +// Len returns the number of items in the map +func (cmap *ShardedConcurrentTxMap) Len() int { return int(atomic.LoadInt32(&cmap.size)) } diff --git a/common/collection/concurrent_tx_map_test.go b/common/collection/concurrent_tx_map_test.go index 2b95fb1c7b8..e636e40cdd1 100644 --- a/common/collection/concurrent_tx_map_test.go +++ b/common/collection/concurrent_tx_map_test.go @@ -49,28 +49,28 @@ func (s *ConcurrentTxMapSuite) SetupTest() { s.Assertions = require.New(s.T()) // Have to define our overridden assertions in the test setup. If we did it earlier, s.T() will return nil } -func (s *ConcurrentTxMapSuite) TestSize() { +func (s *ConcurrentTxMapSuite) TestLen() { testMap := NewShardedConcurrentTxMap(1, UUIDHashCode) key1 := "0001" testMap.Put(key1, boolType(true)) - s.Equal(1, testMap.Size(), "Wrong concurrent map size") + s.Equal(1, testMap.Len(), "Wrong concurrent map size") testMap.Put(key1, boolType(false)) - s.Equal(1, testMap.Size(), "Wrong concurrent map size") + s.Equal(1, testMap.Len(), "Wrong concurrent map size") key2 := "0002" testMap.Put(key2, boolType(false)) - s.Equal(2, testMap.Size(), "Wrong concurrent map size") + s.Equal(2, testMap.Len(), "Wrong concurrent map size") testMap.PutIfNotExist(key2, boolType(false)) - s.Equal(2, testMap.Size(), "Wrong concurrent map size") + s.Equal(2, testMap.Len(), "Wrong concurrent map size") testMap.Remove(key2) - s.Equal(1, testMap.Size(), "Wrong concurrent map size") + s.Equal(1, testMap.Len(), "Wrong concurrent map size") testMap.Remove(key2) - s.Equal(1, testMap.Size(), "Wrong concurrent map size") + s.Equal(1, testMap.Len(), "Wrong concurrent map size") } func (s *ConcurrentTxMapSuite) TestGetAndDo() { @@ -149,7 +149,7 @@ func (s *ConcurrentTxMapSuite) TestRemoveIf() { } return false }) - s.Equal(1, testMap.Size(), "TestRemoveIf should only entry if condition is met") + s.Equal(1, testMap.Len(), "TestRemoveIf should only entry if condition is met") s.False(removed, "TestRemoveIf should return false if key is not deleted") removed = testMap.RemoveIf(key, func(key interface{}, value interface{}) bool { @@ -159,7 +159,7 @@ func (s *ConcurrentTxMapSuite) TestRemoveIf() { } return false }) - s.Equal(0, testMap.Size(), "TestRemoveIf should only entry if condition is met") + s.Equal(0, testMap.Len(), "TestRemoveIf should only entry if condition is met") s.True(removed, "TestRemoveIf should return true if key is deleted") } @@ -181,7 +181,7 @@ func (s *ConcurrentTxMapSuite) TestGetAfterPut() { s.True(bool(boolValue), "Wrong value returned from map") } - s.Equal(len(countMap), testMap.Size(), "Size() returned wrong value") + s.Equal(len(countMap), testMap.Len(), "Size() returned wrong value") it := testMap.Iter() for entry := range it.Entries() { @@ -197,7 +197,7 @@ func (s *ConcurrentTxMapSuite) TestGetAfterPut() { testMap.Remove(k) } - s.Equal(0, testMap.Size(), "Map returned non-zero size after deleting all entries") + s.Equal(0, testMap.Len(), "Map returned non-zero size after deleting all entries") } func (s *ConcurrentTxMapSuite) TestPutIfNotExist() { @@ -244,7 +244,7 @@ func (s *ConcurrentTxMapSuite) TestMapConcurrency() { startWG.Done() doneWG.Wait() - s.Equal(nKeys, testMap.Size(), "Wrong concurrent map size") + s.Equal(nKeys, testMap.Len(), "Wrong concurrent map size") var gotTotal int32 for i := 0; i < nKeys; i++ { diff --git a/common/collection/defs.go b/common/collection/defs.go deleted file mode 100644 index d3d04c6cad3..00000000000 --- a/common/collection/defs.go +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright (c) 2017 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package collection - -const ( - // UUIDStringLength is the length of an UUID represented as a hex string - UUIDStringLength = 36 // xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx -) diff --git a/common/collection/initerface.go b/common/collection/initerface.go new file mode 100644 index 00000000000..beb50f73c1f --- /dev/null +++ b/common/collection/initerface.go @@ -0,0 +1,98 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package collection + +type ( + // Queue is the interface for queue + Queue interface { + // Peek returns the first item of the queue + Peek() interface{} + // Add push an item to the queue + Add(item interface{}) + // Remove pop an item from the queue + Remove() interface{} + // IsEmpty indicate if the queue is empty + IsEmpty() bool + // Len return the size of the queue + Len() int + } + + // HashFunc represents a hash function for string + HashFunc func(interface{}) uint32 + + // ActionFunc take a key and value, do calculation and return err + ActionFunc func(key interface{}, value interface{}) error + // PredicateFunc take a key and value, do calculation and return boolean + PredicateFunc func(key interface{}, value interface{}) bool + + // ConcurrentTxMap is a generic interface for any implementation of a dictionary + // or a key value lookup table that is thread safe, and providing functionality + // to modify key / value pair inside within a transaction + ConcurrentTxMap interface { + // Get returns the value for the given key + Get(key interface{}) (interface{}, bool) + // Contains returns true if the key exist and false otherwise + Contains(key interface{}) bool + // Put records the mapping from given key to value + Put(key interface{}, value interface{}) + // PutIfNotExist records the key value mapping only + // if the mapping does not already exist + PutIfNotExist(key interface{}, value interface{}) bool + // Remove deletes the key from the map + Remove(key interface{}) + // GetAndDo returns the value corresponding to the key, and apply fn to key value before return value + // return (value, value exist or not, error when evaluation fn) + GetAndDo(key interface{}, fn ActionFunc) (interface{}, bool, error) + // PutOrDo put the key value in the map, if key does not exists, otherwise, call fn with existing key and value + // return (value, fn evaluated or not, error when evaluation fn) + PutOrDo(key interface{}, value interface{}, fn ActionFunc) (interface{}, bool, error) + // RemoveIf deletes the given key from the map if fn return true + // return whether the key is removed or not + RemoveIf(key interface{}, fn PredicateFunc) bool + // Iter returns an iterator to the map + Iter() MapIterator + // Len returns the number of items in the map + Len() int + } + + // MapIterator represents the interface for map iterators + MapIterator interface { + // Close closes the iterator + // and releases any allocated resources + Close() + // Entries returns a channel of MapEntry + // objects that can be used in a range loop + Entries() <-chan *MapEntry + } + + // MapEntry represents a key-value entry within the map + MapEntry struct { + // Key represents the key + Key interface{} + // Value represents the value + Value interface{} + } +) + +const ( + // UUIDStringLength is the length of an UUID represented as a hex string + UUIDStringLength = 36 // xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx +) diff --git a/common/collection/priorityQueue.go b/common/collection/priorityQueue.go new file mode 100644 index 00000000000..e337c00797e --- /dev/null +++ b/common/collection/priorityQueue.go @@ -0,0 +1,91 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package collection + +import ( + "container/heap" +) + +type ( + priorityQueueImpl struct { + compareLess func(this interface{}, other interface{}) bool + items []interface{} + } +) + +// NewPriorityQueue create a new priority queue +func NewPriorityQueue(compareLess func(this interface{}, other interface{}) bool) Queue { + return &priorityQueueImpl{ + compareLess: compareLess, + } +} + +// Peek returns the top item of the priority queue +func (pq *priorityQueueImpl) Peek() interface{} { + if pq.IsEmpty() { + panic("Cannot peek item because priority queue is empty") + } + return pq.items[0] +} + +// Add push an item to priority queue +func (pq *priorityQueueImpl) Add(item interface{}) { + heap.Push(pq, item) +} + +// Remove pop an item from priority queue +func (pq *priorityQueueImpl) Remove() interface{} { + return heap.Pop(pq) +} + +// IsEmpty indicate if the priority queue is empty +func (pq *priorityQueueImpl) IsEmpty() bool { + return pq.Len() == 0 +} + +// below are the functions used by heap.Interface and go internal heap implementation + +// Len implements sort.Interface +func (pq *priorityQueueImpl) Len() int { + return len(pq.items) +} + +// Less implements sort.Interface +func (pq *priorityQueueImpl) Less(i, j int) bool { + return pq.compareLess(pq.items[i], pq.items[j]) +} + +// Swap implements sort.Interface +func (pq *priorityQueueImpl) Swap(i, j int) { + pq.items[i], pq.items[j] = pq.items[j], pq.items[i] +} + +// Push push an item to priority queue, used by go internal heap implementation +func (pq *priorityQueueImpl) Push(item interface{}) { + pq.items = append(pq.items, item) +} + +// Pop pop an item from priority queue, used by go internal heap implementation +func (pq *priorityQueueImpl) Pop() interface{} { + pqItem := pq.items[pq.Len()-1] + pq.items = pq.items[0 : pq.Len()-1] + return pqItem +} diff --git a/common/collection/priorityQueue_test.go b/common/collection/priorityQueue_test.go new file mode 100644 index 00000000000..9fcfbeed5d9 --- /dev/null +++ b/common/collection/priorityQueue_test.go @@ -0,0 +1,109 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package collection + +import ( + "math/rand" + "sort" + "testing" + + "github.com/stretchr/testify/suite" +) + +type ( + PriorityQueueSuite struct { + suite.Suite + pq Queue + } + + testPriorityQueueItem struct { + value int + } +) + +func testPriorityQueueItemCompareLess(this interface{}, that interface{}) bool { + return this.(*testPriorityQueueItem).value < that.(*testPriorityQueueItem).value +} + +func TestPriorityQueueSuite(t *testing.T) { + suite.Run(t, new(PriorityQueueSuite)) +} + +func (s *PriorityQueueSuite) SetupTest() { + s.pq = NewPriorityQueue(testPriorityQueueItemCompareLess) +} + +func (s *PriorityQueueSuite) TestInsertAndPop() { + s.pq.Add(&testPriorityQueueItem{10}) + s.pq.Add(&testPriorityQueueItem{3}) + s.pq.Add(&testPriorityQueueItem{5}) + s.pq.Add(&testPriorityQueueItem{4}) + s.pq.Add(&testPriorityQueueItem{1}) + s.pq.Add(&testPriorityQueueItem{16}) + s.pq.Add(&testPriorityQueueItem{-10}) + + expected := []int{-10, 1, 3, 4, 5, 10, 16} + result := []int{} + + for !s.pq.IsEmpty() { + result = append(result, s.pq.Remove().(*testPriorityQueueItem).value) + } + s.Equal(expected, result) + + s.pq.Add(&testPriorityQueueItem{1000}) + s.pq.Add(&testPriorityQueueItem{1233}) + s.pq.Remove() // remove 1000 + s.pq.Add(&testPriorityQueueItem{4}) + s.pq.Add(&testPriorityQueueItem{18}) + s.pq.Add(&testPriorityQueueItem{192}) + s.pq.Add(&testPriorityQueueItem{255}) + s.pq.Remove() // remove 4 + s.pq.Remove() // remove 18 + s.pq.Add(&testPriorityQueueItem{59}) + s.pq.Add(&testPriorityQueueItem{727}) + + expected = []int{59, 192, 255, 727, 1233} + result = []int{} + + for !s.pq.IsEmpty() { + result = append(result, s.pq.Remove().(*testPriorityQueueItem).value) + } + s.Equal(expected, result) +} + +func (s *PriorityQueueSuite) TestRandomNumber() { + for round := 0; round < 1000; round++ { + + expected := []int{} + result := []int{} + for i := 0; i < 1000; i++ { + num := rand.Int() + s.pq.Add(&testPriorityQueueItem{num}) + expected = append(expected, num) + } + sort.Ints(expected) + + for !s.pq.IsEmpty() { + result = append(result, s.pq.Remove().(*testPriorityQueueItem).value) + } + s.Equal(expected, result) + } +} diff --git a/common/metrics/defs.go b/common/metrics/defs.go index ed2768fc659..b413f77b213 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -437,6 +437,9 @@ const ( // ElasticsearchCountWorkflowExecutionsScope tracks CountWorkflowExecutions calls made by service to persistence layer ElasticsearchCountWorkflowExecutionsScope + // SequentialTaskProcessingScope is used by sequential task processing logic + SequentialTaskProcessingScope + NumCommonScopes ) @@ -939,6 +942,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ ElasticsearchListWorkflowExecutionsScope: {operation: "ListWorkflowExecutions"}, ElasticsearchScanWorkflowExecutionsScope: {operation: "ScanWorkflowExecutions"}, ElasticsearchCountWorkflowExecutionsScope: {operation: "CountWorkflowExecutions"}, + SequentialTaskProcessingScope: {operation: "SequentialTaskProcessing"}, }, // Frontend Scope Names Frontend: { @@ -1161,6 +1165,14 @@ const ( ElasticsearchErrBadRequestCounter ElasticsearchErrBusyCounter + SequentialTaskSubmitRequest + SequentialTaskSubmitRequestTaskQueueExist + SequentialTaskSubmitRequestTaskQueueMissing + SequentialTaskSubmitLatency + SequentialTaskQueueSize + SequentialTaskQueueProcessingLatency + SequentialTaskTaskProcessingLatency + NumCommonMetrics // Needs to be last on this list for iota numbering ) @@ -1400,6 +1412,13 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{ ElasticsearchLatency: {metricName: "elasticsearch_latency", metricType: Timer}, ElasticsearchErrBadRequestCounter: {metricName: "elasticsearch_errors_bad_request", metricType: Counter}, ElasticsearchErrBusyCounter: {metricName: "elasticsearch_errors_busy", metricType: Counter}, + SequentialTaskSubmitRequest: {metricName: "sequentialtask_submit_request", metricType: Counter}, + SequentialTaskSubmitRequestTaskQueueExist: {metricName: "sequentialtask_submit_request_taskqueue_exist", metricType: Counter}, + SequentialTaskSubmitRequestTaskQueueMissing: {metricName: "sequentialtask_submit_request_taskqueue_missing", metricType: Counter}, + SequentialTaskSubmitLatency: {metricName: "sequentialtask_submit_latency", metricType: Timer}, + SequentialTaskQueueSize: {metricName: "sequentialtask_queue_size", metricType: Timer}, + SequentialTaskQueueProcessingLatency: {metricName: "sequentialtask_queue_processing_latency", metricType: Timer}, + SequentialTaskTaskProcessingLatency: {metricName: "sequentialtask_task_processing_latency", metricType: Timer}, }, Frontend: {}, History: { diff --git a/common/service/dynamicconfig/constants.go b/common/service/dynamicconfig/constants.go index 89d8f5b72cb..78152baa882 100644 --- a/common/service/dynamicconfig/constants.go +++ b/common/service/dynamicconfig/constants.go @@ -172,8 +172,10 @@ var keys = map[Key]string{ WorkerReplicatorMetaTaskConcurrency: "worker.replicatorMetaTaskConcurrency", WorkerReplicatorTaskConcurrency: "worker.replicatorTaskConcurrency", WorkerReplicatorMessageConcurrency: "worker.replicatorMessageConcurrency", + WorkerReplicatorActivityBufferRetryCount: "worker.replicatorActivityBufferRetryCount", WorkerReplicatorHistoryBufferRetryCount: "worker.replicatorHistoryBufferRetryCount", - WorkerReplicationTaskMaxRetry: "worker.replicationTaskMaxRetry", + WorkerReplicationTaskMaxRetryCount: "worker.replicationTaskMaxRetryCount", + WorkerReplicationTaskMaxRetryDuration: "worker.replicationTaskMaxRetryDuration", WorkerIndexerConcurrency: "worker.indexerConcurrency", WorkerESProcessorNumOfWorkers: "worker.ESProcessorNumOfWorkers", WorkerESProcessorBulkActions: "worker.ESProcessorBulkActions", @@ -450,10 +452,14 @@ const ( WorkerReplicatorTaskConcurrency // WorkerReplicatorMessageConcurrency is the max concurrent tasks provided by messaging client WorkerReplicatorMessageConcurrency + // WorkerReplicatorActivityBufferRetryCount is the retry attempt when encounter retry error on activity + WorkerReplicatorActivityBufferRetryCount // WorkerReplicatorHistoryBufferRetryCount is the retry attempt when encounter retry error on history WorkerReplicatorHistoryBufferRetryCount - // WorkerReplicationTaskMaxRetry is the max retry for any task - WorkerReplicationTaskMaxRetry + // WorkerReplicationTaskMaxRetryCount is the max retry count for any task + WorkerReplicationTaskMaxRetryCount + // WorkerReplicationTaskMaxRetryDuration is the max retry duration for any task + WorkerReplicationTaskMaxRetryDuration // WorkerIndexerConcurrency is the max concurrent messages to be processed at any given time WorkerIndexerConcurrency // WorkerESProcessorNumOfWorkers is num of workers for esProcessor diff --git a/common/task/interface.go b/common/task/interface.go index d0250147ef3..c67914957e0 100644 --- a/common/task/interface.go +++ b/common/task/interface.go @@ -25,16 +25,15 @@ import ( ) type ( - // SequentialTaskProcessor is the generic goroutines interface + // SequentialTaskProcessor is the generic coroutine pool interface // which process sequential task - // for the definition of sequential task, see SequentialTask SequentialTaskProcessor interface { common.Daemon Submit(task SequentialTask) error } - // Task is the generic task representation - Task interface { + // SequentialTask is the interface for tasks which should be executed sequentially + SequentialTask interface { // Execute process this task Execute() error // HandleErr handle the error returned by Execute @@ -47,19 +46,22 @@ type ( Nack() } - // SequentialTaskPartitionID is the interface representing the ID of SequentialTask - SequentialTaskPartitionID interface { - PartitionID() interface{} // MUST be go primitive type or struct with primitive types - HashCode() uint32 - } + // SequentialTaskQueueFactory is the function which generate a new SequentialTaskQueue + // for a give SequentialTask + SequentialTaskQueueFactory func(task SequentialTask) SequentialTaskQueue - // SequentialTask is the interface for tasks which should be executed sequentially - // one common example is the workflow replication task (containing workflow history), - // which must be executed one by one, in the order of the first event ID) - SequentialTask interface { - Task - SequentialTaskPartitionID - // TaskID return the ID of the task, this task ID is used for sorting - TaskID() int64 + // SequentialTaskQueue is the generic task queue interface which group + // sequential tasks to be executed one by one + SequentialTaskQueue interface { + // QueueID return the ID of the queue, as well as the tasks inside (same) + QueueID() interface{} + // Offer push an task to the task set + Add(task SequentialTask) + // Poll pop an task from the task set + Remove() SequentialTask + // IsEmpty indicate if the task set is empty + IsEmpty() bool + // Len return the size of the queue + Len() int } ) diff --git a/common/task/sequentialTaskProcessor.go b/common/task/sequentialTaskProcessor.go index ba70e21a99a..54b66526997 100644 --- a/common/task/sequentialTaskProcessor.go +++ b/common/task/sequentialTaskProcessor.go @@ -21,12 +21,14 @@ package task import ( - "sort" "sync" "sync/atomic" "time" + "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/collection" "github.com/uber/cadence/common/log" ) @@ -36,31 +38,32 @@ type ( shutdownChan chan struct{} waitGroup sync.WaitGroup - coroutineSize int - taskBatchSize int - coroutineTaskQueues []chan SequentialTask - logger log.Logger - } + coroutineSize int + taskqueues collection.ConcurrentTxMap + taskQueueFactory SequentialTaskQueueFactory + taskqueueChan chan SequentialTaskQueue - // SequentialTasks slice of SequentialTask - SequentialTasks []SequentialTask + metricsScope int + metricsClient metrics.Client + logger log.Logger + } ) // NewSequentialTaskProcessor create a new sequential tasks processor -func NewSequentialTaskProcessor(coroutineSize int, taskBatchSize int, logger log.Logger) SequentialTaskProcessor { - - coroutineTaskQueues := make([]chan SequentialTask, coroutineSize) - for i := 0; i < coroutineSize; i++ { - coroutineTaskQueues[i] = make(chan SequentialTask, taskBatchSize) - } +func NewSequentialTaskProcessor(coroutineSize int, taskQueueHashFn collection.HashFunc, taskQueueFactory SequentialTaskQueueFactory, + metricsClient metrics.Client, logger log.Logger) SequentialTaskProcessor { return &sequentialTaskProcessorImpl{ - status: common.DaemonStatusInitialized, - shutdownChan: make(chan struct{}), - coroutineSize: coroutineSize, - taskBatchSize: taskBatchSize, - coroutineTaskQueues: coroutineTaskQueues, - logger: logger, + status: common.DaemonStatusInitialized, + shutdownChan: make(chan struct{}), + coroutineSize: coroutineSize, + taskqueues: collection.NewShardedConcurrentTxMap(1024, taskQueueHashFn), + taskQueueFactory: taskQueueFactory, + taskqueueChan: make(chan SequentialTaskQueue, coroutineSize), + + metricsScope: metrics.SequentialTaskProcessingScope, + metricsClient: metricsClient, + logger: logger, } } @@ -71,8 +74,7 @@ func (t *sequentialTaskProcessorImpl) Start() { t.waitGroup.Add(t.coroutineSize) for i := 0; i < t.coroutineSize; i++ { - coroutineTaskQueue := t.coroutineTaskQueues[i] - go t.pollAndProcessTaskQueue(coroutineTaskQueue) + go t.pollAndProcessTaskQueue() } t.logger.Info("Task processor started.") } @@ -90,105 +92,98 @@ func (t *sequentialTaskProcessorImpl) Stop() { } func (t *sequentialTaskProcessorImpl) Submit(task SequentialTask) error { - hashCode := int(task.HashCode()) % t.coroutineSize - taskQueue := t.coroutineTaskQueues[hashCode] + + t.metricsClient.IncCounter(t.metricsScope, metrics.SequentialTaskSubmitRequest) + metricsTimer := t.metricsClient.StartTimer(t.metricsScope, metrics.SequentialTaskSubmitLatency) + defer metricsTimer.Stop() + + taskqueue := t.taskQueueFactory(task) + taskqueue.Add(task) + + _, fnEvaluated, err := t.taskqueues.PutOrDo( + taskqueue.QueueID(), + taskqueue, + func(key interface{}, value interface{}) error { + value.(SequentialTaskQueue).Add(task) + return nil + }, + ) + if err != nil { + return err + } + + // if function evaluated, meaning that the task set is + // already dispatched + if fnEvaluated { + t.metricsClient.IncCounter(t.metricsScope, metrics.SequentialTaskSubmitRequestTaskQueueExist) + return nil + } + // need to dispatch this task set + t.metricsClient.IncCounter(t.metricsScope, metrics.SequentialTaskSubmitRequestTaskQueueMissing) select { case <-t.shutdownChan: - case taskQueue <- task: + case t.taskqueueChan <- taskqueue: } return nil + } -func (t *sequentialTaskProcessorImpl) pollAndProcessTaskQueue(coroutineTaskQueue chan SequentialTask) { +func (t *sequentialTaskProcessorImpl) pollAndProcessTaskQueue() { defer t.waitGroup.Done() for { select { case <-t.shutdownChan: return - default: - t.batchPollTaskQueue(coroutineTaskQueue) - } - } -} - -func (t *sequentialTaskProcessorImpl) batchPollTaskQueue(coroutineTaskQueue chan SequentialTask) { - bufferedSequentialTasks := make(map[interface{}][]SequentialTask) - indexTasks := func(task SequentialTask) { - sequentialTasks, ok := bufferedSequentialTasks[task.PartitionID()] - if ok { - sequentialTasks = append(sequentialTasks, task) - bufferedSequentialTasks[task.PartitionID()] = sequentialTasks - } else { - bufferedSequentialTasks[task.PartitionID()] = []SequentialTask{task} - } - } - - select { - case <-t.shutdownChan: - return - case task := <-coroutineTaskQueue: - indexTasks(task) - BufferLoop: - for i := 0; i < t.taskBatchSize-1; i++ { - select { - case <-t.shutdownChan: - return - case task := <-coroutineTaskQueue: - indexTasks(task) - default: - // currently no more task - break BufferLoop - } + case taskqueue := <-t.taskqueueChan: + metricsTimer := t.metricsClient.StartTimer(t.metricsScope, metrics.SequentialTaskQueueProcessingLatency) + t.processTaskQueue(taskqueue) + metricsTimer.Stop() } } - - for _, sequentialTasks := range bufferedSequentialTasks { - t.batchProcessingSequentialTasks(sequentialTasks) - } } -func (t *sequentialTaskProcessorImpl) batchProcessingSequentialTasks(sequentialTasks []SequentialTask) { - sort.Sort(SequentialTasks(sequentialTasks)) - - for _, task := range sequentialTasks { - t.processTaskOnce(task) - } -} - -func (t *sequentialTaskProcessorImpl) processTaskOnce(task SequentialTask) { - var err error - -TaskProcessingLoop: +func (t *sequentialTaskProcessorImpl) processTaskQueue(taskqueue SequentialTaskQueue) { for { select { case <-t.shutdownChan: return default: - err = task.Execute() - err = task.HandleErr(err) - if err == nil || !task.RetryErr(err) { - break TaskProcessingLoop + queueSize := taskqueue.Len() + t.metricsClient.RecordTimer(t.metricsScope, metrics.SequentialTaskQueueSize, time.Duration(queueSize)) + if queueSize > 0 { + t.processTaskOnce(taskqueue) + } else { + deleted := t.taskqueues.RemoveIf(taskqueue.QueueID(), func(key interface{}, value interface{}) bool { + return value.(SequentialTaskQueue).IsEmpty() + }) + if deleted { + return + } + + // if deletion failed, meaning that task queue is offered with new task + // continue execution } } } - - if err != nil { - task.Nack() - return - } - task.Ack() } -func (tasks SequentialTasks) Len() int { - return len(tasks) -} +func (t *sequentialTaskProcessorImpl) processTaskOnce(taskqueue SequentialTaskQueue) { + metricsTimer := t.metricsClient.StartTimer(t.metricsScope, metrics.SequentialTaskTaskProcessingLatency) + defer metricsTimer.Stop() -func (tasks SequentialTasks) Swap(i, j int) { - tasks[i], tasks[j] = tasks[j], tasks[i] -} + task := taskqueue.Remove() + err := task.Execute() + err = task.HandleErr(err) -func (tasks SequentialTasks) Less(i, j int) bool { - return tasks[i].TaskID() < tasks[j].TaskID() + if err != nil { + if task.RetryErr(err) { + taskqueue.Add(task) + } else { + task.Nack() + } + } else { + task.Ack() + } } diff --git a/common/task/sequentialTaskProcessor_test.go b/common/task/sequentialTaskProcessor_test.go index 2828078daec..bbb64000be0 100644 --- a/common/task/sequentialTaskProcessor_test.go +++ b/common/task/sequentialTaskProcessor_test.go @@ -26,22 +26,29 @@ import ( "sync" "testing" + "github.com/uber-go/tally" + "github.com/uber/cadence/common/metrics" + "github.com/stretchr/testify/suite" + "github.com/uber/cadence/common/collection" "github.com/uber/cadence/common/log/loggerimpl" - "go.uber.org/zap" ) type ( SequentialTaskProcessorSuite struct { suite.Suite - coroutineSize int - processor SequentialTaskProcessor + processor SequentialTaskProcessor + } + + testSequentialTaskQueueImpl struct { + id uint32 + taskQueue collection.Queue } testSequentialTaskImpl struct { - waitgroup *sync.WaitGroup - partitionID uint32 - taskID int64 + waitgroup *sync.WaitGroup + queueID uint32 + taskID uint32 lock sync.Mutex acked int @@ -54,87 +61,110 @@ func TestSequentialTaskProcessorSuite(t *testing.T) { } func (s *SequentialTaskProcessorSuite) SetupTest() { - s.coroutineSize = 20 - zapLogger, err := zap.NewDevelopment() - s.Require().NoError(err) - logger := loggerimpl.NewLogger(zapLogger) + logger, err := loggerimpl.NewDevelopment() + s.Nil(err) s.processor = NewSequentialTaskProcessor( - s.coroutineSize, - 1000, + 20, + func(key interface{}) uint32 { + return key.(uint32) + }, + func(task SequentialTask) SequentialTaskQueue { + taskQueue := collection.NewConcurrentPriorityQueue(func(this interface{}, other interface{}) bool { + return this.(*testSequentialTaskImpl).taskID < other.(*testSequentialTaskImpl).taskID + }) + + return &testSequentialTaskQueueImpl{ + id: task.(*testSequentialTaskImpl).queueID, + taskQueue: taskQueue, + } + }, + metrics.NewClient(tally.NoopScope, metrics.Common), logger, ) } -func (s *SequentialTaskProcessorSuite) TestSubmit() { +func (s *SequentialTaskProcessorSuite) TestSubmit_NoPriorTask() { waitgroup := &sync.WaitGroup{} waitgroup.Add(1) - task := newTestSequentialTaskImpl(waitgroup, 4, int64(1)) + task := newTestSequentialTaskImpl(waitgroup, 4, uint32(1)) // do not start the processor s.Nil(s.processor.Submit(task)) - taskQueue := s.processor.(*sequentialTaskProcessorImpl).coroutineTaskQueues[int(task.HashCode())%s.coroutineSize] - tasks := []SequentialTask{} -Loop: - for { - select { - case task := <-taskQueue: - tasks = append(tasks, task) - default: - break Loop - } - } - s.Equal(1, len(tasks)) - s.Equal(task, tasks[0]) + sequentialTaskQueue := <-s.processor.(*sequentialTaskProcessorImpl).taskqueueChan + sequentialTask := sequentialTaskQueue.Remove() + s.True(sequentialTaskQueue.IsEmpty()) + s.Equal(task, sequentialTask) } -func (s *SequentialTaskProcessorSuite) TestPollAndProcessTaskQueue_ShutDown() { +func (s *SequentialTaskProcessorSuite) TestSubmit_HasPriorTask() { waitgroup := &sync.WaitGroup{} - waitgroup.Add(1) - task := newTestSequentialTaskImpl(waitgroup, 4, int64(1)) - taskQueue := make(chan SequentialTask, 1) - taskQueue <- task + task1 := newTestSequentialTaskImpl(waitgroup, 4, uint32(1)) + task2 := newTestSequentialTaskImpl(waitgroup, 4, uint32(2)) + + // do not start the processor + s.Nil(s.processor.Submit(task1)) + s.Nil(s.processor.Submit(task2)) + sequentialTaskQueue := <-s.processor.(*sequentialTaskProcessorImpl).taskqueueChan + sequentialTask1 := sequentialTaskQueue.Remove() + sequentialTask2 := sequentialTaskQueue.Remove() + s.True(sequentialTaskQueue.IsEmpty()) + s.Equal(task1, sequentialTask1) + s.Equal(task2, sequentialTask2) +} + +func (s *SequentialTaskProcessorSuite) TestProcessTaskQueue_ShutDown() { + waitgroup := &sync.WaitGroup{} + waitgroup.Add(2) + task1 := newTestSequentialTaskImpl(waitgroup, 4, uint32(1)) + task2 := newTestSequentialTaskImpl(waitgroup, 4, uint32(2)) + + // do not start the processor + s.Nil(s.processor.Submit(task1)) + s.Nil(s.processor.Submit(task2)) + sequentialTaskQueue := <-s.processor.(*sequentialTaskProcessorImpl).taskqueueChan s.processor.Start() s.processor.Stop() + s.processor.(*sequentialTaskProcessorImpl).processTaskQueue(sequentialTaskQueue) - s.processor.(*sequentialTaskProcessorImpl).waitGroup.Add(1) - s.processor.(*sequentialTaskProcessorImpl).pollAndProcessTaskQueue(taskQueue) - select { - case taskInQueue := <-taskQueue: - s.Equal(task, taskInQueue) - default: - s.Fail("there should be one task in task queue when task processing logic is shutdown") - } - s.Equal(0, task.NumAcked()) - s.Equal(0, task.NumNcked()) + s.Equal(0, task1.NumAcked()) + s.Equal(0, task1.NumNcked()) + s.Equal(0, task2.NumAcked()) + s.Equal(0, task2.NumNcked()) + s.Equal(1, s.processor.(*sequentialTaskProcessorImpl).taskqueues.Len()) + s.Equal(2, sequentialTaskQueue.Len()) } func (s *SequentialTaskProcessorSuite) TestProcessTaskQueue() { waitgroup := &sync.WaitGroup{} waitgroup.Add(2) - task1 := newTestSequentialTaskImpl(waitgroup, 4, int64(1)) - task2 := newTestSequentialTaskImpl(waitgroup, 4, int64(2)) + task1 := newTestSequentialTaskImpl(waitgroup, 4, uint32(1)) + task2 := newTestSequentialTaskImpl(waitgroup, 4, uint32(2)) - s.processor.Start() + // do not start the processor s.Nil(s.processor.Submit(task1)) s.Nil(s.processor.Submit(task2)) + sequentialTaskQueue := <-s.processor.(*sequentialTaskProcessorImpl).taskqueueChan + + s.processor.(*sequentialTaskProcessorImpl).processTaskQueue(sequentialTaskQueue) waitgroup.Wait() - s.processor.Stop() s.Equal(1, task1.NumAcked()) s.Equal(0, task1.NumNcked()) s.Equal(1, task2.NumAcked()) s.Equal(0, task2.NumNcked()) + s.Equal(0, s.processor.(*sequentialTaskProcessorImpl).taskqueues.Len()) + s.Equal(0, sequentialTaskQueue.Len()) } -func (s *SequentialTaskProcessorSuite) TestTaskProcessing_UniqueQueueID() { +func (s *SequentialTaskProcessorSuite) TestSequentialTaskProcessing() { numTasks := 100 waitgroup := &sync.WaitGroup{} waitgroup.Add(numTasks) tasks := []*testSequentialTaskImpl{} for i := 0; i < numTasks; i++ { - tasks = append(tasks, newTestSequentialTaskImpl(waitgroup, 4, int64(i))) + tasks = append(tasks, newTestSequentialTaskImpl(waitgroup, 4, uint32(i))) } s.processor.Start() @@ -148,9 +178,10 @@ func (s *SequentialTaskProcessorSuite) TestTaskProcessing_UniqueQueueID() { s.Equal(1, task.NumAcked()) s.Equal(0, task.NumNcked()) } + s.Equal(0, s.processor.(*sequentialTaskProcessorImpl).taskqueues.Len()) } -func (s *SequentialTaskProcessorSuite) TestTaskProcessing_RandomizedQueueID() { +func (s *SequentialTaskProcessorSuite) TestRandomizedTaskProcessing() { numQueues := 100 numTasks := 1000 waitgroup := &sync.WaitGroup{} @@ -161,7 +192,7 @@ func (s *SequentialTaskProcessorSuite) TestTaskProcessing_RandomizedQueueID() { tasks[i] = make([]*testSequentialTaskImpl, numTasks) for j := 0; j < numTasks; j++ { - tasks[i][j] = newTestSequentialTaskImpl(waitgroup, uint32(i), int64(j)) + tasks[i][j] = newTestSequentialTaskImpl(waitgroup, uint32(i), uint32(j)) } randomize(tasks[i]) @@ -189,6 +220,7 @@ func (s *SequentialTaskProcessorSuite) TestTaskProcessing_RandomizedQueueID() { s.Equal(0, task.NumNcked()) } } + s.Equal(0, s.processor.(*sequentialTaskProcessorImpl).taskqueues.Len()) } func randomize(array []*testSequentialTaskImpl) { @@ -198,11 +230,11 @@ func randomize(array []*testSequentialTaskImpl) { } } -func newTestSequentialTaskImpl(waitgroup *sync.WaitGroup, partitionID uint32, taskID int64) *testSequentialTaskImpl { +func newTestSequentialTaskImpl(waitgroup *sync.WaitGroup, queueID uint32, taskID uint32) *testSequentialTaskImpl { return &testSequentialTaskImpl{ - waitgroup: waitgroup, - partitionID: partitionID, - taskID: taskID, + waitgroup: waitgroup, + queueID: queueID, + taskID: taskID, } } @@ -252,14 +284,22 @@ func (t *testSequentialTaskImpl) NumNcked() int { return t.nacked } -func (t *testSequentialTaskImpl) PartitionID() interface{} { - return t.partitionID +func (t *testSequentialTaskQueueImpl) QueueID() interface{} { + return t.id +} + +func (t *testSequentialTaskQueueImpl) Add(task SequentialTask) { + t.taskQueue.Add(task) +} + +func (t *testSequentialTaskQueueImpl) Remove() SequentialTask { + return t.taskQueue.Remove().(SequentialTask) } -func (t *testSequentialTaskImpl) TaskID() int64 { - return t.taskID +func (t *testSequentialTaskQueueImpl) IsEmpty() bool { + return t.taskQueue.IsEmpty() } -func (t *testSequentialTaskImpl) HashCode() uint32 { - return t.partitionID +func (t *testSequentialTaskQueueImpl) Len() int { + return t.taskQueue.Len() } diff --git a/service/worker/indexer/esProcessor_test.go b/service/worker/indexer/esProcessor_test.go index e3415f12785..b824cb68ee7 100644 --- a/service/worker/indexer/esProcessor_test.go +++ b/service/worker/indexer/esProcessor_test.go @@ -23,6 +23,10 @@ package indexer import ( "encoding/json" "errors" + "sync" + "testing" + "time" + "github.com/olivere/elastic" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -36,9 +40,6 @@ import ( "github.com/uber/cadence/common/service/dynamicconfig" "github.com/uber/cadence/service/worker/indexer/mocks" "go.uber.org/zap" - "sync" - "testing" - "time" ) type esProcessorSuite struct { @@ -135,17 +136,17 @@ func (s *esProcessorSuite) TestAdd() { request := elastic.NewBulkIndexRequest() mockKafkaMsg := &msgMocks.Message{} key := "test-key" - s.Equal(0, s.esProcessor.mapToKafkaMsg.Size()) + s.Equal(0, s.esProcessor.mapToKafkaMsg.Len()) s.mockBulkProcessor.On("Add", request).Return().Once() s.esProcessor.Add(request, key, mockKafkaMsg) - s.Equal(1, s.esProcessor.mapToKafkaMsg.Size()) + s.Equal(1, s.esProcessor.mapToKafkaMsg.Len()) mockKafkaMsg.AssertExpectations(s.T()) // handle duplicate mockKafkaMsg.On("Ack").Return(nil).Once() s.esProcessor.Add(request, key, mockKafkaMsg) - s.Equal(1, s.esProcessor.mapToKafkaMsg.Size()) + s.Equal(1, s.esProcessor.mapToKafkaMsg.Len()) mockKafkaMsg.AssertExpectations(s.T()) } @@ -276,12 +277,12 @@ func (s *esProcessorSuite) TestAckKafkaMsg() { mockKafkaMsg := &msgMocks.Message{} s.mockBulkProcessor.On("Add", request).Return().Once() s.esProcessor.Add(request, key, mockKafkaMsg) - s.Equal(1, s.esProcessor.mapToKafkaMsg.Size()) + s.Equal(1, s.esProcessor.mapToKafkaMsg.Len()) mockKafkaMsg.On("Ack").Return(nil).Once() s.esProcessor.ackKafkaMsg(key) mockKafkaMsg.AssertExpectations(s.T()) - s.Equal(0, s.esProcessor.mapToKafkaMsg.Size()) + s.Equal(0, s.esProcessor.mapToKafkaMsg.Len()) } func (s *esProcessorSuite) TestNackKafkaMsg() { @@ -293,12 +294,12 @@ func (s *esProcessorSuite) TestNackKafkaMsg() { mockKafkaMsg := &msgMocks.Message{} s.mockBulkProcessor.On("Add", request).Return().Once() s.esProcessor.Add(request, key, mockKafkaMsg) - s.Equal(1, s.esProcessor.mapToKafkaMsg.Size()) + s.Equal(1, s.esProcessor.mapToKafkaMsg.Len()) mockKafkaMsg.On("Nack").Return(nil).Once() s.esProcessor.nackKafkaMsg(key) mockKafkaMsg.AssertExpectations(s.T()) - s.Equal(0, s.esProcessor.mapToKafkaMsg.Size()) + s.Equal(0, s.esProcessor.mapToKafkaMsg.Len()) } func (s *esProcessorSuite) TestHashFn() { diff --git a/service/worker/replicator/processor.go b/service/worker/replicator/processor.go index 8c0d51f90b0..4e94cceb7b2 100644 --- a/service/worker/replicator/processor.go +++ b/service/worker/replicator/processor.go @@ -26,6 +26,8 @@ import ( "sync/atomic" "time" + "github.com/uber/cadence/common/clock" + h "github.com/uber/cadence/.gen/go/history" "github.com/uber/cadence/.gen/go/replicator" "github.com/uber/cadence/.gen/go/shared" @@ -60,6 +62,7 @@ type ( historyRereplicator xdc.HistoryRereplicator historyClient history.Client msgEncoder codec.BinaryEncoder + timeSource clock.TimeSource sequentialTaskProcessor task.SequentialTaskProcessor } ) @@ -98,6 +101,7 @@ func newReplicationTaskProcessor(currentCluster, sourceCluster, consumer string, historyRereplicator: historyRereplicator, historyClient: retryableHistoryClient, msgEncoder: codec.NewThriftRWEncoder(), + timeSource: clock.NewRealTimeSource(), sequentialTaskProcessor: sequentialTaskProcessor, } } @@ -270,30 +274,37 @@ func (p *replicationTaskProcessor) decodeAndValidateMsg(msg messaging.Message, l return &replicationTask, nil } -func (p *replicationTaskProcessor) handleDomainReplicationTask(task *replicator.ReplicationTask, msg messaging.Message, logger log.Logger) error { +func (p *replicationTaskProcessor) handleDomainReplicationTask(task *replicator.ReplicationTask, msg messaging.Message, logger log.Logger) (retError error) { p.metricsClient.IncCounter(metrics.DomainReplicationTaskScope, metrics.ReplicatorMessages) sw := p.metricsClient.StartTimer(metrics.DomainReplicationTaskScope, metrics.ReplicatorLatency) defer sw.Stop() - logger.Debug("Received domain replication task") + defer func() { + if retError == nil { + p.ackMsg(msg, logger) + } + }() + err := p.domainReplicator.HandleReceivingTask(task.DomainTaskAttributes) if err != nil { return err } - p.ackMsg(msg, logger) return nil } -func (p *replicationTaskProcessor) handleSyncShardTask(task *replicator.ReplicationTask, msg messaging.Message, logger log.Logger) error { +func (p *replicationTaskProcessor) handleSyncShardTask(task *replicator.ReplicationTask, msg messaging.Message, logger log.Logger) (retError error) { p.metricsClient.IncCounter(metrics.SyncShardTaskScope, metrics.ReplicatorMessages) sw := p.metricsClient.StartTimer(metrics.SyncShardTaskScope, metrics.ReplicatorLatency) defer sw.Stop() - attr := task.SyncShardStatusTaskAttributes - logger.Debug("Received sync shard task.") + defer func() { + if retError == nil { + p.ackMsg(msg, logger) + } + }() + attr := task.SyncShardStatusTaskAttributes if time.Now().Sub(time.Unix(0, attr.GetTimestamp())) > dropSyncShardTaskTimeThreshold { - p.ackMsg(msg, logger) return nil } @@ -304,29 +315,24 @@ func (p *replicationTaskProcessor) handleSyncShardTask(task *replicator.Replicat } ctx, cancel := context.WithTimeout(context.Background(), replicationTimeout) defer cancel() - err := p.historyClient.SyncShardStatus(ctx, req) - if err != nil { - return err - } - p.ackMsg(msg, logger) - return nil + return p.historyClient.SyncShardStatus(ctx, req) } func (p *replicationTaskProcessor) handleActivityTask(task *replicator.ReplicationTask, msg messaging.Message, logger log.Logger) error { activityReplicationTask := newActivityReplicationTask(task, msg, logger, - p.config, p.historyClient, p.metricsClient, p.historyRereplicator) + p.config, p.timeSource, p.historyClient, p.metricsClient, p.historyRereplicator) return p.sequentialTaskProcessor.Submit(activityReplicationTask) } func (p *replicationTaskProcessor) handleHistoryReplicationTask(task *replicator.ReplicationTask, msg messaging.Message, logger log.Logger) error { historyReplicationTask := newHistoryReplicationTask(task, msg, p.sourceCluster, logger, - p.config, p.historyClient, p.metricsClient, p.historyRereplicator) + p.config, p.timeSource, p.historyClient, p.metricsClient, p.historyRereplicator) return p.sequentialTaskProcessor.Submit(historyReplicationTask) } func (p *replicationTaskProcessor) handleHistoryMetadataReplicationTask(task *replicator.ReplicationTask, msg messaging.Message, logger log.Logger) error { historyMetadataReplicationTask := newHistoryMetadataReplicationTask(task, msg, p.sourceCluster, logger, - p.config, p.historyClient, p.metricsClient, p.historyRereplicator) + p.config, p.timeSource, p.historyClient, p.metricsClient, p.historyRereplicator) return p.sequentialTaskProcessor.Submit(historyMetadataReplicationTask) } diff --git a/service/worker/replicator/replicationTask.go b/service/worker/replicator/replicationTask.go index 004e37346ac..65b980b9211 100644 --- a/service/worker/replicator/replicationTask.go +++ b/service/worker/replicator/replicationTask.go @@ -24,7 +24,8 @@ import ( "context" "time" - "github.com/dgryski/go-farm" + "github.com/uber/cadence/common/clock" + h "github.com/uber/cadence/.gen/go/history" "github.com/uber/cadence/.gen/go/replicator" "github.com/uber/cadence/.gen/go/shared" @@ -36,6 +37,7 @@ import ( "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/task" "github.com/uber/cadence/common/xdc" ) @@ -43,13 +45,14 @@ type ( workflowReplicationTask struct { metricsScope int startTime time.Time - partitionID definition.WorkflowIdentifier + queueID definition.WorkflowIdentifier taskID int64 attempt int kafkaMsg messaging.Message logger log.Logger config *Config + timeSource clock.TimeSource historyClient history.Client metricsClient metrics.Client historyRereplicator xdc.HistoryRereplicator @@ -74,12 +77,16 @@ type ( } ) +var _ task.SequentialTask = (*activityReplicationTask)(nil) +var _ task.SequentialTask = (*historyReplicationTask)(nil) +var _ task.SequentialTask = (*historyMetadataReplicationTask)(nil) + const ( replicationTaskRetryDelay = 500 * time.Microsecond ) func newActivityReplicationTask(task *replicator.ReplicationTask, msg messaging.Message, logger log.Logger, - config *Config, historyClient history.Client, metricsClient metrics.Client, + config *Config, timeSource clock.TimeSource, historyClient history.Client, metricsClient metrics.Client, historyRereplicator xdc.HistoryRereplicator) *activityReplicationTask { attr := task.SyncActicvityTaskAttributes @@ -92,8 +99,8 @@ func newActivityReplicationTask(task *replicator.ReplicationTask, msg messaging. return &activityReplicationTask{ workflowReplicationTask: workflowReplicationTask{ metricsScope: metrics.SyncActivityTaskScope, - startTime: time.Now(), - partitionID: definition.NewWorkflowIdentifier( + startTime: timeSource.Now(), + queueID: definition.NewWorkflowIdentifier( attr.GetDomainId(), attr.GetWorkflowId(), attr.GetRunId(), ), taskID: attr.GetScheduledId(), @@ -101,6 +108,7 @@ func newActivityReplicationTask(task *replicator.ReplicationTask, msg messaging. kafkaMsg: msg, logger: logger, config: config, + timeSource: timeSource, historyClient: historyClient, metricsClient: metricsClient, historyRereplicator: historyRereplicator, @@ -122,7 +130,7 @@ func newActivityReplicationTask(task *replicator.ReplicationTask, msg messaging. } func newHistoryReplicationTask(task *replicator.ReplicationTask, msg messaging.Message, sourceCluster string, logger log.Logger, - config *Config, historyClient history.Client, metricsClient metrics.Client, + config *Config, timeSource clock.TimeSource, historyClient history.Client, metricsClient metrics.Client, historyRereplicator xdc.HistoryRereplicator) *historyReplicationTask { attr := task.HistoryTaskAttributes @@ -135,8 +143,8 @@ func newHistoryReplicationTask(task *replicator.ReplicationTask, msg messaging.M return &historyReplicationTask{ workflowReplicationTask: workflowReplicationTask{ metricsScope: metrics.HistoryReplicationTaskScope, - startTime: time.Now(), - partitionID: definition.NewWorkflowIdentifier( + startTime: timeSource.Now(), + queueID: definition.NewWorkflowIdentifier( attr.GetDomainId(), attr.GetWorkflowId(), attr.GetRunId(), ), taskID: attr.GetFirstEventId(), @@ -144,6 +152,7 @@ func newHistoryReplicationTask(task *replicator.ReplicationTask, msg messaging.M kafkaMsg: msg, logger: logger, config: config, + timeSource: timeSource, historyClient: historyClient, metricsClient: metricsClient, historyRereplicator: historyRereplicator, @@ -170,7 +179,7 @@ func newHistoryReplicationTask(task *replicator.ReplicationTask, msg messaging.M } func newHistoryMetadataReplicationTask(task *replicator.ReplicationTask, msg messaging.Message, sourceCluster string, logger log.Logger, - config *Config, historyClient history.Client, metricsClient metrics.Client, + config *Config, timeSource clock.TimeSource, historyClient history.Client, metricsClient metrics.Client, historyRereplicator xdc.HistoryRereplicator) *historyMetadataReplicationTask { attr := task.HistoryMetadataTaskAttributes @@ -181,9 +190,9 @@ func newHistoryMetadataReplicationTask(task *replicator.ReplicationTask, msg mes tag.WorkflowNextEventID(attr.GetNextEventId())) return &historyMetadataReplicationTask{ workflowReplicationTask: workflowReplicationTask{ - metricsScope: metrics.HistoryReplicationTaskScope, - startTime: time.Now(), - partitionID: definition.NewWorkflowIdentifier( + metricsScope: metrics.HistoryMetadataReplicationTaskScope, + startTime: timeSource.Now(), + queueID: definition.NewWorkflowIdentifier( attr.GetDomainId(), attr.GetWorkflowId(), attr.GetRunId(), ), taskID: attr.GetFirstEventId(), @@ -191,6 +200,7 @@ func newHistoryMetadataReplicationTask(task *replicator.ReplicationTask, msg mes kafkaMsg: msg, logger: logger, config: config, + timeSource: timeSource, historyClient: historyClient, metricsClient: metricsClient, historyRereplicator: historyRereplicator, @@ -208,6 +218,10 @@ func (t *activityReplicationTask) Execute() error { } func (t *activityReplicationTask) HandleErr(err error) error { + if t.attempt < t.config.ReplicatorActivityBufferRetryCount() { + return err + } + retryErr, ok := t.convertRetryTaskError(err) if !ok || retryErr.GetRunId() == "" { return err @@ -220,10 +234,10 @@ func (t *activityReplicationTask) HandleErr(err error) error { // this is the retry error beginRunID := retryErr.GetRunId() beginEventID := retryErr.GetNextEventId() - endRunID := t.partitionID.RunID + endRunID := t.queueID.RunID endEventID := t.taskID + 1 // the next event ID should be at activity schedule ID + 1 resendErr := t.historyRereplicator.SendMultiWorkflowHistory( - t.partitionID.DomainID, t.partitionID.WorkflowID, + t.queueID.DomainID, t.queueID.WorkflowID, beginRunID, beginEventID, endRunID, endEventID, ) @@ -236,16 +250,6 @@ func (t *activityReplicationTask) HandleErr(err error) error { return t.Execute() } -func (t *activityReplicationTask) RetryErr(err error) bool { - t.attempt++ - - if t.attempt <= t.config.ReplicationTaskMaxRetry() && isTransientRetryableError(err) { - time.Sleep(replicationTaskRetryDelay) - return true - } - return false -} - func (t *historyReplicationTask) Execute() error { ctx, cancel := context.WithTimeout(context.Background(), replicationTimeout) defer cancel() @@ -253,6 +257,10 @@ func (t *historyReplicationTask) Execute() error { } func (t *historyReplicationTask) HandleErr(err error) error { + if t.attempt < t.config.ReplicatorHistoryBufferRetryCount() { + return err + } + retryErr, ok := t.convertRetryTaskError(err) if !ok || retryErr.GetRunId() == "" { return err @@ -265,10 +273,10 @@ func (t *historyReplicationTask) HandleErr(err error) error { // this is the retry error beginRunID := retryErr.GetRunId() beginEventID := retryErr.GetNextEventId() - endRunID := t.partitionID.RunID + endRunID := t.queueID.RunID endEventID := t.taskID resendErr := t.historyRereplicator.SendMultiWorkflowHistory( - t.partitionID.DomainID, t.partitionID.WorkflowID, + t.queueID.DomainID, t.queueID.WorkflowID, beginRunID, beginEventID, endRunID, endEventID, ) if resendErr != nil { @@ -280,25 +288,15 @@ func (t *historyReplicationTask) HandleErr(err error) error { return t.Execute() } -func (t *historyReplicationTask) RetryErr(err error) bool { - t.attempt++ - - if t.attempt <= t.config.ReplicationTaskMaxRetry() && isTransientRetryableError(err) { - time.Sleep(replicationTaskRetryDelay) - return true - } - return false -} - func (t *historyMetadataReplicationTask) Execute() error { t.metricsClient.IncCounter(metrics.HistoryRereplicationByHistoryMetadataReplicationScope, metrics.CadenceClientRequests) stopwatch := t.metricsClient.StartTimer(metrics.HistoryRereplicationByHistoryMetadataReplicationScope, metrics.CadenceClientLatency) defer stopwatch.Stop() return t.historyRereplicator.SendMultiWorkflowHistory( - t.partitionID.DomainID, t.partitionID.WorkflowID, - t.partitionID.RunID, t.firstEventID, - t.partitionID.RunID, t.nextEventID, + t.queueID.DomainID, t.queueID.WorkflowID, + t.queueID.RunID, t.firstEventID, + t.queueID.RunID, t.nextEventID, ) } @@ -315,10 +313,10 @@ func (t *historyMetadataReplicationTask) HandleErr(err error) error { // this is the retry error beginRunID := retryErr.GetRunId() beginEventID := retryErr.GetNextEventId() - endRunID := t.partitionID.RunID + endRunID := t.queueID.RunID endEventID := t.taskID resendErr := t.historyRereplicator.SendMultiWorkflowHistory( - t.partitionID.DomainID, t.partitionID.WorkflowID, + t.queueID.DomainID, t.queueID.WorkflowID, beginRunID, beginEventID, endRunID, endEventID, ) if resendErr != nil { @@ -330,31 +328,22 @@ func (t *historyMetadataReplicationTask) HandleErr(err error) error { return t.Execute() } -func (t *historyMetadataReplicationTask) RetryErr(err error) bool { +func (t *workflowReplicationTask) RetryErr(err error) bool { t.attempt++ - if t.attempt <= t.config.ReplicationTaskMaxRetry() && isTransientRetryableError(err) { + if t.attempt <= t.config.ReplicationTaskMaxRetryCount() && + t.timeSource.Now().Sub(t.startTime) <= t.config.ReplicationTaskMaxRetryDuration() && + isTransientRetryableError(err) { + time.Sleep(replicationTaskRetryDelay) return true } return false } -func (t *workflowReplicationTask) PartitionID() interface{} { - return t.partitionID -} - -func (t *workflowReplicationTask) TaskID() int64 { - return t.taskID -} - -func (t *workflowReplicationTask) HashCode() uint32 { - return farm.Fingerprint32([]byte(t.partitionID.WorkflowID)) -} - func (t *workflowReplicationTask) Ack() { t.metricsClient.IncCounter(t.metricsScope, metrics.ReplicatorMessages) - t.metricsClient.RecordTimer(t.metricsScope, metrics.ReplicatorLatency, time.Now().Sub(t.startTime)) + t.metricsClient.RecordTimer(t.metricsScope, metrics.ReplicatorLatency, t.timeSource.Now().Sub(t.startTime)) // the underlying implementation will not return anything other than nil // do logging just in case @@ -366,7 +355,7 @@ func (t *workflowReplicationTask) Ack() { func (t *workflowReplicationTask) Nack() { t.metricsClient.IncCounter(t.metricsScope, metrics.ReplicatorMessages) - t.metricsClient.RecordTimer(t.metricsScope, metrics.ReplicatorLatency, time.Now().Sub(t.startTime)) + t.metricsClient.RecordTimer(t.metricsScope, metrics.ReplicatorLatency, t.timeSource.Now().Sub(t.startTime)) // the underlying implementation will not return anything other than nil // do logging just in case diff --git a/service/worker/replicator/replicationTaskQueue.go b/service/worker/replicator/replicationTaskQueue.go new file mode 100644 index 00000000000..d7f0ea883d7 --- /dev/null +++ b/service/worker/replicator/replicationTaskQueue.go @@ -0,0 +1,101 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package replicator + +import ( + "github.com/dgryski/go-farm" + "github.com/uber/cadence/common/collection" + "github.com/uber/cadence/common/definition" + "github.com/uber/cadence/common/task" +) + +type ( + replicationSequentialTaskQueue struct { + id definition.WorkflowIdentifier + taskQueue collection.Queue + } +) + +func newReplicationSequentialTaskQueue(task task.SequentialTask) task.SequentialTaskQueue { + var id definition.WorkflowIdentifier + switch t := task.(type) { + case *historyMetadataReplicationTask: + id = t.queueID + case *historyReplicationTask: + id = t.queueID + case *activityReplicationTask: + id = t.queueID + default: + panic("Unknown replication task type") + } + + return &replicationSequentialTaskQueue{ + id: id, + taskQueue: collection.NewConcurrentPriorityQueue( + replicationSequentialTaskQueueCompareLess, + ), + } +} + +func (q *replicationSequentialTaskQueue) QueueID() interface{} { + return q.id +} + +func (q *replicationSequentialTaskQueue) Add(task task.SequentialTask) { + q.taskQueue.Add(task) +} + +func (q *replicationSequentialTaskQueue) Remove() task.SequentialTask { + return q.taskQueue.Remove().(task.SequentialTask) +} + +func (q *replicationSequentialTaskQueue) IsEmpty() bool { + return q.taskQueue.IsEmpty() +} + +func (q *replicationSequentialTaskQueue) Len() int { + return q.taskQueue.Len() +} + +func replicationSequentialTaskQueueHashFn(key interface{}) uint32 { + queue, ok := key.(*replicationSequentialTaskQueue) + if !ok { + return 0 + } + return farm.Fingerprint32([]byte(queue.id.WorkflowID)) +} + +func replicationSequentialTaskQueueCompareLess(this interface{}, that interface{}) bool { + fnGetTaskID := func(object interface{}) int64 { + switch task := object.(type) { + case *activityReplicationTask: + return task.taskID + case *historyReplicationTask: + return task.taskID + case *historyMetadataReplicationTask: + return task.taskID + default: + panic("unknown task type") + } + } + + return fnGetTaskID(this) < fnGetTaskID(that) +} diff --git a/service/worker/replicator/replicationTaskQueue_test.go b/service/worker/replicator/replicationTaskQueue_test.go new file mode 100644 index 00000000000..5384b4e1db1 --- /dev/null +++ b/service/worker/replicator/replicationTaskQueue_test.go @@ -0,0 +1,273 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package replicator + +import ( + "testing" + + "github.com/uber/cadence/common/collection" + + "github.com/dgryski/go-farm" + "github.com/stretchr/testify/suite" + "github.com/uber/cadence/common/definition" +) + +type ( + replicationSequentialTaskQueueSuite struct { + suite.Suite + + queueID definition.WorkflowIdentifier + queue *replicationSequentialTaskQueue + } +) + +func TestReplicationSequentialTaskQueueSuite(t *testing.T) { + s := new(replicationSequentialTaskQueueSuite) + suite.Run(t, s) +} + +func (s *replicationSequentialTaskQueueSuite) SetupSuite() { + +} + +func (s *replicationSequentialTaskQueueSuite) TearDownSuite() { + +} + +func (s *replicationSequentialTaskQueueSuite) SetupTest() { + s.queueID = definition.NewWorkflowIdentifier( + "some random domain ID", + "some random workflow ID", + "some random run ID", + ) + s.queue = &replicationSequentialTaskQueue{ + id: s.queueID, + taskQueue: collection.NewConcurrentPriorityQueue( + replicationSequentialTaskQueueCompareLess, + ), + } +} + +func (s *replicationSequentialTaskQueueSuite) TearDownTest() { + +} + +func (s *replicationSequentialTaskQueueSuite) TestNewTaskQueue() { + activityTask := s.generateActivityTask(0) + activityTaskQueue := newReplicationSequentialTaskQueue(s.generateActivityTask(0)) + s.Equal(0, activityTaskQueue.Len()) + s.Equal(activityTask.queueID, activityTaskQueue.QueueID()) + + historyTask := s.generateHistoryTask(0) + historyTaskQueue := newReplicationSequentialTaskQueue(s.generateActivityTask(0)) + s.Equal(0, historyTaskQueue.Len()) + s.Equal(historyTask.queueID, historyTaskQueue.QueueID()) + + historyMetadataTask := s.generateHistoryMetadataTask(0) + historyMetadataTaskQueue := newReplicationSequentialTaskQueue(s.generateActivityTask(0)) + s.Equal(0, historyMetadataTaskQueue.Len()) + s.Equal(historyMetadataTask.queueID, historyMetadataTaskQueue.QueueID()) +} + +func (s *replicationSequentialTaskQueueSuite) TestQueueID() { + s.Equal(s.queueID, s.queue.QueueID()) +} + +func (s *replicationSequentialTaskQueueSuite) TestAddRemoveIsEmptyLen() { + taskID := int64(0) + + s.Equal(0, s.queue.Len()) + s.True(s.queue.IsEmpty()) + + testTask1 := s.generateActivityTask(taskID) + taskID++ + + s.queue.Add(testTask1) + s.Equal(1, s.queue.Len()) + s.False(s.queue.IsEmpty()) + + testTask2 := s.generateHistoryTask(taskID) + taskID++ + + s.queue.Add(testTask2) + s.Equal(2, s.queue.Len()) + s.False(s.queue.IsEmpty()) + + testTask := s.queue.Remove() + s.Equal(1, s.queue.Len()) + s.False(s.queue.IsEmpty()) + s.Equal(testTask1, testTask) + + testTask3 := s.generateHistoryTask(taskID) + taskID++ + + s.queue.Add(testTask3) + s.Equal(2, s.queue.Len()) + s.False(s.queue.IsEmpty()) + + testTask = s.queue.Remove() + s.Equal(1, s.queue.Len()) + s.False(s.queue.IsEmpty()) + s.Equal(testTask2, testTask) + + testTask = s.queue.Remove() + s.Equal(0, s.queue.Len()) + s.True(s.queue.IsEmpty()) + s.Equal(testTask3, testTask) + + testTask4 := s.generateActivityTask(taskID) + taskID++ + + s.queue.Add(testTask4) + s.Equal(1, s.queue.Len()) + s.False(s.queue.IsEmpty()) + + testTask = s.queue.Remove() + s.Equal(0, s.queue.Len()) + s.True(s.queue.IsEmpty()) + s.Equal(testTask4, testTask) +} + +func (s *replicationSequentialTaskQueueSuite) TestHashFn() { + s.Equal( + farm.Fingerprint32([]byte(s.queueID.WorkflowID)), + replicationSequentialTaskQueueHashFn(s.queue), + ) +} + +func (s *replicationSequentialTaskQueueSuite) TestCompareLess() { + s.True(replicationSequentialTaskQueueCompareLess( + s.generateActivityTask(1), + s.generateActivityTask(2), + )) + + s.True(replicationSequentialTaskQueueCompareLess( + s.generateActivityTask(1), + s.generateHistoryMetadataTask(2), + )) + + s.True(replicationSequentialTaskQueueCompareLess( + s.generateActivityTask(1), + s.generateHistoryTask(2), + )) + + s.True(replicationSequentialTaskQueueCompareLess( + s.generateHistoryMetadataTask(1), + s.generateActivityTask(2), + )) + + s.True(replicationSequentialTaskQueueCompareLess( + s.generateHistoryMetadataTask(1), + s.generateHistoryMetadataTask(2), + )) + + s.True(replicationSequentialTaskQueueCompareLess( + s.generateHistoryMetadataTask(1), + s.generateHistoryTask(2), + )) + + s.True(replicationSequentialTaskQueueCompareLess( + s.generateHistoryTask(1), + s.generateActivityTask(2), + )) + + s.True(replicationSequentialTaskQueueCompareLess( + s.generateHistoryTask(1), + s.generateHistoryMetadataTask(2), + )) + + s.True(replicationSequentialTaskQueueCompareLess( + s.generateHistoryTask(1), + s.generateHistoryTask(2), + )) + + s.False(replicationSequentialTaskQueueCompareLess( + s.generateActivityTask(10), + s.generateActivityTask(2), + )) + + s.False(replicationSequentialTaskQueueCompareLess( + s.generateActivityTask(10), + s.generateHistoryMetadataTask(2), + )) + + s.False(replicationSequentialTaskQueueCompareLess( + s.generateActivityTask(10), + s.generateHistoryTask(2), + )) + + s.False(replicationSequentialTaskQueueCompareLess( + s.generateHistoryMetadataTask(10), + s.generateActivityTask(2), + )) + + s.False(replicationSequentialTaskQueueCompareLess( + s.generateHistoryMetadataTask(10), + s.generateHistoryMetadataTask(2), + )) + + s.False(replicationSequentialTaskQueueCompareLess( + s.generateHistoryMetadataTask(10), + s.generateHistoryTask(2), + )) + + s.False(replicationSequentialTaskQueueCompareLess( + s.generateHistoryTask(10), + s.generateActivityTask(2), + )) + + s.False(replicationSequentialTaskQueueCompareLess( + s.generateHistoryTask(10), + s.generateHistoryMetadataTask(2), + )) + + s.False(replicationSequentialTaskQueueCompareLess( + s.generateHistoryTask(10), + s.generateHistoryTask(2), + )) +} + +func (s *replicationSequentialTaskQueueSuite) generateActivityTask(taskID int64) *activityReplicationTask { + return &activityReplicationTask{ + workflowReplicationTask: workflowReplicationTask{ + queueID: s.queueID, + taskID: taskID, + }, + } +} + +func (s *replicationSequentialTaskQueueSuite) generateHistoryTask(taskID int64) *historyReplicationTask { + return &historyReplicationTask{ + workflowReplicationTask: workflowReplicationTask{ + queueID: s.queueID, + taskID: taskID, + }, + } +} + +func (s *replicationSequentialTaskQueueSuite) generateHistoryMetadataTask(taskID int64) *historyMetadataReplicationTask { + return &historyMetadataReplicationTask{ + workflowReplicationTask: workflowReplicationTask{ + queueID: s.queueID, + taskID: taskID, + }, + } +} diff --git a/service/worker/replicator/replicationTask_test.go b/service/worker/replicator/replicationTask_test.go index 31d80d136d8..1cdd44604be 100644 --- a/service/worker/replicator/replicationTask_test.go +++ b/service/worker/replicator/replicationTask_test.go @@ -25,6 +25,8 @@ import ( "testing" "time" + "github.com/uber/cadence/common/clock" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "github.com/uber-go/tally" @@ -51,6 +53,7 @@ type ( logger log.Logger metricsClient metrics.Client + mockTimeSource *clock.EventTimeSource mockMsg *messageMocks.Message mockHistoryClient *mocks.HistoryClient mockRereplicator *xdc.MockHistoryRereplicator @@ -63,6 +66,7 @@ type ( metricsClient metrics.Client sourceCluster string + mockTimeSource *clock.EventTimeSource mockMsg *messageMocks.Message mockHistoryClient *mocks.HistoryClient mockRereplicator *xdc.MockHistoryRereplicator @@ -75,6 +79,7 @@ type ( metricsClient metrics.Client sourceCluster string + mockTimeSource *clock.EventTimeSource mockMsg *messageMocks.Message mockHistoryClient *mocks.HistoryClient mockRereplicator *xdc.MockHistoryRereplicator @@ -108,10 +113,13 @@ func (s *activityReplicationTaskSuite) SetupTest() { s.Require().NoError(err) s.logger = loggerimpl.NewLogger(zapLogger) s.config = &Config{ - ReplicationTaskMaxRetry: dynamicconfig.GetIntPropertyFn(10), + ReplicatorActivityBufferRetryCount: dynamicconfig.GetIntPropertyFn(2), + ReplicationTaskMaxRetryCount: dynamicconfig.GetIntPropertyFn(10), + ReplicationTaskMaxRetryDuration: dynamicconfig.GetDurationPropertyFn(time.Minute), } s.metricsClient = metrics.NewClient(tally.NoopScope, metrics.Worker) + s.mockTimeSource = clock.NewEventTimeSource() s.mockMsg = &messageMocks.Message{} s.mockHistoryClient = &mocks.HistoryClient{} s.mockRereplicator = &xdc.MockHistoryRereplicator{} @@ -136,11 +144,13 @@ func (s *historyReplicationTaskSuite) SetupTest() { s.logger = loggerimpl.NewLogger(zapLogger) s.config = &Config{ ReplicatorHistoryBufferRetryCount: dynamicconfig.GetIntPropertyFn(2), - ReplicationTaskMaxRetry: dynamicconfig.GetIntPropertyFn(10), + ReplicationTaskMaxRetryCount: dynamicconfig.GetIntPropertyFn(10), + ReplicationTaskMaxRetryDuration: dynamicconfig.GetDurationPropertyFn(time.Minute), } s.metricsClient = metrics.NewClient(tally.NoopScope, metrics.Worker) s.sourceCluster = cluster.TestAlternativeClusterName + s.mockTimeSource = clock.NewEventTimeSource() s.mockMsg = &messageMocks.Message{} s.mockHistoryClient = &mocks.HistoryClient{} s.mockRereplicator = &xdc.MockHistoryRereplicator{} @@ -164,11 +174,13 @@ func (s *historyMetadataReplicationTaskSuite) SetupTest() { s.Require().NoError(err) s.logger = loggerimpl.NewLogger(zapLogger) s.config = &Config{ - ReplicationTaskMaxRetry: dynamicconfig.GetIntPropertyFn(10), + ReplicationTaskMaxRetryCount: dynamicconfig.GetIntPropertyFn(10), + ReplicationTaskMaxRetryDuration: dynamicconfig.GetDurationPropertyFn(time.Minute), } s.metricsClient = metrics.NewClient(tally.NoopScope, metrics.Worker) s.sourceCluster = cluster.TestAlternativeClusterName + s.mockTimeSource = clock.NewEventTimeSource() s.mockMsg = &messageMocks.Message{} s.mockHistoryClient = &mocks.HistoryClient{} s.mockRereplicator = &xdc.MockHistoryRereplicator{} @@ -185,15 +197,16 @@ func (s *activityReplicationTaskSuite) TestNewActivityReplicationTask() { replicationAttr := replicationTask.SyncActicvityTaskAttributes task := newActivityReplicationTask(replicationTask, s.mockMsg, s.logger, - s.config, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) + s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) // overwrite the logger for easy comparison task.logger = s.logger + s.Equal( &activityReplicationTask{ workflowReplicationTask: workflowReplicationTask{ metricsScope: metrics.SyncActivityTaskScope, startTime: task.startTime, - partitionID: definition.NewWorkflowIdentifier( + queueID: definition.NewWorkflowIdentifier( replicationAttr.GetDomainId(), replicationAttr.GetWorkflowId(), replicationAttr.GetRunId(), @@ -202,6 +215,7 @@ func (s *activityReplicationTaskSuite) TestNewActivityReplicationTask() { attempt: 0, kafkaMsg: s.mockMsg, logger: s.logger, + timeSource: s.mockTimeSource, config: s.config, historyClient: s.mockHistoryClient, metricsClient: s.metricsClient, @@ -227,7 +241,7 @@ func (s *activityReplicationTaskSuite) TestNewActivityReplicationTask() { func (s *activityReplicationTaskSuite) TestExecute() { task := newActivityReplicationTask(s.getActivityReplicationTask(), s.mockMsg, s.logger, - s.config, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) + s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) randomErr := errors.New("some random error") s.mockHistoryClient.On("SyncActivity", mock.Anything, task.req).Return(randomErr).Once() @@ -237,44 +251,46 @@ func (s *activityReplicationTaskSuite) TestExecute() { func (s *activityReplicationTaskSuite) TestHandleErr_NotEnoughAttempt() { task := newActivityReplicationTask(s.getActivityReplicationTask(), s.mockMsg, s.logger, - s.config, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) + s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) randomErr := errors.New("some random error") err := task.HandleErr(randomErr) s.Equal(randomErr, err) } -func (s *activityReplicationTaskSuite) TestHandleErr_NotRetryErr() { +func (s *activityReplicationTaskSuite) TestHandleErr_EnoughAttempt_NotRetryErr() { task := newActivityReplicationTask(s.getActivityReplicationTask(), s.mockMsg, s.logger, - s.config, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) + s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) + task.attempt = s.config.ReplicatorActivityBufferRetryCount() + 1 randomErr := errors.New("some random error") err := task.HandleErr(randomErr) s.Equal(randomErr, err) } -func (s *activityReplicationTaskSuite) TestHandleErr_RetryErr() { +func (s *activityReplicationTaskSuite) TestHandleErr_EnoughAttempt_RetryErr() { task := newActivityReplicationTask(s.getActivityReplicationTask(), s.mockMsg, s.logger, - s.config, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) + s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) + task.attempt = s.config.ReplicatorActivityBufferRetryCount() + 1 retryErr := &shared.RetryTaskError{ - DomainId: common.StringPtr(task.partitionID.DomainID), - WorkflowId: common.StringPtr(task.partitionID.WorkflowID), + DomainId: common.StringPtr(task.queueID.DomainID), + WorkflowId: common.StringPtr(task.queueID.WorkflowID), RunId: common.StringPtr("other random run ID"), NextEventId: common.Int64Ptr(447), } s.mockRereplicator.On("SendMultiWorkflowHistory", - task.partitionID.DomainID, task.partitionID.WorkflowID, + task.queueID.DomainID, task.queueID.WorkflowID, retryErr.GetRunId(), retryErr.GetNextEventId(), - task.partitionID.RunID, task.taskID+1, + task.queueID.RunID, task.taskID+1, ).Return(errors.New("some random error")).Once() err := task.HandleErr(retryErr) s.Equal(retryErr, err) s.mockRereplicator.On("SendMultiWorkflowHistory", - task.partitionID.DomainID, task.partitionID.WorkflowID, + task.queueID.DomainID, task.queueID.WorkflowID, retryErr.GetRunId(), retryErr.GetNextEventId(), - task.partitionID.RunID, task.taskID+1, + task.queueID.RunID, task.taskID+1, ).Return(nil).Once() s.mockHistoryClient.On("SyncActivity", mock.Anything, task.req).Return(nil).Once() err = task.HandleErr(retryErr) @@ -284,14 +300,14 @@ func (s *activityReplicationTaskSuite) TestHandleErr_RetryErr() { func (s *activityReplicationTaskSuite) TestRetryErr_NonRetryable() { err := &shared.BadRequestError{} task := newActivityReplicationTask(s.getActivityReplicationTask(), s.mockMsg, s.logger, - s.config, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) + s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) s.False(task.RetryErr(err)) } func (s *activityReplicationTaskSuite) TestRetryErr_Retryable() { err := &shared.InternalServiceError{} task := newActivityReplicationTask(s.getActivityReplicationTask(), s.mockMsg, s.logger, - s.config, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) + s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) task.attempt = 0 s.True(task.RetryErr(err)) } @@ -299,14 +315,22 @@ func (s *activityReplicationTaskSuite) TestRetryErr_Retryable() { func (s *activityReplicationTaskSuite) TestRetryErr_Retryable_ExceedAttempt() { err := &shared.InternalServiceError{} task := newActivityReplicationTask(s.getActivityReplicationTask(), s.mockMsg, s.logger, - s.config, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) - task.attempt = s.config.ReplicationTaskMaxRetry() + 100 + s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) + task.attempt = s.config.ReplicationTaskMaxRetryCount() + 100 + s.False(task.RetryErr(err)) +} + +func (s *activityReplicationTaskSuite) TestRetryErr_Retryable_ExceedDuration() { + err := &shared.InternalServiceError{} + task := newActivityReplicationTask(s.getActivityReplicationTask(), s.mockMsg, s.logger, + s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) + task.startTime = s.mockTimeSource.Now().Add(-2 * s.config.ReplicationTaskMaxRetryDuration()) s.False(task.RetryErr(err)) } func (s *activityReplicationTaskSuite) TestAck() { task := newActivityReplicationTask(s.getActivityReplicationTask(), s.mockMsg, s.logger, - s.config, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) + s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) s.mockMsg.On("Ack").Return(nil).Once() task.Ack() @@ -314,32 +338,18 @@ func (s *activityReplicationTaskSuite) TestAck() { func (s *activityReplicationTaskSuite) TestNack() { task := newActivityReplicationTask(s.getActivityReplicationTask(), s.mockMsg, s.logger, - s.config, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) + s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) s.mockMsg.On("Nack").Return(nil).Once() task.Nack() } -func (s *activityReplicationTaskSuite) TestQueueID() { - task := newActivityReplicationTask(s.getActivityReplicationTask(), s.mockMsg, s.logger, - s.config, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) - - s.Equal(task.partitionID, task.PartitionID()) -} - -func (s *activityReplicationTaskSuite) TestTaskID() { - task := newActivityReplicationTask(s.getActivityReplicationTask(), s.mockMsg, s.logger, - s.config, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) - - s.Equal(task.taskID, task.TaskID()) -} - func (s *historyReplicationTaskSuite) TestNewHistoryReplicationTask() { replicationTask := s.getHistoryReplicationTask() replicationAttr := replicationTask.HistoryTaskAttributes task := newHistoryReplicationTask(replicationTask, s.mockMsg, s.sourceCluster, s.logger, - s.config, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) + s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) // overwrite the logger for easy comparison task.logger = s.logger s.Equal( @@ -347,7 +357,7 @@ func (s *historyReplicationTaskSuite) TestNewHistoryReplicationTask() { workflowReplicationTask: workflowReplicationTask{ metricsScope: metrics.HistoryReplicationTaskScope, startTime: task.startTime, - partitionID: definition.NewWorkflowIdentifier( + queueID: definition.NewWorkflowIdentifier( replicationAttr.GetDomainId(), replicationAttr.GetWorkflowId(), replicationAttr.GetRunId(), @@ -356,6 +366,7 @@ func (s *historyReplicationTaskSuite) TestNewHistoryReplicationTask() { attempt: 0, kafkaMsg: s.mockMsg, logger: s.logger, + timeSource: s.mockTimeSource, config: s.config, historyClient: s.mockHistoryClient, metricsClient: s.metricsClient, @@ -386,7 +397,7 @@ func (s *historyReplicationTaskSuite) TestNewHistoryReplicationTask() { func (s *historyReplicationTaskSuite) TestExecute() { task := newHistoryReplicationTask(s.getHistoryReplicationTask(), s.mockMsg, s.sourceCluster, s.logger, - s.config, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) + s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) randomErr := errors.New("some random error") s.mockHistoryClient.On("ReplicateEvents", mock.Anything, task.req).Return(randomErr).Once() @@ -396,44 +407,46 @@ func (s *historyReplicationTaskSuite) TestExecute() { func (s *historyReplicationTaskSuite) TestHandleErr_NotEnoughAttempt() { task := newHistoryReplicationTask(s.getHistoryReplicationTask(), s.mockMsg, s.sourceCluster, s.logger, - s.config, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) + s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) randomErr := errors.New("some random error") err := task.HandleErr(randomErr) s.Equal(randomErr, err) } -func (s *historyReplicationTaskSuite) TestHandleErr_NotRetryErr() { +func (s *historyReplicationTaskSuite) TestHandleErr_EnoughAttempt_NotRetryErr() { task := newHistoryReplicationTask(s.getHistoryReplicationTask(), s.mockMsg, s.sourceCluster, s.logger, - s.config, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) + s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) + task.attempt = s.config.ReplicatorHistoryBufferRetryCount() + 1 randomErr := errors.New("some random error") err := task.HandleErr(randomErr) s.Equal(randomErr, err) } -func (s *historyReplicationTaskSuite) TestHandleErr_RetryErr() { +func (s *historyReplicationTaskSuite) TestHandleErr_EnoughAttempt_RetryErr() { task := newHistoryReplicationTask(s.getHistoryReplicationTask(), s.mockMsg, s.sourceCluster, s.logger, - s.config, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) + s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) + task.attempt = s.config.ReplicatorHistoryBufferRetryCount() + 1 retryErr := &shared.RetryTaskError{ - DomainId: common.StringPtr(task.partitionID.DomainID), - WorkflowId: common.StringPtr(task.partitionID.WorkflowID), + DomainId: common.StringPtr(task.queueID.DomainID), + WorkflowId: common.StringPtr(task.queueID.WorkflowID), RunId: common.StringPtr("other random run ID"), NextEventId: common.Int64Ptr(447), } s.mockRereplicator.On("SendMultiWorkflowHistory", - task.partitionID.DomainID, task.partitionID.WorkflowID, + task.queueID.DomainID, task.queueID.WorkflowID, retryErr.GetRunId(), retryErr.GetNextEventId(), - task.partitionID.RunID, task.taskID, + task.queueID.RunID, task.taskID, ).Return(errors.New("some random error")).Once() err := task.HandleErr(retryErr) s.Equal(retryErr, err) s.mockRereplicator.On("SendMultiWorkflowHistory", - task.partitionID.DomainID, task.partitionID.WorkflowID, + task.queueID.DomainID, task.queueID.WorkflowID, retryErr.GetRunId(), retryErr.GetNextEventId(), - task.partitionID.RunID, task.taskID, + task.queueID.RunID, task.taskID, ).Return(nil).Once() s.mockHistoryClient.On("ReplicateEvents", mock.Anything, task.req).Return(nil).Once() err = task.HandleErr(retryErr) @@ -443,14 +456,14 @@ func (s *historyReplicationTaskSuite) TestHandleErr_RetryErr() { func (s *historyReplicationTaskSuite) TestRetryErr_NonRetryable() { err := &shared.BadRequestError{} task := newHistoryReplicationTask(s.getHistoryReplicationTask(), s.mockMsg, s.sourceCluster, s.logger, - s.config, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) + s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) s.False(task.RetryErr(err)) } func (s *historyReplicationTaskSuite) TestRetryErr_Retryable() { err := &shared.InternalServiceError{} task := newHistoryReplicationTask(s.getHistoryReplicationTask(), s.mockMsg, s.sourceCluster, s.logger, - s.config, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) + s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) task.attempt = 0 s.True(task.RetryErr(err)) s.False(task.req.GetForceBufferEvents()) @@ -459,14 +472,22 @@ func (s *historyReplicationTaskSuite) TestRetryErr_Retryable() { func (s *historyReplicationTaskSuite) TestRetryErr_Retryable_ExceedAttempt() { err := &shared.InternalServiceError{} task := newHistoryReplicationTask(s.getHistoryReplicationTask(), s.mockMsg, s.sourceCluster, s.logger, - s.config, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) - task.attempt = s.config.ReplicationTaskMaxRetry() + 100 + s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) + task.attempt = s.config.ReplicationTaskMaxRetryCount() + 100 + s.False(task.RetryErr(err)) +} + +func (s *historyReplicationTaskSuite) TestRetryErr_Retryable_ExceedDuration() { + err := &shared.InternalServiceError{} + task := newHistoryReplicationTask(s.getHistoryReplicationTask(), s.mockMsg, s.sourceCluster, s.logger, + s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) + task.startTime = s.mockTimeSource.Now().Add(-2 * s.config.ReplicationTaskMaxRetryDuration()) s.False(task.RetryErr(err)) } func (s *historyReplicationTaskSuite) TestAck() { task := newHistoryReplicationTask(s.getHistoryReplicationTask(), s.mockMsg, s.sourceCluster, s.logger, - s.config, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) + s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) s.mockMsg.On("Ack").Return(nil).Once() task.Ack() @@ -474,40 +495,26 @@ func (s *historyReplicationTaskSuite) TestAck() { func (s *historyReplicationTaskSuite) TestNack() { task := newHistoryReplicationTask(s.getHistoryReplicationTask(), s.mockMsg, s.sourceCluster, s.logger, - s.config, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) + s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) s.mockMsg.On("Nack").Return(nil).Once() task.Nack() } -func (s *historyReplicationTaskSuite) TestQueueID() { - task := newHistoryReplicationTask(s.getHistoryReplicationTask(), s.mockMsg, s.sourceCluster, s.logger, - s.config, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) - - s.Equal(task.partitionID, task.PartitionID()) -} - -func (s *historyReplicationTaskSuite) TestTaskID() { - task := newHistoryReplicationTask(s.getHistoryReplicationTask(), s.mockMsg, s.sourceCluster, s.logger, - s.config, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) - - s.Equal(task.taskID, task.TaskID()) -} - func (s *historyMetadataReplicationTaskSuite) TestNewHistoryMetadataReplicationTask() { replicationTask := s.getHistoryMetadataReplicationTask() replicationAttr := replicationTask.HistoryMetadataTaskAttributes task := newHistoryMetadataReplicationTask(replicationTask, s.mockMsg, s.sourceCluster, s.logger, - s.config, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) + s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) // overwrite the logger for easy comparison task.logger = s.logger s.Equal( &historyMetadataReplicationTask{ workflowReplicationTask: workflowReplicationTask{ - metricsScope: metrics.HistoryReplicationTaskScope, + metricsScope: metrics.HistoryMetadataReplicationTaskScope, startTime: task.startTime, - partitionID: definition.NewWorkflowIdentifier( + queueID: definition.NewWorkflowIdentifier( replicationAttr.GetDomainId(), replicationAttr.GetWorkflowId(), replicationAttr.GetRunId(), @@ -516,6 +523,7 @@ func (s *historyMetadataReplicationTaskSuite) TestNewHistoryMetadataReplicationT attempt: 0, kafkaMsg: s.mockMsg, logger: s.logger, + timeSource: s.mockTimeSource, config: s.config, historyClient: s.mockHistoryClient, metricsClient: s.metricsClient, @@ -531,31 +539,22 @@ func (s *historyMetadataReplicationTaskSuite) TestNewHistoryMetadataReplicationT func (s *historyMetadataReplicationTaskSuite) TestExecute() { task := newHistoryMetadataReplicationTask(s.getHistoryMetadataReplicationTask(), s.mockMsg, s.sourceCluster, s.logger, - s.config, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) + s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) randomErr := errors.New("some random error") s.mockRereplicator.On("SendMultiWorkflowHistory", - task.partitionID.DomainID, task.partitionID.WorkflowID, - task.partitionID.RunID, task.firstEventID, - task.partitionID.RunID, task.nextEventID, + task.queueID.DomainID, task.queueID.WorkflowID, + task.queueID.RunID, task.firstEventID, + task.queueID.RunID, task.nextEventID, ).Return(randomErr).Once() err := task.Execute() s.Equal(randomErr, err) } -func (s *historyMetadataReplicationTaskSuite) TestHandleErr_NotEnoughAttempt() { - task := newHistoryMetadataReplicationTask(s.getHistoryMetadataReplicationTask(), s.mockMsg, s.sourceCluster, s.logger, - s.config, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) - randomErr := errors.New("some random error") - - err := task.HandleErr(randomErr) - s.Equal(randomErr, err) -} - func (s *historyMetadataReplicationTaskSuite) TestHandleErr_NotRetryErr() { task := newHistoryMetadataReplicationTask(s.getHistoryMetadataReplicationTask(), s.mockMsg, s.sourceCluster, s.logger, - s.config, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) + s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) randomErr := errors.New("some random error") err := task.HandleErr(randomErr) @@ -564,31 +563,31 @@ func (s *historyMetadataReplicationTaskSuite) TestHandleErr_NotRetryErr() { func (s *historyMetadataReplicationTaskSuite) TestHandleErr_RetryErr() { task := newHistoryMetadataReplicationTask(s.getHistoryMetadataReplicationTask(), s.mockMsg, s.sourceCluster, s.logger, - s.config, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) + s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) retryErr := &shared.RetryTaskError{ - DomainId: common.StringPtr(task.partitionID.DomainID), - WorkflowId: common.StringPtr(task.partitionID.WorkflowID), + DomainId: common.StringPtr(task.queueID.DomainID), + WorkflowId: common.StringPtr(task.queueID.WorkflowID), RunId: common.StringPtr("other random run ID"), NextEventId: common.Int64Ptr(447), } s.mockRereplicator.On("SendMultiWorkflowHistory", - task.partitionID.DomainID, task.partitionID.WorkflowID, + task.queueID.DomainID, task.queueID.WorkflowID, retryErr.GetRunId(), retryErr.GetNextEventId(), - task.partitionID.RunID, task.taskID, + task.queueID.RunID, task.taskID, ).Return(errors.New("some random error")).Once() err := task.HandleErr(retryErr) s.Equal(retryErr, err) s.mockRereplicator.On("SendMultiWorkflowHistory", - task.partitionID.DomainID, task.partitionID.WorkflowID, + task.queueID.DomainID, task.queueID.WorkflowID, retryErr.GetRunId(), retryErr.GetNextEventId(), - task.partitionID.RunID, task.taskID, + task.queueID.RunID, task.taskID, ).Return(nil).Once() s.mockRereplicator.On("SendMultiWorkflowHistory", - task.partitionID.DomainID, task.partitionID.WorkflowID, - task.partitionID.RunID, task.firstEventID, - task.partitionID.RunID, task.nextEventID, + task.queueID.DomainID, task.queueID.WorkflowID, + task.queueID.RunID, task.firstEventID, + task.queueID.RunID, task.nextEventID, ).Return(nil).Once() err = task.HandleErr(retryErr) s.Nil(err) @@ -597,14 +596,14 @@ func (s *historyMetadataReplicationTaskSuite) TestHandleErr_RetryErr() { func (s *historyMetadataReplicationTaskSuite) TestRetryErr_NonRetryable() { err := &shared.BadRequestError{} task := newHistoryMetadataReplicationTask(s.getHistoryMetadataReplicationTask(), s.mockMsg, s.sourceCluster, s.logger, - s.config, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) + s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) s.False(task.RetryErr(err)) } func (s *historyMetadataReplicationTaskSuite) TestRetryErr_Retryable() { err := &shared.InternalServiceError{} task := newHistoryMetadataReplicationTask(s.getHistoryMetadataReplicationTask(), s.mockMsg, s.sourceCluster, s.logger, - s.config, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) + s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) task.attempt = 0 s.True(task.RetryErr(err)) } @@ -612,14 +611,22 @@ func (s *historyMetadataReplicationTaskSuite) TestRetryErr_Retryable() { func (s *historyMetadataReplicationTaskSuite) TestRetryErr_Retryable_ExceedAttempt() { err := &shared.InternalServiceError{} task := newHistoryMetadataReplicationTask(s.getHistoryMetadataReplicationTask(), s.mockMsg, s.sourceCluster, s.logger, - s.config, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) - task.attempt = s.config.ReplicationTaskMaxRetry() + 100 + s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) + task.attempt = s.config.ReplicationTaskMaxRetryCount() + 100 + s.False(task.RetryErr(err)) +} + +func (s *historyMetadataReplicationTaskSuite) TestRetryErr_Retryable_ExceedDuration() { + err := &shared.InternalServiceError{} + task := newHistoryMetadataReplicationTask(s.getHistoryMetadataReplicationTask(), s.mockMsg, s.sourceCluster, s.logger, + s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) + task.startTime = s.mockTimeSource.Now().Add(-2 * s.config.ReplicationTaskMaxRetryDuration()) s.False(task.RetryErr(err)) } func (s *historyMetadataReplicationTaskSuite) TestAck() { task := newHistoryMetadataReplicationTask(s.getHistoryMetadataReplicationTask(), s.mockMsg, s.sourceCluster, s.logger, - s.config, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) + s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) s.mockMsg.On("Ack").Return(nil).Once() task.Ack() @@ -627,26 +634,12 @@ func (s *historyMetadataReplicationTaskSuite) TestAck() { func (s *historyMetadataReplicationTaskSuite) TestNack() { task := newHistoryMetadataReplicationTask(s.getHistoryMetadataReplicationTask(), s.mockMsg, s.sourceCluster, s.logger, - s.config, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) + s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) s.mockMsg.On("Nack").Return(nil).Once() task.Nack() } -func (s *historyMetadataReplicationTaskSuite) TestQueueID() { - task := newHistoryMetadataReplicationTask(s.getHistoryMetadataReplicationTask(), s.mockMsg, s.sourceCluster, s.logger, - s.config, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) - - s.Equal(task.partitionID, task.PartitionID()) -} - -func (s *historyMetadataReplicationTaskSuite) TestTaskID() { - task := newHistoryMetadataReplicationTask(s.getHistoryMetadataReplicationTask(), s.mockMsg, s.sourceCluster, s.logger, - s.config, s.mockHistoryClient, s.metricsClient, s.mockRereplicator) - - s.Equal(task.taskID, task.TaskID()) -} - func (s *activityReplicationTaskSuite) getActivityReplicationTask() *replicator.ReplicationTask { replicationAttr := &replicator.SyncActicvityTaskAttributes{ DomainId: common.StringPtr("some random domain ID"), diff --git a/service/worker/replicator/replicator.go b/service/worker/replicator/replicator.go index 0b3aa90e955..e45b6161e88 100644 --- a/service/worker/replicator/replicator.go +++ b/service/worker/replicator/replicator.go @@ -60,12 +60,14 @@ type ( // Config contains all the replication config for worker Config struct { - PersistenceMaxQPS dynamicconfig.IntPropertyFn - ReplicatorMetaTaskConcurrency dynamicconfig.IntPropertyFn - ReplicatorTaskConcurrency dynamicconfig.IntPropertyFn - ReplicatorMessageConcurrency dynamicconfig.IntPropertyFn - ReplicatorHistoryBufferRetryCount dynamicconfig.IntPropertyFn - ReplicationTaskMaxRetry dynamicconfig.IntPropertyFn + PersistenceMaxQPS dynamicconfig.IntPropertyFn + ReplicatorMetaTaskConcurrency dynamicconfig.IntPropertyFn + ReplicatorTaskConcurrency dynamicconfig.IntPropertyFn + ReplicatorMessageConcurrency dynamicconfig.IntPropertyFn + ReplicatorActivityBufferRetryCount dynamicconfig.IntPropertyFn + ReplicatorHistoryBufferRetryCount dynamicconfig.IntPropertyFn + ReplicationTaskMaxRetryCount dynamicconfig.IntPropertyFn + ReplicationTaskMaxRetryDuration dynamicconfig.DurationPropertyFn } ) @@ -131,7 +133,9 @@ func (r *Replicator) Start() error { historyRereplicator, r.historyClient, task.NewSequentialTaskProcessor( r.config.ReplicatorTaskConcurrency(), - r.config.ReplicatorMessageConcurrency(), + replicationSequentialTaskQueueHashFn, + newReplicationSequentialTaskQueue, + r.metricsClient, logger, ), )) diff --git a/service/worker/service.go b/service/worker/service.go index 05385490634..1c98d2f7419 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -86,12 +86,14 @@ func NewConfig(params *service.BootstrapParams) *Config { dc := dynamicconfig.NewCollection(params.DynamicConfig, params.Logger) return &Config{ ReplicationCfg: &replicator.Config{ - PersistenceMaxQPS: dc.GetIntProperty(dynamicconfig.WorkerPersistenceMaxQPS, 500), - ReplicatorMetaTaskConcurrency: dc.GetIntProperty(dynamicconfig.WorkerReplicatorMetaTaskConcurrency, 64), - ReplicatorTaskConcurrency: dc.GetIntProperty(dynamicconfig.WorkerReplicatorTaskConcurrency, 256), - ReplicatorMessageConcurrency: dc.GetIntProperty(dynamicconfig.WorkerReplicatorMessageConcurrency, 2048), - ReplicatorHistoryBufferRetryCount: dc.GetIntProperty(dynamicconfig.WorkerReplicatorHistoryBufferRetryCount, 8), - ReplicationTaskMaxRetry: dc.GetIntProperty(dynamicconfig.WorkerReplicationTaskMaxRetry, 400), + PersistenceMaxQPS: dc.GetIntProperty(dynamicconfig.WorkerPersistenceMaxQPS, 500), + ReplicatorMetaTaskConcurrency: dc.GetIntProperty(dynamicconfig.WorkerReplicatorMetaTaskConcurrency, 64), + ReplicatorTaskConcurrency: dc.GetIntProperty(dynamicconfig.WorkerReplicatorTaskConcurrency, 256), + ReplicatorMessageConcurrency: dc.GetIntProperty(dynamicconfig.WorkerReplicatorMessageConcurrency, 2048), + ReplicatorActivityBufferRetryCount: dc.GetIntProperty(dynamicconfig.WorkerReplicatorActivityBufferRetryCount, 8), + ReplicatorHistoryBufferRetryCount: dc.GetIntProperty(dynamicconfig.WorkerReplicatorHistoryBufferRetryCount, 8), + ReplicationTaskMaxRetryCount: dc.GetIntProperty(dynamicconfig.WorkerReplicationTaskMaxRetryCount, 400), + ReplicationTaskMaxRetryDuration: dc.GetDurationProperty(dynamicconfig.WorkerReplicationTaskMaxRetryDuration, 15*time.Minute), }, ArchiverConfig: &archiver.Config{ EnableArchivalCompression: dc.GetBoolPropertyFnWithDomainFilter(dynamicconfig.EnableArchivalCompression, true),