Skip to content

Commit

Permalink
Add host level task worker pool (cadence-workflow#3331)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Jun 12, 2020
1 parent efb2e7c commit c1cf2ca
Show file tree
Hide file tree
Showing 13 changed files with 325 additions and 228 deletions.
13 changes: 11 additions & 2 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,10 @@ var keys = map[Key]string{
TaskProcessRPS: "history.taskProcessRPS",
TaskSchedulerType: "history.taskSchedulerType",
TaskSchedulerWorkerCount: "history.taskSchedulerWorkerCount",
TaskSchedulerShardWorkerCount: "history.taskSchedulerShardWorkerCount",
TaskSchedulerQueueSize: "history.taskSchedulerQueueSize",
TaskSchedulerShardQueueSize: "history.taskSchedulerShardQueueSize",
TaskSchedulerDispatcherCount: "history.taskSchedulerDispatcherCount",
TaskSchedulerRoundRobinWeights: "history.taskSchedulerRoundRobinWeight",
QueueProcessorEnableSplit: "history.queueProcessorEnableSplit",
QueueProcessorSplitMaxLevel: "history.queueProcessorSplitMaxLevel",
Expand Down Expand Up @@ -542,10 +545,16 @@ const (
TaskProcessRPS
// TaskSchedulerType is the task scheduler type for priority task processor
TaskSchedulerType
// TaskSchedulerWorkerCount is the number of workers per shard in task scheduler
// TaskSchedulerWorkerCount is the number of workers per host in task scheduler
TaskSchedulerWorkerCount
// TaskSchedulerQueueSize is the size of task channel size in task scheduler
// TaskSchedulerShardWorkerCount is the number of worker per shard in task scheduler
TaskSchedulerShardWorkerCount
// TaskSchedulerQueueSize is the size of task channel for host level task scheduler
TaskSchedulerQueueSize
// TaskSchedulerShardQueueSize is the size of task channel for shard level task scheduler
TaskSchedulerShardQueueSize
// TaskSchedulerDispatcherCount is the number of task dispatcher in task scheduler (only applies to host level task scheduler)
TaskSchedulerDispatcherCount
// TaskSchedulerRoundRobinWeights is the priority weight for weighted round robin task scheduler
TaskSchedulerRoundRobinWeights
// QueueProcessorEnableSplit indicates whether processing queue split policy should be enabled
Expand Down
15 changes: 10 additions & 5 deletions common/task/fifoTaskScheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,17 @@ import (
type (
// FIFOTaskSchedulerOptions configs FIFO task scheduler
FIFOTaskSchedulerOptions struct {
QueueSize int
WorkerCount int
RetryPolicy backoff.RetryPolicy
QueueSize int
WorkerCount int
DispatcherCount int
RetryPolicy backoff.RetryPolicy
}

fifoTaskSchedulerImpl struct {
status int32
logger log.Logger
metricsScope metrics.Scope
options *FIFOTaskSchedulerOptions
dispatcherWG sync.WaitGroup
taskCh chan PriorityTask
shutdownCh chan struct{}
Expand All @@ -65,6 +67,7 @@ func NewFIFOTaskScheduler(
status: common.DaemonStatusInitialized,
logger: logger,
metricsScope: metricsClient.Scope(metrics.TaskSchedulerScope),
options: options,
taskCh: make(chan PriorityTask, options.QueueSize),
shutdownCh: make(chan struct{}),
processor: NewParallelTaskProcessor(
Expand All @@ -86,8 +89,10 @@ func (f *fifoTaskSchedulerImpl) Start() {

f.processor.Start()

f.dispatcherWG.Add(1)
go f.dispatcher()
f.dispatcherWG.Add(f.options.DispatcherCount)
for i := 0; i != f.options.DispatcherCount; i++ {
go f.dispatcher()
}

f.logger.Info("FIFO task scheduler started.")
}
Expand Down
9 changes: 5 additions & 4 deletions common/task/fifoTaskScheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"testing"
"time"

gomock "github.com/golang/mock/gomock"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/uber-go/tally"
Expand Down Expand Up @@ -65,9 +65,10 @@ func (s *fifoTaskSchedulerSuite) SetupTest() {
loggerimpl.NewDevelopmentForTest(s.Suite),
metrics.NewClient(tally.NoopScope, metrics.Common),
&FIFOTaskSchedulerOptions{
QueueSize: s.queueSize,
WorkerCount: 1,
RetryPolicy: backoff.NewExponentialRetryPolicy(time.Millisecond),
QueueSize: s.queueSize,
WorkerCount: 1,
DispatcherCount: 1,
RetryPolicy: backoff.NewExponentialRetryPolicy(time.Millisecond),
},
).(*fifoTaskSchedulerImpl)
}
Expand Down
15 changes: 9 additions & 6 deletions common/task/weightedRoundRobinTaskScheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ import (
type (
// WeightedRoundRobinTaskSchedulerOptions configs WRR task scheduler
WeightedRoundRobinTaskSchedulerOptions struct {
Weights dynamicconfig.MapPropertyFn
QueueSize int
WorkerCount int
RetryPolicy backoff.RetryPolicy
Weights dynamicconfig.MapPropertyFn
QueueSize int
WorkerCount int
DispatcherCount int
RetryPolicy backoff.RetryPolicy
}

weightedRoundRobinTaskSchedulerImpl struct {
Expand Down Expand Up @@ -116,8 +117,10 @@ func (w *weightedRoundRobinTaskSchedulerImpl) Start() {

w.processor.Start()

w.dispatcherWG.Add(1)
go w.dispatcher()
w.dispatcherWG.Add(w.options.DispatcherCount)
for i := 0; i != w.options.DispatcherCount; i++ {
go w.dispatcher()
}
go w.updateWeights()

w.logger.Info("Weighted round robin task scheduler started.")
Expand Down
56 changes: 47 additions & 9 deletions common/task/weightedRoundRobinTaskScheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ package task
import (
"errors"
"fmt"
"math/rand"
"sync"
"testing"
"time"

gomock "github.com/golang/mock/gomock"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/uber-go/tally"
Expand Down Expand Up @@ -81,10 +82,11 @@ func (s *weightedRoundRobinTaskSchedulerSuite) SetupTest() {
s.queueSize = 10
s.scheduler = s.newTestWeightedRoundRobinTaskScheduler(
&WeightedRoundRobinTaskSchedulerOptions{
Weights: testSchedulerWeights,
QueueSize: s.queueSize,
WorkerCount: 1,
RetryPolicy: backoff.NewExponentialRetryPolicy(time.Millisecond),
Weights: testSchedulerWeights,
QueueSize: s.queueSize,
WorkerCount: 1,
DispatcherCount: 3,
RetryPolicy: backoff.NewExponentialRetryPolicy(time.Millisecond),
},
)
}
Expand Down Expand Up @@ -112,10 +114,11 @@ func (s *weightedRoundRobinTaskSchedulerSuite) TestSubmit_Fail_SchedulerShutDown
// create a new scheduler here with queue size 0, otherwise test is non-deterministic
scheduler := s.newTestWeightedRoundRobinTaskScheduler(
&WeightedRoundRobinTaskSchedulerOptions{
Weights: testSchedulerWeights,
QueueSize: 0,
WorkerCount: 1,
RetryPolicy: backoff.NewExponentialRetryPolicy(time.Millisecond),
Weights: testSchedulerWeights,
QueueSize: 0,
WorkerCount: 1,
DispatcherCount: 3,
RetryPolicy: backoff.NewExponentialRetryPolicy(time.Millisecond),
},
)

Expand Down Expand Up @@ -244,6 +247,41 @@ func (s *weightedRoundRobinTaskSchedulerSuite) TestDispatcher_FailToSubmit() {
<-doneCh
}

func (s *weightedRoundRobinTaskSchedulerSuite) TestWRR() {
numTasks := 1000
var taskWG sync.WaitGroup

s.mockProcessor.EXPECT().Start()
s.mockProcessor.EXPECT().Stop()

tasks := []PriorityTask{}
mockFn := func(_ Task) error {
taskWG.Done()
return nil
}
for i := 0; i != numTasks; i++ {
mockTask := NewMockPriorityTask(s.controller)
mockTask.EXPECT().Priority().Return(rand.Intn(len(testSchedulerWeights()))).Times(1)
tasks = append(tasks, mockTask)
taskWG.Add(1)
s.mockProcessor.EXPECT().Submit(newMockPriorityTaskMatcher(mockTask)).DoAndReturn(mockFn)
}

s.scheduler.processor = s.mockProcessor
s.scheduler.Start()
for _, task := range tasks {
if rand.Intn(2) == 0 {
s.NoError(s.scheduler.Submit(task))
} else {
submitted, err := s.scheduler.TrySubmit(task)
s.NoError(err)
s.True(submitted)
}
}
taskWG.Wait()
s.scheduler.Stop()
}

func (s *weightedRoundRobinTaskSchedulerSuite) newTestWeightedRoundRobinTaskScheduler(
options *WeightedRoundRobinTaskSchedulerOptions,
) *weightedRoundRobinTaskSchedulerImpl {
Expand Down
10 changes: 8 additions & 2 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ type Config struct {
EnablePriorityTaskProcessor dynamicconfig.BoolPropertyFn
TaskSchedulerType dynamicconfig.IntPropertyFn
TaskSchedulerWorkerCount dynamicconfig.IntPropertyFn
TaskSchedulerShardWorkerCount dynamicconfig.IntPropertyFn
TaskSchedulerQueueSize dynamicconfig.IntPropertyFn
TaskSchedulerShardQueueSize dynamicconfig.IntPropertyFn
TaskSchedulerDispatcherCount dynamicconfig.IntPropertyFn
TaskSchedulerRoundRobinWeights dynamicconfig.MapPropertyFn

// QueueProcessor split policy settings
Expand Down Expand Up @@ -311,8 +314,11 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, storeType string, isA
TaskProcessRPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.TaskProcessRPS, 1000),
EnablePriorityTaskProcessor: dc.GetBoolProperty(dynamicconfig.EnablePriorityTaskProcessor, false),
TaskSchedulerType: dc.GetIntProperty(dynamicconfig.TaskSchedulerType, int(task.SchedulerTypeWRR)),
TaskSchedulerWorkerCount: dc.GetIntProperty(dynamicconfig.TaskSchedulerWorkerCount, 20),
TaskSchedulerQueueSize: dc.GetIntProperty(dynamicconfig.TaskSchedulerQueueSize, 2000),
TaskSchedulerWorkerCount: dc.GetIntProperty(dynamicconfig.TaskSchedulerWorkerCount, 400),
TaskSchedulerShardWorkerCount: dc.GetIntProperty(dynamicconfig.TaskSchedulerShardWorkerCount, 2),
TaskSchedulerQueueSize: dc.GetIntProperty(dynamicconfig.TaskSchedulerQueueSize, 10000),
TaskSchedulerShardQueueSize: dc.GetIntProperty(dynamicconfig.TaskSchedulerShardQueueSize, 200),
TaskSchedulerDispatcherCount: dc.GetIntProperty(dynamicconfig.TaskSchedulerDispatcherCount, 10),
TaskSchedulerRoundRobinWeights: dc.GetMapProperty(dynamicconfig.TaskSchedulerRoundRobinWeights, common.ConvertIntMapToDynamicConfigMapProperty(DefaultTaskPriorityWeight)),

QueueProcessorEnableSplit: dc.GetBoolProperty(dynamicconfig.QueueProcessorEnableSplit, false),
Expand Down
24 changes: 1 addition & 23 deletions service/history/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import (
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/quotas"
t "github.com/uber/cadence/common/task"
"github.com/uber/cadence/service/history/config"
"github.com/uber/cadence/service/history/engine"
"github.com/uber/cadence/service/history/events"
Expand Down Expand Up @@ -146,30 +145,9 @@ func (h *Handler) Start() {
h.config,
)

schedulerType := t.SchedulerType(h.config.TaskSchedulerType())
processorOptions := &task.ProcessorOptions{
SchedulerType: schedulerType,
}
switch schedulerType {
case t.SchedulerTypeFIFO:
processorOptions.FifoSchedulerOptions = &t.FIFOTaskSchedulerOptions{
QueueSize: h.config.TaskSchedulerQueueSize(),
WorkerCount: h.config.TaskSchedulerWorkerCount(),
RetryPolicy: common.CreateTaskProcessingRetryPolicy(),
}
case t.SchedulerTypeWRR:
processorOptions.WRRSchedulerOptions = &t.WeightedRoundRobinTaskSchedulerOptions{
Weights: h.config.TaskSchedulerRoundRobinWeights,
QueueSize: h.config.TaskSchedulerQueueSize(),
WorkerCount: h.config.TaskSchedulerWorkerCount(),
RetryPolicy: common.CreateTaskProcessingRetryPolicy(),
}
default:
h.GetLogger().Fatal("Unknown task scheduler type", tag.Value(schedulerType))
}
h.queueTaskProcessor, err = task.NewProcessor(
taskPriorityAssigner,
processorOptions,
h.config,
h.GetLogger(),
h.GetMetricsClient(),
)
Expand Down
18 changes: 11 additions & 7 deletions service/history/queue/timer_queue_processor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,13 +336,9 @@ func (t *timerQueueProcessorBase) processBatch() {
task := t.taskInitializer(taskInfo)
tasks[newTimerTaskKey(taskInfo.GetVisibilityTimestamp(), taskInfo.GetTaskID())] = task
if submitted := t.submitTask(task); !submitted {
// not submitted since processor has been shutdown
return
}
select {
case <-t.shutdownCh:
return
default:
}
}

var newReadLevel task.Key
Expand Down Expand Up @@ -578,9 +574,17 @@ func (t *timerQueueProcessorBase) submitTask(
) bool {
submitted, err := t.taskProcessor.TrySubmit(task)
if err != nil {
return false
select {
case <-t.shutdownCh:
// if error is due to shard shutdown
return false
default:
// otherwise it might be error from domain cache etc, add
// the task to redispatch queue so that it can be retried
t.logger.Error("Failed to submit task", tag.Error(err))
}
}
if !submitted {
if err != nil || !submitted {
t.redispatchQueue.Add(task)
}

Expand Down
17 changes: 10 additions & 7 deletions service/history/queue/transfer_queue_processor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,11 +311,6 @@ func (t *transferQueueProcessorBase) processBatch() {
// not submitted since processor has been shutdown
return
}
select {
case <-t.shutdownCh:
return
default:
}
}

var newReadLevel task.Key
Expand Down Expand Up @@ -457,9 +452,17 @@ func (t *transferQueueProcessorBase) submitTask(
) bool {
submitted, err := t.taskProcessor.TrySubmit(task)
if err != nil {
return false
select {
case <-t.shutdownCh:
// if error is due to shard shutdown
return false
default:
// otherwise it might be error from domain cache etc, add
// the task to redispatch queue so that it can be retried
t.logger.Error("Failed to submit task", tag.Error(err))
}
}
if !submitted {
if err != nil || !submitted {
t.redispatchQueue.Add(task)
}

Expand Down
17 changes: 10 additions & 7 deletions service/history/queueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,11 +279,6 @@ func (p *queueProcessorBase) processBatch() {
// not submitted since processor has been shutdown
return
}
select {
case <-p.shutdownCh:
return
default:
}
}

if more {
Expand Down Expand Up @@ -311,9 +306,17 @@ func (p *queueProcessorBase) submitTask(
queueTask := p.queueTaskInitializer(taskInfo)
submitted, err := p.queueTaskProcessor.TrySubmit(queueTask)
if err != nil {
return false
select {
case <-p.shutdownCh:
// if error is due to shard shutdown
return false
default:
// otherwise it might be error from domain cache etc, add
// the task to redispatch queue so that it can be retried
p.logger.Error("Failed to submit task", tag.Error(err))
}
}
if !submitted {
if err != nil || !submitted {
p.redispatchQueue.Add(queueTask)
}

Expand Down
Loading

0 comments on commit c1cf2ca

Please sign in to comment.