Skip to content

Commit

Permalink
Bugfix: timer creation / processing racing condition during failover (c…
Browse files Browse the repository at this point in the history
…adence-workflow#1415)

Existing logic use time bucket to prevent creating a timer within the time range being processed.
This ensures that all timer tasks are eventually processed.

This commit solves the racing condition which happens during the failover.

* Tasks created before the failover must be visible to either failover processing logic,
or active processing logic.
* Task created after the failover must be visible to active logic.
  • Loading branch information
wxing1292 authored Mar 12, 2019
1 parent ccb5165 commit bda7708
Show file tree
Hide file tree
Showing 10 changed files with 237 additions and 37 deletions.
4 changes: 3 additions & 1 deletion common/cache/domainCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,13 +354,15 @@ UpdateLoop:
return err
}
c.updateNameToIDCache(newCacheNameToID, nextEntry.info.Name, nextEntry.info.ID)
if prevEntry != nil {

if prevEntry != nil {
prevEntries = append(prevEntries, prevEntry)
nextEntries = append(nextEntries, nextEntry)
}
}

// NOTE: READ REF BEFORE MODIFICATION
// ref: historyEngine.go registerDomainFailoverCallback function
c.callbackLock.Lock()
defer c.callbackLock.Unlock()
c.triggerDomainChangePrepareCallbackLocked()
Expand Down
8 changes: 4 additions & 4 deletions common/persistence/cassandra/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,10 +359,10 @@ const (
`IF range_id = ?`

templateUpdateCurrentWorkflowExecutionQuery = `UPDATE executions USING TTL 0 ` +
`SET current_run_id = ?,
execution = {run_id: ?, create_request_id: ?, state: ?, close_status: ?},
replication_state = {start_version: ?, last_write_version: ?},
workflow_last_write_version = ?,
`SET current_run_id = ?,
execution = {run_id: ?, create_request_id: ?, state: ?, close_status: ?},
replication_state = {start_version: ?, last_write_version: ?},
workflow_last_write_version = ?,
workflow_state = ? ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
Expand Down
4 changes: 3 additions & 1 deletion host/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4901,7 +4901,7 @@ func (s *integrationSuite) TestGetWorkflowExecutionHistory_All() {

// this function poll events from history side
getHistory := func(domain string, workflowID string, token []byte, isLongPoll bool) ([]*workflow.HistoryEvent, []byte) {
responseInner, _ := s.engine.GetWorkflowExecutionHistory(createContext(), &workflow.GetWorkflowExecutionHistoryRequest{
responseInner, err := s.engine.GetWorkflowExecutionHistory(createContext(), &workflow.GetWorkflowExecutionHistoryRequest{
Domain: common.StringPtr(domain),
Execution: &workflow.WorkflowExecution{
WorkflowId: common.StringPtr(workflowID),
Expand All @@ -4912,6 +4912,8 @@ func (s *integrationSuite) TestGetWorkflowExecutionHistory_All() {
WaitForNewEvent: common.BoolPtr(isLongPoll),
NextPageToken: token,
})
s.Nil(err)

return responseInner.History.Events, responseInner.NextPageToken
}

Expand Down
163 changes: 163 additions & 0 deletions hostxdc/Integration_domain_failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1455,6 +1455,169 @@ func (s *integrationClustersTestSuite) TestSignalFailover() {
s.True(eventsReplicated)
}

func (s *integrationClustersTestSuite) TestUserTimerFailover() {
domainName := "test-user-timer-workflow-failover-" + common.GenerateRandomString(5)
client1 := s.cluster1.host.GetFrontendClient() // active
regReq := &workflow.RegisterDomainRequest{
Name: common.StringPtr(domainName),
Clusters: clusterReplicationConfig,
ActiveClusterName: common.StringPtr(clusterName[0]),
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(1),
}
err := client1.RegisterDomain(createContext(), regReq)
s.NoError(err)

descReq := &workflow.DescribeDomainRequest{
Name: common.StringPtr(domainName),
}
resp, err := client1.DescribeDomain(createContext(), descReq)
s.NoError(err)
s.NotNil(resp)
// Wait for domain cache to pick the chenge
time.Sleep(cacheRefreshInterval)

client2 := s.cluster2.host.GetFrontendClient() // standby

// Start a workflow
id := "integration-user-timer-workflow-failover-test"
wt := "integration-user-timer-workflow-failover-test-type"
tl := "integration-user-timer-workflow-failover-test-tasklist"
identity := "worker1"
workflowType := &workflow.WorkflowType{Name: common.StringPtr(wt)}
taskList := &workflow.TaskList{Name: common.StringPtr(tl)}
startReq := &workflow.StartWorkflowExecutionRequest{
RequestId: common.StringPtr(uuid.New()),
Domain: common.StringPtr(domainName),
WorkflowId: common.StringPtr(id),
WorkflowType: workflowType,
TaskList: taskList,
Input: nil,
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(300),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(10),
Identity: common.StringPtr(identity),
}
var we *workflow.StartWorkflowExecutionResponse
for i := 0; i < 10; i++ {
we, err = client1.StartWorkflowExecution(createContext(), startReq)
if err == nil {
break
}
time.Sleep(1 * time.Second)
}
s.NoError(err)
s.NotNil(we.GetRunId())

s.logger.Infof("StartWorkflowExecution: response: %v \n", we.GetRunId())

timerCreated := false
timerFired := false
workflowCompleted := false
dtHandler := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType,
previousStartedEventID, startedEventID int64, history *workflow.History) ([]byte, []*workflow.Decision, error) {
if !timerCreated {
timerCreated = true

// Send a signal in cluster
signalName := "my signal"
signalInput := []byte("my signal input.")
err = client1.SignalWorkflowExecution(createContext(), &workflow.SignalWorkflowExecutionRequest{
Domain: common.StringPtr(domainName),
WorkflowExecution: &workflow.WorkflowExecution{
WorkflowId: common.StringPtr(id),
RunId: common.StringPtr(we.GetRunId()),
},
SignalName: common.StringPtr(signalName),
Input: signalInput,
Identity: common.StringPtr(""),
})
s.Nil(err)

return nil, []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeStartTimer),
StartTimerDecisionAttributes: &workflow.StartTimerDecisionAttributes{
TimerId: common.StringPtr("timer-id"),
StartToFireTimeoutSeconds: common.Int64Ptr(2),
},
}}, nil
}

if !timerFired {
resp, err := client2.GetWorkflowExecutionHistory(createContext(), &workflow.GetWorkflowExecutionHistoryRequest{
Domain: common.StringPtr(domainName),
Execution: &workflow.WorkflowExecution{
WorkflowId: common.StringPtr(id),
RunId: common.StringPtr(we.GetRunId()),
},
})
s.Nil(err)
for _, event := range resp.History.Events {
if event.GetEventType() == workflow.EventTypeTimerFired {
timerFired = true
}
}
if !timerFired {
return nil, []*workflow.Decision{}, nil
}
}

workflowCompleted = true
return nil, []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeCompleteWorkflowExecution),
CompleteWorkflowExecutionDecisionAttributes: &workflow.CompleteWorkflowExecutionDecisionAttributes{
Result: []byte("Done."),
},
}}, nil
}

