Skip to content

Commit

Permalink
logging: attempt to fix high memory on history (cadence-workflow#2859)
Browse files Browse the repository at this point in the history
  • Loading branch information
venkat1109 authored Nov 25, 2019
1 parent a5b2ed7 commit 5e0a62b
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 51 deletions.
1 change: 0 additions & 1 deletion common/log/tag/values.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ var (
// Pre-defined values for TagSysComponent
var (
ComponentTaskList = component("tasklist")
ComponentHistoryBuilder = component("history-builder")
ComponentHistoryEngine = component("history-engine")
ComponentHistoryCache = component("history-cache")
ComponentEventsCache = component("events-cache")
Expand Down
4 changes: 0 additions & 4 deletions service/history/historyBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/persistence"
)

Expand All @@ -34,7 +33,6 @@ type (
transientHistory []*workflow.HistoryEvent
history []*workflow.HistoryEvent
msBuilder mutableState
logger log.Logger
}
)

Expand All @@ -43,14 +41,12 @@ func newHistoryBuilder(msBuilder mutableState, logger log.Logger) *historyBuilde
transientHistory: []*workflow.HistoryEvent{},
history: []*workflow.HistoryEvent{},
msBuilder: msBuilder,
logger: logger.WithTags(tag.ComponentHistoryBuilder),
}
}

func newHistoryBuilderFromEvents(history []*workflow.HistoryEvent, logger log.Logger) *historyBuilder {
return &historyBuilder{
history: history,
logger: logger.WithTags(tag.ComponentHistoryBuilder),
}
}

