Skip to content

Commit

Permalink
Decompose history service logic into separate packages (part 3) (cade…
Browse files Browse the repository at this point in the history
…nce-workflow#3190)

* Move query, query registry to query package
* Move historyCache, historyBuilder, mutableState, workflowExecutionContext, timerSequence to execution package
* Rename historyCache to executionCache
  • Loading branch information
yycptt authored Apr 14, 2020
1 parent 5945b66 commit 830974d
Show file tree
Hide file tree
Showing 108 changed files with 6,141 additions and 5,689 deletions.
1 change: 1 addition & 0 deletions common/cache/domainCache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/uber-go/tally"

"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cluster"
Expand Down
5 changes: 5 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ const (
EndMessageID int64 = 1<<63 - 1
)

const (
// EmptyUUID is the placeholder for UUID when it's empty
EmptyUUID = "emptyUuid"
)

const (
// FrontendServiceName is the name of the frontend service
FrontendServiceName = "cadence-frontend"
Expand Down
39 changes: 20 additions & 19 deletions service/history/conflictResolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/service/history/execution"
"github.com/uber/cadence/service/history/shard"
)

Expand All @@ -42,19 +43,19 @@ type (
replayEventID int64,
info *persistence.WorkflowExecutionInfo,
updateCondition int64,
) (mutableState, error)
) (execution.MutableState, error)
}

conflictResolverImpl struct {
shard shard.Context
clusterMetadata cluster.Metadata
context workflowExecutionContext
context execution.Context
historyV2Mgr persistence.HistoryManager
logger log.Logger
}
)

