Skip to content

Commit

Permalink
Change parentClosePolicy config flag (cadence-workflow#2567)
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored Sep 23, 2019
1 parent feb7c23 commit 42c0a37
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 14 deletions.
6 changes: 3 additions & 3 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ var keys = map[Key]string{
EnableAdminProtection: "history.enableAdminProtection",
AdminOperationToken: "history.adminOperationToken",
EnableEventsV2: "history.enableEventsV2",
UseTerminateAsDefaultParentClosePolicy: "history.useTerminateAsDefaultParentClosePolicy",
EnableParentClosePolicy: "history.enableParentClosePolicy",
NumArchiveSystemWorkflows: "history.numArchiveSystemWorkflows",
ArchiveRequestRPS: "history.archiveRequestRPS",
EmitShardDiffLog: "history.emitShardDiffLog",
Expand Down Expand Up @@ -490,8 +490,8 @@ const (

// EnableEventsV2 is whether to use eventsV2
EnableEventsV2
// UseTerminateAsDefaultParentClosePolicy whether to use Terminate as default ParentClosePolicy, otherwise use Abandon for backward compatibility
UseTerminateAsDefaultParentClosePolicy
// EnableParentClosePolicy whether to ParentClosePolicy
EnableParentClosePolicy
// ParentClosePolicyThreshold decides that parent close policy will be processed by sys workers(if enabled) if
// the number of children greater than or equal to this threshold
ParentClosePolicyThreshold
Expand Down
10 changes: 8 additions & 2 deletions service/history/decisionTaskHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,13 +763,19 @@ func (handler *decisionTaskHandlerImpl) handleDecisionStartChildWorkflow(
return err
}

enabled := handler.config.EnableParentClosePolicy(handler.domainEntry.GetInfo().Name)
if attr.ParentClosePolicy == nil {
useTerminate := handler.config.UseTerminateAsDefaultParentClosePolicy(handler.domainEntry.GetInfo().Name)
if useTerminate {
// for old clients, this field is empty. If they enable the feature, make default as terminate
if enabled {
attr.ParentClosePolicy = common.ParentClosePolicyPtr(workflow.ParentClosePolicyTerminate)
} else {
attr.ParentClosePolicy = common.ParentClosePolicyPtr(workflow.ParentClosePolicyAbandon)
}
} else {
// for domains that haven't enabled the feature yet, need to use Abandon for backward-compatibility
if !enabled {
attr.ParentClosePolicy = common.ParentClosePolicyPtr(workflow.ParentClosePolicyAbandon)
}
}

requestID := uuid.New()
Expand Down
156 changes: 156 additions & 0 deletions service/history/historyEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1966,6 +1966,162 @@ func (s *engineSuite) TestRespondDecisionTaskCompletedSignalExternalWorkflowSucc
s.Equal(executionContext, executionBuilder.GetExecutionInfo().ExecutionContext)
}

func (s *engineSuite) TestRespondDecisionTaskCompletedStartChildWorkflowWithAbandonPolicy() {
domainID := validDomainID
we := workflow.WorkflowExecution{
WorkflowId: common.StringPtr("wId"),
RunId: common.StringPtr(validRunID),
}
tl := "testTaskList"
taskToken, _ := json.Marshal(&common.TaskToken{
WorkflowID: *we.WorkflowId,
RunID: *we.RunId,
ScheduleID: 2,
})
identity := "testIdentity"
executionContext := []byte("context")

msBuilder := newMutableStateBuilderWithEventV2(s.mockHistoryEngine.shard, s.eventsCache,
loggerimpl.NewDevelopmentForTest(s.Suite), we.GetRunId())
addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 200, identity)
di := addDecisionTaskScheduledEvent(msBuilder)
addDecisionTaskStartedEvent(msBuilder, di.ScheduleID, tl, identity)

abandon := workflow.ParentClosePolicyAbandon
decisions := []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeStartChildWorkflowExecution),
StartChildWorkflowExecutionDecisionAttributes: &workflow.StartChildWorkflowExecutionDecisionAttributes{
Domain: common.StringPtr(domainID),
WorkflowId: common.StringPtr("child-workflow-id"),
WorkflowType: &workflow.WorkflowType{
Name: common.StringPtr("child-workflow-type"),
},
ParentClosePolicy: &abandon,
},
}}

ms := createMutableState(msBuilder)
gwmsResponse := &persistence.GetWorkflowExecutionResponse{State: ms}