poller1 := &host.TaskPoller{
Engine: client1,
Domain: domainName,
TaskList: taskList,
Identity: identity,
DecisionHandler: dtHandler,
Logger: s.logger,
T: s.T(),
}

poller2 := &host.TaskPoller{
Engine: client2,
Domain: domainName,
TaskList: taskList,
Identity: identity,
DecisionHandler: dtHandler,
Logger: s.logger,
T: s.T(),
}

_, err = poller1.PollAndProcessDecisionTask(false, false)
s.Nil(err)
s.True(timerCreated)

// Update domain to fail over
updateReq := &workflow.UpdateDomainRequest{
Name: common.StringPtr(domainName),
ReplicationConfiguration: &workflow.DomainReplicationConfiguration{
ActiveClusterName: common.StringPtr(clusterName[1]),
},
}
updateResp, err := client1.UpdateDomain(createContext(), updateReq)
s.NoError(err)
s.NotNil(updateResp)
s.Equal(clusterName[1], updateResp.ReplicationConfiguration.GetActiveClusterName())
s.Equal(int64(1), updateResp.GetFailoverVersion())

// Wait for domain cache to pick the chenge
time.Sleep(cacheRefreshInterval)

for i := 1; i < 20; i++ {
if !workflowCompleted {
_, err = poller2.PollAndProcessDecisionTask(true, false)
s.Nil(err)
time.Sleep(time.Second)
}
}
}

