Skip to content

Commit

Permalink
Bugfix: there shall be no events in history after workflow finish eve…
Browse files Browse the repository at this point in the history
…nt. (cadence-workflow#1509)

* Bugfix: there shall be no events in history after workflow finish event.
  • Loading branch information
wxing1292 authored Mar 7, 2019
1 parent 09d90a2 commit 68d81c6
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 11 deletions.
6 changes: 3 additions & 3 deletions service/history/MockWorkflowExecutionContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,18 @@ type mockWorkflowExecutionContext struct {

var _ workflowExecutionContext = (*mockWorkflowExecutionContext)(nil)

func (_m *mockWorkflowExecutionContext) appendHistoryEvents(_a0 *historyBuilder, _a1 []*workflow.HistoryEvent, _a2 int64) (int, error) {
func (_m *mockWorkflowExecutionContext) appendHistoryEvents(_a0 []*workflow.HistoryEvent, _a1 int64, _a2 bool) (int, error) {
ret := _m.Called(_a0, _a1, _a2)

var r0 int
if rf, ok := ret.Get(0).(func(*historyBuilder, []*workflow.HistoryEvent, int64) int); ok {
if rf, ok := ret.Get(0).(func([]*workflow.HistoryEvent, int64, bool) int); ok {
r0 = rf(_a0, _a1, _a2)
} else {
r0 = ret.Get(0).(int)
}

var r1 error
if rf, ok := ret.Get(1).(func(*historyBuilder, []*workflow.HistoryEvent, int64) error); ok {
if rf, ok := ret.Get(1).(func([]*workflow.HistoryEvent, int64, bool) error); ok {
r1 = rf(_a0, _a1, _a2)
} else {
r1 = ret.Error(1)
Expand Down
4 changes: 4 additions & 0 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"encoding/json"
"errors"
"fmt"
"go.uber.org/cadence/.gen/go/shared"
"time"

"github.com/pborman/uuid"
Expand Down Expand Up @@ -118,6 +119,9 @@ var (
ErrBufferedEventsLimitExceeded = &workflow.LimitExceededError{Message: "Exceeded workflow execution limit for buffered events"}
// ErrSignalsLimitExceeded is the error indicating limit reached for maximum number of signal events
ErrSignalsLimitExceeded = &workflow.LimitExceededError{Message: "Exceeded workflow execution limit for signal events"}
// ErrEventsAterWorkflowFinish is the error indicating server error trying to write events after workflow finish event
ErrEventsAterWorkflowFinish = &shared.InternalServiceError{Message: "error validating last event being workflow finish event."}

// FailedWorkflowCloseState is a set of failed workflow close states, used for start workflow policy
// for start workflow execution API
FailedWorkflowCloseState = map[int]bool{
Expand Down
27 changes: 27 additions & 0 deletions service/history/mutableStateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ func (e *mutableStateBuilder) FlushBufferedEvents() error {
e.updateBufferedEvents = nil
}

newCommittedEvents = e.trimEventsAfterWorkflowClose(newCommittedEvents)
e.hBuilder.history = newCommittedEvents
// make sure all new committed events have correct EventID
e.assignEventIDToBufferedEvents()
Expand Down Expand Up @@ -580,6 +581,32 @@ func convertSignalRequestedIDs(inputs map[string]struct{}) []string {
return outputs
}

func (e *mutableStateBuilder) trimEventsAfterWorkflowClose(input []*workflow.HistoryEvent) []*workflow.HistoryEvent {
if len(input) == 0 {
return input
}

nextIndex := 0

loop:
for _, event := range input {
nextIndex++

switch event.GetEventType() {
case workflow.EventTypeWorkflowExecutionCompleted,
workflow.EventTypeWorkflowExecutionFailed,
workflow.EventTypeWorkflowExecutionTimedOut,
workflow.EventTypeWorkflowExecutionTerminated,
workflow.EventTypeWorkflowExecutionContinuedAsNew,
workflow.EventTypeWorkflowExecutionCanceled:

break loop
}
}

return input[0:nextIndex]
}

func (e *mutableStateBuilder) assignEventIDToBufferedEvents() {
newCommittedEvents := e.hBuilder.history

Expand Down
47 changes: 47 additions & 0 deletions service/history/mutableStateBuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,3 +242,50 @@ func (s *mutableStateSuite) TestReorderEvents() {
s.Equal(int64(5), s.msBuilder.hBuilder.history[1].ActivityTaskCompletedEventAttributes.GetScheduledEventId())

}

func (s *mutableStateSuite) TestTrimEvents() {
var input []*workflow.HistoryEvent
output := s.msBuilder.trimEventsAfterWorkflowClose(input)
s.Equal(input, output)

input = []*workflow.HistoryEvent{}
output = s.msBuilder.trimEventsAfterWorkflowClose(input)
s.Equal(input, output)

input = []*workflow.HistoryEvent{
&workflow.HistoryEvent{
EventType: workflow.EventTypeActivityTaskCanceled.Ptr(),
},
&workflow.HistoryEvent{
EventType: workflow.EventTypeWorkflowExecutionSignaled.Ptr(),
},
}
output = s.msBuilder.trimEventsAfterWorkflowClose(input)
s.Equal(input, output)

input = []*workflow.HistoryEvent{
&workflow.HistoryEvent{
EventType: workflow.EventTypeActivityTaskCanceled.Ptr(),
},
&workflow.HistoryEvent{
EventType: workflow.EventTypeWorkflowExecutionCompleted.Ptr(),
},
}
output = s.msBuilder.trimEventsAfterWorkflowClose(input)
s.Equal(input, output)

input = []*workflow.HistoryEvent{
&workflow.HistoryEvent{
EventType: workflow.EventTypeWorkflowExecutionCompleted.Ptr(),
},
&workflow.HistoryEvent{
EventType: workflow.EventTypeActivityTaskCanceled.Ptr(),
},
}
output = s.msBuilder.trimEventsAfterWorkflowClose(input)
s.Equal([]*workflow.HistoryEvent{
&workflow.HistoryEvent{
EventType: workflow.EventTypeWorkflowExecutionCompleted.Ptr(),
},
}, output)
}
8 changes: 8 additions & 0 deletions service/history/stateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package history

import (
"github.com/uber/cadence/common/errors"
"time"

"github.com/pborman/uuid"
Expand Down Expand Up @@ -59,6 +60,10 @@ type (
}
)

const (
ErrMessageNewRunHistorySizeZero = "encounter new run history size being zero"
)

var _ stateBuilder = (*stateBuilderImpl)(nil)

func newStateBuilder(shard ShardContext, msBuilder mutableState, logger bark.Logger) *stateBuilderImpl {
Expand Down Expand Up @@ -387,6 +392,9 @@ func (b *stateBuilderImpl) applyEvents(domainID, requestID string, execution sha
}

case shared.EventTypeWorkflowExecutionContinuedAsNew:
if len(newRunHistory) == 0 {
return nil, nil, nil, errors.NewInternalFailureError(ErrMessageNewRunHistorySizeZero)
}
newRunStartedEvent := newRunHistory[0]
// Create mutable state updates for the new run
newRunMutableStateBuilder = newMutableStateBuilderWithReplicationState(
Expand Down
63 changes: 55 additions & 8 deletions service/history/workflowExecutionContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ const (

type (
workflowExecutionContext interface {
appendHistoryEvents(builder *historyBuilder, history []*workflow.HistoryEvent, transactionID int64) (int, error)
appendHistoryEvents(history []*workflow.HistoryEvent, transactionID int64, doLastEventValidation bool) (int, error)
clear()
continueAsNewWorkflowExecution(context []byte, newStateBuilder mutableState, transferTasks []persistence.Task, timerTasks []persistence.Task, transactionID int64) error
getDomainID() string
Expand Down Expand Up @@ -224,7 +224,7 @@ func (c *workflowExecutionContextImpl) resetWorkflowExecution(currMutableState m
if updateCurr {
hBuilder := currMutableState.GetHistoryBuilder()
var size int
size, retError = c.appendHistoryEvents(hBuilder, hBuilder.GetHistory().GetEvents(), transactionID)
size, retError = c.appendHistoryEvents(hBuilder.GetHistory().GetEvents(), transactionID, true)
if retError != nil {
return
}
Expand Down Expand Up @@ -460,7 +460,7 @@ func (c *workflowExecutionContextImpl) update(transferTasks []persistence.Task,
if hasNewStandbyHistoryEvents {
firstEvent := standbyHistoryBuilder.GetFirstEvent()
// Note: standby events has no transient decision events
newHistorySize, err = c.appendHistoryEvents(standbyHistoryBuilder, standbyHistoryBuilder.history, transactionID)
newHistorySize, err = c.appendHistoryEvents(standbyHistoryBuilder.history, transactionID, true)
if err != nil {
return err
}
Expand All @@ -473,14 +473,15 @@ func (c *workflowExecutionContextImpl) update(transferTasks []persistence.Task,
firstEvent := activeHistoryBuilder.GetFirstEvent()
// Transient decision events need to be written as a separate batch
if activeHistoryBuilder.HasTransientEvents() {
newHistorySize, err = c.appendHistoryEvents(activeHistoryBuilder, activeHistoryBuilder.transientHistory, transactionID)
// transient decision events batch should not perform last event check
newHistorySize, err = c.appendHistoryEvents(activeHistoryBuilder.transientHistory, transactionID, false)
if err != nil {
return err
}
}

var size int
size, err = c.appendHistoryEvents(activeHistoryBuilder, activeHistoryBuilder.history, transactionID)
size, err = c.appendHistoryEvents(activeHistoryBuilder.history, transactionID, true)
if err != nil {
return err
}
Expand Down Expand Up @@ -543,7 +544,7 @@ func (c *workflowExecutionContextImpl) update(transferTasks []persistence.Task,
if err1 != nil {
return err1
}
newHistorySize, err = c.appendHistoryEvents(activeHistoryBuilder, activeHistoryBuilder.history, terminateTransactionID)
newHistorySize, err = c.appendHistoryEvents(activeHistoryBuilder.history, terminateTransactionID, true)
if err != nil {
return err
}
Expand Down Expand Up @@ -653,8 +654,14 @@ func (c *workflowExecutionContextImpl) update(transferTasks []persistence.Task,
return nil
}

func (c *workflowExecutionContextImpl) appendHistoryEvents(builder *historyBuilder, history []*workflow.HistoryEvent,
transactionID int64) (int, error) {
func (c *workflowExecutionContextImpl) appendHistoryEvents(history []*workflow.HistoryEvent,
transactionID int64, doLastEventValidation bool) (int, error) {

if doLastEventValidation {
if err := c.validateNoEventsAfterWorkflowFinish(history); err != nil {
return 0, err
}
}

firstEvent := history[0]
var historySize int
Expand Down Expand Up @@ -927,3 +934,43 @@ func (c *workflowExecutionContextImpl) emitSessionUpdateStats(stats *persistence
c.metricsClient.RecordTimer(metrics.SessionCountStatsScope, metrics.DeleteRequestCancelInfoCount,
time.Duration(stats.DeleteRequestCancelInfoCount))
}

// validateNoEventsAfterWorkflowFinish perform check on history event batch
// NOTE: do not apply this check on every batch, since transient
// decision && workflow finish will be broken (the first batch)
func (c *workflowExecutionContextImpl) validateNoEventsAfterWorkflowFinish(input []*workflow.HistoryEvent) error {
if len(input) == 0 {
return nil
}

// if workflow is still running, no check is necessary
if c.msBuilder.IsWorkflowExecutionRunning() {
return nil
}

// workflow close
// this will perform check on the last event of last batch
// NOTE: do not apply this check on every batch, since transient
// decision && workflow finish will be broken (the first batch)
lastEvent := input[len(input)-1]
switch lastEvent.GetEventType() {
case workflow.EventTypeWorkflowExecutionCompleted,
workflow.EventTypeWorkflowExecutionFailed,
workflow.EventTypeWorkflowExecutionTimedOut,
workflow.EventTypeWorkflowExecutionTerminated,
workflow.EventTypeWorkflowExecutionContinuedAsNew,
workflow.EventTypeWorkflowExecutionCanceled:

return nil

default:
c.logger.WithFields(bark.Fields{
logging.TagDomainID: c.domainID,
logging.TagWorkflowExecutionID: c.workflowExecution.GetWorkflowId(),
logging.TagWorkflowRunID: c.workflowExecution.GetRunId(),
}).Error("encounter case where events appears after workflow finish.")

return ErrEventsAterWorkflowFinish
}

}

0 comments on commit 68d81c6

Please sign in to comment.