Skip to content

Commit

Permalink
Refactor workflow creation logic for code reusability & maintainabili…
Browse files Browse the repository at this point in the history
…ty (cadence-workflow#1780)

* Refactor active workflow creation logic
* Refactor passive workflow creation logic
  • Loading branch information
wxing1292 authored May 4, 2019
1 parent 49d329d commit 571a2bc
Show file tree
Hide file tree
Showing 16 changed files with 583 additions and 500 deletions.
12 changes: 9 additions & 3 deletions common/persistence/cassandra/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -1253,10 +1253,10 @@ func (d *cassandraPersistence) CreateWorkflowExecutionWithinBatch(request *p.Int
initiatedID := emptyInitiatedID
state := p.WorkflowStateRunning
closeStatus := p.WorkflowCloseStatusNone
if request.ParentExecution != nil {
if request.ParentDomainID != "" {
parentDomainID = request.ParentDomainID
parentWorkflowID = *request.ParentExecution.WorkflowId
parentRunID = *request.ParentExecution.RunId
parentWorkflowID = request.ParentExecution.GetWorkflowId()
parentRunID = request.ParentExecution.GetRunId()
initiatedID = request.InitiatedID
state = p.WorkflowStateCreated
}
Expand Down Expand Up @@ -3740,10 +3740,16 @@ func createWorkflowExecutionInfo(result map[string]interface{}) *p.InternalWorkf
info.RunID = v.(gocql.UUID).String()
case "parent_domain_id":
info.ParentDomainID = v.(gocql.UUID).String()
if info.ParentDomainID == emptyDomainID {
info.ParentDomainID = ""
}
case "parent_workflow_id":
info.ParentWorkflowID = v.(string)
case "parent_run_id":
info.ParentRunID = v.(gocql.UUID).String()
if info.ParentRunID == emptyRunID {
info.ParentRunID = ""
}
case "initiated_id":
info.InitiatedID = v.(int64)
case "completion_event_batch_id":
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ type (
DomainID string
Execution workflow.WorkflowExecution
ParentDomainID string
ParentExecution *workflow.WorkflowExecution
ParentExecution workflow.WorkflowExecution
InitiatedID int64
TaskList string
WorkflowTypeName string
Expand Down
12 changes: 6 additions & 6 deletions common/persistence/persistence-tests/executionManagerTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ func (s *ExecutionManagerSuite) TestGetWorkflow() {
RunId: common.StringPtr(uuid.New()),
},
ParentDomainID: uuid.New(),
ParentExecution: &gen.WorkflowExecution{
ParentExecution: gen.WorkflowExecution{
WorkflowId: common.StringPtr("get-workflow-test-parent"),
RunId: common.StringPtr(uuid.New()),
},
Expand Down Expand Up @@ -575,11 +575,11 @@ func (s *ExecutionManagerSuite) TestGetWorkflow() {
s.NotNil(info, "Valid Workflow response expected.")
s.Equal(createReq.RequestID, info.CreateRequestID)
s.Equal(createReq.DomainID, info.DomainID)
s.Equal(*createReq.Execution.WorkflowId, info.WorkflowID)
s.Equal(*createReq.Execution.RunId, info.RunID)
s.Equal(createReq.Execution.GetWorkflowId(), info.WorkflowID)
s.Equal(createReq.Execution.GetRunId(), info.RunID)
s.Equal(createReq.ParentDomainID, info.ParentDomainID)
s.Equal(*createReq.ParentExecution.WorkflowId, info.ParentWorkflowID)
s.Equal(*createReq.ParentExecution.RunId, info.ParentRunID)
s.Equal(createReq.ParentExecution.GetWorkflowId(), info.ParentWorkflowID)
s.Equal(createReq.ParentExecution.GetRunId(), info.ParentRunID)
s.Equal(createReq.InitiatedID, info.InitiatedID)
s.Equal(createReq.TaskList, info.TaskList)
s.Equal(createReq.WorkflowTypeName, info.WorkflowTypeName)
Expand Down Expand Up @@ -1872,7 +1872,7 @@ func (s *ExecutionManagerSuite) TestWorkflowMutableStateChildExecutions() {
}

parentDomainID := "6036ded3-e541-42c9-8f69-3d9354dad081"
parentExecution := &gen.WorkflowExecution{
parentExecution := gen.WorkflowExecution{
WorkflowId: common.StringPtr("test-workflow-mutable-child-executions-child-test"),
RunId: common.StringPtr("73e89362-25ec-4305-bcb8-d9448b90856c"),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ func (s *TestBase) CreateWorkflowExecutionManyTasks(domainID string, workflowExe

// CreateChildWorkflowExecution is a utility method to create child workflow executions
func (s *TestBase) CreateChildWorkflowExecution(domainID string, workflowExecution workflow.WorkflowExecution,
parentDomainID string, parentExecution *workflow.WorkflowExecution, initiatedID int64, taskList, wType string,
parentDomainID string, parentExecution workflow.WorkflowExecution, initiatedID int64, taskList, wType string,
wTimeout int32, decisionTimeout int32, executionContext []byte, nextEventID int64, lastProcessedEventID int64,
decisionScheduleID int64, timerTasks []p.Task) (*p.CreateWorkflowExecutionResponse, error) {
response, err := s.ExecutionManager.CreateWorkflowExecution(&p.CreateWorkflowExecutionRequest{
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/persistenceInterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ type (
DomainID string
Execution workflow.WorkflowExecution
ParentDomainID string
ParentExecution *workflow.WorkflowExecution
ParentExecution workflow.WorkflowExecution
InitiatedID int64
TaskList string
WorkflowTypeName string
Expand Down
4 changes: 2 additions & 2 deletions common/persistence/sql/sqlExecutionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1432,7 +1432,7 @@ func createExecutionFromRequest(
info.LastReplicationInfo[k] = &sqlblobs.ReplicationInfo{Version: &v.Version, LastEventID: &v.LastEventID}
}
}
if request.ParentExecution != nil {
if request.ParentDomainID != "" {
info.InitiatedID = &request.InitiatedID
info.ParentDomainID = sqldb.MustParseUUID(request.ParentDomainID)
info.ParentWorkflowID = request.ParentExecution.WorkflowId
Expand Down Expand Up @@ -1484,7 +1484,7 @@ func createOrUpdateCurrentExecution(
row.StartVersion = replicationState.StartVersion
row.LastWriteVersion = replicationState.LastWriteVersion
}
if request.ParentExecution != nil {
if request.ParentDomainID != "" {
row.State = p.WorkflowStateCreated
}

Expand Down
10 changes: 5 additions & 5 deletions service/history/MockMutableState.go
Original file line number Diff line number Diff line change
Expand Up @@ -934,15 +934,15 @@ func (_m *mockMutableState) CreateNewHistoryEventWithTimestamp(eventType shared.
}

// CreateReplicationTask provides a mock function with given fields:
func (_m *mockMutableState) CreateReplicationTask(_a0 int32, _a1 []byte) *persistence.HistoryReplicationTask {
func (_m *mockMutableState) CreateReplicationTask(_a0 []persistence.Task, _a1 int32, _a2 []byte) []persistence.Task {
ret := _m.Called(_a0, _a1)

var r0 *persistence.HistoryReplicationTask
if rf, ok := ret.Get(0).(func(_a0 int32, _a1 []byte) *persistence.HistoryReplicationTask); ok {
r0 = rf(_a0, _a1)
var r0 []persistence.Task
if rf, ok := ret.Get(0).(func(_a0 []persistence.Task, _a1 int32, _a2 []byte) []persistence.Task); ok {
r0 = rf(_a0, _a1, _a2)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*persistence.HistoryReplicationTask)
r0 = ret.Get(0).([]persistence.Task)
}
}

Expand Down
46 changes: 40 additions & 6 deletions service/history/MockWorkflowExecutionContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,39 @@ type mockWorkflowExecutionContext struct {

var _ workflowExecutionContext = (*mockWorkflowExecutionContext)(nil)

func (_m *mockWorkflowExecutionContext) appendHistoryEvents(_a0 []*workflow.HistoryEvent, _a1 int64, _a2 bool) (int, error) {
ret := _m.Called(_a0, _a1, _a2)
func (_m *mockWorkflowExecutionContext) appendFirstBatchEventsForActive(_a0 mutableState) (int, error) {
ret := _m.Called(_a0)

var r0 int
if rf, ok := ret.Get(0).(func([]*workflow.HistoryEvent, int64, bool) int); ok {
r0 = rf(_a0, _a1, _a2)
if rf, ok := ret.Get(0).(func(mutableState) int); ok {
r0 = rf(_a0)
} else {
r0 = ret.Get(0).(int)
}

var r1 error
if rf, ok := ret.Get(1).(func([]*workflow.HistoryEvent, int64, bool) error); ok {
r1 = rf(_a0, _a1, _a2)
if rf, ok := ret.Get(1).(func(mutableState) error); ok {
r1 = rf(_a0)
} else {
r1 = ret.Error(1)
}

return r0, r1
}

func (_m *mockWorkflowExecutionContext) appendFirstBatchEventsForStandby(_a0 mutableState, _a1 []*workflow.HistoryEvent) (int, error) {
ret := _m.Called(_a0, _a1)

var r0 int
if rf, ok := ret.Get(0).(func(mutableState, []*workflow.HistoryEvent) int); ok {
r0 = rf(_a0, _a1)
} else {
r0 = ret.Get(0).(int)
}

var r1 error
if rf, ok := ret.Get(1).(func(mutableState, []*workflow.HistoryEvent) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
Expand All @@ -76,6 +96,20 @@ func (_m *mockWorkflowExecutionContext) continueAsNewWorkflowExecution(_a0 []byt
return r0
}

func (_m *mockWorkflowExecutionContext) createWorkflowExecution(_a0 mutableState, _a1 string, _a2 bool, _a3 time.Time, _a4 []persistence.Task, _a5 []persistence.Task, _a6 int, _a7 string, _a8 int64) error {

ret := _m.Called(_a0, _a1, _a2, _a3, _a4, _a5, _a6, _a7, _a8)

var r0 error
if rf, ok := ret.Get(0).(func(mutableState, string, bool, time.Time, []persistence.Task, []persistence.Task, int, string, int64) error); ok {
r0 = rf(_a0, _a1, _a2, _a3, _a4, _a5, _a6, _a7, _a8)
} else {
r0 = ret.Error(0)
}

return r0
}

func (_m *mockWorkflowExecutionContext) getDomainID() string {
ret := _m.Called()

Expand Down
4 changes: 4 additions & 0 deletions service/history/conflictResolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,10 @@ func (s *conflictResolverSuite) TestReset() {
DomainID: domainID,
WorkflowID: execution.GetWorkflowId(),
RunID: execution.GetRunId(),
ParentDomainID: "",
ParentWorkflowID: "",
ParentRunID: "",
InitiatedID: common.EmptyEventID,
TaskList: event1.WorkflowExecutionStartedEventAttributes.TaskList.GetName(),
WorkflowTypeName: event1.WorkflowExecutionStartedEventAttributes.WorkflowType.GetName(),
WorkflowTimeout: *event1.WorkflowExecutionStartedEventAttributes.ExecutionStartToCloseTimeoutSeconds,
Expand Down
Loading

0 comments on commit 571a2bc

Please sign in to comment.