Skip to content

Commit

Permalink
Pick signal requestID dedup fix from temporal (cadence-workflow#4210)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored May 24, 2021
1 parent 61399e5 commit 2fb5c02
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 32 deletions.
32 changes: 20 additions & 12 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2144,19 +2144,28 @@ func (e *historyEngineImpl) SignalWorkflowExecution(
workflowExecution,
e.timeSource.Now(),
func(wfContext execution.Context, mutableState execution.MutableState) (*workflow.UpdateAction, error) {
// first deduplicate by request id for signal decision
// this is done before workflow running check so that already completed error
// won't be returned for duplicated signals even if the workflow is closed.
if requestID := request.GetRequestID(); requestID != "" {
if mutableState.IsSignalRequested(requestID) {
return &workflow.UpdateAction{
Noop: true,
CreateDecision: false,
}, nil
}
}

if !mutableState.IsWorkflowExecutionRunning() {
return nil, workflow.ErrAlreadyCompleted
}

executionInfo := mutableState.GetExecutionInfo()
createDecisionTask := true
// Do not create decision task when the workflow is cron and the cron has not been started yet
if mutableState.GetExecutionInfo().CronSchedule != "" && !mutableState.HasProcessedOrPendingDecision() {
createDecisionTask = false
}
postActions := &workflow.UpdateAction{
CreateDecision: createDecisionTask,
}

if !mutableState.IsWorkflowExecutionRunning() {
return nil, workflow.ErrAlreadyCompleted
}

maxAllowedSignals := e.config.MaximumSignalsPerExecution(domainEntry.GetInfo().Name)
if maxAllowedSignals > 0 && int(executionInfo.SignalCount) >= maxAllowedSignals {
Expand All @@ -2176,11 +2185,7 @@ func (e *historyEngineImpl) SignalWorkflowExecution(
}
}

// deduplicate by request id for signal decision
if requestID := request.GetRequestID(); requestID != "" {
if mutableState.IsSignalRequested(requestID) {
return postActions, nil
}
mutableState.AddSignalRequested(requestID)
}

Expand All @@ -2191,7 +2196,10 @@ func (e *historyEngineImpl) SignalWorkflowExecution(
return nil, &types.InternalServiceError{Message: "Unable to signal workflow execution."}
}

return postActions, nil
return &workflow.UpdateAction{
Noop: false,
CreateDecision: createDecisionTask,
}, nil
})
}

Expand Down
78 changes: 58 additions & 20 deletions service/history/historyEngine_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright (c) 2017-2020 Uber Technologies Inc.
// Copyright (c) 2017-2021 Uber Technologies Inc.
// Portions of the Software are attributed to Copyright (c) 2021 Temporal Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -4908,20 +4909,22 @@ func (s *engineSuite) TestCancelTimer_RespondDecisionTaskCompleted_TimerFired()
s.False(executionBuilder.HasBufferedEvents())
}

func (s *engineSuite) TestSignalWorkflowExecution() {
func (s *engineSuite) TestSignalWorkflowExecution_InvalidRequest() {
signalRequest := &types.HistorySignalWorkflowExecutionRequest{}
err := s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest)
s.Error(err)
}

