Skip to content

Commit

Permalink
Revert cadence-workflow#1611 for replication task solution which has …
Browse files Browse the repository at this point in the history
…better performance (cadence-workflow#1753)

Compared to cadence-workflow#1611, this PR shows ~60% reduction in replication task processing mean latency.

Revert "This PR aims to refactor the existing replication task processing, as… (cadence-workflow#1611)"
* This reverts commit 9878a8f.

This PR aims to refactor the existing replication task processing, as well as improve performance when applying large number of replication tasks concurrently to one workflow.

* Implement generic priority queue & UT
* Implement generic concurrent priority queue
* Move collection interface into one file
* Implement generic multi-sequential-task-queue processing logic & UT & metrics.
* Refactor replication worker task processing logic
* Handle activity / history replication task using sequential task queue & UT
* All replication retry-able errors have upper attempt & duration, when exceeding the attempt | duration, replication message will be moved to DLQ
  • Loading branch information
wxing1292 authored May 10, 2019
1 parent a33c480 commit a58f183
Show file tree
Hide file tree
Showing 20 changed files with 1,210 additions and 484 deletions.
79 changes: 79 additions & 0 deletions common/collection/concurrentPriorityQueue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package collection

import (
"sync"
)

type (
concurrentPriorityQueueImpl struct {
lock sync.Mutex
priorityQueue Queue
}
)

// NewConcurrentPriorityQueue create a new concurrent priority queue
func NewConcurrentPriorityQueue(compareLess func(this interface{}, other interface{}) bool) Queue {
return &concurrentPriorityQueueImpl{
priorityQueue: NewPriorityQueue(compareLess),
}
}

// Peek returns the top item of the priority queue
func (pq *concurrentPriorityQueueImpl) Peek() interface{} {
pq.lock.Lock()
defer pq.lock.Unlock()

return pq.priorityQueue.Peek()
}

// Add push an item to priority queue
func (pq *concurrentPriorityQueueImpl) Add(item interface{}) {
pq.lock.Lock()
defer pq.lock.Unlock()

pq.priorityQueue.Add(item)
}

// Remove pop an item from priority queue
func (pq *concurrentPriorityQueueImpl) Remove() interface{} {
pq.lock.Lock()
defer pq.lock.Unlock()

return pq.priorityQueue.Remove()
}

// IsEmpty indicate if the priority queue is empty
func (pq *concurrentPriorityQueueImpl) IsEmpty() bool {
pq.lock.Lock()
defer pq.lock.Unlock()

return pq.priorityQueue.IsEmpty()
}

// Len return the size of the queue
func (pq *concurrentPriorityQueueImpl) Len() int {
pq.lock.Lock()
defer pq.lock.Unlock()

return pq.priorityQueue.Len()
}
60 changes: 2 additions & 58 deletions common/collection/concurrent_tx_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,62 +34,6 @@ const (
)

type (
// HashFunc represents a hash function for string
HashFunc func(interface{}) uint32

// ActionFunc take a key and value, do calulation and return err
ActionFunc func(key interface{}, value interface{}) error
// PredicateFunc take a key and value, do calulation and return boolean
PredicateFunc func(key interface{}, value interface{}) bool

// ConcurrentTxMap is a generic interface for any
// implementation of a dictionary or a key value
// lookup table that is thread safe, and
ConcurrentTxMap interface {
// Get returns the value for the given key
Get(key interface{}) (interface{}, bool)
// Contains returns true if the key exist and false otherwise
Contains(key interface{}) bool
// Put records the mapping from given key to value
Put(key interface{}, value interface{})
// PutIfNotExist records the key value mapping only
// if the mapping does not already exist
PutIfNotExist(key interface{}, value interface{}) bool
// Remove deletes the key from the map
Remove(key interface{})
// GetAndDo returns the value corresponding to the key, and apply fn to key value before return value
// return (value, value exist or not, error when evaluation fn)
GetAndDo(key interface{}, fn ActionFunc) (interface{}, bool, error)
// PutOrDo put the key value in the map, if key does not exists, otherwise, call fn with existing key and value
// return (value, fn evaluated or not, error when evaluation fn)
PutOrDo(key interface{}, value interface{}, fn ActionFunc) (interface{}, bool, error)
// RemoveIf deletes the given key from the map if fn return true
// return whether the key is removed or not
RemoveIf(key interface{}, fn PredicateFunc) bool
// Iter returns an iterator to the map
Iter() MapIterator
// Size returns the number of items in the map
Size() int
}

// MapIterator represents the interface for
// map iterators
MapIterator interface {
// Close closes the iterator
// and releases any allocated resources
Close()
// Entries returns a channel of MapEntry
// objects that can be used in a range loop
Entries() <-chan *MapEntry
}

// MapEntry represents a key-value entry within the map
MapEntry struct {
// Key represents the key
Key interface{}
// Value represents the value
Value interface{}
}

// ShardedConcurrentTxMap is an implementation of
// ConcurrentMap that internally uses multiple
Expand Down Expand Up @@ -289,8 +233,8 @@ func (cmap *ShardedConcurrentTxMap) Iter() MapIterator {
return iterator
}

// Size returns the number of items in the map
func (cmap *ShardedConcurrentTxMap) Size() int {
// Len returns the number of items in the map
func (cmap *ShardedConcurrentTxMap) Len() int {
return int(atomic.LoadInt32(&cmap.size))
}

Expand Down
24 changes: 12 additions & 12 deletions common/collection/concurrent_tx_map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,28 +49,28 @@ func (s *ConcurrentTxMapSuite) SetupTest() {
s.Assertions = require.New(s.T()) // Have to define our overridden assertions in the test setup. If we did it earlier, s.T() will return nil
}

func (s *ConcurrentTxMapSuite) TestSize() {
func (s *ConcurrentTxMapSuite) TestLen() {
testMap := NewShardedConcurrentTxMap(1, UUIDHashCode)

key1 := "0001"
testMap.Put(key1, boolType(true))
s.Equal(1, testMap.Size(), "Wrong concurrent map size")
s.Equal(1, testMap.Len(), "Wrong concurrent map size")

testMap.Put(key1, boolType(false))
s.Equal(1, testMap.Size(), "Wrong concurrent map size")
s.Equal(1, testMap.Len(), "Wrong concurrent map size")

key2 := "0002"
testMap.Put(key2, boolType(false))
s.Equal(2, testMap.Size(), "Wrong concurrent map size")
s.Equal(2, testMap.Len(), "Wrong concurrent map size")

testMap.PutIfNotExist(key2, boolType(false))
s.Equal(2, testMap.Size(), "Wrong concurrent map size")
s.Equal(2, testMap.Len(), "Wrong concurrent map size")

testMap.Remove(key2)
s.Equal(1, testMap.Size(), "Wrong concurrent map size")
s.Equal(1, testMap.Len(), "Wrong concurrent map size")

testMap.Remove(key2)
s.Equal(1, testMap.Size(), "Wrong concurrent map size")
s.Equal(1, testMap.Len(), "Wrong concurrent map size")
}

func (s *ConcurrentTxMapSuite) TestGetAndDo() {
Expand Down Expand Up @@ -149,7 +149,7 @@ func (s *ConcurrentTxMapSuite) TestRemoveIf() {
}
return false
})
s.Equal(1, testMap.Size(), "TestRemoveIf should only entry if condition is met")
s.Equal(1, testMap.Len(), "TestRemoveIf should only entry if condition is met")
s.False(removed, "TestRemoveIf should return false if key is not deleted")

removed = testMap.RemoveIf(key, func(key interface{}, value interface{}) bool {
Expand All @@ -159,7 +159,7 @@ func (s *ConcurrentTxMapSuite) TestRemoveIf() {
}
return false
})
s.Equal(0, testMap.Size(), "TestRemoveIf should only entry if condition is met")
s.Equal(0, testMap.Len(), "TestRemoveIf should only entry if condition is met")
s.True(removed, "TestRemoveIf should return true if key is deleted")
}

Expand All @@ -181,7 +181,7 @@ func (s *ConcurrentTxMapSuite) TestGetAfterPut() {
s.True(bool(boolValue), "Wrong value returned from map")
}

s.Equal(len(countMap), testMap.Size(), "Size() returned wrong value")
s.Equal(len(countMap), testMap.Len(), "Size() returned wrong value")

it := testMap.Iter()
for entry := range it.Entries() {
Expand All @@ -197,7 +197,7 @@ func (s *ConcurrentTxMapSuite) TestGetAfterPut() {
testMap.Remove(k)
}

s.Equal(0, testMap.Size(), "Map returned non-zero size after deleting all entries")
s.Equal(0, testMap.Len(), "Map returned non-zero size after deleting all entries")
}

func (s *ConcurrentTxMapSuite) TestPutIfNotExist() {
Expand Down Expand Up @@ -244,7 +244,7 @@ func (s *ConcurrentTxMapSuite) TestMapConcurrency() {
startWG.Done()
doneWG.Wait()

s.Equal(nKeys, testMap.Size(), "Wrong concurrent map size")
s.Equal(nKeys, testMap.Len(), "Wrong concurrent map size")

var gotTotal int32
for i := 0; i < nKeys; i++ {
Expand Down
26 changes: 0 additions & 26 deletions common/collection/defs.go

This file was deleted.

98 changes: 98 additions & 0 deletions common/collection/initerface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package collection

type (
// Queue is the interface for queue
Queue interface {
// Peek returns the first item of the queue
Peek() interface{}
// Add push an item to the queue
Add(item interface{})
// Remove pop an item from the queue
Remove() interface{}
// IsEmpty indicate if the queue is empty
IsEmpty() bool
// Len return the size of the queue
Len() int
}

// HashFunc represents a hash function for string
HashFunc func(interface{}) uint32

// ActionFunc take a key and value, do calculation and return err
ActionFunc func(key interface{}, value interface{}) error
// PredicateFunc take a key and value, do calculation and return boolean
PredicateFunc func(key interface{}, value interface{}) bool

// ConcurrentTxMap is a generic interface for any implementation of a dictionary
// or a key value lookup table that is thread safe, and providing functionality
// to modify key / value pair inside within a transaction
ConcurrentTxMap interface {
// Get returns the value for the given key
Get(key interface{}) (interface{}, bool)
// Contains returns true if the key exist and false otherwise
Contains(key interface{}) bool
// Put records the mapping from given key to value
Put(key interface{}, value interface{})
// PutIfNotExist records the key value mapping only
// if the mapping does not already exist
PutIfNotExist(key interface{}, value interface{}) bool
// Remove deletes the key from the map
Remove(key interface{})
// GetAndDo returns the value corresponding to the key, and apply fn to key value before return value
// return (value, value exist or not, error when evaluation fn)
GetAndDo(key interface{}, fn ActionFunc) (interface{}, bool, error)
// PutOrDo put the key value in the map, if key does not exists, otherwise, call fn with existing key and value
// return (value, fn evaluated or not, error when evaluation fn)
PutOrDo(key interface{}, value interface{}, fn ActionFunc) (interface{}, bool, error)
// RemoveIf deletes the given key from the map if fn return true
// return whether the key is removed or not
RemoveIf(key interface{}, fn PredicateFunc) bool
// Iter returns an iterator to the map
Iter() MapIterator
// Len returns the number of items in the map
Len() int
}

// MapIterator represents the interface for map iterators
MapIterator interface {
// Close closes the iterator
// and releases any allocated resources
Close()
// Entries returns a channel of MapEntry
// objects that can be used in a range loop
Entries() <-chan *MapEntry
}

// MapEntry represents a key-value entry within the map
MapEntry struct {
// Key represents the key
Key interface{}
// Value represents the value
Value interface{}
}
)

const (
// UUIDStringLength is the length of an UUID represented as a hex string
UUIDStringLength = 36 // xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
)
Loading

0 comments on commit a58f183

Please sign in to comment.