Expand Down
88 changes: 54 additions & 34 deletions service/history/mutableStateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ func (e *mutableStateBuilder) UpdateCurrentVersion(
err := &workflow.InternalServiceError{
Message: "cannot update current version of local domain workflow to version other than empty version",
}
e.logger.Error(err.Error())
e.logError(err.Error())
return err
}
e.currentVersion = common.EmptyVersion
Expand Down Expand Up @@ -1048,7 +1048,7 @@ func (e *mutableStateBuilder) GetCronBackoffDuration() (time.Duration, error) {
// This only call when doing ContinueAsNew. At this point, the workflow should have a start event
workflowStartEvent, err := e.GetStartEvent()
if err != nil {
e.logger.Error("unable to find workflow start event", tag.ErrorTypeInvalidHistoryAction)
e.logError("unable to find workflow start event", tag.ErrorTypeInvalidHistoryAction)
return backoff.NoBackoff, err
}
firstDecisionTaskBackoff :=
Expand Down Expand Up @@ -1139,7 +1139,7 @@ func (e *mutableStateBuilder) DeletePendingChildExecution(
) error {

if _, ok := e.pendingChildExecutionInfoIDs[initiatedEventID]; !ok {
e.logger.Error(
e.logError(
fmt.Sprintf("unable to find child workflow: %v in mutable state", initiatedEventID),
tag.ErrorTypeInvalidMutableStateAction,
)
Expand All @@ -1157,7 +1157,7 @@ func (e *mutableStateBuilder) DeletePendingRequestCancel(
) error {

if _, ok := e.pendingRequestCancelInfoIDs[initiatedEventID]; !ok {
e.logger.Error(
e.logError(
fmt.Sprintf("unable to find request cancel info: %v in mutable state", initiatedEventID),
tag.ErrorTypeInvalidMutableStateAction,
)
Expand All @@ -1175,7 +1175,7 @@ func (e *mutableStateBuilder) DeletePendingSignal(
) error {

if _, ok := e.pendingSignalInfoIDs[initiatedEventID]; !ok {
e.logger.Error(
e.logError(
fmt.Sprintf("unable to find signal info: %v in mutable state", initiatedEventID),
tag.ErrorTypeInvalidMutableStateAction,
)
Expand Down Expand Up @@ -1230,7 +1230,7 @@ func (e *mutableStateBuilder) ReplicateActivityInfo(
) error {
ai, ok := e.pendingActivityInfoIDs[request.GetScheduledId()]
if !ok {
e.logger.Error(
e.logError(
fmt.Sprintf("unable to find activity event ID: %v in mutable state", request.GetScheduledId()),
tag.ErrorTypeInvalidMutableStateAction,
)
Expand Down Expand Up @@ -1266,7 +1266,7 @@ func (e *mutableStateBuilder) UpdateActivity(
) error {

if _, ok := e.pendingActivityInfoIDs[ai.ScheduleID]; !ok {
e.logger.Error(
e.logError(
fmt.Sprintf("unable to find activity ID: %v in mutable state", ai.ActivityID),
tag.ErrorTypeInvalidMutableStateAction,
)
Expand All @@ -1285,7 +1285,7 @@ func (e *mutableStateBuilder) DeleteActivity(

activityInfo, ok := e.pendingActivityInfoIDs[scheduleEventID]
if !ok {
e.logger.Error(
e.logError(
fmt.Sprintf("unable to find activity event id: %v in mutable state", scheduleEventID),
tag.ErrorTypeInvalidMutableStateAction,
)
Expand All @@ -1294,7 +1294,7 @@ func (e *mutableStateBuilder) DeleteActivity(

_, ok = e.pendingActivityIDToEventID[activityInfo.ActivityID]
if !ok {
e.logger.Error(
e.logError(
fmt.Sprintf("unable to find activity ID: %v in mutable state", activityInfo.ActivityID),
tag.ErrorTypeInvalidMutableStateAction,
)
Expand Down Expand Up @@ -1335,15 +1335,15 @@ func (e *mutableStateBuilder) UpdateUserTimer(

timerID, ok := e.pendingTimerEventIDToID[ti.StartedID]
if !ok {
e.logger.Error(
e.logError(
fmt.Sprintf("unable to find timer event ID: %v in mutable state", ti.StartedID),
tag.ErrorTypeInvalidMutableStateAction,
)
return ErrMissingTimerInfo
}

if _, ok := e.pendingTimerInfoIDs[timerID]; !ok {
e.logger.Error(
e.logError(
fmt.Sprintf("unable to find timer ID: %v in mutable state", timerID),
tag.ErrorTypeInvalidMutableStateAction,
)
Expand All @@ -1362,7 +1362,7 @@ func (e *mutableStateBuilder) DeleteUserTimer(

timerInfo, ok := e.pendingTimerInfoIDs[timerID]
if !ok {
e.logger.Error(
e.logError(
fmt.Sprintf("unable to find timer ID: %v in mutable state", timerID),
tag.ErrorTypeInvalidMutableStateAction,
)
Expand All @@ -1371,7 +1371,7 @@ func (e *mutableStateBuilder) DeleteUserTimer(

_, ok = e.pendingTimerEventIDToID[timerInfo.StartedID]
if !ok {
e.logger.Error(
e.logError(
fmt.Sprintf("unable to find timer event ID: %v in mutable state", timerID),
tag.ErrorTypeInvalidMutableStateAction,
)
Expand Down Expand Up @@ -2361,7 +2361,7 @@ func (e *mutableStateBuilder) AddActivityTaskCancelRequestedEvent(

ai, ok := e.GetActivityByActivityID(activityID)
if !ok || ai.CancelRequested {
e.logger.Warn(mutableStateInvalidHistoryActionMsg, opTag,
e.logWarn(mutableStateInvalidHistoryActionMsg, opTag,
tag.WorkflowEventID(e.GetNextEventID()),
tag.ErrorTypeInvalidHistoryAction,
tag.Bool(ok),
Expand Down Expand Up @@ -2431,7 +2431,7 @@ func (e *mutableStateBuilder) AddActivityTaskCanceledEvent(

ai, ok := e.GetActivityInfo(scheduleEventID)
if !ok || ai.StartedID != startedEventID {
e.logger.Warn(mutableStateInvalidHistoryActionMsg, opTag,
e.logWarn(mutableStateInvalidHistoryActionMsg, opTag,
tag.WorkflowEventID(e.GetNextEventID()),
tag.ErrorTypeInvalidHistoryAction,
tag.WorkflowScheduleID(scheduleEventID))
Expand All @@ -2440,7 +2440,7 @@ func (e *mutableStateBuilder) AddActivityTaskCanceledEvent(

// Verify cancel request as well.
if !ai.CancelRequested {
e.logger.Warn(mutableStateInvalidHistoryActionMsg, opTag,
e.logWarn(mutableStateInvalidHistoryActionMsg, opTag,
tag.WorkflowEventID(e.GetNextEventID()),
tag.ErrorTypeInvalidHistoryAction,
tag.WorkflowScheduleID(scheduleEventID),
Expand Down Expand Up @@ -2601,7 +2601,7 @@ func (e *mutableStateBuilder) AddWorkflowExecutionCancelRequestedEvent(
}

if e.executionInfo.CancelRequested {
e.logger.Warn(mutableStateInvalidHistoryActionMsg, opTag,
e.logWarn(mutableStateInvalidHistoryActionMsg, opTag,
tag.WorkflowEventID(e.GetNextEventID()),
tag.ErrorTypeInvalidHistoryAction,
tag.WorkflowState(e.executionInfo.State),
Expand Down Expand Up @@ -2729,7 +2729,7 @@ func (e *mutableStateBuilder) AddExternalWorkflowExecutionCancelRequested(

_, ok := e.GetRequestCancelInfo(initiatedID)
if !ok {
e.logger.Warn(mutableStateInvalidHistoryActionMsg, opTag,
e.logWarn(mutableStateInvalidHistoryActionMsg, opTag,
tag.WorkflowEventID(e.GetNextEventID()),
tag.ErrorTypeInvalidHistoryAction,
tag.WorkflowInitiatedID(initiatedID))
Expand Down Expand Up @@ -2768,7 +2768,7 @@ func (e *mutableStateBuilder) AddRequestCancelExternalWorkflowExecutionFailedEve

_, ok := e.GetRequestCancelInfo(initiatedID)
if !ok {
e.logger.Warn(mutableStateInvalidHistoryActionMsg, opTag,
e.logWarn(mutableStateInvalidHistoryActionMsg, opTag,
tag.WorkflowEventID(e.GetNextEventID()),
tag.ErrorTypeInvalidHistoryAction,
tag.WorkflowInitiatedID(initiatedID))
Expand Down Expand Up @@ -2902,7 +2902,7 @@ func (e *mutableStateBuilder) AddExternalWorkflowExecutionSignaled(

_, ok := e.GetSignalInfo(initiatedID)
if !ok {
e.logger.Warn(mutableStateInvalidHistoryActionMsg, opTag,
e.logWarn(mutableStateInvalidHistoryActionMsg, opTag,
tag.WorkflowEventID(e.GetNextEventID()),
tag.ErrorTypeInvalidHistoryAction,
tag.WorkflowInitiatedID(initiatedID))
Expand Down Expand Up @@ -2942,7 +2942,7 @@ func (e *mutableStateBuilder) AddSignalExternalWorkflowExecutionFailedEvent(

_, ok := e.GetSignalInfo(initiatedID)
if !ok {
e.logger.Warn(mutableStateInvalidHistoryActionMsg, opTag,
e.logWarn(mutableStateInvalidHistoryActionMsg, opTag,
tag.WorkflowEventID(e.GetNextEventID()),
tag.ErrorTypeInvalidHistoryAction,
tag.WorkflowInitiatedID(initiatedID))
Expand Down Expand Up @@ -2979,7 +2979,7 @@ func (e *mutableStateBuilder) AddTimerStartedEvent(
timerID := request.GetTimerId()
ti, ok := e.GetUserTimerInfo(timerID)
if ok {
e.logger.Warn(mutableStateInvalidHistoryActionMsg, opTag,
e.logWarn(mutableStateInvalidHistoryActionMsg, opTag,
tag.WorkflowEventID(e.GetNextEventID()),
tag.ErrorTypeInvalidHistoryAction,
tag.WorkflowTimerID(timerID))
Expand Down Expand Up @@ -3031,7 +3031,7 @@ func (e *mutableStateBuilder) AddTimerFiredEvent(

timerInfo, ok := e.GetUserTimerInfo(timerID)
if !ok {
e.logger.Warn(mutableStateInvalidHistoryActionMsg, opTag,
e.logWarn(mutableStateInvalidHistoryActionMsg, opTag,
tag.WorkflowEventID(e.GetNextEventID()),
tag.ErrorTypeInvalidHistoryAction,
tag.WorkflowTimerID(timerID))
Expand Down Expand Up @@ -3076,7 +3076,7 @@ func (e *mutableStateBuilder) AddTimerCanceledEvent(
// bufferedEvents and the history builder
timerFiredEvent := e.checkAndClearTimerFiredEvent(timerID)
if timerFiredEvent == nil {
e.logger.Warn(mutableStateInvalidHistoryActionMsg, opTag,
e.logWarn(mutableStateInvalidHistoryActionMsg, opTag,
tag.WorkflowEventID(e.GetNextEventID()),
tag.ErrorTypeInvalidHistoryAction,
tag.WorkflowTimerID(timerID))
Expand Down Expand Up @@ -3412,7 +3412,7 @@ func (e *mutableStateBuilder) AddChildWorkflowExecutionStartedEvent(

ci, ok := e.GetChildExecutionInfo(initiatedID)
if !ok || ci.StartedID != common.EmptyEventID {
e.logger.Warn(mutableStateInvalidHistoryActionMsg, opTag,
e.logWarn(mutableStateInvalidHistoryActionMsg, opTag,
tag.WorkflowEventID(e.GetNextEventID()),
tag.ErrorTypeInvalidHistoryAction,
tag.Bool(ok),
Expand Down Expand Up @@ -3455,7 +3455,7 @@ func (e *mutableStateBuilder) AddStartChildWorkflowExecutionFailedEvent(

ci, ok := e.GetChildExecutionInfo(initiatedID)
if !ok || ci.StartedID != common.EmptyEventID {
e.logger.Warn(mutableStateInvalidHistoryActionMsg, opTag,
e.logWarn(mutableStateInvalidHistoryActionMsg, opTag,
tag.WorkflowEventID(e.GetNextEventID()),
tag.ErrorTypeInvalidHistoryAction,
tag.Bool(ok),
Expand Down Expand Up @@ -3493,7 +3493,7 @@ func (e *mutableStateBuilder) AddChildWorkflowExecutionCompletedEvent(

ci, ok := e.GetChildExecutionInfo(initiatedID)
if !ok || ci.StartedID == common.EmptyEventID {
e.logger.Warn(mutableStateInvalidHistoryActionMsg, opTag,
e.logWarn(mutableStateInvalidHistoryActionMsg, opTag,
tag.WorkflowEventID(e.GetNextEventID()),
tag.ErrorTypeInvalidHistoryAction,
tag.Bool(ok),
Expand Down Expand Up @@ -3540,7 +3540,7 @@ func (e *mutableStateBuilder) AddChildWorkflowExecutionFailedEvent(

ci, ok := e.GetChildExecutionInfo(initiatedID)
if !ok || ci.StartedID == common.EmptyEventID {
e.logger.Warn(mutableStateInvalidHistoryActionMsg,
e.logWarn(mutableStateInvalidHistoryActionMsg,
tag.WorkflowEventID(e.GetNextEventID()),
tag.ErrorTypeInvalidHistoryAction,
tag.Bool(!ok),
Expand Down Expand Up @@ -3587,7 +3587,7 @@ func (e *mutableStateBuilder) AddChildWorkflowExecutionCanceledEvent(

ci, ok := e.GetChildExecutionInfo(initiatedID)
if !ok || ci.StartedID == common.EmptyEventID {
e.logger.Warn(mutableStateInvalidHistoryActionMsg, opTag,
e.logWarn(mutableStateInvalidHistoryActionMsg, opTag,
tag.WorkflowEventID(e.GetNextEventID()),
tag.ErrorTypeInvalidHistoryAction,
tag.Bool(ok),
Expand Down Expand Up @@ -3634,7 +3634,7 @@ func (e *mutableStateBuilder) AddChildWorkflowExecutionTerminatedEvent(

ci, ok := e.GetChildExecutionInfo(initiatedID)
if !ok || ci.StartedID == common.EmptyEventID {
e.logger.Warn(mutableStateInvalidHistoryActionMsg, opTag,
e.logWarn(mutableStateInvalidHistoryActionMsg, opTag,
tag.WorkflowEventID(e.GetNextEventID()),
tag.ErrorTypeInvalidHistoryAction,
tag.Bool(ok),
Expand Down Expand Up @@ -3681,7 +3681,7 @@ func (e *mutableStateBuilder) AddChildWorkflowExecutionTimedOutEvent(

ci, ok := e.GetChildExecutionInfo(initiatedID)
if !ok || ci.StartedID == common.EmptyEventID {
e.logger.Warn(mutableStateInvalidHistoryActionMsg, opTag,
e.logWarn(mutableStateInvalidHistoryActionMsg, opTag,
tag.WorkflowEventID(e.GetNextEventID()),
tag.ErrorTypeInvalidHistoryAction,
tag.Bool(ok),
Expand Down Expand Up @@ -4262,7 +4262,7 @@ func (e *mutableStateBuilder) validateNoEventsAfterWorkflowFinish(

default:
executionInfo := e.GetExecutionInfo()
e.logger.Error(
e.logError(
"encounter case where events appears after workflow finish.",
tag.WorkflowDomainID(executionInfo.DomainID),
tag.WorkflowID(executionInfo.WorkflowID),
Expand Down Expand Up @@ -4449,7 +4449,7 @@ func (e *mutableStateBuilder) closeTransactionHandleWorkflowReset(
); err != nil {
return err
}
e.logger.Info("Auto-Reset task is scheduled",
e.logInfo("Auto-Reset task is scheduled",
tag.WorkflowDomainName(domainEntry.GetInfo().Name),
tag.WorkflowID(executionInfo.WorkflowID),
tag.WorkflowRunID(executionInfo.RunID),
Expand Down Expand Up @@ -4487,7 +4487,7 @@ func (e *mutableStateBuilder) checkMutability(
) error {

if !e.IsWorkflowExecutionRunning() {
e.logger.Warn(
e.logWarn(
mutableStateInvalidHistoryActionMsg,
tag.WorkflowEventID(e.GetNextEventID()),
tag.ErrorTypeInvalidHistoryAction,
Expand Down Expand Up @@ -4521,3 +4521,23 @@ func (e *mutableStateBuilder) unixNanoToTime(

return time.Unix(0, timestampNanos)
}

func (e *mutableStateBuilder) logInfo(msg string, tags ...tag.Tag) {
tags = append(tags, tag.WorkflowID(e.executionInfo.WorkflowID))
tags = append(tags, tag.WorkflowRunID(e.executionInfo.RunID))
tags = append(tags, tag.WorkflowDomainID(e.executionInfo.DomainID))
e.logger.Info(msg, tags...)
}

func (e *mutableStateBuilder) logWarn(msg string, tags ...tag.Tag) {
tags = append(tags, tag.WorkflowID(e.executionInfo.WorkflowID))
tags = append(tags, tag.WorkflowRunID(e.executionInfo.RunID))
tags = append(tags, tag.WorkflowDomainID(e.executionInfo.DomainID))
e.logger.Warn(msg, tags...)
}
func (e *mutableStateBuilder) logError(msg string, tags ...tag.Tag) {
tags = append(tags, tag.WorkflowID(e.executionInfo.WorkflowID))
tags = append(tags, tag.WorkflowRunID(e.executionInfo.RunID))
tags = append(tags, tag.WorkflowDomainID(e.executionInfo.DomainID))
e.logger.Error(msg, tags...)
}
Loading

0 comments on commit 5e0a62b

Please sign in to comment.