Skip to content

Commit

Permalink
Bugfix: decision task processing in transfer active task processor (c…
Browse files Browse the repository at this point in the history
…adence-workflow#1369)

* Bugfix: transfer active task processor, when encounter a non sticky decision task and mutable state builder has sticky decision set, should not change the decision task to sticky decision

* Add metrics for history re-replication
  • Loading branch information
wxing1292 authored Jan 7, 2019
1 parent 90a79cd commit 0ca85f1
Show file tree
Hide file tree
Showing 11 changed files with 325 additions and 21 deletions.
15 changes: 14 additions & 1 deletion common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,15 @@ const (

// DomainCacheScope tracks domain cache callbacks
DomainCacheScope
// HistoryRereplicationByTransferTaskScope tracks history replication calls made by transfer task
HistoryRereplicationByTransferTaskScope
// HistoryRereplicationByTimerTaskScope tracks history replication calls made by timer task
HistoryRereplicationByTimerTaskScope
// HistoryRereplicationByHistoryReplicationScope tracks history replication calls made by history replication
HistoryRereplicationByHistoryReplicationScope
// HistoryRereplicationByActivityReplicationScope tracks history replication calls made by activity replication
HistoryRereplicationByActivityReplicationScope

// PersistenceAppendHistoryNodesScope tracks AppendHistoryNodes calls made by service to persistence layer
PersistenceAppendHistoryNodesScope
// PersistenceReadHistoryBranchScope tracks ReadHistoryBranch calls made by service to persistence layer
Expand Down Expand Up @@ -759,7 +768,11 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
MessagingClientPublishScope: {operation: "MessagingClientPublish"},
MessagingClientPublishBatchScope: {operation: "MessagingClientPublishBatch"},

DomainCacheScope: {operation: "DomainCache"},
DomainCacheScope: {operation: "DomainCache"},
HistoryRereplicationByTransferTaskScope: {operation: "HistoryRereplicationByTransferTask"},
HistoryRereplicationByTimerTaskScope: {operation: "HistoryRereplicationByTimerTask"},
HistoryRereplicationByHistoryReplicationScope: {operation: "HistoryRereplicationByHistoryReplication"},
HistoryRereplicationByActivityReplicationScope: {operation: "HistoryRereplicationByActivityReplication"},
},
// Frontend Scope Names
Frontend: {
Expand Down
174 changes: 174 additions & 0 deletions hostxdc/Integration_domain_failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,180 @@ func (s *integrationClustersTestSuite) TestSimpleWorkflowFailover() {
s.Nil(err)
}

func (s *integrationClustersTestSuite) TestStickyDecisionFailover() {
domainName := "test-sticky-decision-workflow-failover-" + common.GenerateRandomString(5)
client1 := s.cluster1.host.GetFrontendClient() // active
regReq := &workflow.RegisterDomainRequest{
Name: common.StringPtr(domainName),
Clusters: clusterReplicationConfig,
ActiveClusterName: common.StringPtr(clusterName[0]),
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(1),
}
err := client1.RegisterDomain(createContext(), regReq)
s.NoError(err)

descReq := &workflow.DescribeDomainRequest{
Name: common.StringPtr(domainName),
}
resp, err := client1.DescribeDomain(createContext(), descReq)
s.NoError(err)
s.NotNil(resp)
// Wait for domain cache to pick the chenge
time.Sleep(cacheRefreshInterval)

client2 := s.cluster2.host.GetFrontendClient() // standby

// Start a workflow
id := "integration-sticky-decision-workflow-failover-test"
wt := "integration-sticky-decision-workflow-failover-test-type"
tl := "integration-sticky-decision-workflow-failover-test-tasklist"
stl1 := "integration-sticky-decision-workflow-failover-test-tasklist-sticky1"
stl2 := "integration-sticky-decision-workflow-failover-test-tasklist-sticky2"
identity1 := "worker1"
identity2 := "worker2"

workflowType := &workflow.WorkflowType{Name: common.StringPtr(wt)}
taskList := &workflow.TaskList{Name: common.StringPtr(tl)}
stickyTaskList1 := &workflow.TaskList{Name: common.StringPtr(stl1)}
stickyTaskList2 := &workflow.TaskList{Name: common.StringPtr(stl2)}
stickyTaskTimeout := common.Int32Ptr(100)
startReq := &workflow.StartWorkflowExecutionRequest{
RequestId: common.StringPtr(uuid.New()),
Domain: common.StringPtr(domainName),
WorkflowId: common.StringPtr(id),
WorkflowType: workflowType,
TaskList: taskList,
Input: nil,
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(2592000),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(60),
Identity: common.StringPtr(identity1),
}
we, err := client1.StartWorkflowExecution(createContext(), startReq)
s.NoError(err)
s.NotNil(we.GetRunId())

s.logger.Infof("StartWorkflowExecution: response: %v \n", we.GetRunId())

firstDecisionMade := false
secondDecisionMade := false
workflowCompleted := false
dtHandler := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType,
previousStartedEventID, startedEventID int64, history *workflow.History) ([]byte, []*workflow.Decision, error) {
if !firstDecisionMade {
firstDecisionMade = true
return nil, []*workflow.Decision{}, nil
}

if !secondDecisionMade {
secondDecisionMade = true
return nil, []*workflow.Decision{}, nil
}

workflowCompleted = true
return nil, []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeCompleteWorkflowExecution),
CompleteWorkflowExecutionDecisionAttributes: &workflow.CompleteWorkflowExecutionDecisionAttributes{
Result: []byte("Done."),
},
}}, nil
}

