Skip to content

Commit

Permalink
Add X-Cluster Child Workflow Completion Tasks (cadence-workflow#4336)
Browse files Browse the repository at this point in the history
  • Loading branch information
demirkayaender authored Aug 18, 2021
1 parent 4781a8d commit 38881a8
Show file tree
Hide file tree
Showing 20 changed files with 1,477 additions and 485 deletions.
165 changes: 86 additions & 79 deletions .gen/proto/admin/v1/service.pb.yarpc.go

Large diffs are not rendered by default.

165 changes: 86 additions & 79 deletions .gen/proto/history/v1/service.pb.yarpc.go

Large diffs are not rendered by default.

833 changes: 742 additions & 91 deletions .gen/proto/shared/v1/queue.pb.go

Large diffs are not rendered by default.

165 changes: 86 additions & 79 deletions .gen/proto/shared/v1/queue.pb.yarpc.go

Large diffs are not rendered by default.

229 changes: 116 additions & 113 deletions common/metrics/defs.go

Large diffs are not rendered by default.

55 changes: 55 additions & 0 deletions common/persistence/dataManagerInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ const (
CrossClusterTaskTypeStartChildExecution = iota + 1
CrossClusterTaskTypeCancelExecution
CrossClusterTaskTypeSignalExecution
CrossClusterTaskTypeRecordChildWorkflowExeuctionComplete
)

// Types of replication tasks
Expand Down Expand Up @@ -574,6 +575,13 @@ type (
Version int64
}

// RecordChildCompletionTask identifies a task completing a child execution
RecordWorkflowExecutionCompleteTask struct {
VisibilityTimestamp time.Time
TaskID int64
Version int64
}

// CrossClusterStartChildExecutionTask is the cross-cluster version of StartChildExecutionTask
CrossClusterStartChildExecutionTask struct {
StartChildExecutionTask
Expand All @@ -595,6 +603,13 @@ type (
TargetCluster string
}

// CrossClusterRecordChildWorkflowExecutionCompleteTask is the cross-cluster version of RecordChildCompletionTask
CrossClusterRecordChildWorkflowExecutionCompleteTask struct {
RecordWorkflowExecutionCompleteTask

TargetCluster string
}

// ActivityTimeoutTask identifies a timeout task.
ActivityTimeoutTask struct {
VisibilityTimestamp time.Time
Expand Down Expand Up @@ -2276,6 +2291,41 @@ func (u *SignalExecutionTask) SetVisibilityTimestamp(timestamp time.Time) {
u.VisibilityTimestamp = timestamp
}

// GetType returns the type of the signal transfer task
func (u *RecordWorkflowExecutionCompleteTask) GetType() int {
return TransferTaskTypeCloseExecution
}

// GetVersion returns the version of the signal transfer task
func (u *RecordWorkflowExecutionCompleteTask) GetVersion() int64 {
return u.Version
}

// SetVersion returns the version of the signal transfer task
func (u *RecordWorkflowExecutionCompleteTask) SetVersion(version int64) {
u.Version = version
}

// GetTaskID returns the sequence ID of the signal transfer task.
func (u *RecordWorkflowExecutionCompleteTask) GetTaskID() int64 {
return u.TaskID
}

// SetTaskID sets the sequence ID of the signal transfer task.
func (u *RecordWorkflowExecutionCompleteTask) SetTaskID(id int64) {
u.TaskID = id
}

// GetVisibilityTimestamp get the visibility timestamp
func (u *RecordWorkflowExecutionCompleteTask) GetVisibilityTimestamp() time.Time {
return u.VisibilityTimestamp
}

// SetVisibilityTimestamp set the visibility timestamp
func (u *RecordWorkflowExecutionCompleteTask) SetVisibilityTimestamp(timestamp time.Time) {
u.VisibilityTimestamp = timestamp
}

// GetType returns the type of the upsert search attributes transfer task
func (u *UpsertWorkflowSearchAttributesTask) GetType() int {
return TransferTaskTypeUpsertWorkflowSearchAttributes
Expand Down Expand Up @@ -2361,6 +2411,11 @@ func (c *CrossClusterSignalExecutionTask) GetType() int {
return CrossClusterTaskTypeSignalExecution
}

// GetType returns of type of the cross-cluster record child workflow completion task
func (c *CrossClusterRecordChildWorkflowExecutionCompleteTask) GetType() int {
return CrossClusterTaskTypeRecordChildWorkflowExeuctionComplete
}

// GetType returns the type of the history replication task
func (a *HistoryReplicationTask) GetType() int {
return ReplicationTaskTypeHistory
Expand Down
3 changes: 3 additions & 0 deletions common/persistence/nosql/nosqlExecutionStoreUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,9 @@ func (d *nosqlExecutionStore) prepareCrossClusterTasksForWorkflowTxn(
targetChildWorkflowOnly = task.(*p.CrossClusterSignalExecutionTask).TargetChildWorkflowOnly
scheduleID = task.(*p.CrossClusterSignalExecutionTask).InitiatedID

case p.CrossClusterTaskTypeRecordChildWorkflowExeuctionComplete:
targetCluster = task.(*p.CrossClusterRecordChildWorkflowExecutionCompleteTask).TargetCluster

default:
return nil, &types.InternalServiceError{
Message: fmt.Sprintf("Unknown cross-cluster task type: %v", task.GetType()),
Expand Down
3 changes: 3 additions & 0 deletions common/persistence/sql/sqlExecutionStoreUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,9 @@ func createCrossClusterTasks(
info.TargetChildWorkflowOnly = task.(*p.CrossClusterSignalExecutionTask).TargetChildWorkflowOnly
info.ScheduleID = task.(*p.CrossClusterSignalExecutionTask).InitiatedID

case p.CrossClusterTaskTypeRecordChildWorkflowExeuctionComplete:
crossClusterTasksRows[i].TargetCluster = task.(*p.CrossClusterRecordChildWorkflowExecutionCompleteTask).TargetCluster

default:
return &types.InternalServiceError{
Message: fmt.Sprintf("Unknown cross-cluster task type: %v", task.GetType()),
Expand Down
43 changes: 43 additions & 0 deletions common/types/mapper/proto/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -1360,6 +1360,49 @@ func ToCrossClusterSignalExecutionResponseAttributes(t *sharedv1.CrossClusterSig
return &types.CrossClusterSignalExecutionResponseAttributes{}
}

// FromCrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes converts internal CrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes type to proto
func FromCrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes(t *types.CrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes) *sharedv1.CrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes {
if t == nil {
return nil
}
return &sharedv1.CrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes{
TargetDomainId: t.TargetDomainID,
TargetWorkflowExecution: FromWorkflowRunPair(t.TargetWorkflowID, t.TargetRunID),
InitiatedEventId: t.InitiatedEventID,
CompletionEvent: FromHistoryEvent(t.CompletionEvent),
}
}

// ToCrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes converts proto CrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes type to internal
func ToCrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes(t *sharedv1.CrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes) *types.CrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes {
if t == nil {
return nil
}
return &types.CrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes{
TargetDomainID: t.TargetDomainId,
TargetWorkflowID: ToWorkflowID(t.TargetWorkflowExecution),
TargetRunID: ToRunID(t.TargetWorkflowExecution),
InitiatedEventID: t.InitiatedEventId,
CompletionEvent: ToHistoryEvent(t.CompletionEvent),
}
}

// FromCrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes converts internal CrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes type to proto
func FromCrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes(t *types.CrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes) *sharedv1.CrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes {
if t == nil {
return nil
}
return &sharedv1.CrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes{}
}

// ToCrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes converts proto CrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes type to internal
func ToCrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes(t *sharedv1.CrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes) *types.CrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes {
if t == nil {
return nil
}
return &types.CrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes{}
}

// FromCrossClusterTaskRequest converts internal CrossClusterTaskRequest type to proto
func FromCrossClusterTaskRequest(t *types.CrossClusterTaskRequest) *sharedv1.CrossClusterTaskRequest {
if t == nil {
Expand Down
82 changes: 72 additions & 10 deletions common/types/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -10652,12 +10652,57 @@ func (v *CrossClusterSignalExecutionRequestAttributes) GetControl() (o []byte) {
type CrossClusterSignalExecutionResponseAttributes struct {
}

type CrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes struct {
TargetDomainID string `json:"targetDomainID,omitempty"`
TargetWorkflowID string `json:"targetWorkflowID,omitempty"`
TargetRunID string `json:"targetRunID,omitempty"`
InitiatedEventID int64 `json:"initiatedEventID,omitempty"`
CompletionEvent *HistoryEvent `json:"completionEvent,omitempty"`
}

// GetTargetDomainID is an internal getter (TBD...)
func (v *CrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes) GetTargetDomainID() (o string) {
if v != nil {
return v.TargetDomainID
}
return
}

// GetTargetWorkflowID is an internal getter (TBD...)
func (v *CrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes) GetTargetWorkflowID() (o string) {
if v != nil {
return v.TargetWorkflowID
}
return
}

// GetTargetRunID is an internal getter (TBD...)
func (v *CrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes) GetTargetRunID() (o string) {
if v != nil {
return v.TargetRunID
}
return
}

// GetInitiatedEventID is an internal getter (TBD...)
func (v *CrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes) GetInitiatedEventID() (o int64) {
if v != nil {
return v.InitiatedEventID
}
return
}

// CrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes is an internal type (TBD...)
type CrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes struct {
}

// CrossClusterTaskRequest is an internal type (TBD...)
type CrossClusterTaskRequest struct {
TaskInfo *CrossClusterTaskInfo `json:"taskInfo,omitempty"`
StartChildExecutionAttributes *CrossClusterStartChildExecutionRequestAttributes `json:"startChildExecutionAttributes,omitempty"`
CancelExecutionAttributes *CrossClusterCancelExecutionRequestAttributes `json:"cancelExecutionAttributes,omitempty"`
SignalExecutionAttributes *CrossClusterSignalExecutionRequestAttributes `json:"signalExecutionAttributes,omitempty"`
TaskInfo *CrossClusterTaskInfo `json:"taskInfo,omitempty"`
StartChildExecutionAttributes *CrossClusterStartChildExecutionRequestAttributes `json:"startChildExecutionAttributes,omitempty"`
CancelExecutionAttributes *CrossClusterCancelExecutionRequestAttributes `json:"cancelExecutionAttributes,omitempty"`
SignalExecutionAttributes *CrossClusterSignalExecutionRequestAttributes `json:"signalExecutionAttributes,omitempty"`
RecordChildWorkflowExecutionCompleteAttributes *CrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes `json:"RecordChildWorkflowExecutionCompleteAttributes,omitempty"`
}

// GetTaskInfo is an internal getter (TBD...)
Expand Down Expand Up @@ -10692,14 +10737,23 @@ func (v *CrossClusterTaskRequest) GetSignalExecutionAttributes() (o *CrossCluste
return
}

// GetSignalExecutionAttributes is an internal getter (TBD...)
func (v *CrossClusterTaskRequest) GetRecordChildWorkflowExecutionCompleteAttributes() (o *CrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes) {
if v != nil && v.RecordChildWorkflowExecutionCompleteAttributes != nil {
return v.RecordChildWorkflowExecutionCompleteAttributes
}
return
}

// CrossClusterTaskResponse is an internal type (TBD...)
type CrossClusterTaskResponse struct {
TaskID int64 `json:"taskID,omitempty"`
TaskType *CrossClusterTaskType `json:"taskType,omitempty"`
FailedCause *CrossClusterTaskFailedCause `json:"failedCause,omitempty"`
StartChildExecutionAttributes *CrossClusterStartChildExecutionResponseAttributes `json:"startChildExecutionAttributes,omitempty"`
CancelExecutionAttributes *CrossClusterCancelExecutionResponseAttributes `json:"cancelExecutionAttributes,omitempty"`
SignalExecutionAttributes *CrossClusterSignalExecutionResponseAttributes `json:"signalExecutionAttributes,omitempty"`
TaskID int64 `json:"taskID,omitempty"`
TaskType *CrossClusterTaskType `json:"taskType,omitempty"`
FailedCause *CrossClusterTaskFailedCause `json:"failedCause,omitempty"`
StartChildExecutionAttributes *CrossClusterStartChildExecutionResponseAttributes `json:"startChildExecutionAttributes,omitempty"`
CancelExecutionAttributes *CrossClusterCancelExecutionResponseAttributes `json:"cancelExecutionAttributes,omitempty"`
SignalExecutionAttributes *CrossClusterSignalExecutionResponseAttributes `json:"signalExecutionAttributes,omitempty"`
RecordChildWorkflowExecutionCompleteAttributes *CrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes `json:"RecordChildWorkflowExecutionCompleteAttributes,omitempty"`
}

// GetTaskID is an internal getter (TBD...)
Expand Down Expand Up @@ -10750,6 +10804,14 @@ func (v *CrossClusterTaskResponse) GetSignalExecutionAttributes() (o *CrossClust
return
}

// GetSignalExecutionAttributes is an internal getter (TBD...)
func (v *CrossClusterTaskResponse) GetRecordChildWorkflowExecutionCompleteAttributes() (o *CrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes) {
if v != nil && v.RecordChildWorkflowExecutionCompleteAttributes != nil {
return v.RecordChildWorkflowExecutionCompleteAttributes
}
return
}

// GetCrossClusterTasksRequest is an internal type (TBD...)
type GetCrossClusterTasksRequest struct {
ShardIDs []int32 `json:"shardIDs,omitempty"`
Expand Down
14 changes: 13 additions & 1 deletion proto/internal/uber/cadence/shared/v1/queue.proto
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ message CrossClusterStartChildExecutionRequestAttributes {
string request_id = 2;
int64 initiated_event_id = 3;
api.v1.StartChildWorkflowExecutionInitiatedEventAttributes initiated_event_attributes = 4;
// targetRunID is for scheduling first decision task
// targetRunID is for scheduling first decision task
// targetWorkflowID is available in initiatedEventAttributes
string target_run_id = 5;
}
Expand Down Expand Up @@ -109,12 +109,23 @@ message CrossClusterSignalExecutionRequestAttributes {
message CrossClusterSignalExecutionResponseAttributes {
}

message CrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes {
string target_domain_id = 1;
api.v1.WorkflowExecution target_workflow_execution = 2;
int64 initiated_event_id = 3;
api.v1.HistoryEvent completion_event = 4;
}

message CrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes {
}

message CrossClusterTaskRequest {
CrossClusterTaskInfo task_info = 1;
oneof attributes {
CrossClusterStartChildExecutionRequestAttributes startChildExecutionAttributes = 2;
CrossClusterCancelExecutionRequestAttributes cancelExecutionAttributes = 3;
CrossClusterSignalExecutionRequestAttributes signalExecutionAttributes = 4;
CrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes recordChildWorkflowExecutionCompleteRequestAttributes = 5;
}
}

Expand All @@ -126,6 +137,7 @@ message CrossClusterTaskResponse {
CrossClusterStartChildExecutionResponseAttributes startChildExecutionAttributes = 4;
CrossClusterCancelExecutionResponseAttributes cancelExecutionAttributes = 5;
CrossClusterSignalExecutionResponseAttributes signalExecutionAttributes = 6;
CrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes recordChildWorkflowExecutionCompleteRequestAttributes = 7;
}
}

Expand Down
1 change: 1 addition & 0 deletions service/history/execution/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ type (
IsSignalRequested(requestID string) bool
IsStickyTaskListEnabled() bool
IsWorkflowExecutionRunning() bool
IsWorkflowCompleted() bool
IsResourceDuplicated(resourceDedupKey definition.DeduplicationID) bool
UpdateDuplicatedResource(resourceDedupKey definition.DeduplicationID)
Load(*persistence.WorkflowMutableState)
Expand Down
4 changes: 4 additions & 0 deletions service/history/execution/mutable_state_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1589,6 +1589,10 @@ func (e *mutableStateBuilder) IsWorkflowExecutionRunning() bool {
}
}

func (e *mutableStateBuilder) IsWorkflowCompleted() bool {
return e.executionInfo.State == persistence.WorkflowStateCompleted
}

func (e *mutableStateBuilder) IsCancelRequested() (bool, string) {
if e.executionInfo.CancelRequested {
return e.executionInfo.CancelRequested, e.executionInfo.CancelRequestID
Expand Down
7 changes: 7 additions & 0 deletions service/history/execution/mutable_state_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion service/history/execution/mutable_state_task_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,14 @@ func (r *mutableStateTaskGeneratorImpl) GenerateCrossClusterTaskFromTransferTask
Version: task.Version,
},
}
// TODO: add the case for TransferTaskTypeCloseExecution
case persistence.TransferTaskTypeCloseExecution:
crossClusterTask = &persistence.CrossClusterRecordChildWorkflowExecutionCompleteTask{
TargetCluster: targetCluster,
RecordWorkflowExecutionCompleteTask: persistence.RecordWorkflowExecutionCompleteTask{
// TaskID is set by shard context
Version: task.Version,
},
}
default:
return fmt.Errorf("unable to convert transfer task of type %v to cross-cluster task", task.TaskType)
}
Expand Down
Loading

0 comments on commit 38881a8

Please sign in to comment.