diff --git a/client/clientfactory.go b/client/clientfactory.go index 5005725780b..e8a41ba69a8 100644 --- a/client/clientfactory.go +++ b/client/clientfactory.go @@ -129,7 +129,15 @@ func (cf *rpcClientFactory) NewHistoryClientWithTimeout(timeout time.Duration) ( return history.NewThriftClient(historyserviceclient.New(dispatcher.ClientConfig(common.HistoryServiceName))), nil } - client := history.NewClient(cf.numberOfHistoryShards, timeout, common.NewClientCache(keyResolver, clientProvider), cf.logger) + client := history.NewClient( + cf.numberOfHistoryShards, + timeout, + common.NewClientCache(keyResolver, clientProvider), + cf.logger, + ) + if errorRate := cf.dynConfig.GetFloat64Property(dynamicconfig.HistoryErrorInjectionRate, 0)(); errorRate != 0 { + client = history.NewErrorInjectionClient(client, errorRate, cf.logger) + } if cf.metricsClient != nil { client = history.NewMetricClient(client, cf.metricsClient) } diff --git a/client/history/errorInjectionClient.go b/client/history/errorInjectionClient.go new file mode 100644 index 00000000000..b066f16dbae --- /dev/null +++ b/client/history/errorInjectionClient.go @@ -0,0 +1,1052 @@ +// Copyright (c) 2017-2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package history + +import ( + "context" + + "go.uber.org/yarpc" + + "github.com/uber/cadence/common/errors" + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/log/tag" + "github.com/uber/cadence/common/types" +) + +var _ Client = (*errorInjectionClient)(nil) + +const ( + msgInjectedFakeErr = "Injected fake history client error" +) + +type errorInjectionClient struct { + client Client + errorRate float64 + logger log.Logger +} + +// NewErrorInjectionClient creates a new instance of Client that injects fake error +func NewErrorInjectionClient( + client Client, + errorRate float64, + logger log.Logger, +) Client { + return &errorInjectionClient{ + client: client, + errorRate: errorRate, + logger: logger, + } +} + +func (c *errorInjectionClient) StartWorkflowExecution( + ctx context.Context, + request *types.HistoryStartWorkflowExecutionRequest, + opts ...yarpc.CallOption, +) (*types.StartWorkflowExecutionResponse, error) { + fakeErr := errors.GenerateFakeError(c.errorRate) + + var resp *types.StartWorkflowExecutionResponse + var clientErr error + var forwardCall bool + if forwardCall = errors.ShouldForwardCall(fakeErr); forwardCall { + resp, clientErr = c.client.StartWorkflowExecution(ctx, request, opts...) + } + + if fakeErr != nil { + c.logger.Error(msgInjectedFakeErr, + tag.HistoryClientOperationStartWorkflowExecution, + tag.Error(fakeErr), + tag.Bool(forwardCall), + tag.ClientError(clientErr), + ) + return nil, fakeErr + } + return resp, clientErr +} + +func (c *errorInjectionClient) DescribeHistoryHost( + ctx context.Context, + request *types.DescribeHistoryHostRequest, + opts ...yarpc.CallOption, +) (*types.DescribeHistoryHostResponse, error) { + fakeErr := errors.GenerateFakeError(c.errorRate) + + var resp *types.DescribeHistoryHostResponse + var clientErr error + var forwardCall bool + if forwardCall = errors.ShouldForwardCall(fakeErr); forwardCall { + resp, clientErr = c.client.DescribeHistoryHost(ctx, request, opts...) + } + + if fakeErr != nil { + c.logger.Error(msgInjectedFakeErr, + tag.HistoryClientOperationDescribeHistoryHost, + tag.Error(fakeErr), + tag.Bool(forwardCall), + tag.ClientError(clientErr), + ) + return nil, fakeErr + } + return resp, clientErr +} + +func (c *errorInjectionClient) CloseShard( + ctx context.Context, + request *types.CloseShardRequest, + opts ...yarpc.CallOption, +) error { + fakeErr := errors.GenerateFakeError(c.errorRate) + + var clientErr error + var forwardCall bool + if forwardCall = errors.ShouldForwardCall(fakeErr); forwardCall { + clientErr = c.client.CloseShard(ctx, request, opts...) + } + + if fakeErr != nil { + c.logger.Error(msgInjectedFakeErr, + tag.HistoryClientOperationCloseShard, + tag.Error(fakeErr), + tag.Bool(forwardCall), + tag.ClientError(clientErr), + ) + return fakeErr + } + return clientErr +} + +func (c *errorInjectionClient) ResetQueue( + ctx context.Context, + request *types.ResetQueueRequest, + opts ...yarpc.CallOption, +) error { + fakeErr := errors.GenerateFakeError(c.errorRate) + + var clientErr error + var forwardCall bool + if forwardCall = errors.ShouldForwardCall(fakeErr); forwardCall { + clientErr = c.client.ResetQueue(ctx, request, opts...) + } + + if fakeErr != nil { + c.logger.Error(msgInjectedFakeErr, + tag.HistoryClientOperationResetQueue, + tag.Error(fakeErr), + tag.Bool(forwardCall), + tag.ClientError(clientErr), + ) + return fakeErr + } + return clientErr +} + +func (c *errorInjectionClient) DescribeQueue( + ctx context.Context, + request *types.DescribeQueueRequest, + opts ...yarpc.CallOption, +) (*types.DescribeQueueResponse, error) { + fakeErr := errors.GenerateFakeError(c.errorRate) + + var resp *types.DescribeQueueResponse + var clientErr error + var forwardCall bool + if forwardCall = errors.ShouldForwardCall(fakeErr); forwardCall { + resp, clientErr = c.client.DescribeQueue(ctx, request, opts...) + } + + if fakeErr != nil { + c.logger.Error(msgInjectedFakeErr, + tag.HistoryClientOperationDescribeQueue, + tag.Error(fakeErr), + tag.Bool(forwardCall), + tag.ClientError(clientErr), + ) + return nil, fakeErr + } + return resp, clientErr +} + +func (c *errorInjectionClient) RemoveTask( + ctx context.Context, + request *types.RemoveTaskRequest, + opts ...yarpc.CallOption, +) error { + fakeErr := errors.GenerateFakeError(c.errorRate) + + var clientErr error + var forwardCall bool + if forwardCall = errors.ShouldForwardCall(fakeErr); forwardCall { + clientErr = c.client.RemoveTask(ctx, request, opts...) + } + + if fakeErr != nil { + c.logger.Error(msgInjectedFakeErr, + tag.HistoryClientOperationRemoveTask, + tag.Error(fakeErr), + tag.Bool(forwardCall), + tag.ClientError(clientErr), + ) + return fakeErr + } + return clientErr +} + +func (c *errorInjectionClient) DescribeMutableState( + ctx context.Context, + request *types.DescribeMutableStateRequest, + opts ...yarpc.CallOption, +) (*types.DescribeMutableStateResponse, error) { + fakeErr := errors.GenerateFakeError(c.errorRate) + + var resp *types.DescribeMutableStateResponse + var clientErr error + var forwardCall bool + if forwardCall = errors.ShouldForwardCall(fakeErr); forwardCall { + resp, clientErr = c.client.DescribeMutableState(ctx, request, opts...) + } + + if fakeErr != nil { + c.logger.Error(msgInjectedFakeErr, + tag.HistoryClientOperationDescribeMutableState, + tag.Error(fakeErr), + tag.Bool(forwardCall), + tag.ClientError(clientErr), + ) + return nil, fakeErr + } + return resp, clientErr +} + +func (c *errorInjectionClient) GetMutableState( + ctx context.Context, + request *types.GetMutableStateRequest, + opts ...yarpc.CallOption, +) (*types.GetMutableStateResponse, error) { + fakeErr := errors.GenerateFakeError(c.errorRate) + + var resp *types.GetMutableStateResponse + var clientErr error + var forwardCall bool + if forwardCall = errors.ShouldForwardCall(fakeErr); forwardCall { + resp, clientErr = c.client.GetMutableState(ctx, request, opts...) + } + + if fakeErr != nil { + c.logger.Error(msgInjectedFakeErr, + tag.HistoryClientOperationGetMutableState, + tag.Error(fakeErr), + tag.Bool(forwardCall), + tag.ClientError(clientErr), + ) + return nil, fakeErr + } + return resp, clientErr +} + +func (c *errorInjectionClient) PollMutableState( + ctx context.Context, + request *types.PollMutableStateRequest, + opts ...yarpc.CallOption, +) (*types.PollMutableStateResponse, error) { + fakeErr := errors.GenerateFakeError(c.errorRate) + + var resp *types.PollMutableStateResponse + var clientErr error + var forwardCall bool + if forwardCall = errors.ShouldForwardCall(fakeErr); forwardCall { + resp, clientErr = c.client.PollMutableState(ctx, request, opts...) + } + + if fakeErr != nil { + c.logger.Error(msgInjectedFakeErr, + tag.HistoryClientOperationPollMutableState, + tag.Error(fakeErr), + tag.Bool(forwardCall), + tag.ClientError(clientErr), + ) + return nil, fakeErr + } + return resp, clientErr +} + +func (c *errorInjectionClient) ResetStickyTaskList( + ctx context.Context, + request *types.HistoryResetStickyTaskListRequest, + opts ...yarpc.CallOption, +) (*types.HistoryResetStickyTaskListResponse, error) { + fakeErr := errors.GenerateFakeError(c.errorRate) + + var resp *types.HistoryResetStickyTaskListResponse + var clientErr error + var forwardCall bool + if forwardCall = errors.ShouldForwardCall(fakeErr); forwardCall { + resp, clientErr = c.client.ResetStickyTaskList(ctx, request, opts...) + } + + if fakeErr != nil { + c.logger.Error(msgInjectedFakeErr, + tag.HistoryClientOperationResetStickyTaskList, + tag.Error(fakeErr), + tag.Bool(forwardCall), + tag.ClientError(clientErr), + ) + return nil, fakeErr + } + return resp, clientErr +} + +func (c *errorInjectionClient) DescribeWorkflowExecution( + ctx context.Context, + request *types.HistoryDescribeWorkflowExecutionRequest, + opts ...yarpc.CallOption, +) (*types.DescribeWorkflowExecutionResponse, error) { + fakeErr := errors.GenerateFakeError(c.errorRate) + + var resp *types.DescribeWorkflowExecutionResponse + var clientErr error + var forwardCall bool + if forwardCall = errors.ShouldForwardCall(fakeErr); forwardCall { + resp, clientErr = c.client.DescribeWorkflowExecution(ctx, request, opts...) + } + + if fakeErr != nil { + c.logger.Error(msgInjectedFakeErr, + tag.HistoryClientOperationDescribeWorkflowExecution, + tag.Error(fakeErr), + tag.Bool(forwardCall), + tag.ClientError(clientErr), + ) + return nil, fakeErr + } + return resp, clientErr +} + +func (c *errorInjectionClient) RecordDecisionTaskStarted( + ctx context.Context, + request *types.RecordDecisionTaskStartedRequest, + opts ...yarpc.CallOption, +) (*types.RecordDecisionTaskStartedResponse, error) { + + fakeErr := errors.GenerateFakeError(c.errorRate) + + var resp *types.RecordDecisionTaskStartedResponse + var clientErr error + var forwardCall bool + if forwardCall = errors.ShouldForwardCall(fakeErr); forwardCall { + resp, clientErr = c.client.RecordDecisionTaskStarted(ctx, request, opts...) + } + + if fakeErr != nil { + c.logger.Error(msgInjectedFakeErr, + tag.HistoryClientOperationRecordDecisionTaskStarted, + tag.Error(fakeErr), + tag.Bool(forwardCall), + tag.ClientError(clientErr), + ) + return nil, fakeErr + } + return resp, clientErr +} + +func (c *errorInjectionClient) RecordActivityTaskStarted( + ctx context.Context, + request *types.RecordActivityTaskStartedRequest, + opts ...yarpc.CallOption, +) (*types.RecordActivityTaskStartedResponse, error) { + fakeErr := errors.GenerateFakeError(c.errorRate) + + var resp *types.RecordActivityTaskStartedResponse + var clientErr error + var forwardCall bool + if forwardCall = errors.ShouldForwardCall(fakeErr); forwardCall { + resp, clientErr = c.client.RecordActivityTaskStarted(ctx, request, opts...) + } + + if fakeErr != nil { + c.logger.Error(msgInjectedFakeErr, + tag.HistoryClientOperationRecordActivityTaskStarted, + tag.Error(fakeErr), + tag.Bool(forwardCall), + tag.ClientError(clientErr), + ) + return nil, fakeErr + } + return resp, clientErr +} + +func (c *errorInjectionClient) RespondDecisionTaskCompleted( + ctx context.Context, + request *types.HistoryRespondDecisionTaskCompletedRequest, + opts ...yarpc.CallOption, +) (*types.HistoryRespondDecisionTaskCompletedResponse, error) { + fakeErr := errors.GenerateFakeError(c.errorRate) + + var resp *types.HistoryRespondDecisionTaskCompletedResponse + var clientErr error + var forwardCall bool + if forwardCall = errors.ShouldForwardCall(fakeErr); forwardCall { + resp, clientErr = c.client.RespondDecisionTaskCompleted(ctx, request, opts...) + } + + if fakeErr != nil { + c.logger.Error(msgInjectedFakeErr, + tag.HistoryClientOperationRecordDecisionTaskCompleted, + tag.Error(fakeErr), + tag.Bool(forwardCall), + tag.ClientError(clientErr), + ) + return nil, fakeErr + } + return resp, clientErr +} + +func (c *errorInjectionClient) RespondDecisionTaskFailed( + ctx context.Context, + request *types.HistoryRespondDecisionTaskFailedRequest, + opts ...yarpc.CallOption, +) error { + fakeErr := errors.GenerateFakeError(c.errorRate) + + var clientErr error + var forwardCall bool + if forwardCall = errors.ShouldForwardCall(fakeErr); forwardCall { + clientErr = c.client.RespondDecisionTaskFailed(ctx, request, opts...) + } + + if fakeErr != nil { + c.logger.Error(msgInjectedFakeErr, + tag.HistoryClientOperationRecordDecisionTaskFailed, + tag.Error(fakeErr), + tag.Bool(forwardCall), + tag.ClientError(clientErr), + ) + return fakeErr + } + return clientErr +} + +func (c *errorInjectionClient) RespondActivityTaskCompleted( + ctx context.Context, + request *types.HistoryRespondActivityTaskCompletedRequest, + opts ...yarpc.CallOption, +) error { + fakeErr := errors.GenerateFakeError(c.errorRate) + + var clientErr error + var forwardCall bool + if forwardCall = errors.ShouldForwardCall(fakeErr); forwardCall { + clientErr = c.client.RespondActivityTaskCompleted(ctx, request, opts...) + } + + if fakeErr != nil { + c.logger.Error(msgInjectedFakeErr, + tag.HistoryClientOperationRecordActivityTaskCompleted, + tag.Error(fakeErr), + tag.Bool(forwardCall), + tag.ClientError(clientErr), + ) + return fakeErr + } + return clientErr +} + +func (c *errorInjectionClient) RespondActivityTaskFailed( + ctx context.Context, + request *types.HistoryRespondActivityTaskFailedRequest, + opts ...yarpc.CallOption, +) error { + fakeErr := errors.GenerateFakeError(c.errorRate) + + var clientErr error + var forwardCall bool + if forwardCall = errors.ShouldForwardCall(fakeErr); forwardCall { + clientErr = c.client.RespondActivityTaskFailed(ctx, request, opts...) + } + + if fakeErr != nil { + c.logger.Error(msgInjectedFakeErr, + tag.HistoryClientOperationRecordActivityTaskFailed, + tag.Error(fakeErr), + tag.Bool(forwardCall), + tag.ClientError(clientErr), + ) + return fakeErr + } + return clientErr +} + +func (c *errorInjectionClient) RespondActivityTaskCanceled( + ctx context.Context, + request *types.HistoryRespondActivityTaskCanceledRequest, + opts ...yarpc.CallOption, +) error { + fakeErr := errors.GenerateFakeError(c.errorRate) + + var clientErr error + var forwardCall bool + if forwardCall = errors.ShouldForwardCall(fakeErr); forwardCall { + clientErr = c.client.RespondActivityTaskCanceled(ctx, request, opts...) + } + + if fakeErr != nil { + c.logger.Error(msgInjectedFakeErr, + tag.HistoryClientOperationRecordActivityTaskCanceled, + tag.Error(fakeErr), + tag.Bool(forwardCall), + tag.ClientError(clientErr), + ) + return fakeErr + } + return clientErr +} + +func (c *errorInjectionClient) RecordActivityTaskHeartbeat( + ctx context.Context, + request *types.HistoryRecordActivityTaskHeartbeatRequest, + opts ...yarpc.CallOption, +) (*types.RecordActivityTaskHeartbeatResponse, error) { + fakeErr := errors.GenerateFakeError(c.errorRate) + + var resp *types.RecordActivityTaskHeartbeatResponse + var clientErr error + var forwardCall bool + if forwardCall = errors.ShouldForwardCall(fakeErr); forwardCall { + resp, clientErr = c.client.RecordActivityTaskHeartbeat(ctx, request, opts...) + } + + if fakeErr != nil { + c.logger.Error(msgInjectedFakeErr, + tag.HistoryClientOperationRecordActivityTaskHeartbeat, + tag.Error(fakeErr), + tag.Bool(forwardCall), + tag.ClientError(clientErr), + ) + return nil, fakeErr + } + return resp, clientErr +} + +func (c *errorInjectionClient) RequestCancelWorkflowExecution( + ctx context.Context, + request *types.HistoryRequestCancelWorkflowExecutionRequest, + opts ...yarpc.CallOption, +) error { + fakeErr := errors.GenerateFakeError(c.errorRate) + + var clientErr error + var forwardCall bool + if forwardCall = errors.ShouldForwardCall(fakeErr); forwardCall { + clientErr = c.client.RequestCancelWorkflowExecution(ctx, request, opts...) + } + + if fakeErr != nil { + c.logger.Error(msgInjectedFakeErr, + tag.HistoryClientOperationRequestCancelWorkflowExecution, + tag.Error(fakeErr), + tag.Bool(forwardCall), + tag.ClientError(clientErr), + ) + return fakeErr + } + return clientErr +} + +func (c *errorInjectionClient) SignalWorkflowExecution( + ctx context.Context, + request *types.HistorySignalWorkflowExecutionRequest, + opts ...yarpc.CallOption, +) error { + fakeErr := errors.GenerateFakeError(c.errorRate) + + var clientErr error + var forwardCall bool + if forwardCall = errors.ShouldForwardCall(fakeErr); forwardCall { + clientErr = c.client.SignalWorkflowExecution(ctx, request, opts...) + } + + if fakeErr != nil { + c.logger.Error(msgInjectedFakeErr, + tag.HistoryClientOperationSignalWorkflowExecution, + tag.Error(fakeErr), + tag.Bool(forwardCall), + tag.ClientError(clientErr), + ) + return fakeErr + } + return clientErr +} + +func (c *errorInjectionClient) SignalWithStartWorkflowExecution( + ctx context.Context, + request *types.HistorySignalWithStartWorkflowExecutionRequest, + opts ...yarpc.CallOption, +) (*types.StartWorkflowExecutionResponse, error) { + fakeErr := errors.GenerateFakeError(c.errorRate) + + var resp *types.StartWorkflowExecutionResponse + var clientErr error + var forwardCall bool + if forwardCall = errors.ShouldForwardCall(fakeErr); forwardCall { + resp, clientErr = c.client.SignalWithStartWorkflowExecution(ctx, request, opts...) + } + + if fakeErr != nil { + c.logger.Error(msgInjectedFakeErr, + tag.HistoryClientOperationSignalWithStartWorkflowExecution, + tag.Error(fakeErr), + tag.Bool(forwardCall), + tag.ClientError(clientErr), + ) + return nil, fakeErr + } + return resp, clientErr +} + +func (c *errorInjectionClient) RemoveSignalMutableState( + ctx context.Context, + request *types.RemoveSignalMutableStateRequest, + opts ...yarpc.CallOption, +) error { + fakeErr := errors.GenerateFakeError(c.errorRate) + + var clientErr error + var forwardCall bool + if forwardCall = errors.ShouldForwardCall(fakeErr); forwardCall { + clientErr = c.client.RemoveSignalMutableState(ctx, request, opts...) + } + + if fakeErr != nil { + c.logger.Error(msgInjectedFakeErr, + tag.HistoryClientOperationRemoveSignalMutableState, + tag.Error(fakeErr), + tag.Bool(forwardCall), + tag.ClientError(clientErr), + ) + return fakeErr + } + return clientErr +} + +func (c *errorInjectionClient) TerminateWorkflowExecution( + ctx context.Context, + request *types.HistoryTerminateWorkflowExecutionRequest, + opts ...yarpc.CallOption, +) error { + fakeErr := errors.GenerateFakeError(c.errorRate) + + var clientErr error + var forwardCall bool + if forwardCall = errors.ShouldForwardCall(fakeErr); forwardCall { + clientErr = c.client.TerminateWorkflowExecution(ctx, request, opts...) + } + + if fakeErr != nil { + c.logger.Error(msgInjectedFakeErr, + tag.HistoryClientOperationTerminateWorkflowExecution, + tag.Error(fakeErr), + tag.Bool(forwardCall), + tag.ClientError(clientErr), + ) + return fakeErr + } + return clientErr +} + +func (c *errorInjectionClient) ResetWorkflowExecution( + ctx context.Context, + request *types.HistoryResetWorkflowExecutionRequest, + opts ...yarpc.CallOption, +) (*types.ResetWorkflowExecutionResponse, error) { + fakeErr := errors.GenerateFakeError(c.errorRate) + + var resp *types.ResetWorkflowExecutionResponse + var clientErr error + var forwardCall bool + if forwardCall = errors.ShouldForwardCall(fakeErr); forwardCall { + resp, clientErr = c.client.ResetWorkflowExecution(ctx, request, opts...) + } + + if fakeErr != nil { + c.logger.Error(msgInjectedFakeErr, + tag.HistoryClientOperationResetWorkflowExecution, + tag.Error(fakeErr), + tag.Bool(forwardCall), + tag.ClientError(clientErr), + ) + return nil, fakeErr + } + return resp, clientErr +} + +func (c *errorInjectionClient) ScheduleDecisionTask( + ctx context.Context, + request *types.ScheduleDecisionTaskRequest, + opts ...yarpc.CallOption, +) error { + fakeErr := errors.GenerateFakeError(c.errorRate) + + var clientErr error + var forwardCall bool + if forwardCall = errors.ShouldForwardCall(fakeErr); forwardCall { + clientErr = c.client.ScheduleDecisionTask(ctx, request, opts...) + } + + if fakeErr != nil { + c.logger.Error(msgInjectedFakeErr, + tag.HistoryClientOperationScheduleDecisionTask, + tag.Error(fakeErr), + tag.Bool(forwardCall), + tag.ClientError(clientErr), + ) + return fakeErr + } + return clientErr +} + +func (c *errorInjectionClient) RecordChildExecutionCompleted( + ctx context.Context, + request *types.RecordChildExecutionCompletedRequest, + opts ...yarpc.CallOption, +) error { + fakeErr := errors.GenerateFakeError(c.errorRate) + + var clientErr error + var forwardCall bool + if forwardCall = errors.ShouldForwardCall(fakeErr); forwardCall { + clientErr = c.client.RecordChildExecutionCompleted(ctx, request, opts...) + } + + if fakeErr != nil { + c.logger.Error(msgInjectedFakeErr, + tag.HistoryClientOperationRecordChildExecutionCompleted, + tag.Error(fakeErr), + tag.Bool(forwardCall), + tag.ClientError(clientErr), + ) + return fakeErr + } + return clientErr +} + +func (c *errorInjectionClient) ReplicateEventsV2( + ctx context.Context, + request *types.ReplicateEventsV2Request, + opts ...yarpc.CallOption, +) error { + fakeErr := errors.GenerateFakeError(c.errorRate) + + var clientErr error + var forwardCall bool + if forwardCall = errors.ShouldForwardCall(fakeErr); forwardCall { + clientErr = c.client.ReplicateEventsV2(ctx, request, opts...) + } + + if fakeErr != nil { + c.logger.Error(msgInjectedFakeErr, + tag.HistoryClientOperationReplicateEventsV2, + tag.Error(fakeErr), + tag.Bool(forwardCall), + tag.ClientError(clientErr), + ) + return fakeErr + } + return clientErr +} + +func (c *errorInjectionClient) SyncShardStatus( + ctx context.Context, + request *types.SyncShardStatusRequest, + opts ...yarpc.CallOption, +) error { + fakeErr := errors.GenerateFakeError(c.errorRate) + + var clientErr error + var forwardCall bool + if forwardCall = errors.ShouldForwardCall(fakeErr); forwardCall { + clientErr = c.client.SyncShardStatus(ctx, request, opts...) + } + + if fakeErr != nil { + c.logger.Error(msgInjectedFakeErr, + tag.HistoryClientOperationSyncShardStatus, + tag.Error(fakeErr), + tag.Bool(forwardCall), + tag.ClientError(clientErr), + ) + return fakeErr + } + return clientErr +} + +func (c *errorInjectionClient) SyncActivity( + ctx context.Context, + request *types.SyncActivityRequest, + opts ...yarpc.CallOption, +) error { + fakeErr := errors.GenerateFakeError(c.errorRate) + + var clientErr error + var forwardCall bool + if forwardCall = errors.ShouldForwardCall(fakeErr); forwardCall { + clientErr = c.client.SyncActivity(ctx, request, opts...) + } + + if fakeErr != nil { + c.logger.Error(msgInjectedFakeErr, + tag.HistoryClientOperationSyncActivity, + tag.Error(fakeErr), + tag.Bool(forwardCall), + tag.ClientError(clientErr), + ) + return fakeErr + } + return clientErr +} + +func (c *errorInjectionClient) GetReplicationMessages( + ctx context.Context, + request *types.GetReplicationMessagesRequest, + opts ...yarpc.CallOption, +) (*types.GetReplicationMessagesResponse, error) { + fakeErr := errors.GenerateFakeError(c.errorRate) + + var resp *types.GetReplicationMessagesResponse + var clientErr error + var forwardCall bool + if forwardCall = errors.ShouldForwardCall(fakeErr); forwardCall { + resp, clientErr = c.client.GetReplicationMessages(ctx, request, opts...) + } + + if fakeErr != nil { + c.logger.Error(msgInjectedFakeErr, + tag.HistoryClientOperationGetReplicationMessages, + tag.Error(fakeErr), + tag.Bool(forwardCall), + tag.ClientError(clientErr), + ) + return nil, fakeErr + } + return resp, clientErr +} + +func (c *errorInjectionClient) GetDLQReplicationMessages( + ctx context.Context, + request *types.GetDLQReplicationMessagesRequest, + opts ...yarpc.CallOption, +) (*types.GetDLQReplicationMessagesResponse, error) { + fakeErr := errors.GenerateFakeError(c.errorRate) + + var resp *types.GetDLQReplicationMessagesResponse + var clientErr error + var forwardCall bool + if forwardCall = errors.ShouldForwardCall(fakeErr); forwardCall { + resp, clientErr = c.client.GetDLQReplicationMessages(ctx, request, opts...) + } + + if fakeErr != nil { + c.logger.Error(msgInjectedFakeErr, + tag.HistoryClientOperationGetDLQReplicationMessages, + tag.Error(fakeErr), + tag.Bool(forwardCall), + tag.ClientError(clientErr), + ) + return nil, fakeErr + } + return resp, clientErr +} + +func (c *errorInjectionClient) QueryWorkflow( + ctx context.Context, + request *types.HistoryQueryWorkflowRequest, + opts ...yarpc.CallOption, +) (*types.HistoryQueryWorkflowResponse, error) { + fakeErr := errors.GenerateFakeError(c.errorRate) + + var resp *types.HistoryQueryWorkflowResponse + var clientErr error + var forwardCall bool + if forwardCall = errors.ShouldForwardCall(fakeErr); forwardCall { + resp, clientErr = c.client.QueryWorkflow(ctx, request, opts...) + } + + if fakeErr != nil { + c.logger.Error(msgInjectedFakeErr, + tag.HistoryClientOperationQueryWorkflow, + tag.Error(fakeErr), + tag.Bool(forwardCall), + tag.ClientError(clientErr), + ) + return nil, fakeErr + } + return resp, clientErr +} + +func (c *errorInjectionClient) ReapplyEvents( + ctx context.Context, + request *types.HistoryReapplyEventsRequest, + opts ...yarpc.CallOption, +) error { + fakeErr := errors.GenerateFakeError(c.errorRate) + + var clientErr error + var forwardCall bool + if forwardCall = errors.ShouldForwardCall(fakeErr); forwardCall { + clientErr = c.client.ReapplyEvents(ctx, request, opts...) + } + + if fakeErr != nil { + c.logger.Error(msgInjectedFakeErr, + tag.HistoryClientOperationReapplyEvents, + tag.Error(fakeErr), + tag.Bool(forwardCall), + tag.ClientError(clientErr), + ) + return fakeErr + } + return clientErr +} + +func (c *errorInjectionClient) ReadDLQMessages( + ctx context.Context, + request *types.ReadDLQMessagesRequest, + opts ...yarpc.CallOption, +) (*types.ReadDLQMessagesResponse, error) { + fakeErr := errors.GenerateFakeError(c.errorRate) + + var resp *types.ReadDLQMessagesResponse + var clientErr error + var forwardCall bool + if forwardCall = errors.ShouldForwardCall(fakeErr); forwardCall { + resp, clientErr = c.client.ReadDLQMessages(ctx, request, opts...) + } + + if fakeErr != nil { + c.logger.Error(msgInjectedFakeErr, + tag.HistoryClientOperationReadDLQMessages, + tag.Error(fakeErr), + tag.Bool(forwardCall), + tag.ClientError(clientErr), + ) + return nil, fakeErr + } + return resp, clientErr +} + +func (c *errorInjectionClient) PurgeDLQMessages( + ctx context.Context, + request *types.PurgeDLQMessagesRequest, + opts ...yarpc.CallOption, +) error { + fakeErr := errors.GenerateFakeError(c.errorRate) + + var clientErr error + var forwardCall bool + if forwardCall = errors.ShouldForwardCall(fakeErr); forwardCall { + clientErr = c.client.PurgeDLQMessages(ctx, request, opts...) + } + + if fakeErr != nil { + c.logger.Error(msgInjectedFakeErr, + tag.HistoryClientOperationPurgeDLQMessages, + tag.Error(fakeErr), + tag.Bool(forwardCall), + tag.ClientError(clientErr), + ) + return fakeErr + } + return clientErr +} + +func (c *errorInjectionClient) MergeDLQMessages( + ctx context.Context, + request *types.MergeDLQMessagesRequest, + opts ...yarpc.CallOption, +) (*types.MergeDLQMessagesResponse, error) { + fakeErr := errors.GenerateFakeError(c.errorRate) + + var resp *types.MergeDLQMessagesResponse + var clientErr error + var forwardCall bool + if forwardCall = errors.ShouldForwardCall(fakeErr); forwardCall { + resp, clientErr = c.client.MergeDLQMessages(ctx, request, opts...) + } + + if fakeErr != nil { + c.logger.Error(msgInjectedFakeErr, + tag.HistoryClientOperationMergeDLQMessages, + tag.Error(fakeErr), + tag.Bool(forwardCall), + tag.ClientError(clientErr), + ) + return nil, fakeErr + } + return resp, clientErr +} + +func (c *errorInjectionClient) RefreshWorkflowTasks( + ctx context.Context, + request *types.HistoryRefreshWorkflowTasksRequest, + opts ...yarpc.CallOption, +) error { + fakeErr := errors.GenerateFakeError(c.errorRate) + + var clientErr error + var forwardCall bool + if forwardCall = errors.ShouldForwardCall(fakeErr); forwardCall { + clientErr = c.client.RefreshWorkflowTasks(ctx, request, opts...) + } + + if fakeErr != nil { + c.logger.Error(msgInjectedFakeErr, + tag.HistoryClientOperationRefreshWorkflowTasks, + tag.Error(fakeErr), + tag.Bool(forwardCall), + tag.ClientError(clientErr), + ) + return fakeErr + } + return clientErr +} + +func (c *errorInjectionClient) NotifyFailoverMarkers( + ctx context.Context, + request *types.NotifyFailoverMarkersRequest, + opts ...yarpc.CallOption, +) error { + fakeErr := errors.GenerateFakeError(c.errorRate) + + var clientErr error + var forwardCall bool + if forwardCall = errors.ShouldForwardCall(fakeErr); forwardCall { + clientErr = c.client.NotifyFailoverMarkers(ctx, request, opts...) + } + + if fakeErr != nil { + c.logger.Error(msgInjectedFakeErr, + tag.HistoryClientOperationNotifyFailoverMarkers, + tag.Error(fakeErr), + tag.Bool(forwardCall), + tag.ClientError(clientErr), + ) + return fakeErr + } + return clientErr +} diff --git a/client/history/metricClient.go b/client/history/metricClient.go index e77e9bb80bd..d9e7f0e99ec 100644 --- a/client/history/metricClient.go +++ b/client/history/metricClient.go @@ -47,7 +47,8 @@ func NewMetricClient(client Client, metricsClient metrics.Client) Client { func (c *metricClient) StartWorkflowExecution( context context.Context, request *types.HistoryStartWorkflowExecutionRequest, - opts ...yarpc.CallOption) (*types.StartWorkflowExecutionResponse, error) { + opts ...yarpc.CallOption, +) (*types.StartWorkflowExecutionResponse, error) { c.metricsClient.IncCounter(metrics.HistoryClientStartWorkflowExecutionScope, metrics.CadenceClientRequests) sw := c.metricsClient.StartTimer(metrics.HistoryClientStartWorkflowExecutionScope, metrics.CadenceClientLatency) @@ -172,7 +173,8 @@ func (c *metricClient) DescribeMutableState( func (c *metricClient) GetMutableState( context context.Context, request *types.GetMutableStateRequest, - opts ...yarpc.CallOption) (*types.GetMutableStateResponse, error) { + opts ...yarpc.CallOption, +) (*types.GetMutableStateResponse, error) { c.metricsClient.IncCounter(metrics.HistoryClientGetMutableStateScope, metrics.CadenceClientRequests) sw := c.metricsClient.StartTimer(metrics.HistoryClientGetMutableStateScope, metrics.CadenceClientLatency) @@ -189,7 +191,8 @@ func (c *metricClient) GetMutableState( func (c *metricClient) PollMutableState( context context.Context, request *types.PollMutableStateRequest, - opts ...yarpc.CallOption) (*types.PollMutableStateResponse, error) { + opts ...yarpc.CallOption, +) (*types.PollMutableStateResponse, error) { c.metricsClient.IncCounter(metrics.HistoryClientPollMutableStateScope, metrics.CadenceClientRequests) sw := c.metricsClient.StartTimer(metrics.HistoryClientPollMutableStateScope, metrics.CadenceClientLatency) @@ -206,7 +209,8 @@ func (c *metricClient) PollMutableState( func (c *metricClient) ResetStickyTaskList( context context.Context, request *types.HistoryResetStickyTaskListRequest, - opts ...yarpc.CallOption) (*types.HistoryResetStickyTaskListResponse, error) { + opts ...yarpc.CallOption, +) (*types.HistoryResetStickyTaskListResponse, error) { c.metricsClient.IncCounter(metrics.HistoryClientResetStickyTaskListScope, metrics.CadenceClientRequests) sw := c.metricsClient.StartTimer(metrics.HistoryClientResetStickyTaskListScope, metrics.CadenceClientLatency) @@ -223,7 +227,8 @@ func (c *metricClient) ResetStickyTaskList( func (c *metricClient) DescribeWorkflowExecution( context context.Context, request *types.HistoryDescribeWorkflowExecutionRequest, - opts ...yarpc.CallOption) (*types.DescribeWorkflowExecutionResponse, error) { + opts ...yarpc.CallOption, +) (*types.DescribeWorkflowExecutionResponse, error) { c.metricsClient.IncCounter(metrics.HistoryClientDescribeWorkflowExecutionScope, metrics.CadenceClientRequests) sw := c.metricsClient.StartTimer(metrics.HistoryClientDescribeWorkflowExecutionScope, metrics.CadenceClientLatency) @@ -240,7 +245,8 @@ func (c *metricClient) DescribeWorkflowExecution( func (c *metricClient) RecordDecisionTaskStarted( context context.Context, request *types.RecordDecisionTaskStartedRequest, - opts ...yarpc.CallOption) (*types.RecordDecisionTaskStartedResponse, error) { + opts ...yarpc.CallOption, +) (*types.RecordDecisionTaskStartedResponse, error) { c.metricsClient.IncCounter(metrics.HistoryClientRecordDecisionTaskStartedScope, metrics.CadenceClientRequests) sw := c.metricsClient.StartTimer(metrics.HistoryClientRecordDecisionTaskStartedScope, metrics.CadenceClientLatency) @@ -257,7 +263,8 @@ func (c *metricClient) RecordDecisionTaskStarted( func (c *metricClient) RecordActivityTaskStarted( context context.Context, request *types.RecordActivityTaskStartedRequest, - opts ...yarpc.CallOption) (*types.RecordActivityTaskStartedResponse, error) { + opts ...yarpc.CallOption, +) (*types.RecordActivityTaskStartedResponse, error) { c.metricsClient.IncCounter(metrics.HistoryClientRecordActivityTaskStartedScope, metrics.CadenceClientRequests) sw := c.metricsClient.StartTimer(metrics.HistoryClientRecordActivityTaskStartedScope, metrics.CadenceClientLatency) @@ -274,7 +281,8 @@ func (c *metricClient) RecordActivityTaskStarted( func (c *metricClient) RespondDecisionTaskCompleted( context context.Context, request *types.HistoryRespondDecisionTaskCompletedRequest, - opts ...yarpc.CallOption) (*types.HistoryRespondDecisionTaskCompletedResponse, error) { + opts ...yarpc.CallOption, +) (*types.HistoryRespondDecisionTaskCompletedResponse, error) { c.metricsClient.IncCounter(metrics.HistoryClientRespondDecisionTaskCompletedScope, metrics.CadenceClientRequests) sw := c.metricsClient.StartTimer(metrics.HistoryClientRespondDecisionTaskCompletedScope, metrics.CadenceClientLatency) @@ -291,7 +299,8 @@ func (c *metricClient) RespondDecisionTaskCompleted( func (c *metricClient) RespondDecisionTaskFailed( context context.Context, request *types.HistoryRespondDecisionTaskFailedRequest, - opts ...yarpc.CallOption) error { + opts ...yarpc.CallOption, +) error { c.metricsClient.IncCounter(metrics.HistoryClientRespondDecisionTaskFailedScope, metrics.CadenceClientRequests) sw := c.metricsClient.StartTimer(metrics.HistoryClientRespondDecisionTaskFailedScope, metrics.CadenceClientLatency) @@ -308,7 +317,8 @@ func (c *metricClient) RespondDecisionTaskFailed( func (c *metricClient) RespondActivityTaskCompleted( context context.Context, request *types.HistoryRespondActivityTaskCompletedRequest, - opts ...yarpc.CallOption) error { + opts ...yarpc.CallOption, +) error { c.metricsClient.IncCounter(metrics.HistoryClientRespondActivityTaskCompletedScope, metrics.CadenceClientRequests) sw := c.metricsClient.StartTimer(metrics.HistoryClientRespondActivityTaskCompletedScope, metrics.CadenceClientLatency) @@ -325,7 +335,8 @@ func (c *metricClient) RespondActivityTaskCompleted( func (c *metricClient) RespondActivityTaskFailed( context context.Context, request *types.HistoryRespondActivityTaskFailedRequest, - opts ...yarpc.CallOption) error { + opts ...yarpc.CallOption, +) error { c.metricsClient.IncCounter(metrics.HistoryClientRespondActivityTaskFailedScope, metrics.CadenceClientRequests) sw := c.metricsClient.StartTimer(metrics.HistoryClientRespondActivityTaskFailedScope, metrics.CadenceClientLatency) @@ -342,7 +353,8 @@ func (c *metricClient) RespondActivityTaskFailed( func (c *metricClient) RespondActivityTaskCanceled( context context.Context, request *types.HistoryRespondActivityTaskCanceledRequest, - opts ...yarpc.CallOption) error { + opts ...yarpc.CallOption, +) error { c.metricsClient.IncCounter(metrics.HistoryClientRespondActivityTaskCanceledScope, metrics.CadenceClientRequests) sw := c.metricsClient.StartTimer(metrics.HistoryClientRespondActivityTaskCanceledScope, metrics.CadenceClientLatency) @@ -359,7 +371,8 @@ func (c *metricClient) RespondActivityTaskCanceled( func (c *metricClient) RecordActivityTaskHeartbeat( context context.Context, request *types.HistoryRecordActivityTaskHeartbeatRequest, - opts ...yarpc.CallOption) (*types.RecordActivityTaskHeartbeatResponse, error) { + opts ...yarpc.CallOption, +) (*types.RecordActivityTaskHeartbeatResponse, error) { c.metricsClient.IncCounter(metrics.HistoryClientRecordActivityTaskHeartbeatScope, metrics.CadenceClientRequests) sw := c.metricsClient.StartTimer(metrics.HistoryClientRecordActivityTaskHeartbeatScope, metrics.CadenceClientLatency) @@ -376,7 +389,8 @@ func (c *metricClient) RecordActivityTaskHeartbeat( func (c *metricClient) RequestCancelWorkflowExecution( context context.Context, request *types.HistoryRequestCancelWorkflowExecutionRequest, - opts ...yarpc.CallOption) error { + opts ...yarpc.CallOption, +) error { c.metricsClient.IncCounter(metrics.HistoryClientRequestCancelWorkflowExecutionScope, metrics.CadenceClientRequests) sw := c.metricsClient.StartTimer(metrics.HistoryClientRequestCancelWorkflowExecutionScope, metrics.CadenceClientLatency) @@ -393,7 +407,8 @@ func (c *metricClient) RequestCancelWorkflowExecution( func (c *metricClient) SignalWorkflowExecution( context context.Context, request *types.HistorySignalWorkflowExecutionRequest, - opts ...yarpc.CallOption) error { + opts ...yarpc.CallOption, +) error { c.metricsClient.IncCounter(metrics.HistoryClientSignalWorkflowExecutionScope, metrics.CadenceClientRequests) sw := c.metricsClient.StartTimer(metrics.HistoryClientSignalWorkflowExecutionScope, metrics.CadenceClientLatency) @@ -410,7 +425,8 @@ func (c *metricClient) SignalWorkflowExecution( func (c *metricClient) SignalWithStartWorkflowExecution( context context.Context, request *types.HistorySignalWithStartWorkflowExecutionRequest, - opts ...yarpc.CallOption) (*types.StartWorkflowExecutionResponse, error) { + opts ...yarpc.CallOption, +) (*types.StartWorkflowExecutionResponse, error) { c.metricsClient.IncCounter(metrics.HistoryClientSignalWithStartWorkflowExecutionScope, metrics.CadenceClientRequests) sw := c.metricsClient.StartTimer(metrics.HistoryClientSignalWithStartWorkflowExecutionScope, metrics.CadenceClientLatency) @@ -427,7 +443,8 @@ func (c *metricClient) SignalWithStartWorkflowExecution( func (c *metricClient) RemoveSignalMutableState( context context.Context, request *types.RemoveSignalMutableStateRequest, - opts ...yarpc.CallOption) error { + opts ...yarpc.CallOption, +) error { c.metricsClient.IncCounter(metrics.HistoryClientRemoveSignalMutableStateScope, metrics.CadenceClientRequests) sw := c.metricsClient.StartTimer(metrics.HistoryClientRemoveSignalMutableStateScope, metrics.CadenceClientLatency) @@ -444,7 +461,8 @@ func (c *metricClient) RemoveSignalMutableState( func (c *metricClient) TerminateWorkflowExecution( context context.Context, request *types.HistoryTerminateWorkflowExecutionRequest, - opts ...yarpc.CallOption) error { + opts ...yarpc.CallOption, +) error { c.metricsClient.IncCounter(metrics.HistoryClientTerminateWorkflowExecutionScope, metrics.CadenceClientRequests) sw := c.metricsClient.StartTimer(metrics.HistoryClientTerminateWorkflowExecutionScope, metrics.CadenceClientLatency) @@ -461,7 +479,8 @@ func (c *metricClient) TerminateWorkflowExecution( func (c *metricClient) ResetWorkflowExecution( context context.Context, request *types.HistoryResetWorkflowExecutionRequest, - opts ...yarpc.CallOption) (*types.ResetWorkflowExecutionResponse, error) { + opts ...yarpc.CallOption, +) (*types.ResetWorkflowExecutionResponse, error) { c.metricsClient.IncCounter(metrics.HistoryClientResetWorkflowExecutionScope, metrics.CadenceClientRequests) sw := c.metricsClient.StartTimer(metrics.HistoryClientResetWorkflowExecutionScope, metrics.CadenceClientLatency) @@ -478,7 +497,8 @@ func (c *metricClient) ResetWorkflowExecution( func (c *metricClient) ScheduleDecisionTask( context context.Context, request *types.ScheduleDecisionTaskRequest, - opts ...yarpc.CallOption) error { + opts ...yarpc.CallOption, +) error { c.metricsClient.IncCounter(metrics.HistoryClientScheduleDecisionTaskScope, metrics.CadenceClientRequests) sw := c.metricsClient.StartTimer(metrics.HistoryClientScheduleDecisionTaskScope, metrics.CadenceClientLatency) @@ -495,7 +515,8 @@ func (c *metricClient) ScheduleDecisionTask( func (c *metricClient) RecordChildExecutionCompleted( context context.Context, request *types.RecordChildExecutionCompletedRequest, - opts ...yarpc.CallOption) error { + opts ...yarpc.CallOption, +) error { c.metricsClient.IncCounter(metrics.HistoryClientRecordChildExecutionCompletedScope, metrics.CadenceClientRequests) sw := c.metricsClient.StartTimer(metrics.HistoryClientRecordChildExecutionCompletedScope, metrics.CadenceClientLatency) @@ -512,7 +533,8 @@ func (c *metricClient) RecordChildExecutionCompleted( func (c *metricClient) ReplicateEventsV2( context context.Context, request *types.ReplicateEventsV2Request, - opts ...yarpc.CallOption) error { + opts ...yarpc.CallOption, +) error { c.metricsClient.IncCounter(metrics.HistoryClientReplicateEventsV2Scope, metrics.CadenceClientRequests) sw := c.metricsClient.StartTimer(metrics.HistoryClientReplicateEventsV2Scope, metrics.CadenceClientLatency) @@ -529,7 +551,8 @@ func (c *metricClient) ReplicateEventsV2( func (c *metricClient) SyncShardStatus( context context.Context, request *types.SyncShardStatusRequest, - opts ...yarpc.CallOption) error { + opts ...yarpc.CallOption, +) error { c.metricsClient.IncCounter(metrics.HistoryClientSyncShardStatusScope, metrics.CadenceClientRequests) sw := c.metricsClient.StartTimer(metrics.HistoryClientSyncShardStatusScope, metrics.CadenceClientLatency) @@ -546,7 +569,8 @@ func (c *metricClient) SyncShardStatus( func (c *metricClient) SyncActivity( context context.Context, request *types.SyncActivityRequest, - opts ...yarpc.CallOption) error { + opts ...yarpc.CallOption, +) error { c.metricsClient.IncCounter(metrics.HistoryClientSyncActivityScope, metrics.CadenceClientRequests) sw := c.metricsClient.StartTimer(metrics.HistoryClientSyncActivityScope, metrics.CadenceClientLatency) diff --git a/client/history/retryableClient.go b/client/history/retryableClient.go index 48e93f00ff4..289814d06fe 100644 --- a/client/history/retryableClient.go +++ b/client/history/retryableClient.go @@ -38,7 +38,11 @@ type retryableClient struct { } // NewRetryableClient creates a new instance of Client with retry policy -func NewRetryableClient(client Client, policy backoff.RetryPolicy, isRetryable backoff.IsRetryable) Client { +func NewRetryableClient( + client Client, + policy backoff.RetryPolicy, + isRetryable backoff.IsRetryable, +) Client { return &retryableClient{ client: client, policy: policy, @@ -49,7 +53,8 @@ func NewRetryableClient(client Client, policy backoff.RetryPolicy, isRetryable b func (c *retryableClient) StartWorkflowExecution( ctx context.Context, request *types.HistoryStartWorkflowExecutionRequest, - opts ...yarpc.CallOption) (*types.StartWorkflowExecutionResponse, error) { + opts ...yarpc.CallOption, +) (*types.StartWorkflowExecutionResponse, error) { var resp *types.StartWorkflowExecutionResponse op := func() error { @@ -65,7 +70,8 @@ func (c *retryableClient) StartWorkflowExecution( func (c *retryableClient) DescribeHistoryHost( ctx context.Context, request *types.DescribeHistoryHostRequest, - opts ...yarpc.CallOption) (*types.DescribeHistoryHostResponse, error) { + opts ...yarpc.CallOption, +) (*types.DescribeHistoryHostResponse, error) { var resp *types.DescribeHistoryHostResponse op := func() error { @@ -81,7 +87,8 @@ func (c *retryableClient) DescribeHistoryHost( func (c *retryableClient) CloseShard( ctx context.Context, request *types.CloseShardRequest, - opts ...yarpc.CallOption) error { + opts ...yarpc.CallOption, +) error { op := func() error { err := c.client.CloseShard(ctx, request, opts...) @@ -127,7 +134,8 @@ func (c *retryableClient) DescribeQueue( func (c *retryableClient) RemoveTask( ctx context.Context, request *types.RemoveTaskRequest, - opts ...yarpc.CallOption) error { + opts ...yarpc.CallOption, +) error { op := func() error { err := c.client.RemoveTask(ctx, request, opts...) @@ -141,7 +149,8 @@ func (c *retryableClient) RemoveTask( func (c *retryableClient) DescribeMutableState( ctx context.Context, request *types.DescribeMutableStateRequest, - opts ...yarpc.CallOption) (*types.DescribeMutableStateResponse, error) { + opts ...yarpc.CallOption, +) (*types.DescribeMutableStateResponse, error) { var resp *types.DescribeMutableStateResponse op := func() error { @@ -157,7 +166,8 @@ func (c *retryableClient) DescribeMutableState( func (c *retryableClient) GetMutableState( ctx context.Context, request *types.GetMutableStateRequest, - opts ...yarpc.CallOption) (*types.GetMutableStateResponse, error) { + opts ...yarpc.CallOption, +) (*types.GetMutableStateResponse, error) { var resp *types.GetMutableStateResponse op := func() error { @@ -173,7 +183,8 @@ func (c *retryableClient) GetMutableState( func (c *retryableClient) PollMutableState( ctx context.Context, request *types.PollMutableStateRequest, - opts ...yarpc.CallOption) (*types.PollMutableStateResponse, error) { + opts ...yarpc.CallOption, +) (*types.PollMutableStateResponse, error) { var resp *types.PollMutableStateResponse op := func() error { @@ -189,7 +200,8 @@ func (c *retryableClient) PollMutableState( func (c *retryableClient) ResetStickyTaskList( ctx context.Context, request *types.HistoryResetStickyTaskListRequest, - opts ...yarpc.CallOption) (*types.HistoryResetStickyTaskListResponse, error) { + opts ...yarpc.CallOption, +) (*types.HistoryResetStickyTaskListResponse, error) { var resp *types.HistoryResetStickyTaskListResponse op := func() error { @@ -205,7 +217,8 @@ func (c *retryableClient) ResetStickyTaskList( func (c *retryableClient) DescribeWorkflowExecution( ctx context.Context, request *types.HistoryDescribeWorkflowExecutionRequest, - opts ...yarpc.CallOption) (*types.DescribeWorkflowExecutionResponse, error) { + opts ...yarpc.CallOption, +) (*types.DescribeWorkflowExecutionResponse, error) { var resp *types.DescribeWorkflowExecutionResponse op := func() error { @@ -221,7 +234,8 @@ func (c *retryableClient) DescribeWorkflowExecution( func (c *retryableClient) RecordDecisionTaskStarted( ctx context.Context, request *types.RecordDecisionTaskStartedRequest, - opts ...yarpc.CallOption) (*types.RecordDecisionTaskStartedResponse, error) { + opts ...yarpc.CallOption, +) (*types.RecordDecisionTaskStartedResponse, error) { var resp *types.RecordDecisionTaskStartedResponse op := func() error { @@ -237,7 +251,8 @@ func (c *retryableClient) RecordDecisionTaskStarted( func (c *retryableClient) RecordActivityTaskStarted( ctx context.Context, request *types.RecordActivityTaskStartedRequest, - opts ...yarpc.CallOption) (*types.RecordActivityTaskStartedResponse, error) { + opts ...yarpc.CallOption, +) (*types.RecordActivityTaskStartedResponse, error) { var resp *types.RecordActivityTaskStartedResponse op := func() error { @@ -253,7 +268,8 @@ func (c *retryableClient) RecordActivityTaskStarted( func (c *retryableClient) RespondDecisionTaskCompleted( ctx context.Context, request *types.HistoryRespondDecisionTaskCompletedRequest, - opts ...yarpc.CallOption) (*types.HistoryRespondDecisionTaskCompletedResponse, error) { + opts ...yarpc.CallOption, +) (*types.HistoryRespondDecisionTaskCompletedResponse, error) { var resp *types.HistoryRespondDecisionTaskCompletedResponse op := func() error { @@ -269,7 +285,8 @@ func (c *retryableClient) RespondDecisionTaskCompleted( func (c *retryableClient) RespondDecisionTaskFailed( ctx context.Context, request *types.HistoryRespondDecisionTaskFailedRequest, - opts ...yarpc.CallOption) error { + opts ...yarpc.CallOption, +) error { op := func() error { return c.client.RespondDecisionTaskFailed(ctx, request, opts...) @@ -281,7 +298,8 @@ func (c *retryableClient) RespondDecisionTaskFailed( func (c *retryableClient) RespondActivityTaskCompleted( ctx context.Context, request *types.HistoryRespondActivityTaskCompletedRequest, - opts ...yarpc.CallOption) error { + opts ...yarpc.CallOption, +) error { op := func() error { return c.client.RespondActivityTaskCompleted(ctx, request, opts...) @@ -293,7 +311,8 @@ func (c *retryableClient) RespondActivityTaskCompleted( func (c *retryableClient) RespondActivityTaskFailed( ctx context.Context, request *types.HistoryRespondActivityTaskFailedRequest, - opts ...yarpc.CallOption) error { + opts ...yarpc.CallOption, +) error { op := func() error { return c.client.RespondActivityTaskFailed(ctx, request, opts...) @@ -305,7 +324,8 @@ func (c *retryableClient) RespondActivityTaskFailed( func (c *retryableClient) RespondActivityTaskCanceled( ctx context.Context, request *types.HistoryRespondActivityTaskCanceledRequest, - opts ...yarpc.CallOption) error { + opts ...yarpc.CallOption, +) error { op := func() error { return c.client.RespondActivityTaskCanceled(ctx, request, opts...) @@ -317,7 +337,8 @@ func (c *retryableClient) RespondActivityTaskCanceled( func (c *retryableClient) RecordActivityTaskHeartbeat( ctx context.Context, request *types.HistoryRecordActivityTaskHeartbeatRequest, - opts ...yarpc.CallOption) (*types.RecordActivityTaskHeartbeatResponse, error) { + opts ...yarpc.CallOption, +) (*types.RecordActivityTaskHeartbeatResponse, error) { var resp *types.RecordActivityTaskHeartbeatResponse op := func() error { @@ -333,7 +354,8 @@ func (c *retryableClient) RecordActivityTaskHeartbeat( func (c *retryableClient) RequestCancelWorkflowExecution( ctx context.Context, request *types.HistoryRequestCancelWorkflowExecutionRequest, - opts ...yarpc.CallOption) error { + opts ...yarpc.CallOption, +) error { op := func() error { return c.client.RequestCancelWorkflowExecution(ctx, request, opts...) @@ -345,7 +367,8 @@ func (c *retryableClient) RequestCancelWorkflowExecution( func (c *retryableClient) SignalWorkflowExecution( ctx context.Context, request *types.HistorySignalWorkflowExecutionRequest, - opts ...yarpc.CallOption) error { + opts ...yarpc.CallOption, +) error { op := func() error { return c.client.SignalWorkflowExecution(ctx, request, opts...) @@ -357,7 +380,8 @@ func (c *retryableClient) SignalWorkflowExecution( func (c *retryableClient) SignalWithStartWorkflowExecution( ctx context.Context, request *types.HistorySignalWithStartWorkflowExecutionRequest, - opts ...yarpc.CallOption) (*types.StartWorkflowExecutionResponse, error) { + opts ...yarpc.CallOption, +) (*types.StartWorkflowExecutionResponse, error) { var resp *types.StartWorkflowExecutionResponse op := func() error { @@ -373,7 +397,8 @@ func (c *retryableClient) SignalWithStartWorkflowExecution( func (c *retryableClient) RemoveSignalMutableState( ctx context.Context, request *types.RemoveSignalMutableStateRequest, - opts ...yarpc.CallOption) error { + opts ...yarpc.CallOption, +) error { op := func() error { return c.client.RemoveSignalMutableState(ctx, request, opts...) @@ -385,7 +410,8 @@ func (c *retryableClient) RemoveSignalMutableState( func (c *retryableClient) TerminateWorkflowExecution( ctx context.Context, request *types.HistoryTerminateWorkflowExecutionRequest, - opts ...yarpc.CallOption) error { + opts ...yarpc.CallOption, +) error { op := func() error { return c.client.TerminateWorkflowExecution(ctx, request, opts...) @@ -397,7 +423,8 @@ func (c *retryableClient) TerminateWorkflowExecution( func (c *retryableClient) ResetWorkflowExecution( ctx context.Context, request *types.HistoryResetWorkflowExecutionRequest, - opts ...yarpc.CallOption) (*types.ResetWorkflowExecutionResponse, error) { + opts ...yarpc.CallOption, +) (*types.ResetWorkflowExecutionResponse, error) { var resp *types.ResetWorkflowExecutionResponse op := func() error { @@ -413,7 +440,8 @@ func (c *retryableClient) ResetWorkflowExecution( func (c *retryableClient) ScheduleDecisionTask( ctx context.Context, request *types.ScheduleDecisionTaskRequest, - opts ...yarpc.CallOption) error { + opts ...yarpc.CallOption, +) error { op := func() error { return c.client.ScheduleDecisionTask(ctx, request, opts...) @@ -425,7 +453,8 @@ func (c *retryableClient) ScheduleDecisionTask( func (c *retryableClient) RecordChildExecutionCompleted( ctx context.Context, request *types.RecordChildExecutionCompletedRequest, - opts ...yarpc.CallOption) error { + opts ...yarpc.CallOption, +) error { op := func() error { return c.client.RecordChildExecutionCompleted(ctx, request, opts...) @@ -437,7 +466,8 @@ func (c *retryableClient) RecordChildExecutionCompleted( func (c *retryableClient) ReplicateEventsV2( ctx context.Context, request *types.ReplicateEventsV2Request, - opts ...yarpc.CallOption) error { + opts ...yarpc.CallOption, +) error { op := func() error { return c.client.ReplicateEventsV2(ctx, request, opts...) @@ -449,7 +479,8 @@ func (c *retryableClient) ReplicateEventsV2( func (c *retryableClient) SyncShardStatus( ctx context.Context, request *types.SyncShardStatusRequest, - opts ...yarpc.CallOption) error { + opts ...yarpc.CallOption, +) error { op := func() error { return c.client.SyncShardStatus(ctx, request, opts...) @@ -461,7 +492,8 @@ func (c *retryableClient) SyncShardStatus( func (c *retryableClient) SyncActivity( ctx context.Context, request *types.SyncActivityRequest, - opts ...yarpc.CallOption) error { + opts ...yarpc.CallOption, +) error { op := func() error { return c.client.SyncActivity(ctx, request, opts...) diff --git a/common/log/tag/values.go b/common/log/tag/values.go index cedca58cdf8..e963bbb048e 100644 --- a/common/log/tag/values.go +++ b/common/log/tag/values.go @@ -320,6 +320,46 @@ var ( FrontendClientOperationGetClusterInfo = clientOperation("frontend-get-cluster-info") FrontendClientOperationListTaskListPartitions = clientOperation("frontend-list-task-list-partitions") + HistoryClientOperationStartWorkflowExecution = clientOperation("history-start-wf-execution") + HistoryClientOperationDescribeHistoryHost = clientOperation("history-describe-history-host") + HistoryClientOperationCloseShard = clientOperation("history-close-shard") + HistoryClientOperationResetQueue = clientOperation("history-reset-queue") + HistoryClientOperationDescribeQueue = clientOperation("history-describe-queue") + HistoryClientOperationRemoveTask = clientOperation("history-remove-task") + HistoryClientOperationDescribeMutableState = clientOperation("history-describe-mutable-state") + HistoryClientOperationGetMutableState = clientOperation("history-get-mutable-state") + HistoryClientOperationPollMutableState = clientOperation("history-poll-mutable-state") + HistoryClientOperationResetStickyTaskList = clientOperation("history-reset-task-list") + HistoryClientOperationDescribeWorkflowExecution = clientOperation("history-describe-wf-execution") + HistoryClientOperationRecordDecisionTaskStarted = clientOperation("history-record-decision-task-started") + HistoryClientOperationRecordActivityTaskStarted = clientOperation("history-record-activity-task-started") + HistoryClientOperationRecordDecisionTaskCompleted = clientOperation("history-record-decision-task-completed") + HistoryClientOperationRecordDecisionTaskFailed = clientOperation("history-record-decision-task-failed") + HistoryClientOperationRecordActivityTaskCompleted = clientOperation("history-record-activity-task-completed") + HistoryClientOperationRecordActivityTaskFailed = clientOperation("history-record-activity-task-failed") + HistoryClientOperationRecordActivityTaskCanceled = clientOperation("history-record-activity-task-canceled") + HistoryClientOperationRecordActivityTaskHeartbeat = clientOperation("history-record-activity-task-heartbeat") + HistoryClientOperationRequestCancelWorkflowExecution = clientOperation("history-request-cancel-wf-execution") + HistoryClientOperationSignalWorkflowExecution = clientOperation("history-signal-wf-execution") + HistoryClientOperationSignalWithStartWorkflowExecution = clientOperation("history-signal-with-start-wf-execution") + HistoryClientOperationRemoveSignalMutableState = clientOperation("history-remove-signal-mutable-state") + HistoryClientOperationTerminateWorkflowExecution = clientOperation("history-terminate-wf-execution") + HistoryClientOperationResetWorkflowExecution = clientOperation("history-reset-wf-execution") + HistoryClientOperationScheduleDecisionTask = clientOperation("history-schedule-decision-task") + HistoryClientOperationRecordChildExecutionCompleted = clientOperation("history-record-child-execution-completed") + HistoryClientOperationReplicateEventsV2 = clientOperation("history-replicate-events-v2") + HistoryClientOperationSyncShardStatus = clientOperation("history-sync-shard-status") + HistoryClientOperationSyncActivity = clientOperation("history-sync-activity") + HistoryClientOperationGetReplicationMessages = clientOperation("history-get-replication-messages") + HistoryClientOperationGetDLQReplicationMessages = clientOperation("history-get-dlq-replication-messages") + HistoryClientOperationQueryWorkflow = clientOperation("history-query-wf") + HistoryClientOperationReapplyEvents = clientOperation("history-reapply-events") + HistoryClientOperationReadDLQMessages = clientOperation("history-read-dlq-messages") + HistoryClientOperationPurgeDLQMessages = clientOperation("history-purge-dlq-messages") + HistoryClientOperationMergeDLQMessages = clientOperation("history-merge-dlq-messages") + HistoryClientOperationRefreshWorkflowTasks = clientOperation("history-refresh-wf-tasks") + HistoryClientOperationNotifyFailoverMarkers = clientOperation("history-notify-failover-markers") + MatchingClientOperationAddActivityTask = clientOperation("matching-add-activity-task") MatchingClientOperationAddDecisionTask = clientOperation("matching-add-decision-task") MatchingClientOperationPollForActivityTask = clientOperation("matching-poll-for-activity-task") diff --git a/common/service/dynamicconfig/constants.go b/common/service/dynamicconfig/constants.go index 4078fcb4963..83d6244ba75 100644 --- a/common/service/dynamicconfig/constants.go +++ b/common/service/dynamicconfig/constants.go @@ -292,6 +292,7 @@ var keys = map[Key]string{ NotifyFailoverMarkerTimerJitterCoefficient: "history.NotifyFailoverMarkerTimerJitterCoefficient", EnableDropStuckTaskByDomainID: "history.DropStuckTaskByDomain", EnableActivityLocalDispatchByDomain: "history.enableActivityLocalDispatchByDomain", + HistoryErrorInjectionRate: "history.errorInjectionRate", WorkerPersistenceMaxQPS: "worker.persistenceMaxQPS", WorkerPersistenceGlobalMaxQPS: "worker.persistenceGlobalMaxQPS", @@ -782,6 +783,9 @@ const ( // EnableDropStuckTaskByDomainID is whether stuck timer/transfer task should be dropped for a domain EnableDropStuckTaskByDomainID + // HistoryErrorInjectionRate is the rate for injecting random error in history client + HistoryErrorInjectionRate + // key for worker // WorkerPersistenceMaxQPS is the max qps worker host can query DB @@ -942,11 +946,11 @@ const ( // NotifyFailoverMarkerTimerJitterCoefficient is the jitter for failover marker notifier timer NotifyFailoverMarkerTimerJitterCoefficient - // lastKeyForTest must be the last one in this const group for testing purpose - lastKeyForTest - // EnableActivityLocalDispatchByDomain allows worker to dispatch activity tasks through local tunnel after decisions are made. This is an performance optimization to skip activity scheduling efforts. EnableActivityLocalDispatchByDomain + + // lastKeyForTest must be the last one in this const group for testing purpose + lastKeyForTest ) // Filter represents a filter on the dynamic config key