s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(gwmsResponse, nil).Once()
s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything).Return(&p.AppendHistoryNodesResponse{Size: 0}, nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(&p.UpdateWorkflowExecutionResponse{MutableStateUpdateSessionStats: &p.MutableStateUpdateSessionStats{}}, nil).Once()
s.mockMetadataMgr.On("GetDomain", mock.Anything).Return(
&persistence.GetDomainResponse{
Info: &persistence.DomainInfo{ID: domainID},
Config: &persistence.DomainConfig{Retention: 1},
ReplicationConfig: &persistence.DomainReplicationConfig{
ActiveClusterName: cluster.TestCurrentClusterName,
Clusters: []*persistence.ClusterReplicationConfig{
{ClusterName: cluster.TestCurrentClusterName},
},
},
TableVersion: persistence.DomainTableVersionV1,
},
nil,
)
_, err := s.mockHistoryEngine.RespondDecisionTaskCompleted(context.Background(), &history.RespondDecisionTaskCompletedRequest{
DomainUUID: common.StringPtr(domainID),
CompleteRequest: &workflow.RespondDecisionTaskCompletedRequest{
TaskToken: taskToken,
Decisions: decisions,
ExecutionContext: executionContext,
Identity: &identity,
},
})
s.Nil(err, s.printHistory(msBuilder))
executionBuilder := s.getBuilder(domainID, we)
s.Equal(int64(6), executionBuilder.GetExecutionInfo().NextEventID)
s.Equal(int64(3), executionBuilder.GetExecutionInfo().LastProcessedEvent)
s.Equal(executionContext, executionBuilder.GetExecutionInfo().ExecutionContext)
s.Equal(int(1), len(executionBuilder.GetPendingChildExecutionInfos()))
var childID int64
for c := range executionBuilder.GetPendingChildExecutionInfos() {
childID = c
break
}
s.Equal("child-workflow-id", executionBuilder.GetPendingChildExecutionInfos()[childID].StartedWorkflowID)
s.Equal(workflow.ParentClosePolicyAbandon, executionBuilder.GetPendingChildExecutionInfos()[childID].ParentClosePolicy)
}

func (s *engineSuite) TestRespondDecisionTaskCompletedStartChildWorkflowWithTerminatePolicy() {
domainID := validDomainID
we := workflow.WorkflowExecution{
WorkflowId: common.StringPtr("wId"),
RunId: common.StringPtr(validRunID),
}
tl := "testTaskList"
taskToken, _ := json.Marshal(&common.TaskToken{
WorkflowID: *we.WorkflowId,
RunID: *we.RunId,
ScheduleID: 2,
})
identity := "testIdentity"
executionContext := []byte("context")

msBuilder := newMutableStateBuilderWithEventV2(s.mockHistoryEngine.shard, s.eventsCache,
loggerimpl.NewDevelopmentForTest(s.Suite), we.GetRunId())
addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 200, identity)
di := addDecisionTaskScheduledEvent(msBuilder)
addDecisionTaskStartedEvent(msBuilder, di.ScheduleID, tl, identity)

terminate := workflow.ParentClosePolicyTerminate
decisions := []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeStartChildWorkflowExecution),
StartChildWorkflowExecutionDecisionAttributes: &workflow.StartChildWorkflowExecutionDecisionAttributes{
Domain: common.StringPtr(domainID),
WorkflowId: common.StringPtr("child-workflow-id"),
WorkflowType: &workflow.WorkflowType{
Name: common.StringPtr("child-workflow-type"),
},
ParentClosePolicy: &terminate,
},
}}

ms := createMutableState(msBuilder)
gwmsResponse := &persistence.GetWorkflowExecutionResponse{State: ms}

s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(gwmsResponse, nil).Once()
s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything).Return(&p.AppendHistoryNodesResponse{Size: 0}, nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(&p.UpdateWorkflowExecutionResponse{MutableStateUpdateSessionStats: &p.MutableStateUpdateSessionStats{}}, nil).Once()
s.mockMetadataMgr.On("GetDomain", mock.Anything).Return(
&persistence.GetDomainResponse{
Info: &persistence.DomainInfo{ID: domainID},
Config: &persistence.DomainConfig{Retention: 1},
ReplicationConfig: &persistence.DomainReplicationConfig{
ActiveClusterName: cluster.TestCurrentClusterName,
Clusters: []*persistence.ClusterReplicationConfig{
{ClusterName: cluster.TestCurrentClusterName},
},
},
TableVersion: persistence.DomainTableVersionV1,
},
nil,
)
_, err := s.mockHistoryEngine.RespondDecisionTaskCompleted(context.Background(), &history.RespondDecisionTaskCompletedRequest{
DomainUUID: common.StringPtr(domainID),
CompleteRequest: &workflow.RespondDecisionTaskCompletedRequest{
TaskToken: taskToken,
Decisions: decisions,
ExecutionContext: executionContext,
Identity: &identity,
},
})
s.Nil(err, s.printHistory(msBuilder))
executionBuilder := s.getBuilder(domainID, we)
s.Equal(int64(6), executionBuilder.GetExecutionInfo().NextEventID)
s.Equal(int64(3), executionBuilder.GetExecutionInfo().LastProcessedEvent)
s.Equal(executionContext, executionBuilder.GetExecutionInfo().ExecutionContext)
s.Equal(int(1), len(executionBuilder.GetPendingChildExecutionInfos()))
var childID int64
for c := range executionBuilder.GetPendingChildExecutionInfos() {
childID = c
break
}
s.Equal("child-workflow-id", executionBuilder.GetPendingChildExecutionInfos()[childID].StartedWorkflowID)
s.Equal(workflow.ParentClosePolicyTerminate, executionBuilder.GetPendingChildExecutionInfos()[childID].ParentClosePolicy)
}