poller1 := &host.TaskPoller{
Engine: client1,
Domain: domainName,
TaskList: taskList,
StickyTaskList: stickyTaskList1,
StickyScheduleToStartTimeoutSeconds: stickyTaskTimeout,
Identity: identity1,
DecisionHandler: dtHandler,
Logger: s.logger,
T: s.T(),
}

poller2 := &host.TaskPoller{
Engine: client2,
Domain: domainName,
TaskList: taskList,
StickyTaskList: stickyTaskList2,
StickyScheduleToStartTimeoutSeconds: stickyTaskTimeout,
Identity: identity2,
DecisionHandler: dtHandler,
Logger: s.logger,
T: s.T(),
}

_, err = poller1.PollAndProcessDecisionTaskWithAttemptAndRetry(false, false, false, true, 0, 5)
s.logger.Infof("PollAndProcessDecisionTask: %v", err)
s.Nil(err)
s.True(firstDecisionMade)

// Send a signal in cluster
signalName := "my signal"
signalInput := []byte("my signal input.")
err = client1.SignalWorkflowExecution(createContext(), &workflow.SignalWorkflowExecutionRequest{
Domain: common.StringPtr(domainName),
WorkflowExecution: &workflow.WorkflowExecution{
WorkflowId: common.StringPtr(id),
RunId: common.StringPtr(we.GetRunId()),
},
SignalName: common.StringPtr(signalName),
Input: signalInput,
Identity: common.StringPtr(identity1),
})
s.Nil(err)

// Update domain to fail over
updateReq := &workflow.UpdateDomainRequest{
Name: common.StringPtr(domainName),
ReplicationConfiguration: &workflow.DomainReplicationConfiguration{
ActiveClusterName: common.StringPtr(clusterName[1]),
},
}
updateResp, err := client1.UpdateDomain(createContext(), updateReq)
s.NoError(err)
s.NotNil(updateResp)
s.Equal(clusterName[1], updateResp.ReplicationConfiguration.GetActiveClusterName())
s.Equal(int64(1), updateResp.GetFailoverVersion())

// Wait for domain cache to pick the chenge
time.Sleep(cacheRefreshInterval)

_, err = poller2.PollAndProcessDecisionTaskWithAttemptAndRetry(false, false, false, true, 0, 5)
s.logger.Infof("PollAndProcessDecisionTask: %v", err)
s.Nil(err)
s.True(secondDecisionMade)

err = client2.SignalWorkflowExecution(createContext(), &workflow.SignalWorkflowExecutionRequest{
Domain: common.StringPtr(domainName),
WorkflowExecution: &workflow.WorkflowExecution{
WorkflowId: common.StringPtr(id),
RunId: common.StringPtr(we.GetRunId()),
},
SignalName: common.StringPtr(signalName),
Input: signalInput,
Identity: common.StringPtr(identity2),
})
s.Nil(err)

