Skip to content

Commit

Permalink
Parent close policy should apply to child workflow only (cadence-work…
Browse files Browse the repository at this point in the history
…flow#4612)

- Add childWorkflowOnly flag to history terminate workflow execution request
- Use childworkflowOnly flag when applying parent close policy
- In parent close policy workflow use history client and signal remote frontend if child domain is not active
  • Loading branch information
yycptt authored Nov 30, 2021
1 parent d3d0682 commit c7727c0
Show file tree
Hide file tree
Showing 19 changed files with 1,325 additions and 736 deletions.
136 changes: 130 additions & 6 deletions .gen/go/history/history.go

Large diffs are not rendered by default.

796 changes: 453 additions & 343 deletions .gen/proto/history/v1/service.pb.go

Large diffs are not rendered by default.

590 changes: 295 additions & 295 deletions .gen/proto/history/v1/service.pb.yarpc.go

Large diffs are not rendered by default.

22 changes: 20 additions & 2 deletions common/types/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -1787,8 +1787,10 @@ func (v *SyncShardStatusRequest) GetTimestamp() (o int64) {

// HistoryTerminateWorkflowExecutionRequest is an internal type (TBD...)
type HistoryTerminateWorkflowExecutionRequest struct {
DomainUUID string `json:"domainUUID,omitempty"`
TerminateRequest *TerminateWorkflowExecutionRequest `json:"terminateRequest,omitempty"`
DomainUUID string `json:"domainUUID,omitempty"`
TerminateRequest *TerminateWorkflowExecutionRequest `json:"terminateRequest,omitempty"`
ExternalWorkflowExecution *WorkflowExecution `json:"externalWorkflowExecution,omitempty"`
ChildWorkflowOnly bool `json:"childWorkflowOnly,omitempty"`
}

// GetDomainUUID is an internal getter (TBD...)
Expand All @@ -1807,6 +1809,22 @@ func (v *HistoryTerminateWorkflowExecutionRequest) GetTerminateRequest() (o *Ter
return
}

// GetExternalWorkflowExecution is an internal getter (TBD...)
func (v *HistoryTerminateWorkflowExecutionRequest) GetExternalWorkflowExecution() (o *WorkflowExecution) {
if v != nil && v.ExternalWorkflowExecution != nil {
return v.ExternalWorkflowExecution
}
return
}

// GetChildWorkflowOnly is an internal getter (TBD...)
func (v *HistoryTerminateWorkflowExecutionRequest) GetChildWorkflowOnly() (o bool) {
if v != nil {
return v.ChildWorkflowOnly
}
return
}

// GetFailoverInfoRequest is an internal type (TBD...)
type GetFailoverInfoRequest struct {
DomainID string `json:"domainID,omitempty"`
Expand Down
12 changes: 8 additions & 4 deletions common/types/mapper/proto/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -1416,8 +1416,10 @@ func FromHistoryTerminateWorkflowExecutionRequest(t *types.HistoryTerminateWorkf
return nil
}
return &historyv1.TerminateWorkflowExecutionRequest{
Request: FromTerminateWorkflowExecutionRequest(t.TerminateRequest),
DomainId: t.DomainUUID,
Request: FromTerminateWorkflowExecutionRequest(t.TerminateRequest),
DomainId: t.DomainUUID,
ExternalWorkflowExecution: FromWorkflowExecution(t.ExternalWorkflowExecution),
ChildWorkflowOnly: t.ChildWorkflowOnly,
}
}

Expand All @@ -1426,8 +1428,10 @@ func ToHistoryTerminateWorkflowExecutionRequest(t *historyv1.TerminateWorkflowEx
return nil
}
return &types.HistoryTerminateWorkflowExecutionRequest{
TerminateRequest: ToTerminateWorkflowExecutionRequest(t.Request),
DomainUUID: t.DomainId,
TerminateRequest: ToTerminateWorkflowExecutionRequest(t.Request),
DomainUUID: t.DomainId,
ExternalWorkflowExecution: ToWorkflowExecution(t.ExternalWorkflowExecution),
ChildWorkflowOnly: t.ChildWorkflowOnly,
}
}

Expand Down
12 changes: 8 additions & 4 deletions common/types/mapper/thrift/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -1158,8 +1158,10 @@ func FromHistoryTerminateWorkflowExecutionRequest(t *types.HistoryTerminateWorkf
return nil
}
return &history.TerminateWorkflowExecutionRequest{
DomainUUID: &t.DomainUUID,
TerminateRequest: FromTerminateWorkflowExecutionRequest(t.TerminateRequest),
DomainUUID: &t.DomainUUID,
TerminateRequest: FromTerminateWorkflowExecutionRequest(t.TerminateRequest),
ExternalWorkflowExecution: FromWorkflowExecution(t.ExternalWorkflowExecution),
ChildWorkflowOnly: &t.ChildWorkflowOnly,
}
}

