Skip to content

Commit

Permalink
Clear buffers on resetting of mutable state (cadence-workflow#978)
Browse files Browse the repository at this point in the history
* Clear buffers on resetting of mutable state

fixes cadence-workflow#841
  • Loading branch information
samarabbas authored and wxing1292 committed Jul 19, 2018
1 parent 0a3ff55 commit ddd0307
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 0 deletions.
36 changes: 36 additions & 0 deletions common/persistence/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,17 @@ const (
`and task_id = ? ` +
`IF next_event_id = ?`

templateClearBufferedReplicationTaskQuery = `UPDATE executions ` +
`SET buffered_replication_tasks_map = {} ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and domain_id = ? ` +
`and workflow_id = ? ` +
`and run_id = ? ` +
`and visibility_ts = ? ` +
`and task_id = ? ` +
`IF next_event_id = ?`

templateDeleteWorkflowExecutionMutableStateQuery = `DELETE FROM executions ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
Expand Down Expand Up @@ -1732,6 +1743,8 @@ func (d *cassandraPersistence) ResetMutableState(request *ResetMutableStateReque
d.resetSignalRequested(batch, request.InsertSignalRequestedIDs, executionInfo.DomainID, executionInfo.WorkflowID,
executionInfo.RunID, request.Condition)

d.resetBufferedEvents(batch, executionInfo.DomainID, executionInfo.WorkflowID, executionInfo.RunID, request.Condition)

// Verifies that the RangeID has not changed
batch.Query(templateUpdateLeaseQuery,
request.RangeID,
Expand Down Expand Up @@ -2648,6 +2661,29 @@ func (d *cassandraPersistence) updateActivityInfos(batch *gocql.Batch, activityI
}
}

func (d *cassandraPersistence) resetBufferedEvents(batch *gocql.Batch, domainID, workflowID, runID string,
condition int64) {
batch.Query(templateDeleteBufferedEventsQuery,
d.shardID,
rowTypeExecution,
domainID,
workflowID,
runID,
defaultVisibilityTimestamp,
rowTypeExecutionTaskID,
condition)

batch.Query(templateClearBufferedReplicationTaskQuery,
d.shardID,
rowTypeExecution,
domainID,
workflowID,
runID,
defaultVisibilityTimestamp,
rowTypeExecutionTaskID,
condition)
}

func (d *cassandraPersistence) resetActivityInfos(batch *gocql.Batch, activityInfos []*ActivityInfo, domainID,
workflowID, runID string, condition int64) {
batch.Query(templateResetActivityInfoQuery,
Expand Down
70 changes: 70 additions & 0 deletions common/persistence/cassandraPersistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2002,6 +2002,52 @@ func (s *cassandraPersistenceSuite) TestResetMutableState() {
updatedInfo.LastProcessedEvent = int64(2)
currentTime := time.Now().UTC()
expiryTime := currentTime.Add(10 * time.Second)
eventsBatch1 := []*gen.HistoryEvent{
&gen.HistoryEvent{
EventId: common.Int64Ptr(5),
EventType: gen.EventTypeDecisionTaskCompleted.Ptr(),
Version: common.Int64Ptr(11),
DecisionTaskCompletedEventAttributes: &gen.DecisionTaskCompletedEventAttributes{
ScheduledEventId: common.Int64Ptr(2),
StartedEventId: common.Int64Ptr(3),
Identity: common.StringPtr("test_worker"),
},
},
&gen.HistoryEvent{
EventId: common.Int64Ptr(6),
EventType: gen.EventTypeTimerStarted.Ptr(),
Version: common.Int64Ptr(11),
TimerStartedEventAttributes: &gen.TimerStartedEventAttributes{
TimerId: common.StringPtr("ID1"),
StartToFireTimeoutSeconds: common.Int64Ptr(101),
DecisionTaskCompletedEventId: common.Int64Ptr(5),
},
},
}
bufferedTask1 := &BufferedReplicationTask{
FirstEventID: int64(5),
NextEventID: int64(7),
Version: int64(11),
History: s.serializeHistoryEvents(eventsBatch1),
}

eventsBatch2 := []*gen.HistoryEvent{
&gen.HistoryEvent{
EventId: common.Int64Ptr(21),
EventType: gen.EventTypeTimerFired.Ptr(),
Version: common.Int64Ptr(12),
TimerFiredEventAttributes: &gen.TimerFiredEventAttributes{
TimerId: common.StringPtr("2"),
StartedEventId: common.Int64Ptr(3),
},
},
}
bufferedTask2 := &BufferedReplicationTask{
FirstEventID: int64(21),
NextEventID: int64(22),
Version: int64(12),
History: s.serializeHistoryEvents(eventsBatch2),
}
updatedState := &WorkflowMutableState{
ExecutionInfo: updatedInfo,
ActivitInfos: map[int64]*ActivityInfo{
Expand Down Expand Up @@ -2100,6 +2146,24 @@ func (s *cassandraPersistenceSuite) TestResetMutableState() {
err2 := s.UpdateAllMutableState(updatedState, int64(3))
s.Nil(err2, "No error expected.")

partialState, err2 := s.GetWorkflowExecutionInfo(domainID, workflowExecution)
s.Nil(err2, "No error expected.")
s.NotNil(partialState, "expected valid state.")
partialInfo := partialState.ExecutionInfo
s.NotNil(partialInfo, "Valid Workflow info expected.")

bufferUpdateInfo := copyWorkflowExecutionInfo(partialInfo)
err2 = s.UpdateWorklowStateAndReplication(bufferUpdateInfo, nil, bufferedTask1, nil, bufferUpdateInfo.NextEventID, nil)
s.Nil(err2, "No error expected.")
err2 = s.UpdateWorklowStateAndReplication(bufferUpdateInfo, nil, bufferedTask2, nil, bufferUpdateInfo.NextEventID, nil)
s.Nil(err2, "No error expected.")
err2 = s.UpdateWorkflowExecutionForBufferEvents(bufferUpdateInfo, nil, bufferUpdateInfo.NextEventID,
s.serializeHistoryEvents(eventsBatch1))
s.Nil(err2, "No error expected.")
err2 = s.UpdateWorkflowExecutionForBufferEvents(bufferUpdateInfo, nil, bufferUpdateInfo.NextEventID,
s.serializeHistoryEvents(eventsBatch2))
s.Nil(err2, "No error expected.")

state1, err1 := s.GetWorkflowExecutionInfo(domainID, workflowExecution)
s.Nil(err1, "No error expected.")
s.NotNil(state1, "expected valid state.")
Expand Down Expand Up @@ -2195,6 +2259,9 @@ func (s *cassandraPersistenceSuite) TestResetMutableState() {
_, contains = state1.SignalRequestedIDs["00000000-0000-0000-0000-000000000003"]
s.True(contains)

s.Equal(2, len(state1.BufferedReplicationTasks))
s.Equal(2, len(state1.BufferedEvents))

updatedInfo1 := copyWorkflowExecutionInfo(info1)
updatedInfo1.NextEventID = int64(3)
resetActivityInfos := []*ActivityInfo{
Expand Down Expand Up @@ -2354,6 +2421,9 @@ func (s *cassandraPersistenceSuite) TestResetMutableState() {
s.Equal([]byte("signal_control_c"), si.Control)

s.Equal(0, len(state4.SignalRequestedIDs))

s.Equal(0, len(state4.BufferedReplicationTasks))
s.Equal(0, len(state4.BufferedEvents))
}

func copyWorkflowExecutionInfo(sourceInfo *WorkflowExecutionInfo) *WorkflowExecutionInfo {
Expand Down
13 changes: 13 additions & 0 deletions common/persistence/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,19 @@ func (s *TestBase) UpdateWorkflowExecutionForSignal(
})
}

// UpdateWorkflowExecutionForSignal is a utility method to update workflow execution
func (s *TestBase) UpdateWorkflowExecutionForBufferEvents(
updatedInfo *WorkflowExecutionInfo, rState *ReplicationState, condition int64,
bufferEvents *SerializedHistoryEventBatch) error {
return s.WorkflowMgr.UpdateWorkflowExecution(&UpdateWorkflowExecutionRequest{
ExecutionInfo: updatedInfo,
ReplicationState: rState,
NewBufferedEvents: bufferEvents,
Condition: condition,
RangeID: s.ShardInfo.RangeID,
})
}

// UpdateAllMutableState is a utility method to update workflow execution
func (s *TestBase) UpdateAllMutableState(updatedMutableState *WorkflowMutableState, condition int64) error {
var aInfos []*ActivityInfo
Expand Down

0 comments on commit ddd0307

Please sign in to comment.