Skip to content

Commit

Permalink
Better handling Cassandra data resurrection issue (cadence-workflow#2949
Browse files Browse the repository at this point in the history
)
  • Loading branch information
wxing1292 authored Jan 3, 2020
1 parent 533e36b commit 7cd1724
Showing 1 changed file with 46 additions and 40 deletions.
86 changes: 46 additions & 40 deletions service/history/mutableStateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1140,15 +1140,17 @@ func (e *mutableStateBuilder) DeletePendingChildExecution(
initiatedEventID int64,
) error {

if _, ok := e.pendingChildExecutionInfoIDs[initiatedEventID]; !ok {
if _, ok := e.pendingChildExecutionInfoIDs[initiatedEventID]; ok {
delete(e.pendingChildExecutionInfoIDs, initiatedEventID)
} else {
e.logError(
fmt.Sprintf("unable to find child workflow: %v in mutable state", initiatedEventID),
fmt.Sprintf("unable to find child workflow event ID: %v in mutable state", initiatedEventID),
tag.ErrorTypeInvalidMutableStateAction,
)
return ErrMissingChildWorkflowInfo
// log data inconsistency instead of returning an error
e.logDataInconsistency()
}

delete(e.pendingChildExecutionInfoIDs, initiatedEventID)
e.deleteChildExecutionInfo = common.Int64Ptr(initiatedEventID)
return nil
}
Expand All @@ -1158,15 +1160,17 @@ func (e *mutableStateBuilder) DeletePendingRequestCancel(
initiatedEventID int64,
) error {

if _, ok := e.pendingRequestCancelInfoIDs[initiatedEventID]; !ok {
if _, ok := e.pendingRequestCancelInfoIDs[initiatedEventID]; ok {
delete(e.pendingRequestCancelInfoIDs, initiatedEventID)
} else {
e.logError(
fmt.Sprintf("unable to find request cancel info: %v in mutable state", initiatedEventID),
fmt.Sprintf("unable to find request cancel external workflow event ID: %v in mutable state", initiatedEventID),
tag.ErrorTypeInvalidMutableStateAction,
)
return ErrMissingRequestCancelInfo
// log data inconsistency instead of returning an error
e.logDataInconsistency()
}

delete(e.pendingRequestCancelInfoIDs, initiatedEventID)
e.deleteRequestCancelInfo = common.Int64Ptr(initiatedEventID)
return nil
}
Expand All @@ -1176,15 +1180,17 @@ func (e *mutableStateBuilder) DeletePendingSignal(
initiatedEventID int64,
) error {

if _, ok := e.pendingSignalInfoIDs[initiatedEventID]; !ok {
if _, ok := e.pendingSignalInfoIDs[initiatedEventID]; ok {
delete(e.pendingSignalInfoIDs, initiatedEventID)
} else {
e.logError(
fmt.Sprintf("unable to find signal info: %v in mutable state", initiatedEventID),
fmt.Sprintf("unable to find signal external workflow event ID: %v in mutable state", initiatedEventID),
tag.ErrorTypeInvalidMutableStateAction,
)
return ErrMissingSignalInfo
// log data inconsistency instead of returning an error
e.logDataInconsistency()
}

delete(e.pendingSignalInfoIDs, initiatedEventID)
e.deleteSignalInfo = common.Int64Ptr(initiatedEventID)
return nil
}
Expand Down Expand Up @@ -1285,28 +1291,28 @@ func (e *mutableStateBuilder) DeleteActivity(
scheduleEventID int64,
) error {

activityInfo, ok := e.pendingActivityInfoIDs[scheduleEventID]
if !ok {
e.logError(
fmt.Sprintf("unable to find activity event id: %v in mutable state", scheduleEventID),
tag.ErrorTypeInvalidMutableStateAction,
)
// log data inconsistency instead of returning an error
e.logDataInconsistency()
}
if activityInfo, ok := e.pendingActivityInfoIDs[scheduleEventID]; ok {
delete(e.pendingActivityInfoIDs, scheduleEventID)

_, ok = e.pendingActivityIDToEventID[activityInfo.ActivityID]
if !ok {
if _, ok = e.pendingActivityIDToEventID[activityInfo.ActivityID]; ok {
delete(e.pendingActivityIDToEventID, activityInfo.ActivityID)
} else {
e.logError(
fmt.Sprintf("unable to find activity ID: %v in mutable state", activityInfo.ActivityID),
tag.ErrorTypeInvalidMutableStateAction,
)
// log data inconsistency instead of returning an error
e.logDataInconsistency()
}
} else {
e.logError(
fmt.Sprintf("unable to find activity ID: %v in mutable state", activityInfo.ActivityID),
fmt.Sprintf("unable to find activity event id: %v in mutable state", scheduleEventID),
tag.ErrorTypeInvalidMutableStateAction,
)
// log data inconsistency instead of returning an error
e.logDataInconsistency()
}

delete(e.pendingActivityInfoIDs, scheduleEventID)
delete(e.pendingActivityIDToEventID, activityInfo.ActivityID)
e.deleteActivityInfos[scheduleEventID] = struct{}{}
return nil
}
Expand Down Expand Up @@ -1364,28 +1370,28 @@ func (e *mutableStateBuilder) DeleteUserTimer(
timerID string,
) error {

timerInfo, ok := e.pendingTimerInfoIDs[timerID]
if !ok {
e.logError(
fmt.Sprintf("unable to find timer ID: %v in mutable state", timerID),
tag.ErrorTypeInvalidMutableStateAction,
)
// log data inconsistency instead of returning an error
e.logDataInconsistency()
}
if timerInfo, ok := e.pendingTimerInfoIDs[timerID]; ok {
delete(e.pendingTimerInfoIDs, timerID)

_, ok = e.pendingTimerEventIDToID[timerInfo.StartedID]
if !ok {
if _, ok = e.pendingTimerEventIDToID[timerInfo.StartedID]; ok {
delete(e.pendingTimerEventIDToID, timerInfo.StartedID)
} else {
e.logError(
fmt.Sprintf("unable to find timer event ID: %v in mutable state", timerID),
tag.ErrorTypeInvalidMutableStateAction,
)
// log data inconsistency instead of returning an error
e.logDataInconsistency()
}
} else {
e.logError(
fmt.Sprintf("unable to find timer event ID: %v in mutable state", timerID),
fmt.Sprintf("unable to find timer ID: %v in mutable state", timerID),
tag.ErrorTypeInvalidMutableStateAction,
)
// log data inconsistency instead of returning an error
e.logDataInconsistency()
}

delete(e.pendingTimerInfoIDs, timerID)
delete(e.pendingTimerEventIDToID, timerInfo.StartedID)
e.deleteTimerInfos[timerID] = struct{}{}
return nil
}
Expand Down

0 comments on commit 7cd1724

Please sign in to comment.