Skip to content

Commit

Permalink
Fix bufferedReplicationTask for continueAsNew with eventsV2 (cadence-…
Browse files Browse the repository at this point in the history
…workflow#1320)

* add unit test

* add schema

* persistence layer

* read/write and apply bufferedReplicationTask

* add unit test
  • Loading branch information
longquanzheng authored Dec 7, 2018
1 parent fec345a commit a71807c
Show file tree
Hide file tree
Showing 12 changed files with 555 additions and 28 deletions.
10 changes: 10 additions & 0 deletions common/persistence/cassandra/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,8 @@ const (
`first_event_id: ?, ` +
`next_event_id: ?, ` +
`version: ?, ` +
`event_store_version: ?, ` +
`new_run_event_store_version: ?, ` +
`history: ` + templateSerializedEventBatch + `, ` +
`new_run_history: ` + templateSerializedEventBatch + ` ` +
`}`
Expand All @@ -293,6 +295,7 @@ const (
`first_event_id: ?, ` +
`next_event_id: ?, ` +
`version: ?, ` +
`event_store_version: ?, ` +
`history: ` + templateSerializedEventBatch + ` ` +
`}`

Expand Down Expand Up @@ -3326,6 +3329,8 @@ func (d *cassandraPersistence) updateBufferedReplicationTasks(batch *gocql.Batch
newBufferedReplicationTask.FirstEventID,
newBufferedReplicationTask.NextEventID,
newBufferedReplicationTask.Version,
newBufferedReplicationTask.EventStoreVersion,
newBufferedReplicationTask.NewRunEventStoreVersion,
newBufferedReplicationTask.History.Encoding,
int64(0),
newBufferedReplicationTask.History.Data,
Expand All @@ -3346,6 +3351,7 @@ func (d *cassandraPersistence) updateBufferedReplicationTasks(batch *gocql.Batch
newBufferedReplicationTask.FirstEventID,
newBufferedReplicationTask.NextEventID,
newBufferedReplicationTask.Version,
newBufferedReplicationTask.EventStoreVersion,
newBufferedReplicationTask.History.Encoding,
int64(0),
newBufferedReplicationTask.History.Data,
Expand Down Expand Up @@ -3802,6 +3808,10 @@ func createBufferedReplicationTaskInfo(result map[string]interface{}) *p.Interna
info.NextEventID = v.(int64)
case "version":
info.Version = v.(int64)
case "event_store_version":
info.EventStoreVersion = int32(v.(int))
case "new_run_event_store_version":
info.NewRunEventStoreVersion = int32(v.(int))
case "history":
h := v.(map[string]interface{})
info.History = createHistoryEventBatchBlob(h)
Expand Down
12 changes: 7 additions & 5 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,11 +586,13 @@ type (

// BufferedReplicationTask has details to handle out of order receive of history events
BufferedReplicationTask struct {
FirstEventID int64
NextEventID int64
Version int64
History []*workflow.HistoryEvent
NewRunHistory []*workflow.HistoryEvent
FirstEventID int64
NextEventID int64
Version int64
History []*workflow.HistoryEvent
NewRunHistory []*workflow.HistoryEvent
EventStoreVersion int32
NewRunEventStoreVersion int32
}

// CreateShardRequest is used to create a shard in executions table
Expand Down
16 changes: 10 additions & 6 deletions common/persistence/executionStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,11 @@ func (m *executionManagerImpl) DeserializeBufferedReplicationTasks(tasks map[int
return nil, err
}
b := &BufferedReplicationTask{
FirstEventID: v.FirstEventID,
NextEventID: v.NextEventID,
Version: v.Version,
FirstEventID: v.FirstEventID,
NextEventID: v.NextEventID,
Version: v.Version,
EventStoreVersion: v.EventStoreVersion,
NewRunEventStoreVersion: v.NewRunEventStoreVersion,

History: history,
NewRunHistory: newHistory,
Expand Down Expand Up @@ -348,9 +350,11 @@ func (m *executionManagerImpl) SerializeNewBufferedReplicationTask(task *Buffere
}

return &InternalBufferedReplicationTask{
FirstEventID: task.FirstEventID,
NextEventID: task.NextEventID,
Version: task.Version,
FirstEventID: task.FirstEventID,
NextEventID: task.NextEventID,
Version: task.Version,
EventStoreVersion: task.EventStoreVersion,
NewRunEventStoreVersion: task.NewRunEventStoreVersion,

History: history,
NewRunHistory: newHistory,
Expand Down
4 changes: 4 additions & 0 deletions common/persistence/persistence-tests/executionManagerTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -1864,6 +1864,8 @@ func (s *ExecutionManagerSuite) TestWorkflowMutableStateBufferedReplicationTasks
s.Equal(int64(5), bufferedTask.FirstEventID)
s.Equal(int64(7), bufferedTask.NextEventID)
s.Equal(int64(11), bufferedTask.Version)
s.Equal(int32(0), bufferedTask.EventStoreVersion)
s.Equal(int32(0), bufferedTask.NewRunEventStoreVersion)

bufferedEvents := bufferedTask.History
s.Equal(2, len(bufferedEvents))
Expand Down Expand Up @@ -1946,6 +1948,8 @@ func (s *ExecutionManagerSuite) TestWorkflowMutableStateBufferedReplicationTasks
s.Equal(int64(10), bufferedTask.FirstEventID)
s.Equal(int64(12), bufferedTask.NextEventID)
s.Equal(int64(12), bufferedTask.Version)
s.Equal(int32(0), bufferedTask.EventStoreVersion)
s.Equal(int32(0), bufferedTask.NewRunEventStoreVersion)

bufferedEvents = bufferedTask.History
s.Equal(2, len(bufferedEvents))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,3 +507,200 @@ func (s *ExecutionManagerSuiteForEventsV2) createWorkflowExecutionWithReplicatio

return response, err
}

// TestWorkflowMutableStateBufferedReplicationTasks test
func (s *ExecutionManagerSuiteForEventsV2) TestWorkflowMutableStateBufferedReplicationTasks() {
domainID := uuid.New()
workflowExecution := gen.WorkflowExecution{
WorkflowId: common.StringPtr("test-workflow-mutable-buffered-replication-tasks-test"),
RunId: common.StringPtr("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"),
}

task0, err0 := s.CreateWorkflowExecution(domainID, workflowExecution, "taskList", "wType", 20, 13, nil, 3, 0, 2, nil)
s.NoError(err0)
s.NotNil(task0, "Expected non empty task identifier.")

state0, err1 := s.GetWorkflowExecutionInfo(domainID, workflowExecution)
s.NoError(err1)
info0 := state0.ExecutionInfo
s.NotNil(info0, "Valid Workflow info expected.")
s.Equal(0, len(state0.BufferedReplicationTasks))

updatedInfo := copyWorkflowExecutionInfo(info0)
events := []*gen.HistoryEvent{
{
EventId: common.Int64Ptr(5),
EventType: gen.EventTypeDecisionTaskCompleted.Ptr(),
DecisionTaskCompletedEventAttributes: &gen.DecisionTaskCompletedEventAttributes{
ScheduledEventId: common.Int64Ptr(2),
StartedEventId: common.Int64Ptr(3),
Identity: common.StringPtr("test_worker"),
},
},
{
EventId: common.Int64Ptr(6),
EventType: gen.EventTypeTimerStarted.Ptr(),
TimerStartedEventAttributes: &gen.TimerStartedEventAttributes{
TimerId: common.StringPtr("ID1"),
StartToFireTimeoutSeconds: common.Int64Ptr(101),
DecisionTaskCompletedEventId: common.Int64Ptr(5),
},
},
}

bufferedTask := &p.BufferedReplicationTask{
FirstEventID: int64(5),
NextEventID: int64(7),
Version: int64(11),
History: events,
EventStoreVersion: p.EventStoreVersionV2,
}
err2 := s.UpdateWorklowStateAndReplication(updatedInfo, nil, bufferedTask, nil, int64(3), nil)
s.NoError(err2)

state1, err1 := s.GetWorkflowExecutionInfo(domainID, workflowExecution)
s.NoError(err1)
s.NotNil(state1, "expected valid state.")
s.Equal(1, len(state1.BufferedReplicationTasks))

bufferedTask, ok := state1.BufferedReplicationTasks[5]
s.True(ok)
s.NotNil(bufferedTask)
s.Equal(int64(5), bufferedTask.FirstEventID)
s.Equal(int64(7), bufferedTask.NextEventID)
s.Equal(int64(11), bufferedTask.Version)
s.Equal(int32(p.EventStoreVersionV2), bufferedTask.EventStoreVersion)
s.Equal(int32(0), bufferedTask.NewRunEventStoreVersion)

bufferedEvents := bufferedTask.History
s.Equal(2, len(bufferedEvents))
s.Equal(int64(5), bufferedEvents[0].GetEventId())
s.Equal(gen.EventTypeDecisionTaskCompleted, bufferedEvents[0].GetEventType())
s.Equal(int64(2), bufferedEvents[0].DecisionTaskCompletedEventAttributes.GetScheduledEventId())
s.Equal(int64(3), bufferedEvents[0].DecisionTaskCompletedEventAttributes.GetStartedEventId())
s.Equal("test_worker", bufferedEvents[0].DecisionTaskCompletedEventAttributes.GetIdentity())
s.Equal(int64(6), bufferedEvents[1].GetEventId())
s.Equal(gen.EventTypeTimerStarted, bufferedEvents[1].GetEventType())
s.Equal("ID1", bufferedEvents[1].TimerStartedEventAttributes.GetTimerId())
s.Equal(int64(101), bufferedEvents[1].TimerStartedEventAttributes.GetStartToFireTimeoutSeconds())
s.Equal(int64(5), bufferedEvents[1].TimerStartedEventAttributes.GetDecisionTaskCompletedEventId())

newExecutionRunID := "d83db48f-a63c-413d-a05a-bbf5a1ac1098"
info1 := state1.ExecutionInfo
updatedInfo = copyWorkflowExecutionInfo(info1)
completionEvents := []*gen.HistoryEvent{
{
EventId: common.Int64Ptr(10),
EventType: gen.EventTypeDecisionTaskCompleted.Ptr(),
DecisionTaskCompletedEventAttributes: &gen.DecisionTaskCompletedEventAttributes{
ScheduledEventId: common.Int64Ptr(8),
StartedEventId: common.Int64Ptr(9),
Identity: common.StringPtr("test_worker"),
},
},
{
EventId: common.Int64Ptr(11),
EventType: gen.EventTypeWorkflowExecutionContinuedAsNew.Ptr(),
WorkflowExecutionContinuedAsNewEventAttributes: &gen.WorkflowExecutionContinuedAsNewEventAttributes{
NewExecutionRunId: common.StringPtr(newExecutionRunID),
WorkflowType: &gen.WorkflowType{Name: common.StringPtr("wType")},
TaskList: &gen.TaskList{Name: common.StringPtr("taskList")},
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(212),
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(312),
DecisionTaskCompletedEventId: common.Int64Ptr(10),
},
},
}

newRunEvents := []*gen.HistoryEvent{
{
EventId: common.Int64Ptr(1),
EventType: gen.EventTypeWorkflowExecutionStarted.Ptr(),
WorkflowExecutionStartedEventAttributes: &gen.WorkflowExecutionStartedEventAttributes{
WorkflowType: &gen.WorkflowType{Name: common.StringPtr("wType")},
TaskList: &gen.TaskList{Name: common.StringPtr("taskList")},
},
},
{
EventId: common.Int64Ptr(2),
EventType: gen.EventTypeDecisionTaskScheduled.Ptr(),
DecisionTaskScheduledEventAttributes: &gen.DecisionTaskScheduledEventAttributes{
TaskList: &gen.TaskList{Name: common.StringPtr("taskList")},
StartToCloseTimeoutSeconds: common.Int32Ptr(201),
Attempt: common.Int64Ptr(1),
},
},
}

bufferedTask = &p.BufferedReplicationTask{
FirstEventID: int64(10),
NextEventID: int64(12),
Version: int64(12),
History: completionEvents,
NewRunHistory: newRunEvents,
EventStoreVersion: p.EventStoreVersionV2,
NewRunEventStoreVersion: p.EventStoreVersionV2,
}
err3 := s.UpdateWorklowStateAndReplication(updatedInfo, nil, bufferedTask, nil, int64(3), nil)
s.NoError(err3)

state2, err4 := s.GetWorkflowExecutionInfo(domainID, workflowExecution)
s.NoError(err4)
s.NotNil(state2, "expected valid state.")
s.Equal(2, len(state2.BufferedReplicationTasks))

bufferedTask, ok = state2.BufferedReplicationTasks[10]
s.True(ok)
s.NotNil(bufferedTask)
s.Equal(int64(10), bufferedTask.FirstEventID)
s.Equal(int64(12), bufferedTask.NextEventID)
s.Equal(int64(12), bufferedTask.Version)
s.Equal(int32(p.EventStoreVersionV2), bufferedTask.EventStoreVersion)
s.Equal(int32(p.EventStoreVersionV2), bufferedTask.NewRunEventStoreVersion)

bufferedEvents = bufferedTask.History
s.Equal(2, len(bufferedEvents))
s.Equal(int64(10), bufferedEvents[0].GetEventId())
s.Equal(gen.EventTypeDecisionTaskCompleted, bufferedEvents[0].GetEventType())
s.Equal(int64(8), bufferedEvents[0].DecisionTaskCompletedEventAttributes.GetScheduledEventId())
s.Equal(int64(9), bufferedEvents[0].DecisionTaskCompletedEventAttributes.GetStartedEventId())
s.Equal("test_worker", bufferedEvents[0].DecisionTaskCompletedEventAttributes.GetIdentity())
s.Equal(int64(11), bufferedEvents[1].GetEventId())
s.Equal(gen.EventTypeWorkflowExecutionContinuedAsNew, bufferedEvents[1].GetEventType())
s.Equal(newExecutionRunID, bufferedEvents[1].WorkflowExecutionContinuedAsNewEventAttributes.GetNewExecutionRunId())
s.Equal("wType", bufferedEvents[1].WorkflowExecutionContinuedAsNewEventAttributes.WorkflowType.GetName())
s.Equal("taskList", bufferedEvents[1].WorkflowExecutionContinuedAsNewEventAttributes.TaskList.GetName())
s.Equal(int32(212), bufferedEvents[1].WorkflowExecutionContinuedAsNewEventAttributes.GetTaskStartToCloseTimeoutSeconds())
s.Equal(int32(312), bufferedEvents[1].WorkflowExecutionContinuedAsNewEventAttributes.GetExecutionStartToCloseTimeoutSeconds())
s.Equal(int64(10), bufferedEvents[1].WorkflowExecutionContinuedAsNewEventAttributes.GetDecisionTaskCompletedEventId())

bufferedNewRunEvents := bufferedTask.NewRunHistory
s.Equal(2, len(bufferedNewRunEvents))
s.Equal(int64(1), bufferedNewRunEvents[0].GetEventId())
s.Equal(gen.EventTypeWorkflowExecutionStarted, bufferedNewRunEvents[0].GetEventType())
s.Equal("wType", bufferedNewRunEvents[0].WorkflowExecutionStartedEventAttributes.WorkflowType.GetName())
s.Equal("taskList", bufferedNewRunEvents[0].WorkflowExecutionStartedEventAttributes.TaskList.GetName())
s.Equal(int64(2), bufferedNewRunEvents[1].GetEventId())
s.Equal(gen.EventTypeDecisionTaskScheduled, bufferedNewRunEvents[1].GetEventType())
s.Equal("taskList", bufferedNewRunEvents[1].DecisionTaskScheduledEventAttributes.TaskList.GetName())
s.Equal(int32(201), bufferedNewRunEvents[1].DecisionTaskScheduledEventAttributes.GetStartToCloseTimeoutSeconds())
s.Equal(int64(1), bufferedNewRunEvents[1].DecisionTaskScheduledEventAttributes.GetAttempt())

deleteBufferedReplicationTask := int64(5)
err5 := s.UpdateWorklowStateAndReplication(updatedInfo, nil, nil, &deleteBufferedReplicationTask, int64(3), nil)
s.NoError(err5)

state3, err6 := s.GetWorkflowExecutionInfo(domainID, workflowExecution)
s.NoError(err6)
s.NotNil(state3, "expected valid state.")
s.Equal(1, len(state3.BufferedReplicationTasks))

deleteBufferedReplicationTask2 := int64(10)
err7 := s.UpdateWorklowStateAndReplication(updatedInfo, nil, nil, &deleteBufferedReplicationTask2, int64(3), nil)
s.NoError(err7)

state4, err8 := s.GetWorkflowExecutionInfo(domainID, workflowExecution)
s.NoError(err8)
s.NotNil(state4, "expected valid state.")
s.Equal(0, len(state4.BufferedReplicationTasks))
}
12 changes: 7 additions & 5 deletions common/persistence/persistenceInterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,13 @@ type (

// InternalBufferedReplicationTask has details to handle out of order receive of history events for Persistence Interface
InternalBufferedReplicationTask struct {
FirstEventID int64
NextEventID int64
Version int64
History *DataBlob
NewRunHistory *DataBlob
FirstEventID int64
NextEventID int64
Version int64
History *DataBlob
NewRunHistory *DataBlob
EventStoreVersion int32
NewRunEventStoreVersion int32
}

// InternalUpdateWorkflowExecutionRequest is used to update a workflow execution for Persistence Interface
Expand Down
2 changes: 2 additions & 0 deletions schema/cassandra/cadence/schema.cql
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,8 @@ CREATE TYPE buffered_replication_task_info (
version bigint,
history frozen<serialized_event_batch>,
new_run_history frozen<serialized_event_batch>,
event_store_version int, -- indiciates which version of event store to query
new_run_event_store_version int, -- indiciates which version of event store to query for new run(continueAsNew)
);

-- for history v2 events
Expand Down
4 changes: 3 additions & 1 deletion schema/cassandra/cadence/versioned/v0.12/events_v2.cql
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,6 @@ ALTER TYPE workflow_execution ADD branch_token blob;
ALTER TYPE replication_task ADD event_store_version int;
ALTER TYPE replication_task ADD branch_token blob;
ALTER TYPE replication_task ADD new_run_event_store_version int;
ALTER TYPE replication_task ADD new_run_branch_token blob;
ALTER TYPE replication_task ADD new_run_branch_token blob;
ALTER TYPE buffered_replication_task_info ADD event_store_version int;
ALTER TYPE buffered_replication_task_info ADD new_run_event_store_version int;
18 changes: 10 additions & 8 deletions service/history/historyReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,14 +657,16 @@ func (r *historyReplicator) FlushBuffer(ctx context.Context, context *workflowEx

sourceCluster := r.clusterMetadata.ClusterNameForFailoverVersion(bt.Version)
req := &h.ReplicateEventsRequest{
SourceCluster: common.StringPtr(sourceCluster),
DomainUUID: common.StringPtr(domainID),
WorkflowExecution: &execution,
FirstEventId: common.Int64Ptr(bt.FirstEventID),
NextEventId: common.Int64Ptr(bt.NextEventID),
Version: common.Int64Ptr(bt.Version),
History: &workflow.History{Events: bt.History},
NewRunHistory: &workflow.History{Events: bt.NewRunHistory},
SourceCluster: common.StringPtr(sourceCluster),
DomainUUID: common.StringPtr(domainID),
WorkflowExecution: &execution,
FirstEventId: common.Int64Ptr(bt.FirstEventID),
NextEventId: common.Int64Ptr(bt.NextEventID),
Version: common.Int64Ptr(bt.Version),
History: &workflow.History{Events: bt.History},
NewRunHistory: &workflow.History{Events: bt.NewRunHistory},
EventStoreVersion: &bt.EventStoreVersion,
NewRunEventStoreVersion: &bt.NewRunEventStoreVersion,
}

// Apply replication task to workflow execution
Expand Down
Loading

0 comments on commit a71807c

Please sign in to comment.