Skip to content

Commit

Permalink
Deprecate old task processing logic (cadence-workflow#3938)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Feb 5, 2021
1 parent 3ea200e commit 528c714
Show file tree
Hide file tree
Showing 36 changed files with 153 additions and 5,814 deletions.
15 changes: 0 additions & 15 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ var keys = map[Key]string{
EnableParentClosePolicyWorker: "system.enableParentClosePolicyWorker",
EnableFailoverManager: "system.enableFailoverManager",
EnableStickyQuery: "system.enableStickyQuery",
EnablePriorityTaskProcessor: "system.enablePriorityTaskProcessor",
EnableDebugMode: "system.enableDebugMode",

// size limit
Expand Down Expand Up @@ -214,8 +213,6 @@ var keys = map[Key]string{
TimerProcessorSplitQueueInterval: "history.timerProcessorSplitQueueInterval",
TimerProcessorSplitQueueIntervalJitterCoefficient: "history.timerProcessorSplitQueueIntervalJitterCoefficient",
TimerProcessorMaxRedispatchQueueSize: "history.timerProcessorMaxRedispatchQueueSize",
TimerProcessorEnablePriorityTaskProcessor: "history.timerProcessorEnablePriorityTaskProcessor",
TimerProcessorEnableMultiCurosrProcessor: "history.timerProcessorEnableMultiCursorProcessor",
TimerProcessorMaxTimeShift: "history.timerProcessorMaxTimeShift",
TimerProcessorHistoryArchivalSizeLimit: "history.timerProcessorHistoryArchivalSizeLimit",
TimerProcessorArchivalTimeLimit: "history.timerProcessorArchivalTimeLimit",
Expand All @@ -233,8 +230,6 @@ var keys = map[Key]string{
TransferProcessorUpdateAckIntervalJitterCoefficient: "history.transferProcessorUpdateAckIntervalJitterCoefficient",
TransferProcessorCompleteTransferInterval: "history.transferProcessorCompleteTransferInterval",
TransferProcessorMaxRedispatchQueueSize: "history.transferProcessorMaxRedispatchQueueSize",
TransferProcessorEnablePriorityTaskProcessor: "history.transferProcessorEnablePriorityTaskProcessor",
TransferProcessorEnableMultiCurosrProcessor: "history.transferProcessorEnableMultiCursorProcessor",
TransferProcessorEnableValidator: "history.transferProcessorEnableValidator",
TransferProcessorValidationInterval: "history.transferProcessorValidationInterval",
TransferProcessorVisibilityArchivalTimeLimit: "history.transferProcessorVisibilityArchivalTimeLimit",
Expand Down Expand Up @@ -417,8 +412,6 @@ const (
MaxDecisionStartToCloseSeconds
// DisallowQuery is the key to disallow query for a domain
DisallowQuery
// EnablePriorityTaskProcessor is the key for enabling priority task processor
EnablePriorityTaskProcessor
// EnableDebugMode is the key for enabling debugging components, logs and metrics
EnableDebugMode

Expand Down Expand Up @@ -682,10 +675,6 @@ const (
TimerProcessorSplitQueueIntervalJitterCoefficient
// TimerProcessorMaxRedispatchQueueSize is the threshold of the number of tasks in the redispatch queue for timer processor
TimerProcessorMaxRedispatchQueueSize
// TimerProcessorEnablePriorityTaskProcessor indicates whether priority task processor should be used for timer processor
TimerProcessorEnablePriorityTaskProcessor
// TimerProcessorEnableMultiCurosrProcessor indicates whether multi-cursor queue processor should be used for timer processor
TimerProcessorEnableMultiCurosrProcessor
// TimerProcessorMaxTimeShift is the max shift timer processor can have
TimerProcessorMaxTimeShift
// TimerProcessorHistoryArchivalSizeLimit is the max history size for inline archival
Expand Down Expand Up @@ -720,10 +709,6 @@ const (
TransferProcessorCompleteTransferInterval
// TransferProcessorMaxRedispatchQueueSize is the threshold of the number of tasks in the redispatch queue for transferQueueProcessor
TransferProcessorMaxRedispatchQueueSize
// TransferProcessorEnablePriorityTaskProcessor indicates whether priority task processor should be used for transferQueueProcessor
TransferProcessorEnablePriorityTaskProcessor
// TransferProcessorEnableMultiCurosrProcessor indicates whether multi-cursor queue processor should be used for transferQueueProcessor
TransferProcessorEnableMultiCurosrProcessor
// TransferProcessorEnableValidator indicates whether validator should be enabled for transferQueueProcessor
TransferProcessorEnableValidator
// TransferProcessorValidationInterval is the interval for performing transfer queue validation
Expand Down
14 changes: 7 additions & 7 deletions host/decision_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (s *integrationSuite) TestDecisionHeartbeatingWithEmptyResult() {
Name: tl,
Kind: types.TaskListKindNormal.Ptr(),
}
stikyTaskList := &types.TaskList{
stickyTaskList := &types.TaskList{
Name: "test-sticky-tasklist",
Kind: types.TaskListKindSticky.Ptr(),
}
Expand Down Expand Up @@ -89,7 +89,7 @@ func (s *integrationSuite) TestDecisionHeartbeatingWithEmptyResult() {
TaskToken: taskToken,
Decisions: []*types.Decision{},
StickyAttributes: &types.StickyExecutionAttributes{
WorkerTaskList: stikyTaskList,
WorkerTaskList: stickyTaskList,
ScheduleToStartTimeoutSeconds: common.Int32Ptr(5),
},
ReturnNewDecisionTask: true,
Expand Down Expand Up @@ -118,15 +118,15 @@ func (s *integrationSuite) TestDecisionHeartbeatingWithEmptyResult() {
resp5, err5 := s.engine.RespondDecisionTaskCompleted(createContext(), &types.RespondDecisionTaskCompletedRequest{
TaskToken: taskToken,
Decisions: []*types.Decision{
&types.Decision{
{
DecisionType: types.DecisionTypeCompleteWorkflowExecution.Ptr(),
CompleteWorkflowExecutionDecisionAttributes: &types.CompleteWorkflowExecutionDecisionAttributes{
Result: []byte("efg"),
},
},
},
StickyAttributes: &types.StickyExecutionAttributes{
WorkerTaskList: stikyTaskList,
WorkerTaskList: stickyTaskList,
ScheduleToStartTimeoutSeconds: common.Int32Ptr(5),
},
ReturnNewDecisionTask: true,
Expand Down Expand Up @@ -204,7 +204,7 @@ func (s *integrationSuite) TestDecisionHeartbeatingWithLocalActivitiesResult() {
resp3, err3 := s.engine.RespondDecisionTaskCompleted(createContext(), &types.RespondDecisionTaskCompletedRequest{
TaskToken: resp2.DecisionTask.GetTaskToken(),
Decisions: []*types.Decision{
&types.Decision{
{
DecisionType: types.DecisionTypeRecordMarker.Ptr(),
RecordMarkerDecisionAttributes: &types.RecordMarkerDecisionAttributes{
MarkerName: common.StringPtr("localActivity1"),
Expand All @@ -224,7 +224,7 @@ func (s *integrationSuite) TestDecisionHeartbeatingWithLocalActivitiesResult() {
resp4, err4 := s.engine.RespondDecisionTaskCompleted(createContext(), &types.RespondDecisionTaskCompletedRequest{
TaskToken: resp3.DecisionTask.GetTaskToken(),
Decisions: []*types.Decision{
&types.Decision{
{
DecisionType: types.DecisionTypeRecordMarker.Ptr(),
RecordMarkerDecisionAttributes: &types.RecordMarkerDecisionAttributes{
MarkerName: common.StringPtr("localActivity2"),
Expand All @@ -244,7 +244,7 @@ func (s *integrationSuite) TestDecisionHeartbeatingWithLocalActivitiesResult() {
resp5, err5 := s.engine.RespondDecisionTaskCompleted(createContext(), &types.RespondDecisionTaskCompletedRequest{
TaskToken: resp4.DecisionTask.GetTaskToken(),
Decisions: []*types.Decision{
&types.Decision{
{
DecisionType: types.DecisionTypeCompleteWorkflowExecution.Ptr(),
CompleteWorkflowExecutionDecisionAttributes: &types.CompleteWorkflowExecutionDecisionAttributes{
Result: []byte("efg"),
Expand Down
13 changes: 8 additions & 5 deletions host/signalworkflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ CheckHistoryLoopForSignalSent:
s.True(signalEvent != nil)
s.Equal(signalName, *signalEvent.WorkflowExecutionSignaledEventAttributes.SignalName)
s.Equal(signalInput, signalEvent.WorkflowExecutionSignaledEventAttributes.Input)
s.Equal("history-service", signalEvent.WorkflowExecutionSignaledEventAttributes.Identity)
s.Equal(execution.IdentityHistoryService, signalEvent.WorkflowExecutionSignaledEventAttributes.Identity)
}

func (s *integrationSuite) TestSignalWorkflow_Cron_NoDecisionTaskCreated() {
Expand Down Expand Up @@ -883,7 +883,7 @@ CheckHistoryLoopForSignalSent:
s.True(signalEvent != nil)
s.Equal(signalName, *signalEvent.WorkflowExecutionSignaledEventAttributes.SignalName)
s.Equal(signalInput, signalEvent.WorkflowExecutionSignaledEventAttributes.Input)
s.Equal("history-service", signalEvent.WorkflowExecutionSignaledEventAttributes.Identity)
s.Equal(execution.IdentityHistoryService, signalEvent.WorkflowExecutionSignaledEventAttributes.Identity)
}

func (s *integrationSuite) TestSignalExternalWorkflowDecision_UnKnownTarget() {
Expand Down Expand Up @@ -1516,26 +1516,29 @@ func (s *integrationSuite) TestSignalWithStartWorkflow_IDReusePolicy() {
Identity: identity,
WorkflowIDReusePolicy: &wfIDReusePolicy,
}
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
resp, err := s.engine.SignalWithStartWorkflowExecution(ctx, sRequest)
cancel()
s.Nil(resp)
s.Error(err)
errMsg := err.(*types.WorkflowExecutionAlreadyStartedError).GetMessage()
s.True(strings.Contains(errMsg, "reject duplicate workflow ID"))

// test policy WorkflowIDReusePolicyAllowDuplicateFailedOnly
wfIDReusePolicy = types.WorkflowIDReusePolicyAllowDuplicateFailedOnly
ctx, _ = context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
resp, err = s.engine.SignalWithStartWorkflowExecution(ctx, sRequest)
cancel()
s.Nil(resp)
s.Error(err)
errMsg = err.(*types.WorkflowExecutionAlreadyStartedError).GetMessage()
s.True(strings.Contains(errMsg, "allow duplicate workflow ID if last run failed"))

// test policy WorkflowIDReusePolicyAllowDuplicate
wfIDReusePolicy = types.WorkflowIDReusePolicyAllowDuplicate
ctx, _ = context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
resp, err = s.engine.SignalWithStartWorkflowExecution(ctx, sRequest)
cancel()
s.Nil(err)
s.NotEmpty(resp.GetRunID())

Expand Down
130 changes: 0 additions & 130 deletions service/history/MockProcessor.go

This file was deleted.

Loading

0 comments on commit 528c714

Please sign in to comment.