Skip to content

Commit

Permalink
Use history event as signal payload data (temporalio#2254)
Browse files Browse the repository at this point in the history
* Use history event as signal payload data reducing mutable state size
NOTE: this PR is part 1 due to forward / backward compatibility
  • Loading branch information
wxing1292 authored Dec 3, 2021
1 parent 822eed7 commit 7257306
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 21 deletions.
16 changes: 12 additions & 4 deletions service/history/transferQueueActiveTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,11 @@ func (t *transferQueueActiveTaskExecutor) processSignalExecution(
return err
}

initiatedEvent, err := mutableState.GetSignalExternalInitiatedEvent(task.InitiatedID)
if err != nil {
return err
}

targetNamespaceEntry, err := t.shard.GetNamespaceRegistry().GetNamespaceByID(namespace.ID(task.TargetNamespaceID))
if err != nil {
return err
Expand All @@ -500,10 +505,12 @@ func (t *transferQueueActiveTaskExecutor) processSignalExecution(
)
}

attributes := initiatedEvent.GetSignalExternalWorkflowExecutionInitiatedEventAttributes()
if err = t.signalExternalExecutionWithRetry(
task,
targetNamespace,
signalInfo,
attributes,
); err != nil {
t.logger.Debug("Failed to signal external workflow execution", tag.Error(err))

Expand Down Expand Up @@ -1106,6 +1113,7 @@ func (t *transferQueueActiveTaskExecutor) signalExternalExecutionWithRetry(
task *tasks.SignalExecutionTask,
targetNamespace namespace.Name,
signalInfo *persistencespb.SignalInfo,
attributes *historypb.SignalExternalWorkflowExecutionInitiatedEventAttributes,
) error {

request := &historyservice.SignalWorkflowExecutionRequest{
Expand All @@ -1117,12 +1125,12 @@ func (t *transferQueueActiveTaskExecutor) signalExternalExecutionWithRetry(
RunId: task.TargetRunID,
},
Identity: consts.IdentityHistoryService,
SignalName: signalInfo.Name,
Input: signalInfo.Input,
SignalName: attributes.SignalName,
Input: attributes.Input,
// Use same request ID to deduplicate SignalWorkflowExecution calls
RequestId: signalInfo.GetRequestId(),
Control: signalInfo.Control,
Header: signalInfo.Header,
Control: attributes.Control,
Header: attributes.Header,
},
ExternalWorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: task.WorkflowID,
Expand Down
1 change: 1 addition & 0 deletions service/history/workflow/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ type (
GetWorkflowTaskInfo(int64) (*WorkflowTaskInfo, bool)
GetNamespaceEntry() *namespace.Namespace
GetStartEvent() (*historypb.HistoryEvent, error)
GetSignalExternalInitiatedEvent(int64) (*historypb.HistoryEvent, error)
GetFirstRunID() (string, error)
GetCurrentBranchToken() ([]byte, error)
GetCurrentVersion() int64
Expand Down
65 changes: 48 additions & 17 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ var (
ErrMissingActivityScheduledEvent = serviceerror.NewInternal("unable to get activity scheduled event")
// ErrMissingChildWorkflowInitiatedEvent indicates missing child workflow initiated event
ErrMissingChildWorkflowInitiatedEvent = serviceerror.NewInternal("unable to get child workflow initiated event")
// ErrMissingSignalInitiatedEvent indicates missing workflow signal initiated event
ErrMissingSignalInitiatedEvent = serviceerror.NewInternal("unable to get signal initiated event")
)

type (
Expand Down Expand Up @@ -594,7 +596,7 @@ func (e *MutableStateImpl) GetActivityScheduledEvent(
if err != nil {
return nil, err
}
scheduledEvent, err := e.eventsCache.GetEvent(
event, err := e.eventsCache.GetEvent(
events.EventKey{
NamespaceID: namespace.ID(e.executionInfo.NamespaceId),
WorkflowID: e.executionInfo.WorkflowId,
Expand All @@ -611,7 +613,7 @@ func (e *MutableStateImpl) GetActivityScheduledEvent(
// which can cause task processing side to fail silently
return nil, ErrMissingActivityScheduledEvent
}
return scheduledEvent, nil
return event, nil
}

// GetActivityInfo gives details about an activity that is currently in progress.
Expand Down Expand Up @@ -669,7 +671,7 @@ func (e *MutableStateImpl) GetChildExecutionInitiatedEvent(
if err != nil {
return nil, err
}
initiatedEvent, err := e.eventsCache.GetEvent(
event, err := e.eventsCache.GetEvent(
events.EventKey{
NamespaceID: namespace.ID(e.executionInfo.NamespaceId),
WorkflowID: e.executionInfo.WorkflowId,
Expand All @@ -686,7 +688,7 @@ func (e *MutableStateImpl) GetChildExecutionInitiatedEvent(
// which can cause task processing side to fail silently
return nil, ErrMissingChildWorkflowInitiatedEvent
}
return initiatedEvent, nil
return event, nil
}

// GetRequestCancelInfo gives details about a request cancellation that is currently in progress.
Expand Down Expand Up @@ -728,7 +730,7 @@ func (e *MutableStateImpl) GetCronBackoffDuration() time.Duration {
return backoff.GetBackoffForNextSchedule(e.executionInfo.CronSchedule, executionTime, e.timeSource.Now())
}

// GetSignalInfo get details about a signal request that is currently in progress.
// GetSignalInfo get the details about a signal request that is currently in progress.
func (e *MutableStateImpl) GetSignalInfo(
initiatedEventID int64,
) (*persistencespb.SignalInfo, bool) {
Expand All @@ -737,6 +739,39 @@ func (e *MutableStateImpl) GetSignalInfo(
return ri, ok
}

// GetSignalExternalInitiatedEvent get the details about signal external workflow
func (e *MutableStateImpl) GetSignalExternalInitiatedEvent(
initiatedEventID int64,
) (*historypb.HistoryEvent, error) {
si, ok := e.pendingSignalInfoIDs[initiatedEventID]
if !ok {
return nil, ErrMissingSignalInfo
}

currentBranchToken, version, err := e.getCurrentBranchTokenAndEventVersion(si.InitiatedId)
if err != nil {
return nil, err
}
event, err := e.eventsCache.GetEvent(
events.EventKey{
NamespaceID: namespace.ID(e.executionInfo.NamespaceId),
WorkflowID: e.executionInfo.WorkflowId,
RunID: e.executionState.RunId,
EventID: si.InitiatedId,
Version: version,
},
si.InitiatedEventBatchId,
currentBranchToken,
)
if err != nil {
// do not return the original error
// since original error can be of type entity not exists
// which can cause task processing side to fail silently
return nil, ErrMissingSignalInitiatedEvent
}
return event, nil
}

// GetCompletionEvent retrieves the workflow completion event from mutable state
func (e *MutableStateImpl) GetCompletionEvent() (*historypb.HistoryEvent, error) {
if e.executionState.State != enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED {
Expand All @@ -752,7 +787,7 @@ func (e *MutableStateImpl) GetCompletionEvent() (*historypb.HistoryEvent, error)
return nil, err
}

completionEvent, err := e.eventsCache.GetEvent(
event, err := e.eventsCache.GetEvent(
events.EventKey{
NamespaceID: namespace.ID(e.executionInfo.NamespaceId),
WorkflowID: e.executionInfo.WorkflowId,
Expand All @@ -769,8 +804,7 @@ func (e *MutableStateImpl) GetCompletionEvent() (*historypb.HistoryEvent, error)
// which can cause task processing side to fail silently
return nil, ErrMissingWorkflowCompletionEvent
}

return completionEvent, nil
return event, nil
}

// GetStartEvent retrieves the workflow start event from mutable state
Expand All @@ -785,7 +819,7 @@ func (e *MutableStateImpl) GetStartEvent() (*historypb.HistoryEvent, error) {
return nil, err
}

startEvent, err := e.eventsCache.GetEvent(
event, err := e.eventsCache.GetEvent(
events.EventKey{
NamespaceID: namespace.ID(e.executionInfo.NamespaceId),
WorkflowID: e.executionInfo.WorkflowId,
Expand All @@ -802,7 +836,7 @@ func (e *MutableStateImpl) GetStartEvent() (*historypb.HistoryEvent, error) {
// which can cause task processing side to fail silently
return nil, ErrMissingWorkflowStartEvent
}
return startEvent, nil
return event, nil
}

func (e *MutableStateImpl) GetFirstRunID() (string, error) {
Expand Down Expand Up @@ -1785,10 +1819,6 @@ func (e *MutableStateImpl) AddActivityTaskScheduledEvent(
}

event := e.hBuilder.AddActivityTaskScheduledEvent(workflowTaskCompletedEventID, command)

// Write the event to cache only on active cluster for processing on activity started or retried
e.writeEventToCache(event)

ai, err := e.ReplicateActivityTaskScheduledEvent(workflowTaskCompletedEventID, event)
// TODO merge active & passive task generation
if err := e.taskGenerator.GenerateActivityTransferTasks(
Expand Down Expand Up @@ -1858,6 +1888,7 @@ func (e *MutableStateImpl) ReplicateActivityTaskScheduledEvent(
e.pendingActivityIDToEventID[ai.ActivityId] = ai.ScheduleId
e.updateActivityInfos[ai.ScheduleId] = ai

e.writeEventToCache(event)
return ai, nil
}

Expand Down Expand Up @@ -2621,6 +2652,8 @@ func (e *MutableStateImpl) ReplicateSignalExternalWorkflowExecutionInitiatedEven

e.pendingSignalInfoIDs[si.InitiatedId] = si
e.updateSignalInfos[si.InitiatedId] = si

e.writeEventToCache(event)
return si, nil
}

Expand Down Expand Up @@ -3125,9 +3158,6 @@ func (e *MutableStateImpl) AddStartChildWorkflowExecutionInitiatedEvent(
}

event := e.hBuilder.AddStartChildWorkflowExecutionInitiatedEvent(workflowTaskCompletedEventID, command)
// Write the event to cache only on active cluster
e.writeEventToCache(event)

ci, err := e.ReplicateStartChildWorkflowExecutionInitiatedEvent(workflowTaskCompletedEventID, event, createRequestID)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -3165,6 +3195,7 @@ func (e *MutableStateImpl) ReplicateStartChildWorkflowExecutionInitiatedEvent(
e.pendingChildExecutionInfoIDs[ci.InitiatedId] = ci
e.updateChildExecutionInfos[ci.InitiatedId] = ci

e.writeEventToCache(event)
return ci, nil
}

Expand Down
15 changes: 15 additions & 0 deletions service/history/workflow/mutable_state_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 7257306

Please sign in to comment.