Skip to content

Commit

Permalink
Refactor workflow reset path (cadence-workflow#2142)
Browse files Browse the repository at this point in the history
* Refactor workflow reset path
* Refactor workflow reset path in Cassandra / MySQL persistence layer
* Remove redudent check on Cassandra: update /delete activity / timer / child workflow / cancel workflow / signal workflow / buffered events
* Keeping condition check on update execution, thus batch operation will still check update condition
* Refactor shard context to use allocateTransferIDsLocked and allocateTimerIDsLocked helper functions
  • Loading branch information
wxing1292 authored Jul 1, 2019
1 parent 458086f commit b3c718b
Show file tree
Hide file tree
Showing 13 changed files with 422 additions and 753 deletions.
294 changes: 64 additions & 230 deletions common/persistence/cassandra/cassandraPersistence.go

Large diffs are not rendered by default.

125 changes: 28 additions & 97 deletions common/persistence/cassandra/cassandraPersistenceUtil.go

Large diffs are not rendered by default.

17 changes: 5 additions & 12 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,25 +739,18 @@ type (

// ResetWorkflowExecutionRequest is used to reset workflow execution state for current run and create new run
ResetWorkflowExecutionRequest struct {
RangeID int64

// for base run (we need to make sure the baseRun hasn't been deleted after forking)
BaseRunID string
BaseRunNextEventID int64

// for current workflow record
PrevRunVersion int64
PrevRunState int

// for shard record
RangeID int64
CurrentRunID string
CurrentRunNextEventID int64

// for current mutable state
Condition int64
UpdateCurr bool
CurrExecutionInfo *WorkflowExecutionInfo
CurrReplicationState *ReplicationState
CurrReplicationTasks []Task
CurrTransferTasks []Task
CurrTimerTasks []Task
CurrentWorkflowMutation *WorkflowMutation

// For new mutable state
NewWorkflowSnapshot WorkflowSnapshot
Expand Down
27 changes: 12 additions & 15 deletions common/persistence/executionStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,31 +510,28 @@ func (m *executionManagerImpl) ResetWorkflowExecution(
request *ResetWorkflowExecutionRequest,
) error {

currExecution, err := m.SerializeExecutionInfo(request.CurrExecutionInfo, request.Encoding)
if err != nil {
return err
}
serializedNewWorkflowSnapshot, err := m.SerializeWorkflowSnapshot(&request.NewWorkflowSnapshot, request.Encoding)
if err != nil {
return err
}
var serializedUpdateWorkflowSnapshot *InternalWorkflowMutation
if request.CurrentWorkflowMutation != nil {
serializedUpdateWorkflowSnapshot, err = m.SerializeWorkflowMutation(request.CurrentWorkflowMutation, request.Encoding)
if err != nil {
return err
}
}

newRequest := &InternalResetWorkflowExecutionRequest{
PrevRunVersion: request.PrevRunVersion,
PrevRunState: request.PrevRunState,

Condition: request.Condition,
RangeID: request.RangeID,
RangeID: request.RangeID,

BaseRunID: request.BaseRunID,
BaseRunNextEventID: request.BaseRunNextEventID,

UpdateCurr: request.UpdateCurr,
CurrExecutionInfo: currExecution,
CurrReplicationState: request.CurrReplicationState,
CurrReplicationTasks: request.CurrReplicationTasks,
CurrTimerTasks: request.CurrTimerTasks,
CurrTransferTasks: request.CurrTransferTasks,
CurrentRunID: request.CurrentRunID,
CurrentRunNextEventID: request.CurrentRunNextEventID,

CurrentWorkflowMutation: serializedUpdateWorkflowSnapshot,

NewWorkflowSnapshot: *serializedNewWorkflowSnapshot,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,7 @@ func (s *ExecutionManagerSuiteForEventsV2) TestWorkflowResetWithCurrWithReplicat

err = s.ResetWorkflowExecution(3,
insertInfo, insertReplicationState, insertActivityInfos, insertTimerInfos, insertChildExecutionInfos, insertRequestCancelInfos, insertSignalInfos, insertSignalRequests, insertTransTasks, insertTimerTasks, insertReplicationTasks,
true, updatedInfo, updatedReplicationState, currTransTasks, currTimerTasks, info0.RunID, -1000, replicationState0.LastWriteVersion)
true, updatedInfo, updatedReplicationState, currTransTasks, currTimerTasks, info0.RunID, -1000)
s.Nil(err)

//////////////////////////////
Expand Down Expand Up @@ -1219,7 +1219,7 @@ func (s *ExecutionManagerSuiteForEventsV2) TestWorkflowResetNoCurrWithReplicate(

err = s.ResetWorkflowExecution(3,
insertInfo, insertReplicationState, insertActivityInfos, insertTimerInfos, insertChildExecutionInfos, insertRequestCancelInfos, insertSignalInfos, insertSignalRequests, insertTransTasks, insertTimerTasks, insertReplicationTasks,
false, updatedInfo, updatedReplicationState, nil, nil, info0.RunID, -1000, replicationState0.LastWriteVersion)
false, updatedInfo, updatedReplicationState, nil, nil, info0.RunID, -1000)
s.Nil(err)

//////////////////////////////
Expand Down Expand Up @@ -1490,7 +1490,7 @@ func (s *ExecutionManagerSuiteForEventsV2) TestWorkflowResetNoCurrNoReplicate()

err = s.ResetWorkflowExecution(3,
insertInfo, nil, insertActivityInfos, insertTimerInfos, nil, insertRequestCancelInfos, nil, nil, insertTransTasks, insertTimerTasks, nil,
false, updatedInfo, nil, nil, nil, info0.RunID, -1000, -1000)
false, updatedInfo, nil, nil, nil, info0.RunID, -1000)
s.Nil(err)

//////////////////////////////
Expand Down
38 changes: 21 additions & 17 deletions common/persistence/persistence-tests/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -919,28 +919,18 @@ func (s *TestBase) ResetWorkflowExecution(condition int64, info *p.WorkflowExecu
activityInfos []*p.ActivityInfo, timerInfos []*p.TimerInfo, childExecutionInfos []*p.ChildExecutionInfo,
requestCancelInfos []*p.RequestCancelInfo, signalInfos []*p.SignalInfo, ids []string, trasTasks, timerTasks, replTasks []p.Task,
updateCurr bool, currInfo *p.WorkflowExecutionInfo, currReplicationState *p.ReplicationState,
currTrasTasks, currTimerTasks []p.Task, forkRunID string, forkRunNextEventID int64, prevRunVersion int64) error {
currTrasTasks, currTimerTasks []p.Task, forkRunID string, forkRunNextEventID int64) error {

prevRunState := p.WorkflowStateCompleted
if updateCurr {
prevRunState = p.WorkflowStateRunning
}
req := &p.ResetWorkflowExecutionRequest{
RangeID: s.ShardInfo.RangeID,

return s.ExecutionManager.ResetWorkflowExecution(&p.ResetWorkflowExecutionRequest{
BaseRunID: forkRunID,
BaseRunNextEventID: forkRunNextEventID,

PrevRunVersion: prevRunVersion,
PrevRunState: prevRunState,

Condition: condition,
RangeID: s.ShardInfo.RangeID,
CurrentRunID: currInfo.RunID,
CurrentRunNextEventID: condition,

UpdateCurr: updateCurr,
CurrExecutionInfo: currInfo,
CurrReplicationState: currReplicationState,
CurrTransferTasks: currTrasTasks,
CurrTimerTasks: currTimerTasks,
CurrentWorkflowMutation: nil,

NewWorkflowSnapshot: p.WorkflowSnapshot{
ExecutionInfo: info,
Expand All @@ -958,7 +948,21 @@ func (s *TestBase) ResetWorkflowExecution(condition int64, info *p.WorkflowExecu
TimerTasks: timerTasks,
},
Encoding: pickRandomEncoding(),
})
}

if updateCurr {
req.CurrentWorkflowMutation = &p.WorkflowMutation{
ExecutionInfo: currInfo,
ReplicationState: currReplicationState,

TransferTasks: currTrasTasks,
TimerTasks: currTimerTasks,

Condition: condition,
}
}

return s.ExecutionManager.ResetWorkflowExecution(req)
}

// DeleteWorkflowExecution is a utility method to delete a workflow execution
Expand Down
17 changes: 6 additions & 11 deletions common/persistence/persistenceInterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,23 +314,18 @@ type (

// InternalResetWorkflowExecutionRequest is used to reset workflow execution state for Persistence Interface
InternalResetWorkflowExecutionRequest struct {
PrevRunVersion int64
PrevRunState int

Condition int64
RangeID int64
RangeID int64

// for base run (we need to make sure the baseRun hasn't been deleted after forking)
BaseRunID string
BaseRunNextEventID int64

// for current workflow record
CurrentRunID string
CurrentRunNextEventID int64

// for current mutable state
UpdateCurr bool
CurrExecutionInfo *InternalWorkflowExecutionInfo
CurrReplicationState *ReplicationState
CurrReplicationTasks []Task
CurrTransferTasks []Task
CurrTimerTasks []Task
CurrentWorkflowMutation *InternalWorkflowMutation

// For new mutable state
NewWorkflowSnapshot InternalWorkflowSnapshot
Expand Down
Loading

0 comments on commit b3c718b

Please sign in to comment.