func (s *engineSuite) TestRespondDecisionTaskCompletedSignalExternalWorkflowFailed() {
domainID := validDomainID
we := workflow.WorkflowExecution{
Expand Down
18 changes: 9 additions & 9 deletions service/history/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ type Config struct {
EventEncodingType dynamicconfig.StringPropertyFnWithDomainFilter
// whether or not using eventsV2
EnableEventsV2 dynamicconfig.BoolPropertyFnWithDomainFilter
// whether or not using Terminate as default ParentClosePolicy, otherwise use Abandon for backward compatibility
UseTerminateAsDefaultParentClosePolicy dynamicconfig.BoolPropertyFnWithDomainFilter
// whether or not using ParentClosePolicy
EnableParentClosePolicy dynamicconfig.BoolPropertyFnWithDomainFilter
// whether or not enable system workers for processing parent close policy task
EnableParentClosePolicyWorker dynamicconfig.BoolPropertyFn
// parent close policy will be processed by sys workers(if enabled) if
Expand Down Expand Up @@ -247,13 +247,13 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int, storeType strin
ShardSyncMinInterval: dc.GetDurationProperty(dynamicconfig.ShardSyncMinInterval, 5*time.Minute),

// history client: client/history/client.go set the client timeout 30s
LongPollExpirationInterval: dc.GetDurationPropertyFilteredByDomain(dynamicconfig.HistoryLongPollExpirationInterval, time.Second*20),
EventEncodingType: dc.GetStringPropertyFnWithDomainFilter(dynamicconfig.DefaultEventEncoding, string(common.EncodingTypeThriftRW)),
EnableEventsV2: dc.GetBoolPropertyFnWithDomainFilter(dynamicconfig.EnableEventsV2, true),
UseTerminateAsDefaultParentClosePolicy: dc.GetBoolPropertyFnWithDomainFilter(dynamicconfig.UseTerminateAsDefaultParentClosePolicy, false),
NumParentClosePolicySystemWorkflows: dc.GetIntProperty(dynamicconfig.NumParentClosePolicySystemWorkflows, 10),
EnableParentClosePolicyWorker: dc.GetBoolProperty(dynamicconfig.EnableParentClosePolicyWorker, true),
ParentClosePolicyThreshold: dc.GetIntPropertyFilteredByDomain(dynamicconfig.ParentClosePolicyThreshold, 10),
LongPollExpirationInterval: dc.GetDurationPropertyFilteredByDomain(dynamicconfig.HistoryLongPollExpirationInterval, time.Second*20),
EventEncodingType: dc.GetStringPropertyFnWithDomainFilter(dynamicconfig.DefaultEventEncoding, string(common.EncodingTypeThriftRW)),
EnableEventsV2: dc.GetBoolPropertyFnWithDomainFilter(dynamicconfig.EnableEventsV2, true),
EnableParentClosePolicy: dc.GetBoolPropertyFnWithDomainFilter(dynamicconfig.EnableParentClosePolicy, true),
NumParentClosePolicySystemWorkflows: dc.GetIntProperty(dynamicconfig.NumParentClosePolicySystemWorkflows, 10),
EnableParentClosePolicyWorker: dc.GetBoolProperty(dynamicconfig.EnableParentClosePolicyWorker, true),
ParentClosePolicyThreshold: dc.GetIntPropertyFilteredByDomain(dynamicconfig.ParentClosePolicyThreshold, 10),

NumArchiveSystemWorkflows: dc.GetIntProperty(dynamicconfig.NumArchiveSystemWorkflows, 1000),
ArchiveRequestRPS: dc.GetIntProperty(dynamicconfig.ArchiveRequestRPS, 300), // should be much smaller than frontend RPS
Expand Down

0 comments on commit 42c0a37

Please sign in to comment.