Skip to content

Commit

Permalink
Add ParentClosePolicy: IDL and Persistence API (cadence-workflow#2450)
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored Aug 26, 2019
1 parent 0e0833a commit 50db090
Show file tree
Hide file tree
Showing 23 changed files with 463 additions and 31 deletions.
356 changes: 344 additions & 12 deletions .gen/go/shared/shared.go

Large diffs are not rendered by default.

52 changes: 48 additions & 4 deletions .gen/go/sqlblobs/sqlblobs.go

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions common/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ func DecisionTypePtr(t s.DecisionType) *s.DecisionType {
return &t
}

// ParentClosePolicyPtr makes a copy and returns the pointer to a DecisionType.
func ParentClosePolicyPtr(t s.ParentClosePolicy) *s.ParentClosePolicy {
return &t
}

// EventTypePtr makes a copy and returns the pointer to a EventType.
func EventTypePtr(t s.EventType) *s.EventType {
return &t
Expand Down
3 changes: 2 additions & 1 deletion common/persistence/cassandra/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,8 @@ const (
`create_request_id: ?, ` +
`event_data_encoding: ?, ` +
`domain_name: ?, ` +
`workflow_type_name: ?` +
`workflow_type_name: ?, ` +
`parent_close_policy: ?` +
`}`

templateRequestCancelInfoType = `{` +
Expand Down
4 changes: 4 additions & 0 deletions common/persistence/cassandra/cassandraPersistenceUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -1326,6 +1326,7 @@ func updateChildExecutionInfos(
initiatedEncoding,
c.DomainName,
c.WorkflowTypeName,
int32(c.ParentClosePolicy),
shardID,
rowTypeExecution,
domainID,
Expand Down Expand Up @@ -2045,6 +2046,8 @@ func createChildExecutionInfo(
info.DomainName = v.(string)
case "workflow_type_name":
info.WorkflowTypeName = v.(string)
case "parent_close_policy":
info.ParentClosePolicy = workflow.ParentClosePolicy(v.(int))
}
}
info.InitiatedEvent = p.NewDataBlob(initiatedData, encoding)
Expand Down Expand Up @@ -2198,6 +2201,7 @@ func resetChildExecutionInfoMap(
cInfo["started_run_id"] = startedRunID
cInfo["domain_name"] = c.DomainName
cInfo["workflow_type_name"] = c.WorkflowTypeName
cInfo["parent_close_policy"] = int32(c.ParentClosePolicy)

cMap[c.InitiatedID] = cInfo
}
Expand Down
1 change: 1 addition & 0 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,7 @@ type (
CreateRequestID string
DomainName string
WorkflowTypeName string
ParentClosePolicy workflow.ParentClosePolicy
}

// RequestCancelInfo has details for pending external workflow cancellations
Expand Down
2 changes: 2 additions & 0 deletions common/persistence/executionStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ func (m *executionManagerImpl) DeserializeChildExecutionInfos(
CreateRequestID: v.CreateRequestID,
DomainName: v.DomainName,
WorkflowTypeName: v.WorkflowTypeName,
ParentClosePolicy: v.ParentClosePolicy,
}

// Needed for backward compatibility reason.
Expand Down Expand Up @@ -347,6 +348,7 @@ func (m *executionManagerImpl) SerializeUpsertChildExecutionInfos(
StartedRunID: v.StartedRunID,
DomainName: v.DomainName,
WorkflowTypeName: v.WorkflowTypeName,
ParentClosePolicy: v.ParentClosePolicy,
}
newInfos = append(newInfos, i)
}
Expand Down
14 changes: 8 additions & 6 deletions common/persistence/persistence-tests/executionManagerTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -2292,12 +2292,13 @@ func (s *ExecutionManagerSuite) TestWorkflowMutableStateChildExecutions() {
updatedInfo.LastProcessedEvent = int64(2)
createRequestID := uuid.New()
childExecutionInfos := []*p.ChildExecutionInfo{{
Version: 1234,
InitiatedID: 1,
InitiatedEvent: &gen.HistoryEvent{EventId: int64Ptr(1)},
StartedID: 2,
StartedEvent: &gen.HistoryEvent{EventId: int64Ptr(2)},
CreateRequestID: createRequestID,
Version: 1234,
InitiatedID: 1,
InitiatedEvent: &gen.HistoryEvent{EventId: int64Ptr(1)},
StartedID: 2,
StartedEvent: &gen.HistoryEvent{EventId: int64Ptr(2)},
CreateRequestID: createRequestID,
ParentClosePolicy: gen.ParentClosePolicyTerminate,
}}
err2 := s.UpsertChildExecutionsState(updatedInfo, updatedStats, int64(3), childExecutionInfos)
s.NoError(err2)
Expand All @@ -2311,6 +2312,7 @@ func (s *ExecutionManagerSuite) TestWorkflowMutableStateChildExecutions() {
s.NotNil(ci)
s.Equal(int64(1234), ci.Version)
s.Equal(int64(1), ci.InitiatedID)
s.Equal(gen.ParentClosePolicyTerminate, ci.ParentClosePolicy)
s.Equal(int64(1), *ci.InitiatedEvent.EventId)
s.Equal(int64(2), ci.StartedID)
s.Equal(int64(2), *ci.StartedEvent.EventId)
Expand Down
1 change: 1 addition & 0 deletions common/persistence/persistenceInterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ type (
CreateRequestID string
DomainName string
WorkflowTypeName string
ParentClosePolicy workflow.ParentClosePolicy
}

// InternalUpdateWorkflowExecutionRequest is used to update a workflow execution for Persistence Interface
Expand Down
2 changes: 2 additions & 0 deletions common/persistence/sql/workflowStateMaps.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ func updateChildExecutionInfos(
CreateRequestID: &v.CreateRequestID,
DomainName: &v.DomainName,
WorkflowTypeName: &v.WorkflowTypeName,
ParentClosePolicy: common.Int32Ptr(int32(v.ParentClosePolicy)),
}
blob, err := childExecutionInfoToBlob(info)
if err != nil {
Expand Down Expand Up @@ -453,6 +454,7 @@ func getChildExecutionInfoMap(
CreateRequestID: rowInfo.GetCreateRequestID(),
DomainName: rowInfo.GetDomainName(),
WorkflowTypeName: rowInfo.GetWorkflowTypeName(),
ParentClosePolicy: workflow.ParentClosePolicy(rowInfo.GetParentClosePolicy()),
}
if rowInfo.InitiatedEvent != nil {
info.InitiatedEvent = persistence.NewDataBlob(rowInfo.InitiatedEvent, common.EncodingType(rowInfo.GetInitiatedEventEncoding()))
Expand Down
4 changes: 4 additions & 0 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ var keys = map[Key]string{
EnableAdminProtection: "history.enableAdminProtection",
AdminOperationToken: "history.adminOperationToken",
EnableEventsV2: "history.enableEventsV2",
UseTerminateAsDefaultParentClosePolicy: "history.useTerminateAsDefaultParentClosePolicy",
NumArchiveSystemWorkflows: "history.numArchiveSystemWorkflows",
ArchiveRequestRPS: "history.archiveRequestRPS",
EmitShardDiffLog: "history.emitShardDiffLog",
Expand Down Expand Up @@ -483,6 +484,9 @@ const (

// EnableEventsV2 is whether to use eventsV2
EnableEventsV2
// UseTerminateAsDefaultParentClosePolicy whether to use Terminate as default ParentClosePolicy, otherwise use Abandon for backward compatibility
UseTerminateAsDefaultParentClosePolicy

// HistoryThrottledLogRPS is the rate limit on number of log messages emitted per second for throttled logger
HistoryThrottledLogRPS
// StickyTTL is to expire a sticky tasklist if no update more than this duration
Expand Down
10 changes: 10 additions & 0 deletions idl/github.com/uber/cadence/shared.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,13 @@ enum TimeoutType {
HEARTBEAT,
}

enum ParentClosePolicy {
ABANDON,
REQUEST_CANCEL,
TERMINATE,
}


// whenever this list of decision is changed
// do change the mutableStateBuilder.go
// function shouldBufferEvent
Expand Down Expand Up @@ -445,6 +452,7 @@ struct StartChildWorkflowExecutionDecisionAttributes {
60: optional i32 executionStartToCloseTimeoutSeconds
70: optional i32 taskStartToCloseTimeoutSeconds
// 80: optional ChildPolicy childPolicy -- Removed but reserve the IDL order number
81: optional ParentClosePolicy parentClosePolicy
90: optional binary control
100: optional WorkflowIdReusePolicy workflowIdReusePolicy
110: optional RetryPolicy retryPolicy
Expand Down Expand Up @@ -773,6 +781,7 @@ struct StartChildWorkflowExecutionInitiatedEventAttributes {
60: optional i32 executionStartToCloseTimeoutSeconds
70: optional i32 taskStartToCloseTimeoutSeconds
// 80: optional ChildPolicy childPolicy -- Removed but reserve the IDL order number
81: optional ParentClosePolicy parentClosePolicy
90: optional binary control
100: optional i64 (js.type = "Long") decisionTaskCompletedEventId
110: optional WorkflowIdReusePolicy workflowIdReusePolicy
Expand Down Expand Up @@ -1387,6 +1396,7 @@ struct PendingChildExecutionInfo {
20: optional string runID
30: optional string workflowTypName
40: optional i64 (js.type = "Long") initiatedID
50: optional ParentClosePolicy parentClosePolicy
}

struct DescribeWorkflowExecutionResponse {
Expand Down
1 change: 1 addition & 0 deletions idl/github.com/uber/cadence/sqlblobs.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ struct ChildExecutionInfo {
28: optional string createRequestID
30: optional string domainName
32: optional string workflowTypeName
35: optional i32 parentClosePolicy
}

struct SignalInfo {
Expand Down
1 change: 1 addition & 0 deletions schema/cassandra/cadence/schema.cql
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ CREATE TYPE child_execution_info (
event_data_encoding text, -- Protocol used for history serialization
domain_name text,
workflow_type_name text,
parent_close_policy int,
);

-- External workflow cancellation in progress mutable state
Expand Down
3 changes: 2 additions & 1 deletion schema/cassandra/cadence/versioned/v0.22/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"Description": "Add per cluster replication level (last replicated task_id) to shard info",
"SchemaUpdateCqlFiles": [
"request_cancel_signal_batch_event_id.cql",
"cluster_replication_level.cql"
"cluster_replication_level.cql",
"parent_close_policy.cql"
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TYPE child_execution_info ADD parent_close_policy int;
1 change: 1 addition & 0 deletions service/history/decisionHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ Update_History_Loop:
timerBuilderProvider,
handler.domainCache,
handler.metricsClient,
handler.config,
)

if err := decisionTaskHandler.handleDecisions(
Expand Down
12 changes: 12 additions & 0 deletions service/history/decisionTaskHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type (
timerBuilderProvider timerBuilderProvider
domainCache cache.DomainCache
metricsClient metrics.Client
config *Config
}
)

Expand All @@ -79,6 +80,7 @@ func newDecisionTaskHandler(
timerBuilderProvider timerBuilderProvider,
domainCache cache.DomainCache,
metricsClient metrics.Client,
config *Config,
) *decisionTaskHandlerImpl {

return &decisionTaskHandlerImpl{
Expand Down Expand Up @@ -106,6 +108,7 @@ func newDecisionTaskHandler(
timerBuilderProvider: timerBuilderProvider,
domainCache: domainCache,
metricsClient: metricsClient,
config: config,
}
}

Expand Down Expand Up @@ -760,6 +763,15 @@ func (handler *decisionTaskHandlerImpl) handleDecisionStartChildWorkflow(
return err
}

if attr.ParentClosePolicy == nil {
useTerminate := handler.config.UseTerminateAsDefaultParentClosePolicy(handler.domainEntry.GetInfo().Name)
if useTerminate {
attr.ParentClosePolicy = common.ParentClosePolicyPtr(workflow.ParentClosePolicyTerminate)
} else {
attr.ParentClosePolicy = common.ParentClosePolicyPtr(workflow.ParentClosePolicyAbandon)
}
}

requestID := uuid.New()
_, _, err = handler.mutableState.AddStartChildWorkflowExecutionInitiatedEvent(
handler.decisionTaskCompletedID, requestID, attr,
Expand Down
1 change: 1 addition & 0 deletions service/history/historyBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -897,6 +897,7 @@ func (b *historyBuilder) newStartChildWorkflowExecutionInitiatedEvent(decisionTa
attributes.CronSchedule = startAttributes.CronSchedule
attributes.Memo = startAttributes.Memo
attributes.SearchAttributes = startAttributes.SearchAttributes
attributes.ParentClosePolicy = common.ParentClosePolicyPtr(startAttributes.GetParentClosePolicy())
historyEvent.StartChildWorkflowExecutionInitiatedEventAttributes = attributes

return historyEvent
Expand Down
9 changes: 5 additions & 4 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,10 +831,11 @@ func (e *historyEngineImpl) DescribeWorkflowExecution(
if len(msBuilder.GetPendingChildExecutionInfos()) > 0 {
for _, ch := range msBuilder.GetPendingChildExecutionInfos() {
p := &workflow.PendingChildExecutionInfo{
WorkflowID: common.StringPtr(ch.StartedWorkflowID),
RunID: common.StringPtr(ch.StartedRunID),
WorkflowTypName: common.StringPtr(ch.WorkflowTypeName),
InitiatedID: common.Int64Ptr(ch.InitiatedID),
WorkflowID: common.StringPtr(ch.StartedWorkflowID),
RunID: common.StringPtr(ch.StartedRunID),
WorkflowTypName: common.StringPtr(ch.WorkflowTypeName),
InitiatedID: common.Int64Ptr(ch.InitiatedID),
ParentClosePolicy: common.ParentClosePolicyPtr(ch.ParentClosePolicy),
}
result.PendingChildren = append(result.PendingChildren, p)
}
Expand Down
1 change: 1 addition & 0 deletions service/history/historyEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5555,6 +5555,7 @@ func copyChildInfo(sourceInfo *persistence.ChildExecutionInfo) *persistence.Chil
CreateRequestID: sourceInfo.CreateRequestID,
DomainName: sourceInfo.DomainName,
WorkflowTypeName: sourceInfo.WorkflowTypeName,
ParentClosePolicy: sourceInfo.ParentClosePolicy,
}

if sourceInfo.InitiatedEvent != nil {
Expand Down
1 change: 1 addition & 0 deletions service/history/mutableStateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3449,6 +3449,7 @@ func (e *mutableStateBuilder) ReplicateStartChildWorkflowExecutionInitiatedEvent
CreateRequestID: createRequestID,
DomainName: attributes.GetDomain(),
WorkflowTypeName: attributes.GetWorkflowType().GetName(),
ParentClosePolicy: attributes.GetParentClosePolicy(),
}

e.pendingChildExecutionInfoIDs[initiatedEventID] = ci
Expand Down
9 changes: 6 additions & 3 deletions service/history/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ type Config struct {
EventEncodingType dynamicconfig.StringPropertyFnWithDomainFilter
// whether or not using eventsV2
EnableEventsV2 dynamicconfig.BoolPropertyFnWithDomainFilter
// whether or not using Terminate as default ParentClosePolicy, otherwise use Abandon for backward compatibility
UseTerminateAsDefaultParentClosePolicy dynamicconfig.BoolPropertyFnWithDomainFilter

// Archival settings
NumArchiveSystemWorkflows dynamicconfig.IntPropertyFn
Expand Down Expand Up @@ -236,9 +238,10 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int, enableVisibilit
ShardSyncMinInterval: dc.GetDurationProperty(dynamicconfig.ShardSyncMinInterval, 5*time.Minute),

// history client: client/history/client.go set the client timeout 30s
LongPollExpirationInterval: dc.GetDurationPropertyFilteredByDomain(dynamicconfig.HistoryLongPollExpirationInterval, time.Second*20),
EventEncodingType: dc.GetStringPropertyFnWithDomainFilter(dynamicconfig.DefaultEventEncoding, string(common.EncodingTypeThriftRW)),
EnableEventsV2: dc.GetBoolPropertyFnWithDomainFilter(dynamicconfig.EnableEventsV2, true),
LongPollExpirationInterval: dc.GetDurationPropertyFilteredByDomain(dynamicconfig.HistoryLongPollExpirationInterval, time.Second*20),
EventEncodingType: dc.GetStringPropertyFnWithDomainFilter(dynamicconfig.DefaultEventEncoding, string(common.EncodingTypeThriftRW)),
EnableEventsV2: dc.GetBoolPropertyFnWithDomainFilter(dynamicconfig.EnableEventsV2, true),
UseTerminateAsDefaultParentClosePolicy: dc.GetBoolPropertyFnWithDomainFilter(dynamicconfig.UseTerminateAsDefaultParentClosePolicy, false),

NumArchiveSystemWorkflows: dc.GetIntProperty(dynamicconfig.NumArchiveSystemWorkflows, 1000),
ArchiveRequestRPS: dc.GetIntProperty(dynamicconfig.ArchiveRequestRPS, 300), // should be much smaller than frontend RPS
Expand Down

0 comments on commit 50db090

Please sign in to comment.