Skip to content

Commit

Permalink
Change visibility closed_time to use last event timestamp (cadence-wo…
Browse files Browse the repository at this point in the history
…rkflow#1501)

* Change visibility closed_time to use last event timestamp

* Move writeCompletionEventToCache to replicate APIs

* Remove GetLastUpdatedTimestamp API
  • Loading branch information
vancexu authored and wxing1292 committed Mar 7, 2019
1 parent 68d81c6 commit 04837dd
Show file tree
Hide file tree
Showing 12 changed files with 59 additions and 69 deletions.
2 changes: 1 addition & 1 deletion schema/cassandra/cadence/schema.cql
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ CREATE TABLE executions (
'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'
};

-- events table is deprecated in favor of v2 talbes: history_node/history_tree
-- events table is deprecated in favor of v2 tables: history_node/history_tree
CREATE TABLE events (
domain_id uuid,
workflow_id text,
Expand Down
14 changes: 0 additions & 14 deletions service/history/MockMutableState.go
Original file line number Diff line number Diff line change
Expand Up @@ -1348,20 +1348,6 @@ func (_m *mockMutableState) GetLastFirstEventID() int64 {
return r0
}

// GetLastUpdatedTimestamp provides a mock function with given fields:
func (_m *mockMutableState) GetLastUpdatedTimestamp() int64 {
ret := _m.Called()

var r0 int64
if rf, ok := ret.Get(0).(func() int64); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(int64)
}

return r0
}

// GetLastWriteVersion provides a mock function with given fields:
func (_m *mockMutableState) GetLastWriteVersion() int64 {
ret := _m.Called()
Expand Down
14 changes: 9 additions & 5 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -819,7 +819,11 @@ func (e *historyEngineImpl) DescribeWorkflowExecution(ctx context.Context,
// for closed workflow
closeStatus := getWorkflowExecutionCloseStatus(executionInfo.CloseStatus)
result.WorkflowExecutionInfo.CloseStatus = &closeStatus
result.WorkflowExecutionInfo.CloseTime = common.Int64Ptr(msBuilder.GetLastUpdatedTimestamp())
completionEvent, ok := msBuilder.GetCompletionEvent()
if !ok {
return nil, &workflow.InternalServiceError{Message: "Unable to get workflow completion event."}
}
result.WorkflowExecutionInfo.CloseTime = common.Int64Ptr(completionEvent.GetTimestamp())
}

if len(msBuilder.GetPendingActivityInfos()) > 0 {
Expand Down Expand Up @@ -1283,7 +1287,7 @@ Update_History_Loop:
}

startAttributes := startEvent.WorkflowExecutionStartedEventAttributes
continueAsnewAttributes := &workflow.ContinueAsNewWorkflowExecutionDecisionAttributes{
continueAsNewAttributes := &workflow.ContinueAsNewWorkflowExecutionDecisionAttributes{
WorkflowType: startAttributes.WorkflowType,
TaskList: startAttributes.TaskList,
RetryPolicy: startAttributes.RetryPolicy,
Expand All @@ -1301,7 +1305,7 @@ Update_History_Loop:
return nil, err
}
if _, continueAsNewBuilder, err = msBuilder.AddContinueAsNewEvent(completedID, domainEntry,
startAttributes.GetParentWorkflowDomain(), continueAsnewAttributes, eventStoreVersion,
startAttributes.GetParentWorkflowDomain(), continueAsNewAttributes, eventStoreVersion,
createTaskID); err != nil {
return nil, err
}
Expand Down Expand Up @@ -1361,7 +1365,7 @@ Update_History_Loop:
}

startAttributes := startEvent.WorkflowExecutionStartedEventAttributes
continueAsnewAttributes := &workflow.ContinueAsNewWorkflowExecutionDecisionAttributes{
continueAsNewAttributes := &workflow.ContinueAsNewWorkflowExecutionDecisionAttributes{
WorkflowType: startAttributes.WorkflowType,
TaskList: startAttributes.TaskList,
RetryPolicy: startAttributes.RetryPolicy,
Expand All @@ -1382,7 +1386,7 @@ Update_History_Loop:
}
if _, continueAsNewBuilder, err = msBuilder.AddContinueAsNewEvent(completedID, domainEntry,
startAttributes.GetParentWorkflowDomain(),
continueAsnewAttributes, eventStoreVersion, createTaskID); err != nil {
continueAsNewAttributes, eventStoreVersion, createTaskID); err != nil {
return nil, err
}
}
Expand Down
2 changes: 1 addition & 1 deletion service/history/historyReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ func (r *historyReplicator) ApplyReplicationTask(ctx context.Context, context wo
if err != nil {
return err
}
// contineueAsNew
// continueAsNew
err = context.appendFirstBatchHistoryForContinueAsNew(newRunStateBuilder, transactionID)
if err != nil {
return err
Expand Down
1 change: 0 additions & 1 deletion service/history/mutableState.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ type (
GetHistorySize() int64
GetInFlightDecisionTask() (*decisionInfo, bool)
GetLastFirstEventID() int64
GetLastUpdatedTimestamp() int64
GetLastWriteVersion() int64
GetNextEventID() int64
GetPreviousStartedEventID() int64
Expand Down
49 changes: 14 additions & 35 deletions service/history/mutableStateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,17 +772,6 @@ func (e *mutableStateBuilder) GetWorkflowType() *workflow.WorkflowType {
return wType
}

func (e *mutableStateBuilder) GetLastUpdatedTimestamp() int64 {
lastUpdated := e.executionInfo.LastUpdatedTimestamp.UnixNano()
if e.executionInfo.StartTimestamp.UnixNano() >= lastUpdated {
// This could happen due to clock skews
// ensure that the lastUpdatedTimestamp is always greater than the StartTimestamp
lastUpdated = e.executionInfo.StartTimestamp.UnixNano() + 1
}

return lastUpdated
}

func (e *mutableStateBuilder) GetActivityScheduledEvent(scheduleEventID int64) (*workflow.HistoryEvent, bool) {
ai, ok := e.pendingActivityInfoIDs[scheduleEventID]
if !ok {
Expand Down Expand Up @@ -933,16 +922,11 @@ func (e *mutableStateBuilder) DeletePendingSignal(initiatedEventID int64) {
e.deleteSignalInfo = common.Int64Ptr(initiatedEventID)
}

func (e *mutableStateBuilder) writeCompletionEventToCache(completionEvent *workflow.HistoryEvent) error {
// First check to see if this is a Child Workflow
if e.HasParentExecution() {
// Store the completion result within events cache so we can communicate the result to parent execution
// during the processing of DeleteTransferTask without loading this event from database
e.eventsCache.putEvent(e.executionInfo.DomainID, e.executionInfo.WorkflowID, e.executionInfo.RunID,
completionEvent.GetEventId(), completionEvent)
}

return nil
func (e *mutableStateBuilder) writeCompletionEventToCache(completionEvent *workflow.HistoryEvent) {
// Store the completion result within events cache so we can communicate the result to parent execution
// during the processing of DeleteTransferTask without loading this event from database
e.eventsCache.putEvent(e.executionInfo.DomainID, e.executionInfo.WorkflowID, e.executionInfo.RunID,
completionEvent.GetEventId(), completionEvent)
}

func (e *mutableStateBuilder) hasPendingTasks() bool {
Expand Down Expand Up @@ -1917,17 +1901,16 @@ func (e *mutableStateBuilder) AddCompletedWorkflowEvent(decisionCompletedEventID
}

event := e.hBuilder.AddCompletedWorkflowEvent(decisionCompletedEventID, attributes)
// Write the event to cache only on active cluster
e.writeCompletionEventToCache(event)

e.ReplicateWorkflowExecutionCompletedEvent(decisionCompletedEventID, event)

return event
}

func (e *mutableStateBuilder) ReplicateWorkflowExecutionCompletedEvent(firstEventID int64, event *workflow.HistoryEvent) {
e.executionInfo.State = persistence.WorkflowStateCompleted
e.executionInfo.CloseStatus = persistence.WorkflowCloseStatusCompleted
e.executionInfo.CompletionEventBatchID = firstEventID // Used when completion event needs to be loaded from database
e.writeCompletionEventToCache(event)
}

func (e *mutableStateBuilder) AddFailWorkflowEvent(decisionCompletedEventID int64,
Expand All @@ -1939,9 +1922,6 @@ func (e *mutableStateBuilder) AddFailWorkflowEvent(decisionCompletedEventID int6
}

event := e.hBuilder.AddFailWorkflowEvent(decisionCompletedEventID, attributes)
// Write the event to cache only on active cluster
e.writeCompletionEventToCache(event)

e.ReplicateWorkflowExecutionFailedEvent(decisionCompletedEventID, event)

return event
Expand All @@ -1951,6 +1931,7 @@ func (e *mutableStateBuilder) ReplicateWorkflowExecutionFailedEvent(firstEventID
e.executionInfo.State = persistence.WorkflowStateCompleted
e.executionInfo.CloseStatus = persistence.WorkflowCloseStatusFailed
e.executionInfo.CompletionEventBatchID = firstEventID // Used when completion event needs to be loaded from database
e.writeCompletionEventToCache(event)
}

func (e *mutableStateBuilder) AddTimeoutWorkflowEvent() *workflow.HistoryEvent {
Expand All @@ -1961,9 +1942,6 @@ func (e *mutableStateBuilder) AddTimeoutWorkflowEvent() *workflow.HistoryEvent {
}

event := e.hBuilder.AddTimeoutWorkflowEvent()
// Write the event to cache only on active cluster
e.writeCompletionEventToCache(event)

e.ReplicateWorkflowExecutionTimedoutEvent(event.GetEventId(), event)

return event
Expand All @@ -1973,6 +1951,7 @@ func (e *mutableStateBuilder) ReplicateWorkflowExecutionTimedoutEvent(firstEvent
e.executionInfo.State = persistence.WorkflowStateCompleted
e.executionInfo.CloseStatus = persistence.WorkflowCloseStatusTimedOut
e.executionInfo.CompletionEventBatchID = firstEventID // Used when completion event needs to be loaded from database
e.writeCompletionEventToCache(event)
}

func (e *mutableStateBuilder) AddWorkflowExecutionCancelRequestedEvent(cause string,
Expand Down Expand Up @@ -2005,8 +1984,6 @@ func (e *mutableStateBuilder) AddWorkflowExecutionCanceledEvent(decisionTaskComp
}

event := e.hBuilder.AddWorkflowExecutionCanceledEvent(decisionTaskCompletedEventID, attributes)
// Write the event to cache only on active cluster
e.writeCompletionEventToCache(event)
e.ReplicateWorkflowExecutionCanceledEvent(decisionTaskCompletedEventID, event)

return event
Expand All @@ -2016,6 +1993,7 @@ func (e *mutableStateBuilder) ReplicateWorkflowExecutionCanceledEvent(firstEvent
e.executionInfo.State = persistence.WorkflowStateCompleted
e.executionInfo.CloseStatus = persistence.WorkflowCloseStatusCanceled
e.executionInfo.CompletionEventBatchID = firstEventID // Used when completion event needs to be loaded from database
e.writeCompletionEventToCache(event)
}

func (e *mutableStateBuilder) AddRequestCancelExternalWorkflowExecutionInitiatedEvent(
Expand Down Expand Up @@ -2272,9 +2250,6 @@ func (e *mutableStateBuilder) AddWorkflowExecutionTerminatedEvent(
}

event := e.hBuilder.AddWorkflowExecutionTerminatedEvent(request)
// Write the event to cache only on active cluster
e.writeCompletionEventToCache(event)

e.ReplicateWorkflowExecutionTerminatedEvent(event.GetEventId(), event)

return event
Expand All @@ -2284,6 +2259,7 @@ func (e *mutableStateBuilder) ReplicateWorkflowExecutionTerminatedEvent(firstEve
e.executionInfo.State = persistence.WorkflowStateCompleted
e.executionInfo.CloseStatus = persistence.WorkflowCloseStatusTerminated
e.executionInfo.CompletionEventBatchID = firstEventID // Used when completion event needs to be loaded from database
e.writeCompletionEventToCache(event)
}

func (e *mutableStateBuilder) AddWorkflowExecutionSignaled(
Expand Down Expand Up @@ -2364,6 +2340,9 @@ func (e *mutableStateBuilder) AddContinueAsNewEvent(decisionCompletedEventID int
func (e *mutableStateBuilder) ReplicateWorkflowExecutionContinuedAsNewEvent(sourceClusterName string, domainID string,
continueAsNewEvent *workflow.HistoryEvent, startedEvent *workflow.HistoryEvent, di *decisionInfo,
newStateBuilder mutableState, eventStoreVersion int32, createTaskID int64) error {

e.writeCompletionEventToCache(continueAsNewEvent)

continueAsNewAttributes := continueAsNewEvent.WorkflowExecutionContinuedAsNewEventAttributes
startedAttributes := startedEvent.WorkflowExecutionStartedEventAttributes
newRunID := continueAsNewAttributes.GetNewExecutionRunId()
Expand Down
8 changes: 4 additions & 4 deletions service/history/stateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (b *stateBuilderImpl) applyEvents(domainID, requestID string, execution sha
firstEvent = history[0]
}

// need to clear the stickness since workflow turned to passive
// need to clear the stickiness since workflow turned to passive
b.msBuilder.ClearStickyness()
for _, event := range history {
lastEvent = event
Expand Down Expand Up @@ -146,7 +146,7 @@ func (b *stateBuilderImpl) applyEvents(domainID, requestID string, execution sha

b.transferTasks = append(b.transferTasks, b.scheduleDecisionTransferTask(domainID, b.getTaskList(b.msBuilder),
di.ScheduleID))
// since we do not use stickyness on the standby side, there shall be no decision schedule to start timeout
// since we do not use stickiness on the standby side, there shall be no decision schedule to start timeout

lastDecision = di

Expand All @@ -173,7 +173,7 @@ func (b *stateBuilderImpl) applyEvents(domainID, requestID string, execution sha
if di := b.msBuilder.ReplicateTransientDecisionTaskScheduled(); di != nil {
b.transferTasks = append(b.transferTasks, b.scheduleDecisionTransferTask(domainID, b.getTaskList(b.msBuilder),
di.ScheduleID))
// since we do not use stickyness on the standby side, there shall be no decision schedule to start timeout
// since we do not use stickiness on the standby side, there shall be no decision schedule to start timeout
lastDecision = di
}

Expand All @@ -184,7 +184,7 @@ func (b *stateBuilderImpl) applyEvents(domainID, requestID string, execution sha
if di := b.msBuilder.ReplicateTransientDecisionTaskScheduled(); di != nil {
b.transferTasks = append(b.transferTasks, b.scheduleDecisionTransferTask(domainID, b.getTaskList(b.msBuilder),
di.ScheduleID))
// since we do not use stickyness on the standby side, there shall be no decision schedule to start timeout
// since we do not use stickiness on the standby side, there shall be no decision schedule to start timeout
lastDecision = di
}

Expand Down
6 changes: 6 additions & 0 deletions service/history/stateBuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type (
mockShard *shardContextImpl
mockMutableState *mockMutableState
mockClientBean *client.MockClientBean
mockEventsCache *MockEventsCache

stateBuilder *stateBuilderImpl
}
Expand Down Expand Up @@ -94,6 +95,7 @@ func (s *stateBuilderSuite) SetupTest() {
metricsClient := metrics.NewClient(tally.NoopScope, metrics.History)
s.mockClientBean = &client.MockClientBean{}
s.mockService = service.NewTestService(s.mockClusterMetadata, s.mockMessagingClient, metricsClient, s.mockClientBean, s.logger)
s.mockEventsCache = &MockEventsCache{}

s.mockShard = &shardContextImpl{
service: s.mockService,
Expand All @@ -107,6 +109,7 @@ func (s *stateBuilderSuite) SetupTest() {
config: NewDynamicConfigForTest(),
logger: s.logger,
domainCache: cache.NewDomainCache(s.mockMetadataMgr, s.mockClusterMetadata, metricsClient, s.logger),
eventsCache: s.mockEventsCache,
metricsClient: metrics.NewClient(tally.NoopScope, metrics.History),
}
s.mockMutableState = &mockMutableState{}
Expand All @@ -124,6 +127,7 @@ func (s *stateBuilderSuite) TearDownTest() {
s.mockProducer.AssertExpectations(s.T())
s.mockMetadataMgr.AssertExpectations(s.T())
s.mockClientBean.AssertExpectations(s.T())
s.mockEventsCache.AssertExpectations(s.T())
}

func (s *stateBuilderSuite) mockUpdateVersion(events ...*shared.HistoryEvent) {
Expand Down Expand Up @@ -832,6 +836,8 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionContinuedA

newRunHistory := &shared.History{Events: []*shared.HistoryEvent{newRunStartedEvent, newRunSignalEvent, newRunDecisionEvent}}
s.mockMutableState.On("ClearStickyness").Once()
s.mockEventsCache.On("putEvent", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().Once()

_, _, newRunStateBuilder, err := stateBuilder.applyEvents(domainID, requestID, execution, s.toHistory(continueAsNewEvent), newRunHistory.Events,
0, persistence.EventStoreVersionV2, createTaskID, newRunCreateTaskID)
s.Nil(err)
Expand Down
2 changes: 2 additions & 0 deletions service/history/timerQueueProcessor2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ func (s *timerQueueProcessor2Suite) TearDownTest() {
s.mockVisibilityMgr.AssertExpectations(s.T())
s.mockProducer.AssertExpectations(s.T())
s.mockClientBean.AssertExpectations(s.T())
s.mockEventsCache.AssertExpectations(s.T())
}

func (s *timerQueueProcessor2Suite) TestTimerUpdateTimesOut() {
Expand Down Expand Up @@ -286,6 +287,7 @@ func (s *timerQueueProcessor2Suite) TestWorkflowTimeout() {
// Done.
waitCh <- struct{}{}
}).Once()
s.mockEventsCache.On("putEvent", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().Once()

// Start timer Processor.
emptyResponse := &persistence.GetTimerIndexTasksResponse{Timers: []*persistence.TimerTaskInfo{}}
Expand Down
11 changes: 4 additions & 7 deletions service/history/transferQueueActiveProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,12 +389,9 @@ func (t *transferQueueActiveProcessorImpl) processCloseExecution(task *persisten

executionInfo := msBuilder.GetExecutionInfo()
replyToParentWorkflow := msBuilder.HasParentExecution() && executionInfo.CloseStatus != persistence.WorkflowCloseStatusContinuedAsNew
var completionEvent *workflow.HistoryEvent
if replyToParentWorkflow {
completionEvent, ok = msBuilder.GetCompletionEvent()
if !ok {
return &workflow.InternalServiceError{Message: "Unable to get workflow completion event."}
}
completionEvent, ok := msBuilder.GetCompletionEvent()
if !ok {
return &workflow.InternalServiceError{Message: "Unable to get workflow completion event."}
}
parentDomainID := executionInfo.ParentDomainID
parentWorkflowID := executionInfo.ParentWorkflowID
Expand All @@ -403,7 +400,7 @@ func (t *transferQueueActiveProcessorImpl) processCloseExecution(task *persisten

workflowTypeName := executionInfo.WorkflowTypeName
workflowStartTimestamp := executionInfo.StartTimestamp.UnixNano()
workflowCloseTimestamp := msBuilder.GetLastUpdatedTimestamp()
workflowCloseTimestamp := completionEvent.GetTimestamp()
workflowCloseStatus := getWorkflowExecutionCloseStatus(executionInfo.CloseStatus)
workflowHistoryLength := msBuilder.GetNextEventID() - 1

Expand Down
7 changes: 6 additions & 1 deletion service/history/transferQueueStandbyProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,10 +291,15 @@ func (t *transferQueueStandbyProcessorImpl) processCloseExecution(transferTask *
return nil
}

completionEvent, ok := msBuilder.GetCompletionEvent()
if !ok {
return &workflow.InternalServiceError{Message: "Unable to get workflow completion event."}
}

executionInfo := msBuilder.GetExecutionInfo()
workflowTypeName := executionInfo.WorkflowTypeName
workflowStartTimestamp := executionInfo.StartTimestamp.UnixNano()
workflowCloseTimestamp := msBuilder.GetLastUpdatedTimestamp()
workflowCloseTimestamp := completionEvent.GetTimestamp()
workflowCloseStatus := getWorkflowExecutionCloseStatus(executionInfo.CloseStatus)
workflowHistoryLength := msBuilder.GetNextEventID() - 1

Expand Down
Loading

0 comments on commit 04837dd

Please sign in to comment.