func newConflictResolver(shard shard.Context, context workflowExecutionContext, historyV2Mgr persistence.HistoryManager,
func newConflictResolver(shard shard.Context, context execution.Context, historyV2Mgr persistence.HistoryManager,
logger log.Logger) *conflictResolverImpl {

return &conflictResolverImpl{
Expand All @@ -74,10 +75,10 @@ func (r *conflictResolverImpl) reset(
replayEventID int64,
info *persistence.WorkflowExecutionInfo,
updateCondition int64,
) (mutableState, error) {
) (execution.MutableState, error) {

domainID := r.context.getDomainID()
execution := *r.context.getExecution()
domainID := r.context.GetDomainID()
workflowExecution := *r.context.GetExecution()
startTime := info.StartTimestamp
branchToken := info.BranchToken // in 2DC world branch token is stored in execution info
replayNextEventID := replayEventID + 1
Expand All @@ -88,15 +89,15 @@ func (r *conflictResolverImpl) reset(
}

var nextPageToken []byte
var resetMutableStateBuilder *mutableStateBuilder
var resetMutableStateBuilder execution.MutableState
var sBuilder stateBuilder
var history []*shared.HistoryEvent
var totalSize int64

eventsToApply := replayNextEventID - common.FirstEventID
for hasMore := true; hasMore; hasMore = len(nextPageToken) > 0 {
var size int
history, size, _, nextPageToken, err = r.getHistory(domainID, execution, common.FirstEventID, replayNextEventID, nextPageToken, branchToken)
history, size, _, nextPageToken, err = r.getHistory(domainID, workflowExecution, common.FirstEventID, replayNextEventID, nextPageToken, branchToken)
if err != nil {
r.logError("Conflict resolution err getting history.", err)
return nil, err
Expand All @@ -117,7 +118,7 @@ func (r *conflictResolverImpl) reset(

firstEvent := history[0]
if firstEvent.GetEventId() == common.FirstEventID {
resetMutableStateBuilder = newMutableStateBuilderWithReplicationState(
resetMutableStateBuilder = execution.NewMutableStateBuilderWithReplicationState(
r.shard,
r.shard.GetEventsCache(),
r.logger,
Expand All @@ -128,13 +129,13 @@ func (r *conflictResolverImpl) reset(
r.shard,
r.logger,
resetMutableStateBuilder,
func(mutableState mutableState) mutableStateTaskGenerator {
return newMutableStateTaskGenerator(r.shard.GetDomainCache(), r.logger, mutableState)
func(mutableState execution.MutableState) execution.MutableStateTaskGenerator {
return execution.NewMutableStateTaskGenerator(r.shard.GetDomainCache(), r.logger, mutableState)
},
)
}

_, err = sBuilder.applyEvents(domainID, requestID, execution, history, nil, false)
_, err = sBuilder.applyEvents(domainID, requestID, workflowExecution, history, nil, false)
if err != nil {
r.logError("Conflict resolution err applying events.", err)
return nil, err
Expand All @@ -149,17 +150,17 @@ func (r *conflictResolverImpl) reset(
}

// reset branchToken to the original one(it has been set to a wrong branchToken in applyEvents for startEvent)
resetMutableStateBuilder.executionInfo.BranchToken = branchToken // in 2DC world branch token is stored in execution info
resetMutableStateBuilder.GetExecutionInfo().BranchToken = branchToken // in 2DC world branch token is stored in execution info

resetMutableStateBuilder.executionInfo.StartTimestamp = startTime
resetMutableStateBuilder.GetExecutionInfo().StartTimestamp = startTime
// the last updated time is not important here, since this should be updated with event time afterwards
resetMutableStateBuilder.executionInfo.LastUpdatedTimestamp = startTime
resetMutableStateBuilder.GetExecutionInfo().LastUpdatedTimestamp = startTime

// close the rebuild transaction on reset mutable state, since we do not want oo write the
// events used in the replay to be persisted again
_, _, err = resetMutableStateBuilder.CloseTransactionAsSnapshot(
startTime,
transactionPolicyPassive,
execution.TransactionPolicyPassive,
)
if err != nil {
return nil, err
Expand All @@ -172,8 +173,8 @@ func (r *conflictResolverImpl) reset(
}

r.logger.Info("All events applied for execution.", tag.WorkflowResetNextEventID(resetMutableStateBuilder.GetNextEventID()))
r.context.setHistorySize(totalSize)
if err := r.context.conflictResolveWorkflowExecution(
r.context.SetHistorySize(totalSize)
if err := r.context.ConflictResolveWorkflowExecution(
startTime,
persistence.ConflictResolveWorkflowModeUpdateCurrent,
resetMutableStateBuilder,
Expand All @@ -190,7 +191,7 @@ func (r *conflictResolverImpl) reset(
); err != nil {
r.logError("Conflict resolution err reset workflow.", err)
}
return r.context.loadWorkflowExecution()
return r.context.LoadWorkflowExecution()
}

func (r *conflictResolverImpl) getHistory(domainID string, execution shared.WorkflowExecution, firstEventID,
Expand Down
5 changes: 3 additions & 2 deletions service/history/conflictResolver_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 10 additions & 11 deletions service/history/conflictResolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/uber/cadence/common/service/dynamicconfig"
"github.com/uber/cadence/service/history/config"
"github.com/uber/cadence/service/history/events"
"github.com/uber/cadence/service/history/execution"
"github.com/uber/cadence/service/history/shard"
)

Expand All @@ -63,7 +64,7 @@ type (
logger log.Logger
mockExecutionMgr *mocks.ExecutionManager
mockHistoryV2Mgr *mocks.HistoryV2Manager
mockContext *workflowExecutionContextImpl
mockContext execution.Context

conflictResolver *conflictResolverImpl
}
Expand Down Expand Up @@ -122,7 +123,7 @@ func (s *conflictResolverSuite) SetupTest() {
}
s.mockShard.SetEngine(h)

s.mockContext = newWorkflowExecutionContext(testDomainID, shared.WorkflowExecution{
s.mockContext = execution.NewContext(testDomainID, shared.WorkflowExecution{
WorkflowId: common.StringPtr("some random workflow ID"),
RunId: common.StringPtr(testRunID),
}, s.mockShard, s.mockExecutionMgr, s.logger)
Expand All @@ -146,8 +147,8 @@ func (s *conflictResolverSuite) TestReset() {
startTime := time.Now()
version := int64(12)

domainID := s.mockContext.domainID
execution := s.mockContext.workflowExecution
domainID := s.mockContext.GetDomainID()
workflowExecution := *s.mockContext.GetExecution()
nextEventID := int64(2)
branchToken := []byte("some random branch token")

Expand Down Expand Up @@ -183,13 +184,12 @@ func (s *conflictResolverSuite) TestReset() {
Size: int(historySize),
}, nil)

s.mockContext.updateCondition = int64(59)
createRequestID := uuid.New()

executionInfo := &persistence.WorkflowExecutionInfo{
DomainID: domainID,
WorkflowID: execution.GetWorkflowId(),
RunID: execution.GetRunId(),
WorkflowID: workflowExecution.GetWorkflowId(),
RunID: workflowExecution.GetRunId(),
ParentDomainID: "",
ParentWorkflowID: "",
ParentRunID: "",
Expand All @@ -208,7 +208,7 @@ func (s *conflictResolverSuite) TestReset() {
DecisionVersion: common.EmptyVersion,
DecisionScheduleID: common.EmptyEventID,
DecisionStartedID: common.EmptyEventID,
DecisionRequestID: emptyUUID,
DecisionRequestID: common.EmptyUUID,
DecisionTimeout: 0,
DecisionAttempt: 0,
DecisionStartedTimestamp: 0,
Expand Down Expand Up @@ -259,15 +259,14 @@ func (s *conflictResolverSuite) TestReset() {
TransferTasks: nil,
ReplicationTasks: nil,
TimerTasks: nil,
Condition: s.mockContext.updateCondition,
},
Encoding: common.EncodingType(s.mockShard.GetConfig().EventEncodingType(domainID)),
}, input)
return true
})).Return(nil).Once()
s.mockExecutionMgr.On("GetWorkflowExecution", &persistence.GetWorkflowExecutionRequest{
DomainID: domainID,
Execution: execution,
Execution: workflowExecution,
}).Return(&persistence.GetWorkflowExecutionResponse{
State: &persistence.WorkflowMutableState{
ExecutionInfo: &persistence.WorkflowExecutionInfo{},
Expand All @@ -280,6 +279,6 @@ func (s *conflictResolverSuite) TestReset() {
&persistence.DomainInfo{ID: domainID}, &persistence.DomainConfig{}, "", nil,
), nil).AnyTimes()

_, err := s.conflictResolver.reset(prevRunID, prevLastWriteVersion, prevState, createRequestID, nextEventID-1, executionInfo, s.mockContext.updateCondition)
_, err := s.conflictResolver.reset(prevRunID, prevLastWriteVersion, prevState, createRequestID, nextEventID-1, executionInfo, 0)
s.Nil(err)
}
5 changes: 3 additions & 2 deletions service/history/decisionChecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/service/history/config"
"github.com/uber/cadence/service/history/execution"
)

type (
Expand All @@ -56,7 +57,7 @@ type (
historyCountLimitError int

completedID int64
mutableState mutableState
mutableState execution.MutableState
executionStats *persistence.ExecutionStats
metricsScope metrics.Scope
logger log.Logger
Expand Down Expand Up @@ -93,7 +94,7 @@ func newWorkflowSizeChecker(
historyCountLimitWarn int,
historyCountLimitError int,
completedID int64,
mutableState mutableState,
mutableState execution.MutableState,
executionStats *persistence.ExecutionStats,
metricsScope metrics.Scope,
logger log.Logger,
Expand Down
Loading

0 comments on commit 830974d

Please sign in to comment.