Skip to content

Commit

Permalink
Enforce persistence context timeout in application layer: Part 4 (cad…
Browse files Browse the repository at this point in the history
…ence-workflow#3643)

Enforce persistence context timeout for events cache
  • Loading branch information
yycptt authored Oct 14, 2020
1 parent e1126ac commit ba0eacb
Show file tree
Hide file tree
Showing 21 changed files with 210 additions and 131 deletions.
3 changes: 2 additions & 1 deletion service/history/decisionHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (handler *decisionHandlerImpl) handleDecisionTaskScheduled(
}, nil
}

startEvent, err := mutableState.GetStartEvent()
startEvent, err := mutableState.GetStartEvent(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -424,6 +424,7 @@ Update_History_Loop:
)

if decisionResults, err = decisionTaskHandler.handleDecisions(
ctx,
request.ExecutionContext,
request.Decisions,
); err != nil {
Expand Down
68 changes: 47 additions & 21 deletions service/history/decisionTaskHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package history

import (
"context"
"fmt"

"github.com/pborman/uuid"
Expand Down Expand Up @@ -109,6 +110,7 @@ func newDecisionTaskHandler(
}

func (handler *decisionTaskHandlerImpl) handleDecisions(
ctx context.Context,
executionContext []byte,
decisions []*workflow.Decision,
) ([]*decisionResult, error) {
Expand All @@ -122,7 +124,7 @@ func (handler *decisionTaskHandlerImpl) handleDecisions(
var results []*decisionResult
for _, decision := range decisions {

result, err := handler.handleDecisionWithResult(decision)
result, err := handler.handleDecisionWithResult(ctx, decision)
if err != nil || handler.stopProcessing {
return nil, err
} else if result != nil {
Expand All @@ -134,60 +136,67 @@ func (handler *decisionTaskHandlerImpl) handleDecisions(
return results, nil
}

func (handler *decisionTaskHandlerImpl) handleDecisionWithResult(decision *workflow.Decision) (*decisionResult, error) {
func (handler *decisionTaskHandlerImpl) handleDecisionWithResult(
ctx context.Context,
decision *workflow.Decision,
) (*decisionResult, error) {
switch decision.GetDecisionType() {
case workflow.DecisionTypeScheduleActivityTask:
return handler.handleDecisionScheduleActivity(decision.ScheduleActivityTaskDecisionAttributes)
return handler.handleDecisionScheduleActivity(ctx, decision.ScheduleActivityTaskDecisionAttributes)
default:
return nil, handler.handleDecision(decision)
return nil, handler.handleDecision(ctx, decision)
}
}

func (handler *decisionTaskHandlerImpl) handleDecision(decision *workflow.Decision) error {
func (handler *decisionTaskHandlerImpl) handleDecision(
ctx context.Context,
decision *workflow.Decision,
) error {
switch decision.GetDecisionType() {

case workflow.DecisionTypeCompleteWorkflowExecution:
return handler.handleDecisionCompleteWorkflow(decision.CompleteWorkflowExecutionDecisionAttributes)
return handler.handleDecisionCompleteWorkflow(ctx, decision.CompleteWorkflowExecutionDecisionAttributes)

case workflow.DecisionTypeFailWorkflowExecution:
return handler.handleDecisionFailWorkflow(decision.FailWorkflowExecutionDecisionAttributes)
return handler.handleDecisionFailWorkflow(ctx, decision.FailWorkflowExecutionDecisionAttributes)

case workflow.DecisionTypeCancelWorkflowExecution:
return handler.handleDecisionCancelWorkflow(decision.CancelWorkflowExecutionDecisionAttributes)
return handler.handleDecisionCancelWorkflow(ctx, decision.CancelWorkflowExecutionDecisionAttributes)

case workflow.DecisionTypeStartTimer:
return handler.handleDecisionStartTimer(decision.StartTimerDecisionAttributes)
return handler.handleDecisionStartTimer(ctx, decision.StartTimerDecisionAttributes)

case workflow.DecisionTypeRequestCancelActivityTask:
return handler.handleDecisionRequestCancelActivity(decision.RequestCancelActivityTaskDecisionAttributes)
return handler.handleDecisionRequestCancelActivity(ctx, decision.RequestCancelActivityTaskDecisionAttributes)

case workflow.DecisionTypeCancelTimer:
return handler.handleDecisionCancelTimer(decision.CancelTimerDecisionAttributes)
return handler.handleDecisionCancelTimer(ctx, decision.CancelTimerDecisionAttributes)

case workflow.DecisionTypeRecordMarker:
return handler.handleDecisionRecordMarker(decision.RecordMarkerDecisionAttributes)
return handler.handleDecisionRecordMarker(ctx, decision.RecordMarkerDecisionAttributes)

case workflow.DecisionTypeRequestCancelExternalWorkflowExecution:
return handler.handleDecisionRequestCancelExternalWorkflow(decision.RequestCancelExternalWorkflowExecutionDecisionAttributes)
return handler.handleDecisionRequestCancelExternalWorkflow(ctx, decision.RequestCancelExternalWorkflowExecutionDecisionAttributes)

case workflow.DecisionTypeSignalExternalWorkflowExecution:
return handler.handleDecisionSignalExternalWorkflow(decision.SignalExternalWorkflowExecutionDecisionAttributes)
return handler.handleDecisionSignalExternalWorkflow(ctx, decision.SignalExternalWorkflowExecutionDecisionAttributes)

case workflow.DecisionTypeContinueAsNewWorkflowExecution:
return handler.handleDecisionContinueAsNewWorkflow(decision.ContinueAsNewWorkflowExecutionDecisionAttributes)
return handler.handleDecisionContinueAsNewWorkflow(ctx, decision.ContinueAsNewWorkflowExecutionDecisionAttributes)

case workflow.DecisionTypeStartChildWorkflowExecution:
return handler.handleDecisionStartChildWorkflow(decision.StartChildWorkflowExecutionDecisionAttributes)
return handler.handleDecisionStartChildWorkflow(ctx, decision.StartChildWorkflowExecutionDecisionAttributes)

case workflow.DecisionTypeUpsertWorkflowSearchAttributes:
return handler.handleDecisionUpsertWorkflowSearchAttributes(decision.UpsertWorkflowSearchAttributesDecisionAttributes)
return handler.handleDecisionUpsertWorkflowSearchAttributes(ctx, decision.UpsertWorkflowSearchAttributesDecisionAttributes)

default:
return &workflow.BadRequestError{Message: fmt.Sprintf("Unknown decision type: %v", decision.GetDecisionType())}
}
}

func (handler *decisionTaskHandlerImpl) handleDecisionScheduleActivity(
ctx context.Context,
attr *workflow.ScheduleActivityTaskDecisionAttributes,
) (*decisionResult, error) {

Expand Down Expand Up @@ -253,6 +262,7 @@ func (handler *decisionTaskHandlerImpl) handleDecisionScheduleActivity(
}

func (handler *decisionTaskHandlerImpl) handleDecisionRequestCancelActivity(
ctx context.Context,
attr *workflow.RequestCancelActivityTaskDecisionAttributes,
) error {

Expand Down Expand Up @@ -307,6 +317,7 @@ func (handler *decisionTaskHandlerImpl) handleDecisionRequestCancelActivity(
}

func (handler *decisionTaskHandlerImpl) handleDecisionStartTimer(
ctx context.Context,
attr *workflow.StartTimerDecisionAttributes,
) error {

Expand Down Expand Up @@ -338,6 +349,7 @@ func (handler *decisionTaskHandlerImpl) handleDecisionStartTimer(
}

func (handler *decisionTaskHandlerImpl) handleDecisionCompleteWorkflow(
ctx context.Context,
attr *workflow.CompleteWorkflowExecutionDecisionAttributes,
) error {

Expand Down Expand Up @@ -384,7 +396,7 @@ func (handler *decisionTaskHandlerImpl) handleDecisionCompleteWorkflow(
}

// check if this is a cron workflow
cronBackoff, err := handler.mutableState.GetCronBackoffDuration()
cronBackoff, err := handler.mutableState.GetCronBackoffDuration(ctx)
if err != nil {
handler.stopProcessing = true
return err
Expand All @@ -398,12 +410,13 @@ func (handler *decisionTaskHandlerImpl) handleDecisionCompleteWorkflow(
}

// this is a cron workflow
startEvent, err := handler.mutableState.GetStartEvent()
startEvent, err := handler.mutableState.GetStartEvent(ctx)
if err != nil {
return err
}
startAttributes := startEvent.WorkflowExecutionStartedEventAttributes
return handler.retryCronContinueAsNew(
ctx,
startAttributes,
int32(cronBackoff.Seconds()),
workflow.ContinueAsNewInitiatorCronSchedule.Ptr(),
Expand All @@ -414,6 +427,7 @@ func (handler *decisionTaskHandlerImpl) handleDecisionCompleteWorkflow(
}

func (handler *decisionTaskHandlerImpl) handleDecisionFailWorkflow(
ctx context.Context,
attr *workflow.FailWorkflowExecutionDecisionAttributes,
) error {

Expand Down Expand Up @@ -465,7 +479,7 @@ func (handler *decisionTaskHandlerImpl) handleDecisionFailWorkflow(
// first check the backoff retry
if backoffInterval == backoff.NoBackoff {
// if no backoff retry, set the backoffInterval using cron schedule
backoffInterval, err = handler.mutableState.GetCronBackoffDuration()
backoffInterval, err = handler.mutableState.GetCronBackoffDuration(ctx)
if err != nil {
handler.stopProcessing = true
return err
Expand All @@ -482,12 +496,13 @@ func (handler *decisionTaskHandlerImpl) handleDecisionFailWorkflow(
}

// this is a cron / backoff workflow
startEvent, err := handler.mutableState.GetStartEvent()
startEvent, err := handler.mutableState.GetStartEvent(ctx)
if err != nil {
return err
}
startAttributes := startEvent.WorkflowExecutionStartedEventAttributes
return handler.retryCronContinueAsNew(
ctx,
startAttributes,
int32(backoffInterval.Seconds()),
continueAsNewInitiator.Ptr(),
Expand All @@ -498,6 +513,7 @@ func (handler *decisionTaskHandlerImpl) handleDecisionFailWorkflow(
}

func (handler *decisionTaskHandlerImpl) handleDecisionCancelTimer(
ctx context.Context,
attr *workflow.CancelTimerDecisionAttributes,
) error {

Expand Down Expand Up @@ -540,6 +556,7 @@ func (handler *decisionTaskHandlerImpl) handleDecisionCancelTimer(
}

func (handler *decisionTaskHandlerImpl) handleDecisionCancelWorkflow(
ctx context.Context,
attr *workflow.CancelWorkflowExecutionDecisionAttributes,
) error {

Expand Down Expand Up @@ -578,6 +595,7 @@ func (handler *decisionTaskHandlerImpl) handleDecisionCancelWorkflow(
}

func (handler *decisionTaskHandlerImpl) handleDecisionRequestCancelExternalWorkflow(
ctx context.Context,
attr *workflow.RequestCancelExternalWorkflowExecutionDecisionAttributes,
) error {

Expand Down Expand Up @@ -620,6 +638,7 @@ func (handler *decisionTaskHandlerImpl) handleDecisionRequestCancelExternalWorkf
}

func (handler *decisionTaskHandlerImpl) handleDecisionRecordMarker(
ctx context.Context,
attr *workflow.RecordMarkerDecisionAttributes,
) error {

Expand Down Expand Up @@ -652,6 +671,7 @@ func (handler *decisionTaskHandlerImpl) handleDecisionRecordMarker(
}

func (handler *decisionTaskHandlerImpl) handleDecisionContinueAsNewWorkflow(
ctx context.Context,
attr *workflow.ContinueAsNewWorkflowExecutionDecisionAttributes,
) error {

Expand Down Expand Up @@ -714,6 +734,7 @@ func (handler *decisionTaskHandlerImpl) handleDecisionContinueAsNewWorkflow(
}

_, newStateBuilder, err := handler.mutableState.AddContinueAsNewEvent(
ctx,
handler.decisionTaskCompletedID,
handler.decisionTaskCompletedID,
parentDomainName,
Expand All @@ -728,6 +749,7 @@ func (handler *decisionTaskHandlerImpl) handleDecisionContinueAsNewWorkflow(
}

func (handler *decisionTaskHandlerImpl) handleDecisionStartChildWorkflow(
ctx context.Context,
attr *workflow.StartChildWorkflowExecutionDecisionAttributes,
) error {

Expand Down Expand Up @@ -796,6 +818,7 @@ func (handler *decisionTaskHandlerImpl) handleDecisionStartChildWorkflow(
}

func (handler *decisionTaskHandlerImpl) handleDecisionSignalExternalWorkflow(
ctx context.Context,
attr *workflow.SignalExternalWorkflowExecutionDecisionAttributes,
) error {

Expand Down Expand Up @@ -848,6 +871,7 @@ func (handler *decisionTaskHandlerImpl) handleDecisionSignalExternalWorkflow(
}

func (handler *decisionTaskHandlerImpl) handleDecisionUpsertWorkflowSearchAttributes(
ctx context.Context,
attr *workflow.UpsertWorkflowSearchAttributesDecisionAttributes,
) error {

Expand Down Expand Up @@ -908,6 +932,7 @@ func convertSearchAttributesToByteArray(fields map[string][]byte) []byte {
}

func (handler *decisionTaskHandlerImpl) retryCronContinueAsNew(
ctx context.Context,
attr *workflow.WorkflowExecutionStartedEventAttributes,
backoffInterval int32,
continueAsNewIter *workflow.ContinueAsNewInitiator,
Expand Down Expand Up @@ -935,6 +960,7 @@ func (handler *decisionTaskHandlerImpl) retryCronContinueAsNew(
}

_, newStateBuilder, err := handler.mutableState.AddContinueAsNewEvent(
ctx,
handler.decisionTaskCompletedID,
handler.decisionTaskCompletedID,
attr.GetParentWorkflowDomain(),
Expand Down
7 changes: 5 additions & 2 deletions service/history/events/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type (
// Cache caches workflow history event
Cache interface {
GetEvent(
ctx context.Context,
shardID int,
domainID string,
workflowID string,
Expand Down Expand Up @@ -172,6 +173,7 @@ func newEventKey(
}

func (e *cacheImpl) GetEvent(
ctx context.Context,
shardID int,
domainID string,
workflowID string,
Expand All @@ -194,7 +196,7 @@ func (e *cacheImpl) GetEvent(

e.metricsClient.IncCounter(metrics.EventsCacheGetEventScope, metrics.CacheMissCounter)

event, err := e.getHistoryEventFromStore(firstEventID, eventID, branchToken, shardID)
event, err := e.getHistoryEventFromStore(ctx, firstEventID, eventID, branchToken, shardID)
if err != nil {
e.metricsClient.IncCounter(metrics.EventsCacheGetEventScope, metrics.CacheFailures)
e.logger.Error("EventsCache unable to retrieve event from store",
Expand Down Expand Up @@ -225,6 +227,7 @@ func (e *cacheImpl) PutEvent(
}

func (e *cacheImpl) getHistoryEventFromStore(
ctx context.Context,
firstEventID,
eventID int64,
branchToken []byte,
Expand All @@ -236,7 +239,7 @@ func (e *cacheImpl) getHistoryEventFromStore(

var historyEvents []*shared.HistoryEvent

response, err := e.historyManager.ReadHistoryBranch(context.TODO(), &persistence.ReadHistoryBranchRequest{
response, err := e.historyManager.ReadHistoryBranch(ctx, &persistence.ReadHistoryBranchRequest{
BranchToken: branchToken,
MinEventID: firstEventID,
MaxEventID: eventID + 1,
Expand Down
9 changes: 5 additions & 4 deletions service/history/events/cache_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit ba0eacb

Please sign in to comment.