func (s *integrationClustersTestSuite) TestActivityHeartbeatFailover() {
domainName := "test-activity-hearbeat-workflow-failover-" + common.GenerateRandomString(5)
client1 := s.cluster1.host.GetFrontendClient() // active
Expand Down
25 changes: 22 additions & 3 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ import (
"fmt"
"time"

"go.uber.org/cadence/.gen/go/shared"

"github.com/pborman/uuid"
"github.com/uber-common/bark"
h "github.com/uber/cadence/.gen/go/history"
Expand All @@ -46,6 +44,7 @@ import (
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/service/worker/sysworkflow"
"go.uber.org/cadence/.gen/go/shared"
"go.uber.org/yarpc"
)

Expand Down Expand Up @@ -225,6 +224,25 @@ func (e *historyEngineImpl) Stop() {

func (e *historyEngineImpl) registerDomainFailoverCallback() {

// NOTE: READ BEFORE MODIFICATION
//
// Tasks, e.g. transfer tasks and timer tasks, are created when holding the shard lock
// meaning tasks -> release of shard lock
//
// Domain change notification follows the following steps, order matters
// 1. lock all task processing.
// 2. domain changes visible to everyone (Note: lock of task processing prevents task processing logic seeing the domain changes).
// 3. failover min and max task levels are calaulated, then update to shard.
// 4. failover start & task processing unlock & shard domain version notification update. (order does not matter for this discussion)
//
// The above guarantees that task created during the failover will be processed.
// If the task is created after domain change:
// then active processor will handle it. (simple case)
// If the task is created before domain change:
// task -> release of shard lock
// failover min / max task levels calculated & updated to shard (using shard lock) -> failover start
// above 2 guarantees that failover start is after persistence of the task.

failoverPredicate := func(shardNotificationVersion int64, nextDomain *cache.DomainCacheEntry, action func()) {
domainFailoverNotificationVersion := nextDomain.GetFailoverNotificationVersion()
domainActiveCluster := nextDomain.GetReplicationConfig().ActiveClusterName
Expand Down Expand Up @@ -266,7 +284,7 @@ func (e *historyEngineImpl) registerDomainFailoverCallback() {
if len(failoverDomainIDs) > 0 {
e.logger.WithFields(bark.Fields{
logging.TagDomainIDs: failoverDomainIDs,
}).Infof("Domain Failover Start.")
}).Info("Domain Failover Start.")

e.txProcessor.FailoverDomain(failoverDomainIDs)
e.timerProcessor.FailoverDomain(failoverDomainIDs)
Expand All @@ -279,6 +297,7 @@ func (e *historyEngineImpl) registerDomainFailoverCallback() {
e.txProcessor.NotifyNewTask(e.currentClusterName, fakeDecisionTask)
e.timerProcessor.NotifyNewTimers(e.currentClusterName, now, fakeDecisionTimeoutTask)
}

e.shard.UpdateDomainNotificationVersion(nextDomains[len(nextDomains)-1].GetNotificationVersion() + 1)
},
)
Expand Down
Loading

0 comments on commit bda7708

Please sign in to comment.