Skip to content

Commit

Permalink
This PR aims to refactor the existing replication task processing, as… (
Browse files Browse the repository at this point in the history
cadence-workflow#1611)

* This PR aims to refactor the existing replication task processing, as well as improve performance when applying large number of replication tasks concurrently to one workflow.

* Implement sequential task processing logic & UT
* Refactor replication worker task processing logic
* Handle activity / history replication task using sequential task queue & UT
* All replication retryable errors have upper attempt, when exceeding the attempt, replication message will be moved to DLQ
  • Loading branch information
wxing1292 authored Apr 3, 2019
1 parent c2ddcd4 commit 9878a8f
Show file tree
Hide file tree
Showing 22 changed files with 1,945 additions and 422 deletions.
15 changes: 9 additions & 6 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,9 @@ var keys = map[Key]string{
HistoryThrottledLogRPS: "history.throttledLogRPS",

WorkerPersistenceMaxQPS: "worker.persistenceMaxQPS",
WorkerReplicatorConcurrency: "worker.replicatorConcurrency",
WorkerReplicatorActivityBufferRetryCount: "worker.replicatorActivityBufferRetryCount",
WorkerReplicatorMetaTaskConcurrency: "worker.replicatorMetaTaskConcurrency",
WorkerReplicatorTaskConcurrency: "worker.replicatorTaskConcurrency",
WorkerReplicatorMessageConcurrency: "worker.replicatorMessageConcurrency",
WorkerReplicatorHistoryBufferRetryCount: "worker.replicatorHistoryBufferRetryCount",
WorkerReplicationTaskMaxRetry: "worker.replicationTaskMaxRetry",
WorkerIndexerConcurrency: "worker.indexerConcurrency",
Expand Down Expand Up @@ -429,10 +430,12 @@ const (

// WorkerPersistenceMaxQPS is the max qps worker host can query DB
WorkerPersistenceMaxQPS
// WorkerReplicatorConcurrency is the max concurrent tasks to be processed at any given time
WorkerReplicatorConcurrency
// WorkerReplicatorActivityBufferRetryCount is the retry attempt when encounter retry error on activity
WorkerReplicatorActivityBufferRetryCount
// WorkerReplicatorMetaTaskConcurrency is the number of coroutine handling metadata related tasks
WorkerReplicatorMetaTaskConcurrency
// WorkerReplicatorTaskConcurrency is the number of coroutine handling non metadata related tasks
WorkerReplicatorTaskConcurrency
// WorkerReplicatorMessageConcurrency is the max concurrent tasks provided by messaging client
WorkerReplicatorMessageConcurrency
// WorkerReplicatorHistoryBufferRetryCount is the retry attempt when encounter retry error on history
WorkerReplicatorHistoryBufferRetryCount
// WorkerReplicationTaskMaxRetry is the max retry for any task
Expand Down
65 changes: 65 additions & 0 deletions common/task/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package task

import (
"github.com/uber/cadence/common"
)

type (
// SequentialTaskProcessor is the generic goroutines interface
// which process sequential task
// for the definition of sequential task, see SequentialTask
SequentialTaskProcessor interface {
common.Daemon
Submit(task SequentialTask) error
}

// Task is the generic task representation
Task interface {
// Execute process this task
Execute() error
// HandleErr handle the error returned by Execute
HandleErr(err error) error
// RetryErr check whether to retry after HandleErr(Execute())
RetryErr(err error) bool
// Ack marks the task as successful completed
Ack()
// Nack marks the task as unsuccessful completed
Nack()
}

// SequentialTaskPartitionID is the interface representing the ID of SequentialTask
SequentialTaskPartitionID interface {
PartitionID() interface{} // MUST be go primitive type or struct with primitive types
HashCode() uint32
}

// SequentialTask is the interface for tasks which should be executed sequentially
// one common example is the workflow replication task (containing workflow history),
// which must be executed one by one, in the order of the first event ID)
SequentialTask interface {
Task
SequentialTaskPartitionID
// TaskID return the ID of the task, this task ID is used for sorting
TaskID() int64
}
)
194 changes: 194 additions & 0 deletions common/task/sequentialTaskProcessor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package task

import (
"sort"
"sync"
"sync/atomic"
"time"

"github.com/uber-common/bark"
"github.com/uber/cadence/common"
)

type (
sequentialTaskProcessorImpl struct {
status int32
shutdownChan chan struct{}
waitGroup sync.WaitGroup

coroutineSize int
taskBatchSize int
coroutineTaskQueues []chan SequentialTask
logger bark.Logger
}

// SequentialTasks slice of SequentialTask
SequentialTasks []SequentialTask
)

// NewSequentialTaskProcessor create a new sequential tasks processor
func NewSequentialTaskProcessor(coroutineSize int, taskBatchSize int, logger bark.Logger) SequentialTaskProcessor {

coroutineTaskQueues := make([]chan SequentialTask, coroutineSize)
for i := 0; i < coroutineSize; i++ {
coroutineTaskQueues[i] = make(chan SequentialTask, taskBatchSize)
}

return &sequentialTaskProcessorImpl{
status: common.DaemonStatusInitialized,
shutdownChan: make(chan struct{}),
coroutineSize: coroutineSize,
taskBatchSize: taskBatchSize,
coroutineTaskQueues: coroutineTaskQueues,
logger: logger,
}
}

func (t *sequentialTaskProcessorImpl) Start() {
if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
return
}

t.waitGroup.Add(t.coroutineSize)
for i := 0; i < t.coroutineSize; i++ {
coroutineTaskQueue := t.coroutineTaskQueues[i]
go t.pollAndProcessTaskQueue(coroutineTaskQueue)
}
t.logger.Info("Task processor started.")
}

func (t *sequentialTaskProcessorImpl) Stop() {
if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
return
}

close(t.shutdownChan)
if success := common.AwaitWaitGroup(&t.waitGroup, time.Minute); !success {
t.logger.Warn("Task processor timeout trying to stop.")
}
t.logger.Info("Task processor stopped.")
}

func (t *sequentialTaskProcessorImpl) Submit(task SequentialTask) error {
hashCode := int(task.HashCode()) % t.coroutineSize
taskQueue := t.coroutineTaskQueues[hashCode]
// need to dispatch this task set
select {
case <-t.shutdownChan:
case taskQueue <- task:
}
return nil
}

func (t *sequentialTaskProcessorImpl) pollAndProcessTaskQueue(coroutineTaskQueue chan SequentialTask) {
defer t.waitGroup.Done()

for {
select {
case <-t.shutdownChan:
return
default:
t.batchPollTaskQueue(coroutineTaskQueue)
}
}
}

func (t *sequentialTaskProcessorImpl) batchPollTaskQueue(coroutineTaskQueue chan SequentialTask) {
bufferedSequentialTasks := make(map[interface{}][]SequentialTask)
indexTasks := func(task SequentialTask) {
sequentialTasks, ok := bufferedSequentialTasks[task.PartitionID()]
if ok {
sequentialTasks = append(sequentialTasks, task)
bufferedSequentialTasks[task.PartitionID()] = sequentialTasks
} else {
bufferedSequentialTasks[task.PartitionID()] = []SequentialTask{task}
}
}

select {
case <-t.shutdownChan:
return
case task := <-coroutineTaskQueue:
indexTasks(task)
BufferLoop:
for i := 0; i < t.taskBatchSize-1; i++ {
select {
case <-t.shutdownChan:
return
case task := <-coroutineTaskQueue:
indexTasks(task)
default:
// currently no more task
break BufferLoop
}
}
}

for _, sequentialTasks := range bufferedSequentialTasks {
t.batchProcessingSequentialTasks(sequentialTasks)
}
}

func (t *sequentialTaskProcessorImpl) batchProcessingSequentialTasks(sequentialTasks []SequentialTask) {
sort.Sort(SequentialTasks(sequentialTasks))

for _, task := range sequentialTasks {
t.processTaskOnce(task)
}
}

func (t *sequentialTaskProcessorImpl) processTaskOnce(task SequentialTask) {
var err error

TaskProcessingLoop:
for {
select {
case <-t.shutdownChan:
return
default:
err = task.Execute()
err = task.HandleErr(err)
if err == nil || !task.RetryErr(err) {
break TaskProcessingLoop
}
}
}

if err != nil {
task.Nack()
return
}
task.Ack()
}

func (tasks SequentialTasks) Len() int {
return len(tasks)
}

func (tasks SequentialTasks) Swap(i, j int) {
tasks[i], tasks[j] = tasks[j], tasks[i]
}

func (tasks SequentialTasks) Less(i, j int) bool {
return tasks[i].TaskID() < tasks[j].TaskID()
}
54 changes: 54 additions & 0 deletions common/task/sequentialTaskProcessor_mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package task

import (
mock "github.com/stretchr/testify/mock"
)

// MockSequentialTaskProcessor is an autogenerated mock type for the SequentialTaskProcessor type
type MockSequentialTaskProcessor struct {
mock.Mock
}

// Start provides a mock function with given fields:
func (_m *MockSequentialTaskProcessor) Start() {
_m.Called()
}

// Stop provides a mock function with given fields:
func (_m *MockSequentialTaskProcessor) Stop() {
_m.Called()
}

// Submit provides a mock function with given fields: _a0
func (_m *MockSequentialTaskProcessor) Submit(_a0 SequentialTask) error {
ret := _m.Called(_a0)

var r0 error
if rf, ok := ret.Get(0).(func(SequentialTask) error); ok {
r0 = rf(_a0)
} else {
r0 = ret.Error(0)
}

return r0
}
Loading

0 comments on commit 9878a8f

Please sign in to comment.