Skip to content

Commit

Permalink
Workflow Reset XDC implementation (cadence-workflow#1377)
Browse files Browse the repository at this point in the history
* Implement reset xdc

Add checking for forkRun not deleted

fix comment

fix code change back

implement reset without termination

fix

remove unused code

fix

check bugs for checking existence

fix checking

fix comment

stash

fix persistence unit test

fix

fix

add forkEventVersion

add forkEventVersion

simplfy replication for reset

fix unit test

Add unit test for replicator

fix rep task order

fix unit test

two unit tests for xdc

one more unit test for reset with repl

remove unused schema

revert previous unused change for reset

fix

finish unit test for reset xdc

* fix fmt

* fix lint

* address comments

* address comments: passing in prevRunState

* change forkRun to baseRun as better names
  • Loading branch information
longquanzheng authored Jan 15, 2019
1 parent b1008ed commit 108981c
Show file tree
Hide file tree
Showing 24 changed files with 3,352 additions and 222 deletions.
4 changes: 2 additions & 2 deletions .gen/go/shared/idl.go

Large diffs are not rendered by default.

69 changes: 54 additions & 15 deletions .gen/go/shared/types.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

94 changes: 62 additions & 32 deletions common/persistence/cassandra/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,6 @@ const (
`event_store_version: ?, ` +
`branch_token: ?, ` +
`reset_workflow: ?, ` +
`new_run_first_event_id: ?,` +
`new_run_next_event_id: ?,` +
`new_run_event_store_version: ?, ` +
`new_run_branch_token: ? ` +
`}`
Expand Down Expand Up @@ -1872,25 +1870,67 @@ func (d *cassandraPersistence) ResetWorkflowExecution(request *p.InternalResetWo
startVersion = insertReplicationState.StartVersion
lastWriteVersion = insertReplicationState.LastWriteVersion
}
batch.Query(templateUpdateCurrentWorkflowExecutionQuery,
insertExecutionInfo.RunID,
insertExecutionInfo.RunID,
insertExecutionInfo.CreateRequestID,
insertExecutionInfo.State,
insertExecutionInfo.CloseStatus,
startVersion,
lastWriteVersion,
lastWriteVersion,
insertExecutionInfo.State,
d.shardID,
rowTypeExecution,
insertExecutionInfo.DomainID,
insertExecutionInfo.WorkflowID,
permanentRunID,
defaultVisibilityTimestamp,
rowTypeExecutionTaskID,
request.PrevRunID,
)

if currReplicationState == nil {
batch.Query(templateUpdateCurrentWorkflowExecutionQuery,
insertExecutionInfo.RunID,
insertExecutionInfo.RunID,
insertExecutionInfo.CreateRequestID,
insertExecutionInfo.State,
insertExecutionInfo.CloseStatus,
startVersion,
lastWriteVersion,
lastWriteVersion,
insertExecutionInfo.State,
d.shardID,
rowTypeExecution,
insertExecutionInfo.DomainID,
insertExecutionInfo.WorkflowID,
permanentRunID,
defaultVisibilityTimestamp,
rowTypeExecutionTaskID,
currExecutionInfo.RunID,
)
} else {
batch.Query(templateUpdateCurrentWorkflowExecutionForNewQuery,
insertExecutionInfo.RunID,
insertExecutionInfo.RunID,
insertExecutionInfo.CreateRequestID,
insertExecutionInfo.State,
insertExecutionInfo.CloseStatus,
startVersion,
lastWriteVersion,
lastWriteVersion,
insertExecutionInfo.State,
d.shardID,
rowTypeExecution,
insertExecutionInfo.DomainID,
insertExecutionInfo.WorkflowID,
permanentRunID,
defaultVisibilityTimestamp,
rowTypeExecutionTaskID,
currExecutionInfo.RunID,
request.PrevRunVersion,
request.PrevRunState,
)
}

// for forkRun, check condition without updating anything to make sure the forkRun hasn't been deleted.
// Without this check, it will run into race condition with deleteHistoryEvent timer task
// we only do it when forkRun != currentRun
if request.BaseRunID != currExecutionInfo.RunID {
batch.Query(templateCheckWorkflowExecutionQuery,
request.BaseRunNextEventID,
d.shardID,
rowTypeExecution,
currExecutionInfo.DomainID,
currExecutionInfo.WorkflowID,
request.BaseRunID,
defaultVisibilityTimestamp,
rowTypeExecutionTaskID,
request.BaseRunNextEventID,
)
}

if request.UpdateCurr {
d.updateMutableState(batch, currExecutionInfo, currReplicationState, cqlNowTimestamp, true, request.Condition)
Expand Down Expand Up @@ -1993,7 +2033,7 @@ func (d *cassandraPersistence) ResetWorkflowExecution(request *p.InternalResetWo
}

if !applied {
return d.getExecutionConditionalUpdateFailure(previous, iter, currExecutionInfo.RunID, request.Condition, request.RangeID, request.PrevRunID)
return d.getExecutionConditionalUpdateFailure(previous, iter, currExecutionInfo.RunID, request.Condition, request.RangeID, currExecutionInfo.RunID)
}

return nil
Expand Down Expand Up @@ -2908,8 +2948,6 @@ func (d *cassandraPersistence) createReplicationTasks(batch *gocql.Batch, replic
var eventStoreVersion, newRunEventStoreVersion int32
var branchToken, newRunBranchToken []byte
resetWorkflow := false
newRunFirstEventID := common.EmptyEventID
newRunNextEventID := common.EmptyEventID

switch task.GetType() {
case p.ReplicationTaskTypeHistory:
Expand All @@ -2925,8 +2963,6 @@ func (d *cassandraPersistence) createReplicationTasks(batch *gocql.Batch, replic
for k, v := range histTask.LastReplicationInfo {
lastReplicationInfo[k] = createReplicationInfoMap(v)
}
newRunFirstEventID = histTask.NewRunFirstEventID
newRunNextEventID = histTask.NewRunNextEventID
resetWorkflow = histTask.ResetWorkflow
case p.ReplicationTaskTypeSyncActivity:
version = task.GetVersion()
Expand Down Expand Up @@ -2956,8 +2992,6 @@ func (d *cassandraPersistence) createReplicationTasks(batch *gocql.Batch, replic
eventStoreVersion,
branchToken,
resetWorkflow,
newRunFirstEventID,
newRunNextEventID,
newRunEventStoreVersion,
newRunBranchToken,
defaultVisibilityTimestamp,
Expand Down Expand Up @@ -3745,10 +3779,6 @@ func createReplicationTaskInfo(result map[string]interface{}) *p.ReplicationTask
info.BranchToken = v.([]byte)
case "reset_workflow":
info.ResetWorkflow = v.(bool)
case "new_run_first_event_id":
info.NewRunFirstEventID = v.(int64)
case "new_run_next_event_id":
info.NewRunNextEventID = v.(int64)
case "new_run_event_store_version":
info.NewRunEventStoreVersion = int32(v.(int))
case "new_run_branch_token":
Expand Down
18 changes: 11 additions & 7 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,6 @@ type (
NewRunEventStoreVersion int32
NewRunBranchToken []byte
ResetWorkflow bool
NewRunFirstEventID int64
NewRunNextEventID int64
}

// TimerTaskInfo describes a timer task.
Expand Down Expand Up @@ -497,8 +495,6 @@ type (
ResetWorkflow bool
NewRunEventStoreVersion int32
NewRunBranchToken []byte
NewRunFirstEventID int64
NewRunNextEventID int64
}

// SyncActivityTask is the replication task created for shipping activity info to other clusters
Expand Down Expand Up @@ -764,11 +760,19 @@ type (

// ResetWorkflowExecutionRequest is used to reset workflow execution state for current run and create new run
ResetWorkflowExecutionRequest struct {
PrevRunID string
Condition int64
RangeID int64
// for base run (we need to make sure the baseRun hasn't been deleted after forking)
BaseRunID string
BaseRunNextEventID int64

// for current workflow record
PrevRunVersion int64
PrevRunState int

// for shard record
RangeID int64

// for current mutable state
Condition int64
UpdateCurr bool
CurrExecutionInfo *WorkflowExecutionInfo
CurrReplicationState *ReplicationState
Expand Down
7 changes: 6 additions & 1 deletion common/persistence/executionStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,10 +555,15 @@ func (m *executionManagerImpl) ResetWorkflowExecution(request *ResetWorkflowExec
}

newRequest := &InternalResetWorkflowExecutionRequest{
PrevRunID: request.PrevRunID,
PrevRunVersion: request.PrevRunVersion,
PrevRunState: request.PrevRunState,

Condition: request.Condition,
RangeID: request.RangeID,

BaseRunID: request.BaseRunID,
BaseRunNextEventID: request.BaseRunNextEventID,

UpdateCurr: request.UpdateCurr,
CurrExecutionInfo: currExecution,
CurrReplicationState: request.CurrReplicationState,
Expand Down
Loading

0 comments on commit 108981c

Please sign in to comment.