func (s *engineSuite) TestSignalWorkflowExecution() {
we := types.WorkflowExecution{
WorkflowID: "wId",
WorkflowID: constants.TestWorkflowID,
RunID: constants.TestRunID,
}
tasklist := "testTaskList"
identity := "testIdentity"
signalName := "my signal name"
input := []byte("test input")
signalRequest = &types.HistorySignalWorkflowExecutionRequest{
signalRequest := &types.HistorySignalWorkflowExecutionRequest{
DomainUUID: constants.TestDomainID,
SignalRequest: &types.SignalWorkflowExecutionRequest{
Domain: constants.TestDomainID,
Expand All @@ -4948,26 +4951,22 @@ func (s *engineSuite) TestSignalWorkflowExecution() {
s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything, mock.Anything).Return(&p.AppendHistoryNodesResponse{Size: 0}, nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything, mock.Anything).Return(&p.UpdateWorkflowExecutionResponse{MutableStateUpdateSessionStats: &p.MutableStateUpdateSessionStats{}}, nil).Once()

err = s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest)
err := s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest)
s.Nil(err)
}

// Test signal decision by adding request ID
func (s *engineSuite) TestSignalWorkflowExecution_DuplicateRequest() {
signalRequest := &types.HistorySignalWorkflowExecutionRequest{}
err := s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest)
s.Error(err)

func (s *engineSuite) TestSignalWorkflowExecution_DuplicateRequest_WorkflowOpen() {
we := types.WorkflowExecution{
WorkflowID: "wId2",
WorkflowID: constants.TestWorkflowID,
RunID: constants.TestRunID,
}
tasklist := "testTaskList"
identity := "testIdentity"
signalName := "my signal name 2"
input := []byte("test input 2")
requestID := uuid.New()
signalRequest = &types.HistorySignalWorkflowExecutionRequest{
signalRequest := &types.HistorySignalWorkflowExecutionRequest{
DomainUUID: constants.TestDomainID,
SignalRequest: &types.SignalWorkflowExecutionRequest{
Domain: constants.TestDomainID,
Expand Down Expand Up @@ -4995,26 +4994,65 @@ func (s *engineSuite) TestSignalWorkflowExecution_DuplicateRequest() {
gwmsResponse := &persistence.GetWorkflowExecutionResponse{State: ms}

s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(gwmsResponse, nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything, mock.Anything).Return(&p.UpdateWorkflowExecutionResponse{MutableStateUpdateSessionStats: &p.MutableStateUpdateSessionStats{}}, nil).Once()

err = s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest)
err := s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest)
s.Nil(err)
}

func (s *engineSuite) TestSignalWorkflowExecution_Failed() {
signalRequest := &types.HistorySignalWorkflowExecutionRequest{}
func (s *engineSuite) TestSignalWorkflowExecution_DuplicateRequest_WorkflowCompleted() {
we := types.WorkflowExecution{
WorkflowID: constants.TestWorkflowID,
RunID: constants.TestRunID,
}
tasklist := "testTaskList"
identity := "testIdentity"
signalName := "my signal name 2"
input := []byte("test input 2")
requestID := uuid.New()
signalRequest := &types.HistorySignalWorkflowExecutionRequest{
DomainUUID: constants.TestDomainID,
SignalRequest: &types.SignalWorkflowExecutionRequest{
Domain: constants.TestDomainID,
WorkflowExecution: &we,
Identity: identity,
SignalName: signalName,
Input: input,
RequestID: requestID,
},
}

msBuilder := execution.NewMutableStateBuilderWithEventV2(
s.mockHistoryEngine.shard,
loggerimpl.NewLoggerForTest(s.Suite),
we.GetRunID(),
constants.TestLocalDomainEntry,
)
test.AddWorkflowExecutionStartedEvent(msBuilder, we, "wType", tasklist, []byte("input"), 100, 200, identity)
test.AddDecisionTaskScheduledEvent(msBuilder)
ms := execution.CreatePersistenceMutableState(msBuilder)
// assume duplicate request id
ms.SignalRequestedIDs = make(map[string]struct{})
ms.SignalRequestedIDs[requestID] = struct{}{}
ms.ExecutionInfo.DomainID = constants.TestDomainID
ms.ExecutionInfo.State = persistence.WorkflowStateCompleted
gwmsResponse := &persistence.GetWorkflowExecutionResponse{State: ms}

s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(gwmsResponse, nil).Once()

err := s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest)
s.Error(err)
s.Nil(err)
}

func (s *engineSuite) TestSignalWorkflowExecution_WorkflowCompleted() {
we := &types.WorkflowExecution{
WorkflowID: "wId",
WorkflowID: constants.TestWorkflowID,
RunID: constants.TestRunID,
}
tasklist := "testTaskList"
identity := "testIdentity"
signalName := "my signal name"
input := []byte("test input")
signalRequest = &types.HistorySignalWorkflowExecutionRequest{
signalRequest := &types.HistorySignalWorkflowExecutionRequest{
DomainUUID: constants.TestDomainID,
SignalRequest: &types.SignalWorkflowExecutionRequest{
Domain: constants.TestDomainID,
Expand All @@ -5039,7 +5077,7 @@ func (s *engineSuite) TestSignalWorkflowExecution_Failed() {

s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(gwmsResponse, nil).Once()

err = s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest)
err := s.mockHistoryEngine.SignalWorkflowExecution(context.Background(), signalRequest)
s.EqualError(err, "WorkflowExecutionAlreadyCompletedError{Message: workflow execution already completed}")
}

Expand Down

0 comments on commit 2fb5c02

Please sign in to comment.