Skip to content

Commit

Permalink
Function signature and call cleanups for readability (cadence-workflo…
Browse files Browse the repository at this point in the history
  • Loading branch information
shreyassrivatsan authored Jun 7, 2019
1 parent ee5f5e8 commit 59a12f0
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 50 deletions.
7 changes: 5 additions & 2 deletions common/persistence/cassandra/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -3007,8 +3007,11 @@ func (d *cassandraPersistence) GetTimerIndexTasks(request *p.GetTimerIndexTasksR
return response, nil
}

func (d *cassandraPersistence) createTransferTasks(batch *gocql.Batch, transferTasks []p.Task, domainID, workflowID,
runID string) {
func (d *cassandraPersistence) createTransferTasks(
batch *gocql.Batch,
transferTasks []p.Task,
domainID, workflowID, runID string,
) {
targetDomainID := domainID
for _, task := range transferTasks {
var taskList string
Expand Down
4 changes: 2 additions & 2 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1658,13 +1658,13 @@ func (e *historyEngineImpl) ResetWorkflowExecution(ctx ctx.Context,
request := resetRequest.ResetRequest
if request == nil || request.WorkflowExecution == nil || len(request.WorkflowExecution.GetRunId()) == 0 || len(request.WorkflowExecution.GetWorkflowId()) == 0 {
retError = &workflow.BadRequestError{
Message: fmt.Sprintf("Require workflowId and runId."),
Message: "Require workflowId and runId.",
}
return
}
if request.GetDecisionFinishEventId() <= common.FirstEventID {
retError = &workflow.BadRequestError{
Message: fmt.Sprintf("Decision finish ID must be > 1."),
Message: "Decision finish ID must be > 1.",
}
return
}
Expand Down
2 changes: 1 addition & 1 deletion service/history/queueAckMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (a *queueAckMgrImpl) updateQueueAckLevel() {

pendingTasks := len(taskIDs)
if pendingTasks > warnPendingTasks {
a.logger.Warn("Too many pendind tasks.")
a.logger.Warn("Too many pending tasks.")
}
switch a.options.MetricScope {
case metrics.ReplicatorQueueProcessorScope:
Expand Down
12 changes: 9 additions & 3 deletions service/history/workflowExecutionContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,15 @@ func (c *workflowExecutionContextImpl) resetMutableState(prevRunID string, reset
// 1. append history to new run
// 2. append history to current run if current run is not closed
// 3. update mutableState(terminate current run if not closed) and create new run
func (c *workflowExecutionContextImpl) resetWorkflowExecution(currMutableState mutableState, updateCurr bool, closeTask, cleanupTask persistence.Task,
newMutableState mutableState, newTransferTasks, newTimerTasks, currReplicationTasks, insertReplicationTasks []persistence.Task, baseRunID string,
baseRunNextEventID, prevRunVersion int64) (retError error) {
func (c *workflowExecutionContextImpl) resetWorkflowExecution(
currMutableState mutableState,
updateCurr bool,
closeTask, cleanupTask persistence.Task,
newMutableState mutableState,
newTransferTasks, newTimerTasks, currReplicationTasks, insertReplicationTasks []persistence.Task,
baseRunID string,
baseRunNextEventID, prevRunVersion int64,
) (retError error) {

now := time.Now()
currTransferTasks := []persistence.Task{}
Expand Down
150 changes: 108 additions & 42 deletions service/history/workflowResetor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,20 @@ import (

type (
workflowResetor interface {
ResetWorkflowExecution(ctx context.Context, resetRequest *workflow.ResetWorkflowExecutionRequest,
baseContext workflowExecutionContext, baseMutableState mutableState,
currContext workflowExecutionContext, currMutableState mutableState) (response *workflow.ResetWorkflowExecutionResponse, retError error)
ApplyResetEvent(ctx context.Context, request *h.ReplicateEventsRequest, domainID, workflowID, currentRunID string) (retError error)
ResetWorkflowExecution(
ctx context.Context,
resetRequest *workflow.ResetWorkflowExecutionRequest,
baseContext workflowExecutionContext,
baseMutableState mutableState,
currContext workflowExecutionContext,
currMutableState mutableState,
) (response *workflow.ResetWorkflowExecutionResponse, retError error)

ApplyResetEvent(
ctx context.Context,
request *h.ReplicateEventsRequest,
domainID, workflowID, currentRunID string,
) (retError error)
}

workflowResetorImpl struct {
Expand All @@ -56,17 +66,22 @@ func newWorkflowResetor(historyEngine *historyEngineImpl) *workflowResetorImpl {

// ResetWorkflowExecution only allows resetting to decisionTaskCompleted, but exclude that batch of decisionTaskCompleted/decisionTaskFailed/decisionTaskTimeout.
// It will then fail the decision with cause of "reset_workflow"
func (w *workflowResetorImpl) ResetWorkflowExecution(ctx context.Context, request *workflow.ResetWorkflowExecutionRequest,
baseContext workflowExecutionContext, baseMutableState mutableState,
currContext workflowExecutionContext, currMutableState mutableState) (response *workflow.ResetWorkflowExecutionResponse, retError error) {
func (w *workflowResetorImpl) ResetWorkflowExecution(
ctx context.Context,
request *workflow.ResetWorkflowExecutionRequest,
baseContext workflowExecutionContext,
baseMutableState mutableState,
currContext workflowExecutionContext,
currMutableState mutableState,
) (*workflow.ResetWorkflowExecutionResponse, error) {

domainEntry, retError := w.eng.shard.GetDomainCache().GetDomain(request.GetDomain())
if retError != nil {
return
return nil, retError
}

resetNewRunID := uuid.New()
response = &workflow.ResetWorkflowExecutionResponse{
response := &workflow.ResetWorkflowExecutionResponse{
RunId: common.StringPtr(resetNewRunID),
}

Expand All @@ -75,12 +90,12 @@ func (w *workflowResetorImpl) ResetWorkflowExecution(ctx context.Context, reques
// terminate the current run if it is running
currTerminated, currCloseTask, currCleanupTask, retError := w.terminateIfCurrIsRunning(currMutableState, request.GetReason())
if retError != nil {
return
return response, retError
}

retError = w.validateResetWorkflowBeforeReplay(baseMutableState, currMutableState)
if retError != nil {
return
return response, retError
}

newMutableState, newTransferTasks, newTimerTasks, retError := w.buildNewMutableStateForReset(
Expand All @@ -98,12 +113,12 @@ func (w *workflowResetorImpl) ResetWorkflowExecution(ctx context.Context, reques
}
}()
if retError != nil {
return
return response, retError
}

retError = w.checkDomainStatus(newMutableState, currPrevRunVersion, domainEntry.GetInfo().Name)
if retError != nil {
return
return response, retError
}

// update replication and generate replication task
Expand All @@ -124,10 +139,10 @@ func (w *workflowResetorImpl) ResetWorkflowExecution(ctx context.Context, reques
w.eng.timerProcessor.NotifyNewTimers(w.eng.currentClusterName, w.eng.shard.GetCurrentTime(w.eng.currentClusterName), newTimerTasks)
}

return
return response, retError
}

func (w *workflowResetorImpl) checkDomainStatus(newMutableState mutableState, prevRunVersion int64, domain string) (retError error) {
func (w *workflowResetorImpl) checkDomainStatus(newMutableState mutableState, prevRunVersion int64, domain string) error {
if newMutableState.GetReplicationState() != nil {
clusterMetadata := w.eng.shard.GetService().GetClusterMetadata()
currentVersion := newMutableState.GetCurrentVersion()
Expand All @@ -147,7 +162,7 @@ func (w *workflowResetorImpl) checkDomainStatus(newMutableState mutableState, pr
return nil
}

func (w *workflowResetorImpl) validateResetWorkflowBeforeReplay(baseMutableState, currMutableState mutableState) (retError error) {
func (w *workflowResetorImpl) validateResetWorkflowBeforeReplay(baseMutableState, currMutableState mutableState) error {
if baseMutableState.GetEventStoreVersion() != persistence.EventStoreVersionV2 {
return &workflow.BadRequestError{
Message: fmt.Sprintf("reset API is not supported for V1 history events"),
Expand All @@ -163,13 +178,12 @@ func (w *workflowResetorImpl) validateResetWorkflowBeforeReplay(baseMutableState
Message: fmt.Sprintf("current workflow should already been terminated"),
}
}
return
return nil
}

func (w *workflowResetorImpl) validateResetWorkflowAfterReplay(newMutableState mutableState) (retError error) {
retError = newMutableState.CheckResettable()
if retError != nil {
return
func (w *workflowResetorImpl) validateResetWorkflowAfterReplay(newMutableState mutableState) error {
if retError := newMutableState.CheckResettable(); retError != nil {
return retError
}
if !newMutableState.HasInFlightDecisionTask() {
return &workflow.InternalServiceError{
Expand All @@ -186,7 +200,7 @@ func (w *workflowResetorImpl) validateResetWorkflowAfterReplay(newMutableState m
Message: fmt.Sprintf("replay history shouldn't have stikyness"),
}
}
return
return nil
}

// Fail the started activities
Expand Down Expand Up @@ -225,9 +239,14 @@ func (w *workflowResetorImpl) scheduleUnstartedActivities(msBuilder mutableState
return tasks, nil
}

// TODO: @shreyassrivatsan reduce number of return parameters
func (w *workflowResetorImpl) buildNewMutableStateForReset(
ctx context.Context, domainEntry *cache.DomainCacheEntry, baseMutableState, currMutableState mutableState,
resetReason string, resetDecisionCompletedEventID int64, requestedID, newRunID string,
ctx context.Context,
domainEntry *cache.DomainCacheEntry,
baseMutableState, currMutableState mutableState,
resetReason string,
resetDecisionCompletedEventID int64,
requestedID, newRunID string,
) (newMutableState mutableState, newTransferTasks, newTimerTasks []persistence.Task, retError error) {

domainID := baseMutableState.GetExecutionInfo().DomainID
Expand Down Expand Up @@ -321,8 +340,10 @@ func (w *workflowResetorImpl) buildNewMutableStateForReset(
return
}

func (w *workflowResetorImpl) terminateIfCurrIsRunning(currMutableState mutableState,
reason string) (terminateCurr bool, closeTask, cleanupTask persistence.Task, retError error) {
func (w *workflowResetorImpl) terminateIfCurrIsRunning(
currMutableState mutableState,
reason string,
) (terminateCurr bool, closeTask, cleanupTask persistence.Task, retError error) {

if currMutableState.IsWorkflowExecutionRunning() {
terminateCurr = true
Expand Down Expand Up @@ -359,7 +380,11 @@ func (w *workflowResetorImpl) setEventIDsWithHistory(msBuilder mutableState) int
return firstEvent.GetEventId()
}

func (w *workflowResetorImpl) generateReplicationTasksForReset(terminateCurr bool, currMutableState, newMutableState mutableState, domainEntry *cache.DomainCacheEntry) ([]persistence.Task, []persistence.Task) {
func (w *workflowResetorImpl) generateReplicationTasksForReset(
terminateCurr bool,
currMutableState, newMutableState mutableState,
domainEntry *cache.DomainCacheEntry,
) ([]persistence.Task, []persistence.Task) {
var currRepTasks, insertRepTasks []persistence.Task
if newMutableState.GetReplicationState() != nil {
if terminateCurr {
Expand Down Expand Up @@ -395,7 +420,12 @@ func (w *workflowResetorImpl) generateReplicationTasksForReset(terminateCurr boo
}

// replay signals in the base run, and also signals in all the runs along the chain of contineAsNew
func (w *workflowResetorImpl) replayReceivedSignals(ctx context.Context, receivedSignals []*workflow.HistoryEvent, continueRunID string, newMutableState, currMutableState mutableState) error {
func (w *workflowResetorImpl) replayReceivedSignals(
ctx context.Context,
receivedSignals []*workflow.HistoryEvent,
continueRunID string,
newMutableState, currMutableState mutableState,
) error {
for _, se := range receivedSignals {
sigReq := &workflow.SignalWorkflowExecutionRequest{
SignalName: se.GetWorkflowExecutionSignaledEventAttributes().SignalName,
Expand Down Expand Up @@ -469,7 +499,11 @@ func (w *workflowResetorImpl) replayReceivedSignals(ctx context.Context, receive
return nil
}

func (w *workflowResetorImpl) generateTimerTasksForReset(msBuilder mutableState, wfTimeoutSecs int64, needActivityTimer bool) ([]persistence.Task, error) {
func (w *workflowResetorImpl) generateTimerTasksForReset(
msBuilder mutableState,
wfTimeoutSecs int64,
needActivityTimer bool,
) ([]persistence.Task, error) {
timerTasks := []persistence.Task{}

// WF timeout task
Expand Down Expand Up @@ -509,7 +543,13 @@ func getRespondActivityTaskFailedRequestFromActivity(ai *persistence.ActivityInf
}
}

func (w *workflowResetorImpl) replayHistoryEvents(decisionFinishEventID int64, requestID string, prevMutableState mutableState, newRunID string) (forkEventVersion, wfTimeoutSecs int64, receivedSignalsAfterReset []*workflow.HistoryEvent, continueRunID string, sBuilder stateBuilder, retError error) {
// TODO: @shreyassrivatsan reduce the number of return parameters from this method or return a struct
func (w *workflowResetorImpl) replayHistoryEvents(
decisionFinishEventID int64,
requestID string,
prevMutableState mutableState,
newRunID string,
) (forkEventVersion, wfTimeoutSecs int64, receivedSignalsAfterReset []*workflow.HistoryEvent, continueRunID string, sBuilder stateBuilder, retError error) {
clusterMetadata := w.eng.shard.GetService().GetClusterMetadata()

prevExecution := workflow.WorkflowExecution{
Expand Down Expand Up @@ -652,7 +692,11 @@ func validateResetReplicationTask(request *h.ReplicateEventsRequest) (*workflow.
return attr, nil
}

func (w *workflowResetorImpl) ApplyResetEvent(ctx context.Context, request *h.ReplicateEventsRequest, domainID, workflowID, currentRunID string) (retError error) {
func (w *workflowResetorImpl) ApplyResetEvent(
ctx context.Context,
request *h.ReplicateEventsRequest,
domainID, workflowID, currentRunID string,
) error {
var currContext workflowExecutionContext
var baseMutableState, currMutableState, newMsBuilder mutableState
var newRunTransferTasks, newRunTimerTasks []persistence.Task
Expand All @@ -662,7 +706,7 @@ func (w *workflowResetorImpl) ApplyResetEvent(ctx context.Context, request *h.Re
lastEvent := historyAfterReset[len(historyAfterReset)-1]
decisionFinishEventID := historyAfterReset[0].GetEventId()
if retError != nil {
return
return retError
}
baseExecution := workflow.WorkflowExecution{
WorkflowId: common.StringPtr(workflowID),
Expand All @@ -676,7 +720,7 @@ func (w *workflowResetorImpl) ApplyResetEvent(ctx context.Context, request *h.Re
defer func() { baseRelease(retError) }()
baseMutableState, retError = baseContext.loadWorkflowExecution()
if retError != nil {
return
return retError
}
if baseMutableState.GetNextEventID() < decisionFinishEventID {
// re-replicate the whole new run
Expand All @@ -700,14 +744,14 @@ func (w *workflowResetorImpl) ApplyResetEvent(ctx context.Context, request *h.Re
defer func() { currRelease(retError) }()
currMutableState, retError = currContext.loadWorkflowExecution()
if retError != nil {
return
return retError
}
}
// before changing mutable state
prevRunVersion := currMutableState.GetLastWriteVersion()
newMsBuilder, newRunTransferTasks, newRunTimerTasks, retError = w.replicateResetEvent(baseMutableState, &baseExecution, historyAfterReset, resetAttr.GetForkEventVersion())
if retError != nil {
return
return retError
}

// fork a new history branch
Expand All @@ -719,7 +763,7 @@ func (w *workflowResetorImpl) ApplyResetEvent(ctx context.Context, request *h.Re
ShardID: shardID,
})
if retError != nil {
return
return retError
}
defer func() {
w.eng.historyV2Mgr.CompleteForkBranch(&persistence.CompleteForkBranchRequest{
Expand All @@ -735,16 +779,35 @@ func (w *workflowResetorImpl) ApplyResetEvent(ctx context.Context, request *h.Re
hBuilder.history = historyAfterReset
newMsBuilder.SetHistoryBuilder(hBuilder)

retError = currContext.resetWorkflowExecution(currMutableState, false, nil, nil, newMsBuilder, newRunTransferTasks, newRunTimerTasks, nil, nil, baseExecution.GetRunId(), baseMutableState.GetNextEventID(), prevRunVersion)
retError = currContext.resetWorkflowExecution(
currMutableState,
false,
nil,
nil,
newMsBuilder,
newRunTransferTasks,
newRunTimerTasks,
nil,
nil,
baseExecution.GetRunId(),
baseMutableState.GetNextEventID(),
prevRunVersion,
)
if retError != nil {
return
return retError
}
now := time.Unix(0, lastEvent.GetTimestamp())
notify(w.eng.shard, w.eng, request.GetSourceCluster(), now, newRunTransferTasks, newRunTimerTasks)
return nil
}

func (w *workflowResetorImpl) replicateResetEvent(baseMutableState mutableState, baseExecution *workflow.WorkflowExecution, newRunHistory []*workflow.HistoryEvent, forkEventVersion int64) (newMsBuilder mutableState, transferTasks, timerTasks []persistence.Task, retError error) {
// TODO: @shreyassrivatsan reduce number of return parameters from this method
func (w *workflowResetorImpl) replicateResetEvent(
baseMutableState mutableState,
baseExecution *workflow.WorkflowExecution,
newRunHistory []*workflow.HistoryEvent,
forkEventVersion int64,
) (newMsBuilder mutableState, transferTasks, timerTasks []persistence.Task, retError error) {
domainID := baseMutableState.GetExecutionInfo().DomainID
workflowID := baseMutableState.GetExecutionInfo().WorkflowID
firstEvent := newRunHistory[0]
Expand Down Expand Up @@ -852,9 +915,12 @@ func (w *workflowResetorImpl) replicateResetEvent(baseMutableState mutableState,
}

// FindAutoResetPoint returns the auto reset point
func FindAutoResetPoint(badBinaries *workflow.BadBinaries, autoResetPoints *workflow.ResetPoints) (reason string, pt *workflow.ResetPointInfo) {
func FindAutoResetPoint(
badBinaries *workflow.BadBinaries,
autoResetPoints *workflow.ResetPoints,
) (string, *workflow.ResetPointInfo) {
if badBinaries == nil || badBinaries.Binaries == nil || autoResetPoints == nil || autoResetPoints.Points == nil {
return
return "", nil
}
nowNano := time.Now().UnixNano()
for _, p := range autoResetPoints.Points {
Expand All @@ -867,5 +933,5 @@ func FindAutoResetPoint(badBinaries *workflow.BadBinaries, autoResetPoints *work
return bin.GetReason(), p
}
}
return
return "", nil
}

0 comments on commit 59a12f0

Please sign in to comment.