Skip to content

Commit

Permalink
Host Level Queue Task Processor (cadence-workflow#3084)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Mar 3, 2020
1 parent ccafe5b commit 0381863
Show file tree
Hide file tree
Showing 11 changed files with 626 additions and 1 deletion.
10 changes: 10 additions & 0 deletions common/task/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ type (
TrySubmit(task PriorityTask) (bool, error)
}

// SchedulerType respresents the type of the task scheduler implementation
SchedulerType int

// State represents the current state of a task
State int

Expand Down Expand Up @@ -90,6 +93,13 @@ type (
}
)

const (
// SchedulerTypeFIFO is the scheduler type for FIFO scheduler implementation
SchedulerTypeFIFO SchedulerType = iota + 1
// SchedulerTypeWRR is the scheduler type for weighted round robin scheduler implementation
SchedulerTypeWRR
)

const (
// TaskStatePending is the state for a task when it's waiting to be processed or currently being processed
TaskStatePending State = iota + 1
Expand Down
15 changes: 15 additions & 0 deletions common/task/interface_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions common/task/weightedRoundRobinTaskScheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/uber-go/tally"

"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/log/loggerimpl"
"github.com/uber/cadence/common/metrics"
Expand Down
9 changes: 8 additions & 1 deletion service/history/historyEngineInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,20 @@ type (
task.PriorityTask
queueTaskInfo
GetQueueType() queueType
// TODO: add a method for getting task shardID
GetShardID() int
}

queueTaskExecutor interface {
execute(taskInfo queueTaskInfo, shouldProcessTask bool) error
}

queueTaskProcessor interface {
common.Daemon
StopShardProcessor(int)
Submit(queueTask) error
TrySubmit(queueTask) (bool, error)
}

// TODO: deprecate this interface in favor of the task interface
// defined in common/task package
taskExecutor interface {
Expand Down
14 changes: 14 additions & 0 deletions service/history/historyEngineInterfaces_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions service/history/queueTask.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type (
sync.Mutex
queueTaskInfo

shardID int
state task.State
priority int
attempt int
Expand Down Expand Up @@ -72,6 +73,7 @@ type (
)

func newTimerQueueTask(
shardID int,
taskInfo queueTaskInfo,
scope metrics.Scope,
logger log.Logger,
Expand All @@ -83,6 +85,7 @@ func newTimerQueueTask(
) queueTask {
return &timerQueueTask{
queueTaskBase: newQueueTaskBase(
shardID,
taskInfo,
scope,
logger,
Expand All @@ -96,6 +99,7 @@ func newTimerQueueTask(
}

func newTransferQueueTask(
shardID int,
taskInfo queueTaskInfo,
scope metrics.Scope,
logger log.Logger,
Expand All @@ -107,6 +111,7 @@ func newTransferQueueTask(
) queueTask {
return &transferQueueTask{
queueTaskBase: newQueueTaskBase(
shardID,
taskInfo,
scope,
logger,
Expand All @@ -120,6 +125,7 @@ func newTransferQueueTask(
}

func newQueueTaskBase(
shardID int,
queueTaskInfo queueTaskInfo,
scope metrics.Scope,
logger log.Logger,
Expand All @@ -130,6 +136,7 @@ func newQueueTaskBase(
) *queueTaskBase {
return &queueTaskBase{
queueTaskInfo: queueTaskInfo,
shardID: shardID,
state: task.TaskStatePending,
scope: scope,
logger: logger,
Expand Down Expand Up @@ -295,3 +302,7 @@ func (t *queueTaskBase) SetPriority(

t.priority = priority
}

func (t *queueTaskBase) GetShardID() int {
return t.shardID
}
Loading

0 comments on commit 0381863

Please sign in to comment.