Expand All @@ -1169,8 +1171,10 @@ func ToHistoryTerminateWorkflowExecutionRequest(t *history.TerminateWorkflowExec
return nil
}
return &types.HistoryTerminateWorkflowExecutionRequest{
DomainUUID: t.GetDomainUUID(),
TerminateRequest: ToTerminateWorkflowExecutionRequest(t.TerminateRequest),
DomainUUID: t.GetDomainUUID(),
TerminateRequest: ToTerminateWorkflowExecutionRequest(t.TerminateRequest),
ExternalWorkflowExecution: ToWorkflowExecution(t.ExternalWorkflowExecution),
ChildWorkflowOnly: t.GetChildWorkflowOnly(),
}
}

Expand Down
37 changes: 37 additions & 0 deletions common/types/mapper/thrift/thrift-tests/history_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright (c) 2021 Uber Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package thrifttests

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/uber/cadence/common/types"
"github.com/uber/cadence/common/types/mapper/thrift"
"github.com/uber/cadence/common/types/testdata"
)

func TestHistoryTerminateWorkflowExecutionRequest(t *testing.T) {
for _, item := range []*types.HistoryTerminateWorkflowExecutionRequest{nil, {}, &testdata.HistoryTerminateWorkflowExecutionRequest} {
assert.Equal(t, item, thrift.ToHistoryTerminateWorkflowExecutionRequest(thrift.FromHistoryTerminateWorkflowExecutionRequest(item)))
}
}
6 changes: 4 additions & 2 deletions common/types/testdata/service_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,10 @@ var (
Timestamp: &Timestamp1,
}
HistoryTerminateWorkflowExecutionRequest = types.HistoryTerminateWorkflowExecutionRequest{
DomainUUID: DomainID,
TerminateRequest: &TerminateWorkflowExecutionRequest,
DomainUUID: DomainID,
TerminateRequest: &TerminateWorkflowExecutionRequest,
ExternalWorkflowExecution: &WorkflowExecution,
ChildWorkflowOnly: true,
}
HistoryGetCrossClusterTasksRequest = GetCrossClusterTasksRequest
HistoryGetCrossClusterTasksResponse = GetCrossClusterTasksResponse
Expand Down
2 changes: 1 addition & 1 deletion idls
Submodule idls updated from a6d3e3 to 6ccf14
11 changes: 11 additions & 0 deletions proto/internal/uber/cadence/history/v1/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,9 @@ message StartWorkflowExecutionResponse {
message SignalWorkflowExecutionRequest {
api.v1.SignalWorkflowExecutionRequest request = 1;
string domain_id = 2;
// workflow execution that requests this signal, for making sure
// the workflow being signaled is actually a child of the workflow
// making the request
api.v1.WorkflowExecution external_workflow_execution = 3;
bool child_workflow_only = 4;
}
Expand Down Expand Up @@ -274,6 +277,11 @@ message ResetWorkflowExecutionResponse {
message TerminateWorkflowExecutionRequest {
api.v1.TerminateWorkflowExecutionRequest request = 1;
string domain_id = 2;
// workflow execution that requests this termination, for making sure
// the workflow being terminated is actually a child of the workflow
// making the request
api.v1.WorkflowExecution external_workflow_execution = 3;
bool child_workflow_only = 4;
}

message TerminateWorkflowExecutionResponse {
Expand Down Expand Up @@ -472,6 +480,9 @@ message RemoveSignalMutableStateResponse {
message RequestCancelWorkflowExecutionRequest {
string domain_id = 1;
api.v1.RequestCancelWorkflowExecutionRequest cancel_request = 2;
// workflow execution that requests this cancellation, for making sure
// the workflow being cancelled is actually a child of the workflow
// making the request
api.v1.ExternalExecutionInfo external_execution_info = 3;
bool child_workflow_only = 4;
}
Expand Down
12 changes: 12 additions & 0 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2434,6 +2434,8 @@ func (e *historyEngineImpl) TerminateWorkflowExecution(
domainID := domainEntry.GetInfo().ID

request := terminateRequest.TerminateRequest
parentExecution := terminateRequest.ExternalWorkflowExecution
childWorkflowOnly := terminateRequest.GetChildWorkflowOnly()
workflowExecution := types.WorkflowExecution{
WorkflowID: request.WorkflowExecution.WorkflowID,
RunID: request.WorkflowExecution.RunID,
Expand All @@ -2451,6 +2453,16 @@ func (e *historyEngineImpl) TerminateWorkflowExecution(
return nil, workflow.ErrAlreadyCompleted
}

if childWorkflowOnly {
executionInfo := mutableState.GetExecutionInfo()
parentWorkflowID := executionInfo.ParentWorkflowID
parentRunID := executionInfo.ParentRunID
if parentExecution.GetWorkflowID() != parentWorkflowID ||
parentExecution.GetRunID() != parentRunID {
return nil, workflow.ErrParentMismatch
}
}

eventBatchFirstEventID := mutableState.GetNextEventID()
return workflow.UpdateWithoutDecision, execution.TerminateWorkflow(
mutableState,
Expand Down
4 changes: 4 additions & 0 deletions service/history/historyEngine2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1476,6 +1476,10 @@ func (s *engine2Suite) TestSignalWithStartWorkflowExecution_Start_WorkflowAlread
s.NotNil(err)
}

func (s *engine2Suite) TestTerminateWorkflowExecution_Success() {

}

func (s *engine2Suite) TestNewChildContext() {
ctx := context.Background()
childCtx, childCancel := s.historyEngine.newChildContext(ctx)
Expand Down
10 changes: 9 additions & 1 deletion service/history/task/cross_cluster_target_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,14 +249,22 @@ func (t *crossClusterTargetTaskExecutor) executeApplyParentClosePolicyTask(
err = applyParentClosePolicy(
ctx,
t.historyClient,
&types.WorkflowExecution{
WorkflowID: task.GetWorkflowID(),
RunID: task.GetRunID(),
},
childAttrs.ChildDomainID,
targetDomainName,
childAttrs.ChildWorkflowID,
childAttrs.ChildRunID,
*childAttrs.ParentClosePolicy,
)
switch err.(type) {
case *types.EntityNotExistsError, *types.WorkflowExecutionAlreadyCompletedError, *types.CancellationAlreadyRequestedError:
case nil:
continue
case *types.EntityNotExistsError,
*types.WorkflowExecutionAlreadyCompletedError,
*types.CancellationAlreadyRequestedError:
// expected error, no-op
break
default:
Expand Down
101 changes: 98 additions & 3 deletions service/history/task/cross_cluster_target_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
package task

import (
"context"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.uber.org/yarpc"

"github.com/uber/cadence/client/history"
"github.com/uber/cadence/common"
Expand Down Expand Up @@ -331,6 +333,60 @@ func (s *crossClusterTargetTaskExecutorSuite) TestSignalExecutionTask_RemoveSign
s.NotNil(task.response.SignalExecutionAttributes)
}

func (s *crossClusterTargetTaskExecutorSuite) TestApplyParentClosePolicyTask_Success() {
task := s.getTestApplyParentClosePolicyTask(processingStateInitialized)

taskInfo := task.GetInfo().(*persistence.CrossClusterTaskInfo)
for _, childAttr := range task.request.ApplyParentClosePolicyAttributes.ApplyParentClosePolicyAttributes {
switch *childAttr.GetParentClosePolicy() {
case types.ParentClosePolicyRequestCancel:
s.mockHistoryClient.EXPECT().RequestCancelWorkflowExecution(gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, request *types.HistoryRequestCancelWorkflowExecutionRequest, option ...yarpc.CallOption) error {
s.Equal(childAttr.ChildDomainID, request.GetDomainUUID())
s.Equal(childAttr.ChildWorkflowID, request.GetCancelRequest().GetWorkflowExecution().GetWorkflowID())
s.Equal(childAttr.ChildRunID, request.GetCancelRequest().GetWorkflowExecution().GetRunID())
s.True(request.GetChildWorkflowOnly())
s.Equal(taskInfo.GetWorkflowID(), request.GetExternalWorkflowExecution().GetWorkflowID())
s.Equal(taskInfo.GetRunID(), request.GetExternalWorkflowExecution().GetRunID())
return nil
}).Times(1)
case types.ParentClosePolicyTerminate:
s.mockHistoryClient.EXPECT().TerminateWorkflowExecution(gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, request *types.HistoryTerminateWorkflowExecutionRequest, option ...yarpc.CallOption) error {
s.Equal(childAttr.ChildDomainID, request.GetDomainUUID())
s.Equal(childAttr.ChildWorkflowID, request.GetTerminateRequest().GetWorkflowExecution().GetWorkflowID())
s.Equal(childAttr.ChildRunID, request.GetTerminateRequest().GetWorkflowExecution().GetRunID())
s.True(request.GetChildWorkflowOnly())
s.Equal(taskInfo.GetWorkflowID(), request.GetExternalWorkflowExecution().GetWorkflowID())
s.Equal(taskInfo.GetRunID(), request.GetExternalWorkflowExecution().GetRunID())
return nil
}).Times(1)
}
}

err := s.executor.Execute(task, true)
s.NoError(err)

s.Equal(task.GetTaskID(), task.response.GetTaskID())
s.Equal(types.CrossClusterTaskTypeApplyParentPolicy, task.response.GetTaskType())
s.Nil(task.response.FailedCause)
s.NotNil(task.response.ApplyParentClosePolicyAttributes)
}

func (s *crossClusterTargetTaskExecutorSuite) TestApplyParentClosePolicyTask_Failed() {
task := s.getTestApplyParentClosePolicyTask(processingStateInitialized)

s.mockHistoryClient.EXPECT().RequestCancelWorkflowExecution(gomock.Any(), gomock.Any()).Return(&types.DomainNotActiveError{}).AnyTimes()
s.mockHistoryClient.EXPECT().TerminateWorkflowExecution(gomock.Any(), gomock.Any()).Return(&types.DomainNotActiveError{}).AnyTimes()

err := s.executor.Execute(task, true)
s.NoError(err)

s.Equal(task.GetTaskID(), task.response.GetTaskID())
s.Equal(types.CrossClusterTaskTypeApplyParentPolicy, task.response.GetTaskType())
s.Equal(types.CrossClusterTaskFailedCauseDomainNotActive, task.response.GetFailedCause())
}

func (s *crossClusterTargetTaskExecutorSuite) getTestStartChildExecutionTask(
processingState processingState,
targetRunID *string,
Expand All @@ -353,6 +409,7 @@ func (s *crossClusterTargetTaskExecutorSuite) getTestStartChildExecutionTask(
},
nil,
nil,
nil,
)
}

Expand All @@ -372,6 +429,7 @@ func (s *crossClusterTargetTaskExecutorSuite) getTestCancelExecutionTask(
ChildWorkflowOnly: false,
},
nil,
nil,
)
}

Expand All @@ -394,6 +452,41 @@ func (s *crossClusterTargetTaskExecutorSuite) getTestSignalExecutionTask(
SignalInput: []byte("some random signal input"),
Control: []byte("some random control"),
},
nil,
)
}

func (s *crossClusterTargetTaskExecutorSuite) getTestApplyParentClosePolicyTask(
processingState processingState,
) *crossClusterTargetTask {
return s.getTestCrossClusterTargetTask(
types.CrossClusterTaskTypeApplyParentPolicy,
processingState,
nil,
nil,
nil,
&types.CrossClusterApplyParentClosePolicyRequestAttributes{
ApplyParentClosePolicyAttributes: []*types.ApplyParentClosePolicyAttributes{
{
ChildDomainID: constants.TestTargetDomainID,
ChildWorkflowID: "some random target workflowID",
ChildRunID: "some random target runID",
ParentClosePolicy: types.ParentClosePolicyAbandon.Ptr(),
},
{
ChildDomainID: constants.TestTargetDomainID,
ChildWorkflowID: "some random target workflowID",
ChildRunID: "some random target runID",
ParentClosePolicy: types.ParentClosePolicyRequestCancel.Ptr(),
},
{
ChildDomainID: constants.TestTargetDomainID,
ChildWorkflowID: "some random target workflowID",
ChildRunID: "some random target runID",
ParentClosePolicy: types.ParentClosePolicyTerminate.Ptr(),
},
},
},
)
}

Expand All @@ -403,6 +496,7 @@ func (s *crossClusterTargetTaskExecutorSuite) getTestCrossClusterTargetTask(
startChildAttributes *types.CrossClusterStartChildExecutionRequestAttributes,
cancelAttributes *types.CrossClusterCancelExecutionRequestAttributes,
signalAttributes *types.CrossClusterSignalExecutionRequestAttributes,
parentClosePolicyAttributes *types.CrossClusterApplyParentClosePolicyRequestAttributes,
) *crossClusterTargetTask {
task, _ := NewCrossClusterTargetTask(
s.mockShard,
Expand All @@ -416,9 +510,10 @@ func (s *crossClusterTargetTaskExecutorSuite) getTestCrossClusterTargetTask(
TaskID: int64(1234),
VisibilityTimestamp: common.Int64Ptr(time.Now().UnixNano()),
},
StartChildExecutionAttributes: startChildAttributes,
CancelExecutionAttributes: cancelAttributes,
SignalExecutionAttributes: signalAttributes,
StartChildExecutionAttributes: startChildAttributes,
CancelExecutionAttributes: cancelAttributes,
SignalExecutionAttributes: signalAttributes,
ApplyParentClosePolicyAttributes: parentClosePolicyAttributes,
},
s.executor,
nil,
Expand Down
Loading

0 comments on commit c7727c0

Please sign in to comment.