Skip to content

Commit

Permalink
Target cluster cross cluster task processor (cadence-workflow#4292)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Jul 6, 2021
1 parent a24af63 commit 4384e4c
Show file tree
Hide file tree
Showing 7 changed files with 944 additions and 51 deletions.
59 changes: 30 additions & 29 deletions common/log/tag/values.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,35 +97,36 @@ var (

// Pre-defined values for TagSysComponent
var (
ComponentTaskList = component("tasklist")
ComponentHistoryEngine = component("history-engine")
ComponentHistoryCache = component("history-cache")
ComponentDecisionHandler = component("decision-handler")
ComponentEventsCache = component("events-cache")
ComponentTransferQueue = component("transfer-queue-processor")
ComponentTimerQueue = component("timer-queue-processor")
ComponentTimerBuilder = component("timer-builder")
ComponentReplicatorQueue = component("replicator-queue-processor")
ComponentShardController = component("shard-controller")
ComponentShard = component("shard")
ComponentShardItem = component("shard-item")
ComponentShardEngine = component("shard-engine")
ComponentMatchingEngine = component("matching-engine")
ComponentReplicator = component("replicator")
ComponentReplicationTaskProcessor = component("replication-task-processor")
ComponentReplicationAckManager = component("replication-ack-manager")
ComponentHistoryReplicator = component("history-replicator")
ComponentHistoryResender = component("history-resender")
ComponentIndexer = component("indexer")
ComponentIndexerProcessor = component("indexer-processor")
ComponentIndexerESProcessor = component("indexer-es-processor")
ComponentESVisibilityManager = component("es-visibility-manager")
ComponentArchiver = component("archiver")
ComponentBatcher = component("batcher")
ComponentWorker = component("worker")
ComponentServiceResolver = component("service-resolver")
ComponentFailoverCoordinator = component("failover-coordinator")
ComponentFailoverMarkerNotifier = component("failover-marker-notifier")
ComponentTaskList = component("tasklist")
ComponentHistoryEngine = component("history-engine")
ComponentHistoryCache = component("history-cache")
ComponentDecisionHandler = component("decision-handler")
ComponentEventsCache = component("events-cache")
ComponentTransferQueue = component("transfer-queue-processor")
ComponentTimerQueue = component("timer-queue-processor")
ComponentTimerBuilder = component("timer-builder")
ComponentReplicatorQueue = component("replicator-queue-processor")
ComponentShardController = component("shard-controller")
ComponentShard = component("shard")
ComponentShardItem = component("shard-item")
ComponentShardEngine = component("shard-engine")
ComponentMatchingEngine = component("matching-engine")
ComponentReplicator = component("replicator")
ComponentReplicationTaskProcessor = component("replication-task-processor")
ComponentReplicationAckManager = component("replication-ack-manager")
ComponentHistoryReplicator = component("history-replicator")
ComponentHistoryResender = component("history-resender")
ComponentIndexer = component("indexer")
ComponentIndexerProcessor = component("indexer-processor")
ComponentIndexerESProcessor = component("indexer-es-processor")
ComponentESVisibilityManager = component("es-visibility-manager")
ComponentArchiver = component("archiver")
ComponentBatcher = component("batcher")
ComponentWorker = component("worker")
ComponentServiceResolver = component("service-resolver")
ComponentFailoverCoordinator = component("failover-coordinator")
ComponentFailoverMarkerNotifier = component("failover-marker-notifier")
ComponentCrossClusterTaskProcessor = component("cross-cluster-task-processor")
)

// Pre-defined values for TagSysLifecycle
Expand Down
5 changes: 4 additions & 1 deletion common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -990,8 +990,10 @@ const (
TimerStandbyTaskDeleteHistoryEventScope
// TimerStandbyTaskWorkflowBackoffTimerScope is the scope used by metric emitted by timer queue processor for processing retry task.
TimerStandbyTaskWorkflowBackoffTimerScope
// CrossClusterQueueProcessorScope is the scope used by all metric emitted by cross cluster queue processor
// CrossClusterQueueProcessorScope is the scope used by all metric emitted by cross cluster queue processor in the source cluster
CrossClusterQueueProcessorScope
// CrossClusterTaskProcessorScope is the scope used by all metric emitted by cross cluster task processor in the target cluster
CrossClusterTaskProcessorScope
// CrossClusterTaskStartChildExecutionScope is the scope used by metric emitted by cross cluster queue processor for processing start child workflow task.
CrossClusterTaskStartChildExecutionScope
// CrossClusterTaskCancelExecutionScope is the scope used by metric emitted by cross cluster queue processor for processing cancel workflow task.
Expand Down Expand Up @@ -1566,6 +1568,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
TimerStandbyTaskWorkflowBackoffTimerScope: {operation: "TimerStandbyTaskWorkflowBackoffTimer"},
TimerStandbyTaskDeleteHistoryEventScope: {operation: "TimerStandbyTaskDeleteHistoryEvent"},
CrossClusterQueueProcessorScope: {operation: "CrossClusterQueueProcessor"},
CrossClusterTaskProcessorScope: {operation: "CrossClusterTaskProcessor"},
CrossClusterTaskStartChildExecutionScope: {operation: "CrossClusterTaskStartChildExecution"},
CrossClusterTaskCancelExecutionScope: {operation: "CrossClusterTaskCancelExecution"},
CrossClusterTaskTypeSignalExecutionScope: {operation: "CrossClusterTaskTypeSignalExecution"},
Expand Down
43 changes: 39 additions & 4 deletions service/history/task/cross_cluster_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ import (

"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/future"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
ctask "github.com/uber/cadence/common/task"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/history/shard"
)

Expand Down Expand Up @@ -73,9 +76,35 @@ type (
taskProcessor Processor //TODO: wire up the dependency
redispatchFn func(task Task) //TODO: wire up the dependency
maxRetryCount dynamicconfig.IntPropertyFn
settable future.Settable
}
)

// NewCrossClusterTaskForTargetCluster creates a CrossClusterTask
// for the processing logic at target cluster
// the returned the Future will be unblocked when after the task
// is processed. The future value has type types.CrossClusterTaskResponse
// and there will be not error returned for this future. All errors will
// be recorded by the FailedCause field in the response.
func NewCrossClusterTaskForTargetCluster(
shard shard.Context,
taskRequest *types.CrossClusterTaskRequest,
logger log.Logger,
maxRetryCount dynamicconfig.IntPropertyFn,
) (CrossClusterTask, future.Future) {
// TODO: create CrossClusterTasks based on request
// for now create a dummy CrossClusterTask for testing
future, settable := future.NewFuture()
return &crossClusterSignalWorkflowTask{
crossClusterTaskBase: &crossClusterTaskBase{
Info: &persistence.CrossClusterTaskInfo{
TaskID: taskRequest.TaskInfo.TaskID,
},
settable: settable,
},
}, future
}

// NewCrossClusterSignalWorkflowTask initialize cross cluster signal workflow task and task future
func NewCrossClusterSignalWorkflowTask(
shard shard.Context,
Expand Down Expand Up @@ -169,13 +198,17 @@ func (c *crossClusterSignalWorkflowTask) Execute() error {
}

func (c *crossClusterSignalWorkflowTask) Ack() {
panic("Not implement")

// TODO: rewrite the implementation
// current impl is just for testing purpose
if c.settable != nil {
c.settable.Set(types.CrossClusterTaskResponse{
TaskID: c.Info.GetTaskID(),
}, nil)
}
}

func (c *crossClusterSignalWorkflowTask) Nack() {
panic("Not implement")

}

func (c *crossClusterSignalWorkflowTask) HandleErr(
Expand All @@ -202,7 +235,6 @@ func (c *crossClusterCancelWorkflowTask) Execute() error {

func (c *crossClusterCancelWorkflowTask) Ack() {
panic("Not implement")

}

func (c *crossClusterCancelWorkflowTask) Nack() {
Expand Down Expand Up @@ -301,6 +333,9 @@ func (c *crossClusterTaskBase) GetQueueType() QueueType {
}

func (c *crossClusterTaskBase) IsReadyForPoll() bool {
c.Lock()
defer c.Unlock()

return c.state == ctask.TaskStatePending &&
(c.processingState == processingStateInitialed || c.processingState == processingStateResponseRecorded)
}
Loading

0 comments on commit 4384e4c

Please sign in to comment.