Skip to content

Commit

Permalink
Support ApplyParentClosePolicy Cross Cluster Tasks (cadence-workflow#…
Browse files Browse the repository at this point in the history
  • Loading branch information
demirkayaender authored Aug 31, 2021
1 parent 723149b commit c8f3c1c
Show file tree
Hide file tree
Showing 19 changed files with 2,350 additions and 445 deletions.
715 changes: 693 additions & 22 deletions .gen/go/shared/shared.go

Large diffs are not rendered by default.

184 changes: 98 additions & 86 deletions .gen/proto/admin/v1/service.pb.yarpc.go

Large diffs are not rendered by default.

184 changes: 98 additions & 86 deletions .gen/proto/history/v1/service.pb.yarpc.go

Large diffs are not rendered by default.

1,031 changes: 932 additions & 99 deletions .gen/proto/shared/v1/queue.pb.go

Large diffs are not rendered by default.

184 changes: 98 additions & 86 deletions .gen/proto/shared/v1/queue.pb.yarpc.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1022,6 +1022,8 @@ const (
CrossClusterTaskTypeSignalExecutionScope
// CrossClusterTaskTypeRecordChildWorkflowExeuctionCompleteScope is the scope used by metric emitted by cross cluster queue processor for processing signal workflow task.
CrossClusterTaskTypeRecordChildWorkflowExeuctionCompleteScope
// CrossClusterTaskTypeApplyParentClosePolicyScope is the scope used by metric emitted by cross cluster queue processor for processing applying parent close policy
CrossClusterTaskTypeApplyParentClosePolicyScope
// HistoryEventNotificationScope is the scope used by shard history event notification
HistoryEventNotificationScope
// ReplicatorQueueProcessorScope is the scope used by all metric emitted by replicator queue processor
Expand Down Expand Up @@ -1605,6 +1607,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
CrossClusterTaskCancelExecutionScope: {operation: "CrossClusterTaskCancelExecution"},
CrossClusterTaskTypeSignalExecutionScope: {operation: "CrossClusterTaskTypeSignalExecution"},
CrossClusterTaskTypeRecordChildWorkflowExeuctionCompleteScope: {operation: "CrossClusterTaskTypeRecordChildWorkflowExeuctionComplete"},
CrossClusterTaskTypeApplyParentClosePolicyScope: {operation: "CrossClusterTaskTypeApplyParentClosePolicy"},
HistoryEventNotificationScope: {operation: "HistoryEventNotification"},
ReplicatorQueueProcessorScope: {operation: "ReplicatorQueueProcessor"},
ReplicatorTaskHistoryScope: {operation: "ReplicatorTaskHistory"},
Expand Down
50 changes: 50 additions & 0 deletions common/persistence/dataManagerInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ const (
CrossClusterTaskTypeCancelExecution
CrossClusterTaskTypeSignalExecution
CrossClusterTaskTypeRecordChildWorkflowExeuctionComplete
CrossClusterTaskTypeApplyParentPolicy
)

// Types of replication tasks
Expand Down Expand Up @@ -582,6 +583,13 @@ type (
Version int64
}

// ApplyParentClosePolicyTask identifies a task for applying parent close policy
ApplyParentClosePolicyTask struct {
VisibilityTimestamp time.Time
TaskID int64
Version int64
}

// CrossClusterStartChildExecutionTask is the cross-cluster version of StartChildExecutionTask
CrossClusterStartChildExecutionTask struct {
StartChildExecutionTask
Expand Down Expand Up @@ -610,6 +618,13 @@ type (
TargetCluster string
}

// CrossClusterApplyParentClosePolicyTask is the cross-cluster version of ApplyParentClosePolicyTask
CrossClusterApplyParentClosePolicyTask struct {
ApplyParentClosePolicyTask

TargetCluster string
}

// ActivityTimeoutTask identifies a timeout task.
ActivityTimeoutTask struct {
VisibilityTimestamp time.Time
Expand Down Expand Up @@ -2326,6 +2341,36 @@ func (u *RecordWorkflowExecutionCompleteTask) SetVisibilityTimestamp(timestamp t
u.VisibilityTimestamp = timestamp
}

// GetVersion returns the version of the cancel transfer task
func (u *ApplyParentClosePolicyTask) GetVersion() int64 {
return u.Version
}

// SetVersion returns the version of the cancel transfer task
func (u *ApplyParentClosePolicyTask) SetVersion(version int64) {
u.Version = version
}

// GetTaskID returns the sequence ID of the cancel transfer task.
func (u *ApplyParentClosePolicyTask) GetTaskID() int64 {
return u.TaskID
}

// SetTaskID sets the sequence ID of the cancel transfer task.
func (u *ApplyParentClosePolicyTask) SetTaskID(id int64) {
u.TaskID = id
}

// GetVisibilityTimestamp get the visibility timestamp
func (u *ApplyParentClosePolicyTask) GetVisibilityTimestamp() time.Time {
return u.VisibilityTimestamp
}

// SetVisibilityTimestamp set the visibility timestamp
func (u *ApplyParentClosePolicyTask) SetVisibilityTimestamp(timestamp time.Time) {
u.VisibilityTimestamp = timestamp
}

// GetType returns the type of the upsert search attributes transfer task
func (u *UpsertWorkflowSearchAttributesTask) GetType() int {
return TransferTaskTypeUpsertWorkflowSearchAttributes
Expand Down Expand Up @@ -2416,6 +2461,11 @@ func (c *CrossClusterRecordChildWorkflowExecutionCompleteTask) GetType() int {
return CrossClusterTaskTypeRecordChildWorkflowExeuctionComplete
}

// GetType returns of type of the cross-cluster cancel task
func (c *CrossClusterApplyParentClosePolicyTask) GetType() int {
return CrossClusterTaskTypeApplyParentPolicy
}

// GetType returns the type of the history replication task
func (a *HistoryReplicationTask) GetType() int {
return ReplicationTaskTypeHistory
Expand Down
3 changes: 3 additions & 0 deletions common/persistence/nosql/nosqlExecutionStoreUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,9 @@ func (d *nosqlExecutionStore) prepareCrossClusterTasksForWorkflowTxn(
case p.CrossClusterTaskTypeRecordChildWorkflowExeuctionComplete:
targetCluster = task.(*p.CrossClusterRecordChildWorkflowExecutionCompleteTask).TargetCluster

case p.CrossClusterTaskTypeApplyParentPolicy:
targetCluster = task.(*p.CrossClusterApplyParentClosePolicyTask).TargetCluster

default:
return nil, &types.InternalServiceError{
Message: fmt.Sprintf("Unknown cross-cluster task type: %v", task.GetType()),
Expand Down
3 changes: 3 additions & 0 deletions common/persistence/sql/sqlExecutionStoreUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,9 @@ func createCrossClusterTasks(
case p.CrossClusterTaskTypeRecordChildWorkflowExeuctionComplete:
crossClusterTasksRows[i].TargetCluster = task.(*p.CrossClusterRecordChildWorkflowExecutionCompleteTask).TargetCluster

case p.CrossClusterTaskTypeApplyParentPolicy:
crossClusterTasksRows[i].TargetCluster = task.(*p.CrossClusterApplyParentClosePolicyTask).TargetCluster

default:
return &types.InternalServiceError{
Message: fmt.Sprintf("Unknown cross-cluster task type: %v", task.GetType()),
Expand Down
56 changes: 56 additions & 0 deletions common/types/mapper/proto/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -1387,6 +1387,62 @@ func ToCrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes(t *shar
}
}

// FromAppyParentClosePolicyAttributes converts internal AppyParentClosePolicyAttributes type to proto
func FromAppyParentClosePolicyAttributes(t *types.AppyParentClosePolicyAttributes) *sharedv1.AppyParentClosePolicyAttributes {
if t == nil {
return nil
}
return &sharedv1.AppyParentClosePolicyAttributes{
ChildDomainId: t.ChildDomainID,
ChildWorkflowId: t.ChildWorkflowID,
ChildRunId: t.ChildRunID,
ParentClosePolicy: FromParentClosePolicy(t.ParentClosePolicy),
}
}

// ToAppyParentClosePolicyAttributes converts proto AppyParentClosePolicyAttributes type to internal
func ToAppyParentClosePolicyAttributes(t *sharedv1.AppyParentClosePolicyAttributes) *types.AppyParentClosePolicyAttributes {
if t == nil {
return nil
}
return &types.AppyParentClosePolicyAttributes{
ChildDomainID: t.ChildDomainId,
ChildWorkflowID: t.ChildWorkflowId,
ChildRunID: t.ChildRunId,
ParentClosePolicy: ToParentClosePolicy(t.ParentClosePolicy),
}
}

// FromCrossClusterApplyParentClosePolicyRequestAttributes converts internal CrossClusterApplyParentClosePolicyRequestAttributes type to proto
func FromCrossClusterApplyParentClosePolicyRequestAttributes(t *types.CrossClusterApplyParentClosePolicyRequestAttributes) *sharedv1.CrossClusterApplyParentClosePolicyRequestAttributes {
if t == nil {
return nil
}
requestAttributes := &sharedv1.CrossClusterApplyParentClosePolicyRequestAttributes{}
for _, execution := range t.AppyParentClosePolicyAttributes {
requestAttributes.AppyParentClosePolicyAttributes = append(
requestAttributes.AppyParentClosePolicyAttributes,
FromAppyParentClosePolicyAttributes(execution),
)
}
return requestAttributes
}

// ToCrossClusterApplyParentClosePolicyRequestAttributes converts proto CrossClusterApplyParentClosePolicyRequestAttributes type to internal
func ToCrossClusterApplyParentClosePolicyRequestAttributes(t *sharedv1.CrossClusterApplyParentClosePolicyRequestAttributes) *types.CrossClusterApplyParentClosePolicyRequestAttributes {
if t == nil {
return nil
}
requestAttributes := &types.CrossClusterApplyParentClosePolicyRequestAttributes{}
for _, execution := range t.AppyParentClosePolicyAttributes {
requestAttributes.AppyParentClosePolicyAttributes = append(
requestAttributes.AppyParentClosePolicyAttributes,
ToAppyParentClosePolicyAttributes(execution),
)
}
return requestAttributes
}

// FromCrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes converts internal CrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes type to proto
func FromCrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes(t *types.CrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes) *sharedv1.CrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes {
if t == nil {
Expand Down
73 changes: 73 additions & 0 deletions common/types/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -10696,13 +10696,69 @@ func (v *CrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes) GetI
type CrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes struct {
}

type AppyParentClosePolicyAttributes struct {
ChildDomainID string `json:"ChildDomainID,omitempty"`
ChildWorkflowID string `json:"ChildWorkflowID,omitempty"`
ChildRunID string `json:"ChildRunID,omitempty"`
ParentClosePolicy *ParentClosePolicy `json:"parentClosePolicy,omitempty"`
}

// GetDomainID is an internal getter (TBD...)
func (v *AppyParentClosePolicyAttributes) GetDomainID() (o string) {
if v != nil {
return v.ChildDomainID
}
return
}

// GetWorkflowID is an internal getter (TBD...)
func (v *AppyParentClosePolicyAttributes) GetWorkflowID() (o string) {
if v != nil {
return v.ChildWorkflowID
}
return
}

// GetRunID is an internal getter (TBD...)
func (v *AppyParentClosePolicyAttributes) GetRunID() (o string) {
if v != nil {
return v.ChildRunID
}
return
}

// GetParentClosePolicy is an internal getter (TBD...)
func (v *AppyParentClosePolicyAttributes) GetParentClosePolicy() (o *ParentClosePolicy) {
if v != nil {
return v.ParentClosePolicy
}
return
}

type CrossClusterApplyParentClosePolicyRequestAttributes struct {
AppyParentClosePolicyAttributes []*AppyParentClosePolicyAttributes `json:"appyParentClosePolicyAttributes,omitempty"`
}

// GetInitiatedEventID is an internal getter (TBD...)
func (v *CrossClusterApplyParentClosePolicyRequestAttributes) GetAppyParentClosePolicyAttributes() (o []*AppyParentClosePolicyAttributes) {
if v != nil {
return v.AppyParentClosePolicyAttributes
}
return
}

// CrossClusterApplyParentClosePolicyResponseAttributes is an internal type (TBD...)
type CrossClusterApplyParentClosePolicyResponseAttributes struct {
}

// CrossClusterTaskRequest is an internal type (TBD...)
type CrossClusterTaskRequest struct {
TaskInfo *CrossClusterTaskInfo `json:"taskInfo,omitempty"`
StartChildExecutionAttributes *CrossClusterStartChildExecutionRequestAttributes `json:"startChildExecutionAttributes,omitempty"`
CancelExecutionAttributes *CrossClusterCancelExecutionRequestAttributes `json:"cancelExecutionAttributes,omitempty"`
SignalExecutionAttributes *CrossClusterSignalExecutionRequestAttributes `json:"signalExecutionAttributes,omitempty"`
RecordChildWorkflowExecutionCompleteAttributes *CrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes `json:"RecordChildWorkflowExecutionCompleteAttributes,omitempty"`
ApplyParentClosePolicyAttributes *CrossClusterApplyParentClosePolicyRequestAttributes `json:"ApplyParentClosePolicyAttributes,omitempty"`
}

// GetTaskInfo is an internal getter (TBD...)
Expand Down Expand Up @@ -10745,6 +10801,14 @@ func (v *CrossClusterTaskRequest) GetRecordChildWorkflowExecutionCompleteAttribu
return
}

// GetSignalExecutionAttributes is an internal getter (TBD...)
func (v *CrossClusterTaskRequest) GetApplyParentClosePolicyAttributes() (o *CrossClusterApplyParentClosePolicyRequestAttributes) {
if v != nil && v.RecordChildWorkflowExecutionCompleteAttributes != nil {
return v.ApplyParentClosePolicyAttributes
}
return
}

// CrossClusterTaskResponse is an internal type (TBD...)
type CrossClusterTaskResponse struct {
TaskID int64 `json:"taskID,omitempty"`
Expand All @@ -10754,6 +10818,7 @@ type CrossClusterTaskResponse struct {
CancelExecutionAttributes *CrossClusterCancelExecutionResponseAttributes `json:"cancelExecutionAttributes,omitempty"`
SignalExecutionAttributes *CrossClusterSignalExecutionResponseAttributes `json:"signalExecutionAttributes,omitempty"`
RecordChildWorkflowExecutionCompleteAttributes *CrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes `json:"RecordChildWorkflowExecutionCompleteAttributes,omitempty"`
ApplyParentClosePolicyAttributes *CrossClusterApplyParentClosePolicyResponseAttributes `json:"ApplyParentClosePolicyAttributes,omitempty"`
}

// GetTaskID is an internal getter (TBD...)
Expand Down Expand Up @@ -10812,6 +10877,14 @@ func (v *CrossClusterTaskResponse) GetRecordChildWorkflowExecutionCompleteAttrib
return
}

// GetSignalExecutionAttributes is an internal getter (TBD...)
func (v *CrossClusterTaskResponse) GetApplyParenctClosePolicyAttributes() (o *CrossClusterApplyParentClosePolicyResponseAttributes) {
if v != nil && v.ApplyParentClosePolicyAttributes != nil {
return v.ApplyParentClosePolicyAttributes
}
return
}

// GetCrossClusterTasksRequest is an internal type (TBD...)
type GetCrossClusterTasksRequest struct {
ShardIDs []int32 `json:"shardIDs,omitempty"`
Expand Down
2 changes: 1 addition & 1 deletion idls
Submodule idls updated from 48a149 to 6f660a
17 changes: 17 additions & 0 deletions proto/internal/uber/cadence/shared/v1/queue.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ option go_package = "github.com/uber/cadence/.gen/proto/shared/v1;sharedv1";
import "google/protobuf/timestamp.proto";
import "uber/cadence/api/v1/common.proto";
import "uber/cadence/api/v1/history.proto";
import "uber/cadence/api/v1/workflow.proto";

enum TaskType {
TASK_TYPE_INVALID = 0;
Expand Down Expand Up @@ -119,13 +120,28 @@ message CrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes {
message CrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes {
}

message AppyParentClosePolicyAttributes {
string child_domain_id = 1;
string child_workflow_id = 2;
string child_run_id = 3;
api.v1.ParentClosePolicy parent_close_policy = 4;
}

message CrossClusterApplyParentClosePolicyRequestAttributes {
repeated AppyParentClosePolicyAttributes appy_parent_close_policy_attributes = 1;
}

message CrossClusterApplyParentClosePolicyResponseAttributes {
}

message CrossClusterTaskRequest {
CrossClusterTaskInfo task_info = 1;
oneof attributes {
CrossClusterStartChildExecutionRequestAttributes startChildExecutionAttributes = 2;
CrossClusterCancelExecutionRequestAttributes cancelExecutionAttributes = 3;
CrossClusterSignalExecutionRequestAttributes signalExecutionAttributes = 4;
CrossClusterRecordChildWorkflowExecutionCompleteRequestAttributes recordChildWorkflowExecutionCompleteRequestAttributes = 5;
CrossClusterApplyParentClosePolicyRequestAttributes applyParentClosePolicyRequestAttributes = 6;
}
}

Expand All @@ -138,6 +154,7 @@ message CrossClusterTaskResponse {
CrossClusterCancelExecutionResponseAttributes cancelExecutionAttributes = 5;
CrossClusterSignalExecutionResponseAttributes signalExecutionAttributes = 6;
CrossClusterRecordChildWorkflowExecutionCompleteResponseAttributes recordChildWorkflowExecutionCompleteRequestAttributes = 7;
CrossClusterApplyParentClosePolicyResponseAttributes applyParentClosePolicyResponseAttributes = 8;
}
}

Expand Down
32 changes: 32 additions & 0 deletions service/history/execution/mutable_state_task_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ type (
transferTask *persistence.TransferTaskInfo,
targetCluster string,
) error
// TODO: Consider merging below with GenerateCrossClusterTaskFromTransferTask.
// Close event generates both recordChildCompletion and ApplyParentPolicy
// tasks. That's why we currently have a separate function for applying
// parent policy
GenerateCrossClusterApplyParentClosePolicyTask(
transferTask *persistence.TransferTaskInfo,
targetCluster string,
) error

// these 2 APIs should only be called when mutable state transaction is being closed
GenerateActivityTimerTasks(
Expand Down Expand Up @@ -552,6 +560,30 @@ func (r *mutableStateTaskGeneratorImpl) GenerateWorkflowResetTasks() error {
return nil
}

func (r *mutableStateTaskGeneratorImpl) GenerateCrossClusterApplyParentClosePolicyTask(
task *persistence.TransferTaskInfo,
targetCluster string,
) error {
if targetCluster == r.clusterMetadata.GetCurrentClusterName() {
// this should not happen
return errors.New("unable to create cross-cluster task for current cluster")
}

crossClusterTask := &persistence.CrossClusterApplyParentClosePolicyTask{
TargetCluster: targetCluster,
ApplyParentClosePolicyTask: persistence.ApplyParentClosePolicyTask{
// TaskID is set by shard context
// Domain, workflow and run ids will be collected from mutableState
// when processing the apply parent policy tasks.
Version: task.Version,
VisibilityTimestamp: task.VisibilityTimestamp,
},
}
r.mutableState.AddCrossClusterTasks(crossClusterTask)

return nil
}

func (r *mutableStateTaskGeneratorImpl) GenerateCrossClusterTaskFromTransferTask(
task *persistence.TransferTaskInfo,
targetCluster string,
Expand Down
Loading

0 comments on commit c8f3c1c

Please sign in to comment.