// Update domain to fail over back
updateReq = &workflow.UpdateDomainRequest{
Name: common.StringPtr(domainName),
ReplicationConfiguration: &workflow.DomainReplicationConfiguration{
ActiveClusterName: common.StringPtr(clusterName[0]),
},
}
updateResp, err = client2.UpdateDomain(createContext(), updateReq)
s.NoError(err)
s.NotNil(updateResp)
s.Equal(clusterName[0], updateResp.ReplicationConfiguration.GetActiveClusterName())
s.Equal(int64(10), updateResp.GetFailoverVersion())

_, err = poller1.PollAndProcessDecisionTask(true, false)
s.logger.Infof("PollAndProcessDecisionTask: %v", err)
s.Nil(err)
s.True(workflowCompleted)
}

func (s *integrationClustersTestSuite) TestStartWorkflowExecution_Failover_WorkflowIDReusePolicy() {
domainName := "test-start-workflow-failover-ID-reuse-policy" + common.GenerateRandomString(5)
client1 := s.cluster1.host.GetFrontendClient() // active
Expand Down
6 changes: 3 additions & 3 deletions service/history/historyReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ var (
// ErrRetryEntityNotExists is returned to indicate workflow execution is not created yet and replicator should
// try this task again after a small delay.
ErrRetryEntityNotExists = &shared.RetryTaskError{Message: "entity not exists"}
// ErrRetrySyncActivity is returned when sync activity replication tasks are arriving out of order, should retry
ErrRetrySyncActivity = &shared.RetryTaskError{Message: "retry on applying sync activity"}
// ErrRetrySyncActivityMsg is returned when sync activity replication tasks are arriving out of order, should retry
ErrRetrySyncActivityMsg = "retry on applying sync activity"
// ErrRetryBufferEventsMsg is returned when events are arriving out of order, should retry, or specify force apply
ErrRetryBufferEventsMsg = "retry on applying buffer events"
// ErrRetryEmptyEventsMsg is returned when events size is 0
Expand Down Expand Up @@ -187,7 +187,7 @@ func (r *historyReplicator) SyncActivity(ctx context.Context, request *h.SyncAct

// version >= last write version
// this can happen if out of order delivery heppens
return ErrRetrySyncActivity
return newRetryTaskErrorWithHint(ErrRetrySyncActivityMsg, domainID, execution.GetWorkflowId(), execution.GetRunId(), msBuilder.GetNextEventID())
}

ai, isRunning := msBuilder.GetActivityInfo(scheduleID)
Expand Down
3 changes: 2 additions & 1 deletion service/history/historyReplicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func (s *historyReplicatorSuite) TestSyncActivity_IncomingScheduleIDLarger_Incom
).Once()

err = s.historyReplicator.SyncActivity(ctx.Background(), request)
s.Equal(ErrRetrySyncActivity, err)
s.Equal(newRetryTaskErrorWithHint(ErrRetrySyncActivityMsg, domainID, workflowID, runID, nextEventID), err)
}

func (s *historyReplicatorSuite) TestSyncActivity_ActivityCompleted() {
Expand Down Expand Up @@ -2904,6 +2904,7 @@ func (s *historyReplicatorSuite) TestConflictResolutionTerminateCurrentRunningIf
contextCurrent.On("replicateWorkflowExecution", terminateRequest, mock.Anything, mock.Anything, currentNextEventID, mock.Anything, mock.Anything).Return(nil).Once()
s.mockTxProcessor.On("NotifyNewTask", incomingCluster, mock.Anything)
s.mockTimerProcessor.On("NotifyNewTimers", incomingCluster, mock.Anything, mock.Anything)
msBuilderCurrent.On("ClearStickyness").Once()

prevRunID, err := s.historyReplicator.conflictResolutionTerminateCurrentRunningIfNotSelf(ctx.Background(), msBuilderTarget, incomingVersion, incomingTimestamp, s.logger)
s.Nil(err)
Expand Down
3 changes: 3 additions & 0 deletions service/history/stateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ func (b *stateBuilderImpl) applyEvents(domainID, requestID string, execution sha
var lastEvent *shared.HistoryEvent
var lastDecision *decisionInfo
var newRunStateBuilder mutableState

// need to clear the stickness since workflow turned to passive
b.msBuilder.ClearStickyness()
for _, event := range history {
lastEvent = event
// NOTE: stateBuilder is also being used in the active side
Expand Down
Loading

0 comments on commit 0ca85f1

Please sign in to comment.