Skip to content

Commit

Permalink
Move event constants from historyBuilder to constants (cadence-workfl…
Browse files Browse the repository at this point in the history
…ow#715)

* moves emptyVersion from historyBuilder to constants

* moves transientEventID from historyBuilder to constants

* moves bufferedEventID from historyBuilder to constants

* moves emptyEventID from historyBuilder to constants

* moves firstEventID from historyBuilder to constants

* fixes formatting error in service/history/retry.go
  • Loading branch information
mudit3774 authored and samarabbas committed May 13, 2018
1 parent 057c6bf commit 3b0ca48
Show file tree
Hide file tree
Showing 15 changed files with 160 additions and 164 deletions.
4 changes: 4 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ const (
EmptyVersion int64 = -24
// EndEventID is the id of the end event, here we use the int64 max
EndEventID int64 = 1<<63 - 1
// BufferedEventID is the id of the buffered event
BufferedEventID int64 = -123
// TransientEventID is the id of the transient event
TransientEventID int64 = -124
)

const (
Expand Down
9 changes: 0 additions & 9 deletions service/history/historyBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,6 @@ import (
"github.com/uber/cadence/common/persistence"
)

const (
emptyVersion int64 = -2234

firstEventID int64 = 1
emptyEventID int64 = -23
bufferedEventID int64 = -123
transientEventID int64 = -124
)

type (
historyBuilder struct {
serializer persistence.HistorySerializer
Expand Down
90 changes: 45 additions & 45 deletions service/history/historyBuilder_test.go

Large diffs are not rendered by default.

50 changes: 25 additions & 25 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (e *historyEngineImpl) StartWorkflowExecution(startRequest *h.StartWorkflow
}

var parentExecution *workflow.WorkflowExecution
initiatedID := emptyEventID
initiatedID := common.EmptyEventID
parentDomainID := ""
parentInfo := startRequest.ParentExecutionInfo
if parentInfo != nil {
Expand All @@ -251,9 +251,9 @@ func (e *historyEngineImpl) StartWorkflowExecution(startRequest *h.StartWorkflow
}

var transferTasks []persistence.Task
decisionVersion := emptyVersion
decisionScheduleID := emptyEventID
decisionStartID := emptyEventID
decisionVersion := common.EmptyVersion
decisionScheduleID := common.EmptyEventID
decisionStartID := common.EmptyEventID
decisionTimeout := int32(0)
if parentInfo == nil {
// DecisionTask is only created when it is not a Child Workflow Execution
Expand Down Expand Up @@ -311,7 +311,7 @@ func (e *historyEngineImpl) StartWorkflowExecution(startRequest *h.StartWorkflow
}

replicationTask := &persistence.HistoryReplicationTask{
FirstEventID: firstEventID,
FirstEventID: common.FirstEventID,
NextEventID: msBuilder.GetNextEventID(),
Version: failoverVersion,
LastReplicationInfo: nil,
Expand All @@ -334,7 +334,7 @@ func (e *historyEngineImpl) StartWorkflowExecution(startRequest *h.StartWorkflow
DecisionTimeoutValue: *request.TaskStartToCloseTimeoutSeconds,
ExecutionContext: nil,
NextEventID: msBuilder.GetNextEventID(),
LastProcessedEvent: emptyEventID,
LastProcessedEvent: common.EmptyEventID,
TransferTasks: transferTasks,
ReplicationTasks: replicationTasks,
DecisionVersion: decisionVersion,
Expand Down Expand Up @@ -607,7 +607,7 @@ func (e *historyEngineImpl) DescribeWorkflowExecution(
state := workflow.PendingActivityStateScheduled
if pi.CancelRequested {
state = workflow.PendingActivityStateCancelRequested
} else if pi.StartedID != emptyEventID {
} else if pi.StartedID != common.EmptyEventID {
state = workflow.PendingActivityStateStarted
}
ai.State = &state
Expand Down Expand Up @@ -669,12 +669,12 @@ Update_History_Loop:
// Looks like DecisionTask already completed as a result of another call.
// It is OK to drop the task at this point.
logging.LogDuplicateTaskEvent(context.logger, persistence.TransferTaskTypeDecisionTask, common.Int64Default(request.TaskId), requestID,
scheduleID, emptyEventID, isRunning)
scheduleID, common.EmptyEventID, isRunning)

return nil, &workflow.EntityNotExistsError{Message: "Decision task not found."}
}

if di.StartedID != emptyEventID {
if di.StartedID != common.EmptyEventID {
// If decision is started as part of the current request scope then return a positive response
if di.RequestID == requestID {
return e.createRecordDecisionTaskStartedResponse(domainID, msBuilder, di, request.PollRequest.GetIdentity()), nil
Expand Down Expand Up @@ -760,7 +760,7 @@ func (e *historyEngineImpl) RecordActivityTaskStarted(
// Looks like ActivityTask already completed as a result of another call.
// It is OK to drop the task at this point.
logging.LogDuplicateTaskEvent(e.logger, persistence.TransferTaskTypeActivityTask,
common.Int64Default(request.TaskId), requestID, scheduleID, emptyEventID, isRunning)
common.Int64Default(request.TaskId), requestID, scheduleID, common.EmptyEventID, isRunning)

return nil, ErrActivityTaskNotFound
}
Expand All @@ -772,7 +772,7 @@ func (e *historyEngineImpl) RecordActivityTaskStarted(
response.ScheduledEvent = scheduledEvent
response.ScheduledTimestampOfThisAttempt = common.Int64Ptr(ai.ScheduledTime.UnixNano())

if ai.StartedID != emptyEventID {
if ai.StartedID != common.EmptyEventID {
// If activity is started as part of the current request scope then return a positive response
if ai.RequestID == requestID {
response.StartedTimestamp = common.Int64Ptr(ai.StartedTime.UnixNano())
Expand Down Expand Up @@ -861,7 +861,7 @@ Update_History_Loop:
}

if !msBuilder.isWorkflowExecutionRunning() || !isRunning || di.Attempt != token.ScheduleAttempt ||
di.StartedID == emptyEventID {
di.StartedID == common.EmptyEventID {
return &workflow.EntityNotExistsError{Message: "Decision task not found."}
}

Expand Down Expand Up @@ -1042,7 +1042,7 @@ Update_History_Loop:
continue Process_Decision_Loop
}

if ai.StartedID == emptyEventID {
if ai.StartedID == common.EmptyEventID {
// We haven't started the activity yet, we can cancel the activity right away.
msBuilder.AddActivityTaskCanceledEvent(ai.ScheduleID, ai.StartedID, *actCancelReqEvent.EventId,
[]byte(activityCancelationMsgActivityNotStarted), common.StringDefault(request.Identity))
Expand Down Expand Up @@ -1343,7 +1343,7 @@ func (e *historyEngineImpl) RespondDecisionTaskFailed(req *h.RespondDecisionTask

scheduleID := token.ScheduleID
di, isRunning := msBuilder.GetPendingDecision(scheduleID)
if !isRunning || di.Attempt != token.ScheduleAttempt || di.StartedID == emptyEventID {
if !isRunning || di.Attempt != token.ScheduleAttempt || di.StartedID == common.EmptyEventID {
return nil, &workflow.EntityNotExistsError{Message: "Decision task not found."}
}

Expand Down Expand Up @@ -1396,7 +1396,7 @@ func (e *historyEngineImpl) RespondActivityTaskCompleted(req *h.RespondActivityT
return nil, ErrStaleState
}

if !isRunning || ai.StartedID == emptyEventID ||
if !isRunning || ai.StartedID == common.EmptyEventID ||
(token.ScheduleAttempt != 0 && int64(ai.Attempt) != token.ScheduleAttempt) {
return nil, ErrActivityTaskNotFound
}
Expand Down Expand Up @@ -1452,8 +1452,8 @@ func (e *historyEngineImpl) RespondActivityTaskFailed(req *h.RespondActivityTask
return nil, ErrStaleState
}

if !isRunning || ai.StartedID == emptyEventID ||
(token.ScheduleID != emptyEventID && token.ScheduleAttempt != int64(ai.Attempt)) {
if !isRunning || ai.StartedID == common.EmptyEventID ||
(token.ScheduleID != common.EmptyEventID && token.ScheduleAttempt != int64(ai.Attempt)) {
return nil, ErrActivityTaskNotFound
}

Expand Down Expand Up @@ -1516,8 +1516,8 @@ func (e *historyEngineImpl) RespondActivityTaskCanceled(req *h.RespondActivityTa
return nil, ErrStaleState
}

if !isRunning || ai.StartedID == emptyEventID ||
(token.ScheduleID != emptyEventID && token.ScheduleAttempt != int64(ai.Attempt)) {
if !isRunning || ai.StartedID == common.EmptyEventID ||
(token.ScheduleID != common.EmptyEventID && token.ScheduleAttempt != int64(ai.Attempt)) {
return nil, ErrActivityTaskNotFound
}

Expand Down Expand Up @@ -1580,8 +1580,8 @@ func (e *historyEngineImpl) RecordActivityTaskHeartbeat(
return nil, ErrStaleState
}

if !isRunning || ai.StartedID == emptyEventID ||
(token.ScheduleID != emptyEventID && token.ScheduleAttempt != int64(ai.Attempt)) {
if !isRunning || ai.StartedID == common.EmptyEventID ||
(token.ScheduleID != common.EmptyEventID && token.ScheduleAttempt != int64(ai.Attempt)) {
e.logger.Debugf("Activity HeartBeat: scheduleEventID: %v, ActivityInfo: %+v, Exist: %v", scheduleID, ai,
isRunning)
return nil, ErrActivityTaskNotFound
Expand Down Expand Up @@ -1873,14 +1873,14 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution(signalWithStartRequ
RequestID: common.StringDefault(request.RequestId),
DomainID: domainID,
Execution: execution,
InitiatedID: emptyEventID,
InitiatedID: common.EmptyEventID,
TaskList: *request.TaskList.Name,
WorkflowTypeName: *request.WorkflowType.Name,
WorkflowTimeout: *request.ExecutionStartToCloseTimeoutSeconds,
DecisionTimeoutValue: *request.TaskStartToCloseTimeoutSeconds,
ExecutionContext: nil,
NextEventID: msBuilder.GetNextEventID(),
LastProcessedEvent: emptyEventID,
LastProcessedEvent: common.EmptyEventID,
TransferTasks: transferTasks,
DecisionVersion: decisionVersion,
DecisionScheduleID: decisionScheduleID,
Expand Down Expand Up @@ -2024,7 +2024,7 @@ func (e *historyEngineImpl) RecordChildExecutionCompleted(completionRequest *h.R

// Check mutable state to make sure child execution is in pending child executions
ci, isRunning := msBuilder.GetChildExecutionInfo(initiatedID)
if !isRunning || ci.StartedID == emptyEventID {
if !isRunning || ci.StartedID == common.EmptyEventID {
return nil, &workflow.EntityNotExistsError{Message: "Pending child execution not found."}
}

Expand Down Expand Up @@ -2184,7 +2184,7 @@ func (e *historyEngineImpl) createRecordDecisionTaskStartedResponse(domainID str
di *decisionInfo, identity string) *h.RecordDecisionTaskStartedResponse {
response := &h.RecordDecisionTaskStartedResponse{}
response.WorkflowType = msBuilder.getWorkflowType()
if msBuilder.previousDecisionStartedEvent() != emptyEventID {
if msBuilder.previousDecisionStartedEvent() != common.EmptyEventID {
response.PreviousStartedEventId = common.Int64Ptr(msBuilder.previousDecisionStartedEvent())
}

Expand Down
16 changes: 8 additions & 8 deletions service/history/historyEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1931,7 +1931,7 @@ func (s *engineSuite) TestRespondActivityTaskCompletedConflictOnUpdate() {
s.True(ok)
s.Equal(int32(200), di.DecisionTimeout)
s.Equal(int64(10), di.ScheduleID)
s.Equal(emptyEventID, di.StartedID)
s.Equal(common.EmptyEventID, di.StartedID)
}

func (s *engineSuite) TestRespondActivityTaskCompletedMaxAttemptsExceeded() {
Expand Down Expand Up @@ -2062,7 +2062,7 @@ func (s *engineSuite) TestRespondActivityTaskCompletedSuccess() {
s.True(ok)
s.Equal(int32(200), di.DecisionTimeout)
s.Equal(int64(8), di.ScheduleID)
s.Equal(emptyEventID, di.StartedID)
s.Equal(common.EmptyEventID, di.StartedID)
}

func (s *engineSuite) TestRespondActivityTaskCompletedByIdSuccess() {
Expand Down Expand Up @@ -2135,7 +2135,7 @@ func (s *engineSuite) TestRespondActivityTaskCompletedByIdSuccess() {
s.True(ok)
s.Equal(int32(200), di.DecisionTimeout)
s.Equal(int64(8), di.ScheduleID)
s.Equal(emptyEventID, di.StartedID)
s.Equal(common.EmptyEventID, di.StartedID)
}

func (s *engineSuite) TestRespondActivityTaskFailedInvalidToken() {
Expand Down Expand Up @@ -2616,7 +2616,7 @@ func (s *engineSuite) TestRespondActivityTaskFailedConflictOnUpdate() {
s.True(ok)
s.Equal(int32(200), di.DecisionTimeout)
s.Equal(int64(10), di.ScheduleID)
s.Equal(emptyEventID, di.StartedID)
s.Equal(common.EmptyEventID, di.StartedID)
}

func (s *engineSuite) TestRespondActivityTaskFailedMaxAttemptsExceeded() {
Expand Down Expand Up @@ -2747,7 +2747,7 @@ func (s *engineSuite) TestRespondActivityTaskFailedSuccess() {
s.True(ok)
s.Equal(int32(200), di.DecisionTimeout)
s.Equal(int64(8), di.ScheduleID)
s.Equal(emptyEventID, di.StartedID)
s.Equal(common.EmptyEventID, di.StartedID)
}

func (s *engineSuite) TestRespondActivityTaskFailedByIDSuccess() {
Expand Down Expand Up @@ -2822,7 +2822,7 @@ func (s *engineSuite) TestRespondActivityTaskFailedByIDSuccess() {
s.True(ok)
s.Equal(int32(200), di.DecisionTimeout)
s.Equal(int64(8), di.ScheduleID)
s.Equal(emptyEventID, di.StartedID)
s.Equal(common.EmptyEventID, di.StartedID)
}

func (s *engineSuite) TestRecordActivityTaskHeartBeatSuccess_NoTimer() {
Expand Down Expand Up @@ -3132,7 +3132,7 @@ func (s *engineSuite) TestRespondActivityTaskCanceled_Started() {
s.True(ok)
s.Equal(int32(200), di.DecisionTimeout)
s.Equal(int64(9), di.ScheduleID)
s.Equal(emptyEventID, di.StartedID)
s.Equal(common.EmptyEventID, di.StartedID)
}

func (s *engineSuite) TestRespondActivityTaskCanceledByID_Started() {
Expand Down Expand Up @@ -3204,7 +3204,7 @@ func (s *engineSuite) TestRespondActivityTaskCanceledByID_Started() {
s.True(ok)
s.Equal(int32(200), di.DecisionTimeout)
s.Equal(int64(9), di.ScheduleID)
s.Equal(emptyEventID, di.StartedID)
s.Equal(common.EmptyEventID, di.StartedID)
}

func (s *engineSuite) TestRespondActivityTaskCanceledIfNoRunID() {
Expand Down
10 changes: 5 additions & 5 deletions service/history/historyReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (r *historyReplicator) ApplyReplicationTask(context *workflowExecutionConte
case shared.EventTypeWorkflowExecutionStarted:
// TODO: Support for child execution
var parentExecution *shared.WorkflowExecution
initiatedID := emptyEventID
initiatedID := common.EmptyEventID
parentDomainID := ""

// Serialize the history
Expand Down Expand Up @@ -249,9 +249,9 @@ func (r *historyReplicator) ApplyReplicationTask(context *workflowExecutionConte
}

// Set decision attributes after replication of history events
decisionVersionID := emptyVersion
decisionScheduleID := emptyEventID
decisionStartID := emptyEventID
decisionVersionID := common.EmptyVersion
decisionScheduleID := common.EmptyEventID
decisionStartID := common.EmptyEventID
decisionTimeout := int32(0)
if di != nil {
decisionVersionID = di.Version
Expand All @@ -275,7 +275,7 @@ func (r *historyReplicator) ApplyReplicationTask(context *workflowExecutionConte
DecisionTimeoutValue: msBuilder.executionInfo.DecisionTimeoutValue,
ExecutionContext: nil,
NextEventID: msBuilder.GetNextEventID(),
LastProcessedEvent: emptyEventID,
LastProcessedEvent: common.EmptyEventID,
TransferTasks: sBuilder.transferTasks,
DecisionVersion: decisionVersionID,
DecisionScheduleID: decisionScheduleID,
Expand Down
Loading

0 comments on commit 3b0ca48

Please sign in to comment.