Skip to content

Commit

Permalink
bugfix: update mutable state to handle duplicated update activity / u…
Browse files Browse the repository at this point in the history
…ser timer for better Cassandra performance (cadence-workflow#652)
  • Loading branch information
wxing1292 authored Apr 24, 2018
1 parent c3e240e commit 021330b
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 73 deletions.
201 changes: 134 additions & 67 deletions service/history/mutableStateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,26 +41,26 @@ const (

type (
mutableStateBuilder struct {
pendingActivityInfoIDs map[int64]*persistence.ActivityInfo // Schedule Event ID -> Activity Info.
pendingActivityInfoByActivityID map[string]int64 // Activity ID -> Schedule Event ID of the activity.
updateActivityInfos []*persistence.ActivityInfo // Modified activities from last update.
deleteActivityInfos []int64 // Deleted activities from last update.
pendingActivityInfoIDs map[int64]*persistence.ActivityInfo // Schedule Event ID -> Activity Info.
pendingActivityInfoByActivityID map[string]int64 // Activity ID -> Schedule Event ID of the activity.
updateActivityInfos map[*persistence.ActivityInfo]struct{} // Modified activities from last update.
deleteActivityInfos map[int64]struct{} // Deleted activities from last update.

pendingTimerInfoIDs map[string]*persistence.TimerInfo // User Timer ID -> Timer Info.
updateTimerInfos []*persistence.TimerInfo // Modified timers from last update.
deleteTimerInfos []string // Deleted timers from last update.
pendingTimerInfoIDs map[string]*persistence.TimerInfo // User Timer ID -> Timer Info.
updateTimerInfos map[*persistence.TimerInfo]struct{} // Modified timers from last update.
deleteTimerInfos map[string]struct{} // Deleted timers from last update.

pendingChildExecutionInfoIDs map[int64]*persistence.ChildExecutionInfo // Initiated Event ID -> Child Execution Info
updateChildExecutionInfos []*persistence.ChildExecutionInfo // Modified ChildExecution Infos since last update
deleteChildExecutionInfo *int64 // Deleted ChildExecution Info since last update
pendingChildExecutionInfoIDs map[int64]*persistence.ChildExecutionInfo // Initiated Event ID -> Child Execution Info
updateChildExecutionInfos map[*persistence.ChildExecutionInfo]struct{} // Modified ChildExecution Infos since last update
deleteChildExecutionInfo *int64 // Deleted ChildExecution Info since last update

pendingRequestCancelInfoIDs map[int64]*persistence.RequestCancelInfo // Initiated Event ID -> RequestCancelInfo
updateRequestCancelInfos []*persistence.RequestCancelInfo // Modified RequestCancel Infos since last update, for persistence update
deleteRequestCancelInfo *int64 // Deleted RequestCancel Info since last update, for persistence update
pendingRequestCancelInfoIDs map[int64]*persistence.RequestCancelInfo // Initiated Event ID -> RequestCancelInfo
updateRequestCancelInfos map[*persistence.RequestCancelInfo]struct{} // Modified RequestCancel Infos since last update, for persistence update
deleteRequestCancelInfo *int64 // Deleted RequestCancel Info since last update, for persistence update

pendingSignalInfoIDs map[int64]*persistence.SignalInfo // Initiated Event ID -> SignalInfo
updateSignalInfos []*persistence.SignalInfo // Modified SignalInfo since last update
deleteSignalInfo *int64 // Deleted SignalInfo since last update
pendingSignalInfoIDs map[int64]*persistence.SignalInfo // Initiated Event ID -> SignalInfo
updateSignalInfos map[*persistence.SignalInfo]struct{} // Modified SignalInfo since last update
deleteSignalInfo *int64 // Deleted SignalInfo since last update

pendingSignalRequestedIDs map[string]struct{} // Set of signaled requestIds
updateSignalRequestedIDs map[string]struct{} // Set of signaled requestIds since last update
Expand Down Expand Up @@ -112,24 +112,34 @@ type (

func newMutableStateBuilder(config *Config, logger bark.Logger) *mutableStateBuilder {
s := &mutableStateBuilder{
updateActivityInfos: []*persistence.ActivityInfo{},
updateActivityInfos: make(map[*persistence.ActivityInfo]struct{}),
pendingActivityInfoIDs: make(map[int64]*persistence.ActivityInfo),
pendingActivityInfoByActivityID: make(map[string]int64),
deleteActivityInfos: []int64{},
pendingTimerInfoIDs: make(map[string]*persistence.TimerInfo),
updateTimerInfos: []*persistence.TimerInfo{},
deleteTimerInfos: []string{},
updateChildExecutionInfos: []*persistence.ChildExecutionInfo{},
pendingChildExecutionInfoIDs: make(map[int64]*persistence.ChildExecutionInfo),
updateRequestCancelInfos: []*persistence.RequestCancelInfo{},
pendingRequestCancelInfoIDs: make(map[int64]*persistence.RequestCancelInfo),
updateSignalInfos: []*persistence.SignalInfo{},
pendingSignalInfoIDs: make(map[int64]*persistence.SignalInfo),
updateSignalRequestedIDs: make(map[string]struct{}),
pendingSignalRequestedIDs: make(map[string]struct{}),
eventSerializer: newJSONHistoryEventSerializer(),
config: config,
logger: logger,
deleteActivityInfos: make(map[int64]struct{}),

pendingTimerInfoIDs: make(map[string]*persistence.TimerInfo),
updateTimerInfos: make(map[*persistence.TimerInfo]struct{}),
deleteTimerInfos: make(map[string]struct{}),

updateChildExecutionInfos: make(map[*persistence.ChildExecutionInfo]struct{}),
pendingChildExecutionInfoIDs: make(map[int64]*persistence.ChildExecutionInfo),
deleteChildExecutionInfo: nil,

updateRequestCancelInfos: make(map[*persistence.RequestCancelInfo]struct{}),
pendingRequestCancelInfoIDs: make(map[int64]*persistence.RequestCancelInfo),
deleteRequestCancelInfo: nil,

updateSignalInfos: make(map[*persistence.SignalInfo]struct{}),
pendingSignalInfoIDs: make(map[int64]*persistence.SignalInfo),
deleteSignalInfo: nil,

updateSignalRequestedIDs: make(map[string]struct{}),
pendingSignalRequestedIDs: make(map[string]struct{}),
deleteSignalRequestedID: "",

eventSerializer: newJSONHistoryEventSerializer(),
config: config,
logger: logger,
}
s.executionInfo = &persistence.WorkflowExecutionInfo{
NextEventID: firstEventID,
Expand Down Expand Up @@ -244,17 +254,17 @@ func (e *mutableStateBuilder) CloseUpdateSession() (*mutableStateSessionUpdates,

updates := &mutableStateSessionUpdates{
newEventsBuilder: e.hBuilder,
updateActivityInfos: e.updateActivityInfos,
deleteActivityInfos: e.deleteActivityInfos,
updateTimerInfos: e.updateTimerInfos,
deleteTimerInfos: e.deleteTimerInfos,
updateChildExecutionInfos: e.updateChildExecutionInfos,
updateActivityInfos: convertUpdateActivityInfos(e.updateActivityInfos),
deleteActivityInfos: convertDeleteActivityInfos(e.deleteActivityInfos),
updateTimerInfos: convertUpdateTimerInfos(e.updateTimerInfos),
deleteTimerInfos: convertDeleteTimerInfos(e.deleteTimerInfos),
updateChildExecutionInfos: convertUpdateChildExecutionInfos(e.updateChildExecutionInfos),
deleteChildExecutionInfo: e.deleteChildExecutionInfo,
updateCancelExecutionInfos: e.updateRequestCancelInfos,
updateCancelExecutionInfos: convertUpdateRequestCancelInfos(e.updateRequestCancelInfos),
deleteCancelExecutionInfo: e.deleteRequestCancelInfo,
updateSignalInfos: e.updateSignalInfos,
updateSignalInfos: convertUpdateSignalInfos(e.updateSignalInfos),
deleteSignalInfo: e.deleteSignalInfo,
updateSignalRequestedIDs: getSignalRequestedIDs(e.updateSignalRequestedIDs),
updateSignalRequestedIDs: convertSignalRequestedIDs(e.updateSignalRequestedIDs),
deleteSignalRequestedID: e.deleteSignalRequestedID,
continueAsNew: e.continueAsNew,
newBufferedEvents: e.updateBufferedEvents,
Expand All @@ -263,15 +273,15 @@ func (e *mutableStateBuilder) CloseUpdateSession() (*mutableStateSessionUpdates,

// Clear all updates to prepare for the next session
e.hBuilder = newHistoryBuilder(e, e.logger)
e.updateActivityInfos = []*persistence.ActivityInfo{}
e.deleteActivityInfos = []int64{}
e.updateTimerInfos = []*persistence.TimerInfo{}
e.deleteTimerInfos = []string{}
e.updateChildExecutionInfos = []*persistence.ChildExecutionInfo{}
e.updateActivityInfos = make(map[*persistence.ActivityInfo]struct{})
e.deleteActivityInfos = make(map[int64]struct{})
e.updateTimerInfos = make(map[*persistence.TimerInfo]struct{})
e.deleteTimerInfos = make(map[string]struct{})
e.updateChildExecutionInfos = make(map[*persistence.ChildExecutionInfo]struct{})
e.deleteChildExecutionInfo = nil
e.updateRequestCancelInfos = []*persistence.RequestCancelInfo{}
e.updateRequestCancelInfos = make(map[*persistence.RequestCancelInfo]struct{})
e.deleteRequestCancelInfo = nil
e.updateSignalInfos = []*persistence.SignalInfo{}
e.updateSignalInfos = make(map[*persistence.SignalInfo]struct{})
e.deleteSignalInfo = nil
e.updateSignalRequestedIDs = make(map[string]struct{})
e.deleteSignalRequestedID = ""
Expand All @@ -294,12 +304,68 @@ func (e *mutableStateBuilder) createReplicationTask() *persistence.HistoryReplic
}
}

func getSignalRequestedIDs(signalReqIDs map[string]struct{}) []string {
var result []string
for k := range signalReqIDs {
result = append(result, k)
func convertUpdateActivityInfos(inputs map[*persistence.ActivityInfo]struct{}) []*persistence.ActivityInfo {
outputs := []*persistence.ActivityInfo{}
for item := range inputs {
outputs = append(outputs, item)
}
return outputs
}

func convertDeleteActivityInfos(inputs map[int64]struct{}) []int64 {
outputs := []int64{}
for item := range inputs {
outputs = append(outputs, item)
}
return outputs
}

func convertUpdateTimerInfos(inputs map[*persistence.TimerInfo]struct{}) []*persistence.TimerInfo {
outputs := []*persistence.TimerInfo{}
for item := range inputs {
outputs = append(outputs, item)
}
return outputs
}

func convertDeleteTimerInfos(inputs map[string]struct{}) []string {
outputs := []string{}
for item := range inputs {
outputs = append(outputs, item)
}
return outputs
}

func convertUpdateChildExecutionInfos(inputs map[*persistence.ChildExecutionInfo]struct{}) []*persistence.ChildExecutionInfo {
outputs := []*persistence.ChildExecutionInfo{}
for item := range inputs {
outputs = append(outputs, item)
}
return outputs
}

func convertUpdateRequestCancelInfos(inputs map[*persistence.RequestCancelInfo]struct{}) []*persistence.RequestCancelInfo {
outputs := []*persistence.RequestCancelInfo{}
for item := range inputs {
outputs = append(outputs, item)
}
return result
return outputs
}

func convertUpdateSignalInfos(inputs map[*persistence.SignalInfo]struct{}) []*persistence.SignalInfo {
outputs := []*persistence.SignalInfo{}
for item := range inputs {
outputs = append(outputs, item)
}
return outputs
}

func convertSignalRequestedIDs(inputs map[string]struct{}) []string {
outputs := []string{}
for item := range inputs {
outputs = append(outputs, item)
}
return outputs
}

func (e *mutableStateBuilder) assignEventIDToBufferedEvents() {
Expand All @@ -322,15 +388,15 @@ func (e *mutableStateBuilder) assignEventIDToBufferedEvents() {
scheduledIDToStartedID[scheduledID] = eventID
if ai, ok := e.GetActivityInfo(scheduledID); ok {
ai.StartedID = eventID
e.updateActivityInfos = append(e.updateActivityInfos, ai)
e.updateActivityInfos[ai] = struct{}{}
}
case workflow.EventTypeChildWorkflowExecutionStarted:
attributes := event.ChildWorkflowExecutionStartedEventAttributes
initiatedID := attributes.GetInitiatedEventId()
scheduledIDToStartedID[initiatedID] = eventID
if ci, ok := e.GetChildExecutionInfo(initiatedID); ok {
ci.StartedID = eventID
e.updateChildExecutionInfos = append(e.updateChildExecutionInfos, ci)
e.updateChildExecutionInfos[ci] = struct{}{}
}
case workflow.EventTypeActivityTaskCompleted:
attributes := event.ActivityTaskCompletedEventAttributes
Expand Down Expand Up @@ -618,7 +684,7 @@ func (e *mutableStateBuilder) updateActivityProgress(ai *persistence.ActivityInf
request *workflow.RecordActivityTaskHeartbeatRequest) {
ai.Details = request.Details
ai.LastHeartBeatUpdatedTime = time.Now()
e.updateActivityInfos = append(e.updateActivityInfos, ai)
e.updateActivityInfos[ai] = struct{}{}
}

// UpdateActivity updates an activity
Expand All @@ -627,7 +693,7 @@ func (e *mutableStateBuilder) UpdateActivity(ai *persistence.ActivityInfo) error
if !ok {
return fmt.Errorf("Unable to find activity with schedule event id: %v in mutable state", ai.ScheduleID)
}
e.updateActivityInfos = append(e.updateActivityInfos, ai)
e.updateActivityInfos[ai] = struct{}{}
return nil
}

Expand All @@ -649,7 +715,7 @@ func (e *mutableStateBuilder) DeleteActivity(scheduleEventID int64) error {
}
delete(e.pendingActivityInfoByActivityID, a.ActivityID)

e.deleteActivityInfos = append(e.deleteActivityInfos, scheduleEventID)
e.deleteActivityInfos[scheduleEventID] = struct{}{}
return nil
}

Expand All @@ -662,13 +728,13 @@ func (e *mutableStateBuilder) GetUserTimer(timerID string) (bool, *persistence.T
// UpdateUserTimer updates the user timer in progress.
func (e *mutableStateBuilder) UpdateUserTimer(timerID string, ti *persistence.TimerInfo) {
e.pendingTimerInfoIDs[timerID] = ti
e.updateTimerInfos = append(e.updateTimerInfos, ti)
e.updateTimerInfos[ti] = struct{}{}
}

// DeleteUserTimer deletes an user timer.
func (e *mutableStateBuilder) DeleteUserTimer(timerID string) {
delete(e.pendingTimerInfoIDs, timerID)
e.deleteTimerInfos = append(e.deleteTimerInfos, timerID)
e.deleteTimerInfos[timerID] = struct{}{}
}

// GetPendingDecision returns details about the in-progress decision task
Expand Down Expand Up @@ -1213,7 +1279,7 @@ func (e *mutableStateBuilder) ReplicateActivityTaskScheduledEvent(

e.pendingActivityInfoIDs[scheduleEventID] = ai
e.pendingActivityInfoByActivityID[ai.ActivityID] = scheduleEventID
e.updateActivityInfos = append(e.updateActivityInfos, ai)
e.updateActivityInfos[ai] = struct{}{}

return ai
}
Expand All @@ -1240,7 +1306,7 @@ func (e *mutableStateBuilder) ReplicateActivityTaskStartedEvent(event *workflow.
ai.StartedID = event.GetEventId()
ai.RequestID = attributes.GetRequestId()
ai.StartedTime = time.Unix(0, event.GetTimestamp())
e.updateActivityInfos = append(e.updateActivityInfos, ai)
e.updateActivityInfos[ai] = struct{}{}
}

func (e *mutableStateBuilder) AddActivityTaskCompletedEvent(scheduleEventID, startedEventID int64,
Expand Down Expand Up @@ -1340,8 +1406,9 @@ func (e *mutableStateBuilder) ReplicateActivityTaskCancelRequestedEvent(event *w
// - The activity might not be heartbeat'ing, but the activity can still call RecordActivityHeartBeat()
// to see cancellation while reporting progress of the activity.
ai.CancelRequested = true

ai.CancelRequestID = event.GetEventId()
e.updateActivityInfos = append(e.updateActivityInfos, ai)
e.updateActivityInfos[ai] = struct{}{}
}

func (e *mutableStateBuilder) AddRequestCancelActivityTaskFailedEvent(decisionCompletedEventID int64,
Expand Down Expand Up @@ -1503,7 +1570,7 @@ func (e *mutableStateBuilder) ReplicateRequestCancelExternalWorkflowExecutionIni
}

e.pendingRequestCancelInfoIDs[initiatedEventID] = ri
e.updateRequestCancelInfos = append(e.updateRequestCancelInfos, ri)
e.updateRequestCancelInfos[ri] = struct{}{}

return ri
}
Expand Down Expand Up @@ -1579,7 +1646,7 @@ func (e *mutableStateBuilder) ReplicateSignalExternalWorkflowExecutionInitiatedE
}

e.pendingSignalInfoIDs[initiatedEventID] = ri
e.updateSignalInfos = append(e.updateSignalInfos, ri)
e.updateSignalInfos[ri] = struct{}{}
}

func (e *mutableStateBuilder) AddExternalWorkflowExecutionSignaled(initiatedID int64,
Expand Down Expand Up @@ -1657,7 +1724,7 @@ func (e *mutableStateBuilder) ReplicateTimerStartedEvent(event *workflow.History
}

e.pendingTimerInfoIDs[timerID] = ti
e.updateTimerInfos = append(e.updateTimerInfos, ti)
e.updateTimerInfos[ti] = struct{}{}

return ti
}
Expand Down Expand Up @@ -1884,7 +1951,7 @@ func (e *mutableStateBuilder) ReplicateStartChildWorkflowExecutionInitiatedEvent
}

e.pendingChildExecutionInfoIDs[initiatedEventID] = ci
e.updateChildExecutionInfos = append(e.updateChildExecutionInfos, ci)
e.updateChildExecutionInfos[ci] = struct{}{}

return ci
}
Expand Down Expand Up @@ -1918,7 +1985,7 @@ func (e *mutableStateBuilder) ReplicateChildWorkflowExecutionStartedEvent(event

ci.StartedID = event.GetEventId()
ci.StartedEvent = startedEvent
e.updateChildExecutionInfos = append(e.updateChildExecutionInfos, ci)
e.updateChildExecutionInfos[ci] = struct{}{}

return nil
}
Expand Down
Loading

0 comments on commit 021330b

Please sign in to comment.