From e1883ef71e7e6054bfca4c24da94820a84a71f4a Mon Sep 17 00:00:00 2001 From: wxing1292 Date: Thu, 11 Apr 2019 11:26:42 -0700 Subject: [PATCH] Remove buffered replication task functionality in favor of resend (#1693) --- .gen/go/history/idl.go | 4 +- idl/github.com/uber/cadence/history.thrift | 2 +- service/history/historyReplicator.go | 179 ++--------------- service/history/historyReplicator_test.go | 187 +----------------- service/worker/replicator/replicationTask.go | 3 - .../worker/replicator/replicationTask_test.go | 4 - 6 files changed, 21 insertions(+), 358 deletions(-) diff --git a/.gen/go/history/idl.go b/.gen/go/history/idl.go index 3b6f1c4cc0e..53805d2513c 100644 --- a/.gen/go/history/idl.go +++ b/.gen/go/history/idl.go @@ -33,11 +33,11 @@ var ThriftModule = &thriftreflect.ThriftModule{ Name: "history", Package: "github.com/uber/cadence/.gen/go/history", FilePath: "history.thrift", - SHA1: "042f3d16f7903228e09e78b317668b9f9626002e", + SHA1: "8dd092478b36a1e29277dc4753b9370b5f363fa3", Includes: []*thriftreflect.ThriftModule{ shared.ThriftModule, }, Raw: rawIDL, } -const rawIDL = "// Copyright (c) 2017 Uber Technologies, Inc.\n//\n// Permission is hereby granted, free of charge, to any person obtaining a copy\n// of this software and associated documentation files (the \"Software\"), to deal\n// in the Software without restriction, including without limitation the rights\n// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell\n// copies of the Software, and to permit persons to whom the Software is\n// furnished to do so, subject to the following conditions:\n//\n// The above copyright notice and this permission notice shall be included in\n// all copies or substantial portions of the Software.\n//\n// THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\n// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,\n// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE\n// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER\n// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,\n// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN\n// THE SOFTWARE.\n\ninclude \"shared.thrift\"\n\nnamespace java com.uber.cadence.history\n\nexception EventAlreadyStartedError {\n 1: required string message\n}\n\nexception ShardOwnershipLostError {\n 10: optional string message\n 20: optional string owner\n}\n\nstruct ParentExecutionInfo {\n 10: optional string domainUUID\n 15: optional string domain\n 20: optional shared.WorkflowExecution execution\n 30: optional i64 (js.type = \"Long\") initiatedId\n}\n\nstruct StartWorkflowExecutionRequest {\n 10: optional string domainUUID\n 20: optional shared.StartWorkflowExecutionRequest startRequest\n 30: optional ParentExecutionInfo parentExecutionInfo\n 40: optional i32 attempt\n 50: optional i64 (js.type = \"Long\") expirationTimestamp\n 55: optional shared.ContinueAsNewInitiator continueAsNewInitiator\n 56: optional string continuedFailureReason\n 57: optional binary continuedFailureDetails\n 58: optional binary lastCompletionResult\n 60: optional i32 firstDecisionTaskBackoffSeconds\n}\n\nstruct DescribeMutableStateRequest{\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution execution\n}\n\nstruct DescribeMutableStateResponse{\n 30: optional string mutableStateInCache\n 40: optional string mutableStateInDatabase\n}\n\nstruct GetMutableStateRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution execution\n 30: optional i64 (js.type = \"Long\") expectedNextEventId\n}\n\nstruct GetMutableStateResponse {\n 10: optional shared.WorkflowExecution execution\n 20: optional shared.WorkflowType workflowType\n 30: optional i64 (js.type = \"Long\") NextEventId\n 35: optional i64 (js.type = \"Long\") PreviousStartedEventId\n 40: optional i64 (js.type = \"Long\") LastFirstEventId\n 50: optional shared.TaskList taskList\n 60: optional shared.TaskList stickyTaskList\n 70: optional string clientLibraryVersion\n 80: optional string clientFeatureVersion\n 90: optional string clientImpl\n 100: optional bool isWorkflowRunning\n 110: optional i32 stickyTaskListScheduleToStartTimeout\n 120: optional i32 eventStoreVersion\n 130: optional binary branchToken\n 140: optional map replicationInfo\n}\n\nstruct ResetStickyTaskListRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution execution\n}\n\nstruct ResetStickyTaskListResponse {\n // The reason to keep this response is to allow returning\n // information in the future.\n}\n\nstruct RespondDecisionTaskCompletedRequest {\n 10: optional string domainUUID\n 20: optional shared.RespondDecisionTaskCompletedRequest completeRequest\n}\n\nstruct RespondDecisionTaskCompletedResponse {\n 10: optional RecordDecisionTaskStartedResponse startedResponse\n}\n\nstruct RespondDecisionTaskFailedRequest {\n 10: optional string domainUUID\n 20: optional shared.RespondDecisionTaskFailedRequest failedRequest\n}\n\nstruct RecordActivityTaskHeartbeatRequest {\n 10: optional string domainUUID\n 20: optional shared.RecordActivityTaskHeartbeatRequest heartbeatRequest\n}\n\nstruct RespondActivityTaskCompletedRequest {\n 10: optional string domainUUID\n 20: optional shared.RespondActivityTaskCompletedRequest completeRequest\n}\n\nstruct RespondActivityTaskFailedRequest {\n 10: optional string domainUUID\n 20: optional shared.RespondActivityTaskFailedRequest failedRequest\n}\n\nstruct RespondActivityTaskCanceledRequest {\n 10: optional string domainUUID\n 20: optional shared.RespondActivityTaskCanceledRequest cancelRequest\n}\n\nstruct RecordActivityTaskStartedRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution workflowExecution\n 30: optional i64 (js.type = \"Long\") scheduleId\n 40: optional i64 (js.type = \"Long\") taskId\n 45: optional string requestId // Unique id of each poll request. Used to ensure at most once delivery of tasks.\n 50: optional shared.PollForActivityTaskRequest pollRequest\n}\n\nstruct RecordActivityTaskStartedResponse {\n 20: optional shared.HistoryEvent scheduledEvent\n 30: optional i64 (js.type = \"Long\") startedTimestamp\n 40: optional i64 (js.type = \"Long\") attempt\n 50: optional i64 (js.type = \"Long\") scheduledTimestampOfThisAttempt\n 60: optional binary heartbeatDetails\n 70: optional shared.WorkflowType workflowType\n 80: optional string workflowDomain\n}\n\nstruct RecordDecisionTaskStartedRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution workflowExecution\n 30: optional i64 (js.type = \"Long\") scheduleId\n 40: optional i64 (js.type = \"Long\") taskId\n 45: optional string requestId // Unique id of each poll request. Used to ensure at most once delivery of tasks.\n 50: optional shared.PollForDecisionTaskRequest pollRequest\n}\n\nstruct RecordDecisionTaskStartedResponse {\n 10: optional shared.WorkflowType workflowType\n 20: optional i64 (js.type = \"Long\") previousStartedEventId\n 30: optional i64 (js.type = \"Long\") scheduledEventId\n 40: optional i64 (js.type = \"Long\") startedEventId\n 50: optional i64 (js.type = \"Long\") nextEventId\n 60: optional i64 (js.type = \"Long\") attempt\n 70: optional bool stickyExecutionEnabled\n 80: optional shared.TransientDecisionInfo decisionInfo\n 90: optional shared.TaskList WorkflowExecutionTaskList\n 100: optional i32 eventStoreVersion\n 110: optional binary branchToken\n}\n\nstruct SignalWorkflowExecutionRequest {\n 10: optional string domainUUID\n 20: optional shared.SignalWorkflowExecutionRequest signalRequest\n 30: optional shared.WorkflowExecution externalWorkflowExecution\n 40: optional bool childWorkflowOnly\n}\n\nstruct SignalWithStartWorkflowExecutionRequest {\n 10: optional string domainUUID\n 20: optional shared.SignalWithStartWorkflowExecutionRequest signalWithStartRequest\n}\n\nstruct RemoveSignalMutableStateRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution workflowExecution\n 30: optional string requestId\n}\n\nstruct TerminateWorkflowExecutionRequest {\n 10: optional string domainUUID\n 20: optional shared.TerminateWorkflowExecutionRequest terminateRequest\n}\n\nstruct ResetWorkflowExecutionRequest {\n 10: optional string domainUUID\n 20: optional shared.ResetWorkflowExecutionRequest resetRequest\n}\n\nstruct RequestCancelWorkflowExecutionRequest {\n 10: optional string domainUUID\n 20: optional shared.RequestCancelWorkflowExecutionRequest cancelRequest\n 30: optional i64 (js.type = \"Long\") externalInitiatedEventId\n 40: optional shared.WorkflowExecution externalWorkflowExecution\n 50: optional bool childWorkflowOnly\n}\n\nstruct ScheduleDecisionTaskRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution workflowExecution\n 30: optional bool isFirstDecision\n}\n\nstruct DescribeWorkflowExecutionRequest {\n 10: optional string domainUUID\n 20: optional shared.DescribeWorkflowExecutionRequest request\n}\n\n/**\n* RecordChildExecutionCompletedRequest is used for reporting the completion of child execution to parent workflow\n* execution which started it. When a child execution is completed it creates this request and calls the\n* RecordChildExecutionCompleted API with the workflowExecution of parent. It also sets the completedExecution of the\n* child as it could potentially be different than the ChildExecutionStartedEvent of parent in the situation when\n* child creates multiple runs through ContinueAsNew before finally completing.\n**/\nstruct RecordChildExecutionCompletedRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution workflowExecution\n 30: optional i64 (js.type = \"Long\") initiatedId\n 40: optional shared.WorkflowExecution completedExecution\n 50: optional shared.HistoryEvent completionEvent\n}\n\nstruct ReplicateEventsRequest {\n 10: optional string sourceCluster\n 20: optional string domainUUID\n 30: optional shared.WorkflowExecution workflowExecution\n 40: optional i64 (js.type = \"Long\") firstEventId\n 50: optional i64 (js.type = \"Long\") nextEventId\n 60: optional i64 (js.type = \"Long\") version\n 70: optional map replicationInfo\n 80: optional shared.History history\n 90: optional shared.History newRunHistory\n 100: optional bool forceBufferEvents\n 110: optional i32 eventStoreVersion\n 120: optional i32 newRunEventStoreVersion\n 130: optional bool resetWorkflow\n}\n\nstruct ReplicateRawEventsRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution workflowExecution\n 30: optional map replicationInfo\n 40: optional shared.DataBlob history\n 50: optional shared.DataBlob newRunHistory\n 60: optional i32 eventStoreVersion\n 70: optional i32 newRunEventStoreVersion\n}\n\nstruct SyncShardStatusRequest {\n 10: optional string sourceCluster\n 20: optional i64 (js.type = \"Long\") shardId\n 30: optional i64 (js.type = \"Long\") timestamp\n}\n\nstruct SyncActivityRequest {\n 10: optional string domainId\n 20: optional string workflowId\n 30: optional string runId\n 40: optional i64 (js.type = \"Long\") version\n 50: optional i64 (js.type = \"Long\") scheduledId\n 60: optional i64 (js.type = \"Long\") scheduledTime\n 70: optional i64 (js.type = \"Long\") startedId\n 80: optional i64 (js.type = \"Long\") startedTime\n 90: optional i64 (js.type = \"Long\") lastHeartbeatTime\n 100: optional binary details\n 110: optional i32 attempt\n}\n\n/**\n* HistoryService provides API to start a new long running workflow instance, as well as query and update the history\n* of workflow instances already created.\n**/\nservice HistoryService {\n /**\n * StartWorkflowExecution starts a new long running workflow instance. It will create the instance with\n * 'WorkflowExecutionStarted' event in history and also schedule the first DecisionTask for the worker to make the\n * first decision for this instance. It will return 'WorkflowExecutionAlreadyStartedError', if an instance already\n * exists with same workflowId.\n **/\n shared.StartWorkflowExecutionResponse StartWorkflowExecution(1: StartWorkflowExecutionRequest startRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.WorkflowExecutionAlreadyStartedError sessionAlreadyExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n 7: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * Returns the information from mutable state of workflow execution.\n * It fails with 'EntityNotExistError' if specified workflow execution in unknown to the service.\n **/\n GetMutableStateResponse GetMutableState(1: GetMutableStateRequest getRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.LimitExceededError limitExceededError,\n 6: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * Reset the sticky tasklist related information in mutable state of a given workflow.\n * Things cleared are:\n * 1. StickyTaskList\n * 2. StickyScheduleToStartTimeout\n * 3. ClientLibraryVersion\n * 4. ClientFeatureVersion\n * 5. ClientImpl\n **/\n ResetStickyTaskListResponse ResetStickyTaskList(1: ResetStickyTaskListRequest resetRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.LimitExceededError limitExceededError,\n 6: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * RecordDecisionTaskStarted is called by the Matchingservice before it hands a decision task to the application worker in response to\n * a PollForDecisionTask call. It records in the history the event that the decision task has started. It will return 'EventAlreadyStartedError',\n * if the workflow's execution history already includes a record of the event starting.\n **/\n RecordDecisionTaskStartedResponse RecordDecisionTaskStarted(1: RecordDecisionTaskStartedRequest addRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: EventAlreadyStartedError eventAlreadyStartedError,\n 4: shared.EntityNotExistsError entityNotExistError,\n 5: ShardOwnershipLostError shardOwnershipLostError,\n 6: shared.DomainNotActiveError domainNotActiveError,\n 7: shared.LimitExceededError limitExceededError,\n 8: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * RecordActivityTaskStarted is called by the Matchingservice before it hands a decision task to the application worker in response to\n * a PollForActivityTask call. It records in the history the event that the decision task has started. It will return 'EventAlreadyStartedError',\n * if the workflow's execution history already includes a record of the event starting.\n **/\n RecordActivityTaskStartedResponse RecordActivityTaskStarted(1: RecordActivityTaskStartedRequest addRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: EventAlreadyStartedError eventAlreadyStartedError,\n 4: shared.EntityNotExistsError entityNotExistError,\n 5: ShardOwnershipLostError shardOwnershipLostError,\n 6: shared.DomainNotActiveError domainNotActiveError,\n 7: shared.LimitExceededError limitExceededError,\n 8: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * RespondDecisionTaskCompleted is called by application worker to complete a DecisionTask handed as a result of\n * 'PollForDecisionTask' API call. Completing a DecisionTask will result in new events for the workflow execution and\n * potentially new ActivityTask being created for corresponding decisions. It will also create a DecisionTaskCompleted\n * event in the history for that session. Use the 'taskToken' provided as response of PollForDecisionTask API call\n * for completing the DecisionTask.\n **/\n RespondDecisionTaskCompletedResponse RespondDecisionTaskCompleted(1: RespondDecisionTaskCompletedRequest completeRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n 7: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * RespondDecisionTaskFailed is called by application worker to indicate failure. This results in\n * DecisionTaskFailedEvent written to the history and a new DecisionTask created. This API can be used by client to\n * either clear sticky tasklist or report ny panics during DecisionTask processing.\n **/\n void RespondDecisionTaskFailed(1: RespondDecisionTaskFailedRequest failedRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n 7: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * RecordActivityTaskHeartbeat is called by application worker while it is processing an ActivityTask. If worker fails\n * to heartbeat within 'heartbeatTimeoutSeconds' interval for the ActivityTask, then it will be marked as timedout and\n * 'ActivityTaskTimedOut' event will be written to the workflow history. Calling 'RecordActivityTaskHeartbeat' will\n * fail with 'EntityNotExistsError' in such situations. Use the 'taskToken' provided as response of\n * PollForActivityTask API call for heartbeating.\n **/\n shared.RecordActivityTaskHeartbeatResponse RecordActivityTaskHeartbeat(1: RecordActivityTaskHeartbeatRequest heartbeatRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n 7: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * RespondActivityTaskCompleted is called by application worker when it is done processing an ActivityTask. It will\n * result in a new 'ActivityTaskCompleted' event being written to the workflow history and a new DecisionTask\n * created for the workflow so new decisions could be made. Use the 'taskToken' provided as response of\n * PollForActivityTask API call for completion. It fails with 'EntityNotExistsError' if the taskToken is not valid\n * anymore due to activity timeout.\n **/\n void RespondActivityTaskCompleted(1: RespondActivityTaskCompletedRequest completeRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n 7: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * RespondActivityTaskFailed is called by application worker when it is done processing an ActivityTask. It will\n * result in a new 'ActivityTaskFailed' event being written to the workflow history and a new DecisionTask\n * created for the workflow instance so new decisions could be made. Use the 'taskToken' provided as response of\n * PollForActivityTask API call for completion. It fails with 'EntityNotExistsError' if the taskToken is not valid\n * anymore due to activity timeout.\n **/\n void RespondActivityTaskFailed(1: RespondActivityTaskFailedRequest failRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n 7: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * RespondActivityTaskCanceled is called by application worker when it is successfully canceled an ActivityTask. It will\n * result in a new 'ActivityTaskCanceled' event being written to the workflow history and a new DecisionTask\n * created for the workflow instance so new decisions could be made. Use the 'taskToken' provided as response of\n * PollForActivityTask API call for completion. It fails with 'EntityNotExistsError' if the taskToken is not valid\n * anymore due to activity timeout.\n **/\n void RespondActivityTaskCanceled(1: RespondActivityTaskCanceledRequest canceledRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n 7: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * SignalWorkflowExecution is used to send a signal event to running workflow execution. This results in\n * WorkflowExecutionSignaled event recorded in the history and a decision task being created for the execution.\n **/\n void SignalWorkflowExecution(1: SignalWorkflowExecutionRequest signalRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.ServiceBusyError serviceBusyError,\n 7: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * SignalWithStartWorkflowExecution is used to ensure sending a signal event to a workflow execution.\n * If workflow is running, this results in WorkflowExecutionSignaled event recorded in the history\n * and a decision task being created for the execution.\n * If workflow is not running or not found, it will first try start workflow with given WorkflowIDResuePolicy,\n * and record WorkflowExecutionStarted and WorkflowExecutionSignaled event in case of success.\n * It will return `WorkflowExecutionAlreadyStartedError` if start workflow failed with given policy.\n **/\n shared.StartWorkflowExecutionResponse SignalWithStartWorkflowExecution(1: SignalWithStartWorkflowExecutionRequest signalWithStartRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: ShardOwnershipLostError shardOwnershipLostError,\n 4: shared.DomainNotActiveError domainNotActiveError,\n 5: shared.LimitExceededError limitExceededError,\n 6: shared.ServiceBusyError serviceBusyError,\n 7: shared.WorkflowExecutionAlreadyStartedError workflowAlreadyStartedError,\n )\n\n /**\n * RemoveSignalMutableState is used to remove a signal request ID that was previously recorded. This is currently\n * used to clean execution info when signal decision finished.\n **/\n void RemoveSignalMutableState(1: RemoveSignalMutableStateRequest removeRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n 7: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * TerminateWorkflowExecution terminates an existing workflow execution by recording WorkflowExecutionTerminated event\n * in the history and immediately terminating the execution instance.\n **/\n void TerminateWorkflowExecution(1: TerminateWorkflowExecutionRequest terminateRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n 7: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * ResetWorkflowExecution reset an existing workflow execution by a firstEventID of a existing event batch\n * in the history and immediately terminating the current execution instance.\n * After reset, the history will grow from nextFirstEventID.\n **/\n shared.ResetWorkflowExecutionResponse ResetWorkflowExecution(1: ResetWorkflowExecutionRequest resetRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n 7: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * RequestCancelWorkflowExecution is called by application worker when it wants to request cancellation of a workflow instance.\n * It will result in a new 'WorkflowExecutionCancelRequested' event being written to the workflow history and a new DecisionTask\n * created for the workflow instance so new decisions could be made. It fails with 'EntityNotExistsError' if the workflow is not valid\n * anymore due to completion or doesn't exist.\n **/\n void RequestCancelWorkflowExecution(1: RequestCancelWorkflowExecutionRequest cancelRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.CancellationAlreadyRequestedError cancellationAlreadyRequestedError,\n 6: shared.DomainNotActiveError domainNotActiveError,\n 7: shared.LimitExceededError limitExceededError,\n 8: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * ScheduleDecisionTask is used for creating a decision task for already started workflow execution. This is mainly\n * used by transfer queue processor during the processing of StartChildWorkflowExecution task, where it first starts\n * child execution without creating the decision task and then calls this API after updating the mutable state of\n * parent execution.\n **/\n void ScheduleDecisionTask(1: ScheduleDecisionTaskRequest scheduleRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n 7: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * RecordChildExecutionCompleted is used for reporting the completion of child workflow execution to parent.\n * This is mainly called by transfer queue processor during the processing of DeleteExecution task.\n **/\n void RecordChildExecutionCompleted(1: RecordChildExecutionCompletedRequest completionRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n 7: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * DescribeWorkflowExecution returns information about the specified workflow execution.\n **/\n shared.DescribeWorkflowExecutionResponse DescribeWorkflowExecution(1: DescribeWorkflowExecutionRequest describeRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.LimitExceededError limitExceededError,\n 6: shared.ServiceBusyError serviceBusyError,\n )\n\n void ReplicateEvents(1: ReplicateEventsRequest replicateRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.LimitExceededError limitExceededError,\n 6: shared.RetryTaskError retryTaskError,\n 7: shared.ServiceBusyError serviceBusyError,\n )\n\n void ReplicateRawEvents(1: ReplicateRawEventsRequest replicateRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.LimitExceededError limitExceededError,\n 6: shared.RetryTaskError retryTaskError,\n 7: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * SyncShardStatus sync the status between shards\n **/\n void SyncShardStatus(1: SyncShardStatusRequest syncShardStatusRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.LimitExceededError limitExceededError,\n 6: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * SyncActivity sync the activity status\n **/\n void SyncActivity(1: SyncActivityRequest syncActivityRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.ServiceBusyError serviceBusyError,\n 6: shared.RetryTaskError retryTaskError,\n )\n\n /**\n * DescribeMutableState returns information about the internal states of workflow mutable state.\n **/\n DescribeMutableStateResponse DescribeMutableState(1: DescribeMutableStateRequest request)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: shared.AccessDeniedError accessDeniedError,\n 5: ShardOwnershipLostError shardOwnershipLostError,\n 6: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * DescribeHistoryHost returns information about the internal states of a history host\n **/\n shared.DescribeHistoryHostResponse DescribeHistoryHost(1: shared.DescribeHistoryHostRequest request)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.AccessDeniedError accessDeniedError,\n )\n}\n" +const rawIDL = "// Copyright (c) 2017 Uber Technologies, Inc.\n//\n// Permission is hereby granted, free of charge, to any person obtaining a copy\n// of this software and associated documentation files (the \"Software\"), to deal\n// in the Software without restriction, including without limitation the rights\n// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell\n// copies of the Software, and to permit persons to whom the Software is\n// furnished to do so, subject to the following conditions:\n//\n// The above copyright notice and this permission notice shall be included in\n// all copies or substantial portions of the Software.\n//\n// THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\n// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,\n// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE\n// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER\n// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,\n// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN\n// THE SOFTWARE.\n\ninclude \"shared.thrift\"\n\nnamespace java com.uber.cadence.history\n\nexception EventAlreadyStartedError {\n 1: required string message\n}\n\nexception ShardOwnershipLostError {\n 10: optional string message\n 20: optional string owner\n}\n\nstruct ParentExecutionInfo {\n 10: optional string domainUUID\n 15: optional string domain\n 20: optional shared.WorkflowExecution execution\n 30: optional i64 (js.type = \"Long\") initiatedId\n}\n\nstruct StartWorkflowExecutionRequest {\n 10: optional string domainUUID\n 20: optional shared.StartWorkflowExecutionRequest startRequest\n 30: optional ParentExecutionInfo parentExecutionInfo\n 40: optional i32 attempt\n 50: optional i64 (js.type = \"Long\") expirationTimestamp\n 55: optional shared.ContinueAsNewInitiator continueAsNewInitiator\n 56: optional string continuedFailureReason\n 57: optional binary continuedFailureDetails\n 58: optional binary lastCompletionResult\n 60: optional i32 firstDecisionTaskBackoffSeconds\n}\n\nstruct DescribeMutableStateRequest{\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution execution\n}\n\nstruct DescribeMutableStateResponse{\n 30: optional string mutableStateInCache\n 40: optional string mutableStateInDatabase\n}\n\nstruct GetMutableStateRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution execution\n 30: optional i64 (js.type = \"Long\") expectedNextEventId\n}\n\nstruct GetMutableStateResponse {\n 10: optional shared.WorkflowExecution execution\n 20: optional shared.WorkflowType workflowType\n 30: optional i64 (js.type = \"Long\") NextEventId\n 35: optional i64 (js.type = \"Long\") PreviousStartedEventId\n 40: optional i64 (js.type = \"Long\") LastFirstEventId\n 50: optional shared.TaskList taskList\n 60: optional shared.TaskList stickyTaskList\n 70: optional string clientLibraryVersion\n 80: optional string clientFeatureVersion\n 90: optional string clientImpl\n 100: optional bool isWorkflowRunning\n 110: optional i32 stickyTaskListScheduleToStartTimeout\n 120: optional i32 eventStoreVersion\n 130: optional binary branchToken\n 140: optional map replicationInfo\n}\n\nstruct ResetStickyTaskListRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution execution\n}\n\nstruct ResetStickyTaskListResponse {\n // The reason to keep this response is to allow returning\n // information in the future.\n}\n\nstruct RespondDecisionTaskCompletedRequest {\n 10: optional string domainUUID\n 20: optional shared.RespondDecisionTaskCompletedRequest completeRequest\n}\n\nstruct RespondDecisionTaskCompletedResponse {\n 10: optional RecordDecisionTaskStartedResponse startedResponse\n}\n\nstruct RespondDecisionTaskFailedRequest {\n 10: optional string domainUUID\n 20: optional shared.RespondDecisionTaskFailedRequest failedRequest\n}\n\nstruct RecordActivityTaskHeartbeatRequest {\n 10: optional string domainUUID\n 20: optional shared.RecordActivityTaskHeartbeatRequest heartbeatRequest\n}\n\nstruct RespondActivityTaskCompletedRequest {\n 10: optional string domainUUID\n 20: optional shared.RespondActivityTaskCompletedRequest completeRequest\n}\n\nstruct RespondActivityTaskFailedRequest {\n 10: optional string domainUUID\n 20: optional shared.RespondActivityTaskFailedRequest failedRequest\n}\n\nstruct RespondActivityTaskCanceledRequest {\n 10: optional string domainUUID\n 20: optional shared.RespondActivityTaskCanceledRequest cancelRequest\n}\n\nstruct RecordActivityTaskStartedRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution workflowExecution\n 30: optional i64 (js.type = \"Long\") scheduleId\n 40: optional i64 (js.type = \"Long\") taskId\n 45: optional string requestId // Unique id of each poll request. Used to ensure at most once delivery of tasks.\n 50: optional shared.PollForActivityTaskRequest pollRequest\n}\n\nstruct RecordActivityTaskStartedResponse {\n 20: optional shared.HistoryEvent scheduledEvent\n 30: optional i64 (js.type = \"Long\") startedTimestamp\n 40: optional i64 (js.type = \"Long\") attempt\n 50: optional i64 (js.type = \"Long\") scheduledTimestampOfThisAttempt\n 60: optional binary heartbeatDetails\n 70: optional shared.WorkflowType workflowType\n 80: optional string workflowDomain\n}\n\nstruct RecordDecisionTaskStartedRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution workflowExecution\n 30: optional i64 (js.type = \"Long\") scheduleId\n 40: optional i64 (js.type = \"Long\") taskId\n 45: optional string requestId // Unique id of each poll request. Used to ensure at most once delivery of tasks.\n 50: optional shared.PollForDecisionTaskRequest pollRequest\n}\n\nstruct RecordDecisionTaskStartedResponse {\n 10: optional shared.WorkflowType workflowType\n 20: optional i64 (js.type = \"Long\") previousStartedEventId\n 30: optional i64 (js.type = \"Long\") scheduledEventId\n 40: optional i64 (js.type = \"Long\") startedEventId\n 50: optional i64 (js.type = \"Long\") nextEventId\n 60: optional i64 (js.type = \"Long\") attempt\n 70: optional bool stickyExecutionEnabled\n 80: optional shared.TransientDecisionInfo decisionInfo\n 90: optional shared.TaskList WorkflowExecutionTaskList\n 100: optional i32 eventStoreVersion\n 110: optional binary branchToken\n}\n\nstruct SignalWorkflowExecutionRequest {\n 10: optional string domainUUID\n 20: optional shared.SignalWorkflowExecutionRequest signalRequest\n 30: optional shared.WorkflowExecution externalWorkflowExecution\n 40: optional bool childWorkflowOnly\n}\n\nstruct SignalWithStartWorkflowExecutionRequest {\n 10: optional string domainUUID\n 20: optional shared.SignalWithStartWorkflowExecutionRequest signalWithStartRequest\n}\n\nstruct RemoveSignalMutableStateRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution workflowExecution\n 30: optional string requestId\n}\n\nstruct TerminateWorkflowExecutionRequest {\n 10: optional string domainUUID\n 20: optional shared.TerminateWorkflowExecutionRequest terminateRequest\n}\n\nstruct ResetWorkflowExecutionRequest {\n 10: optional string domainUUID\n 20: optional shared.ResetWorkflowExecutionRequest resetRequest\n}\n\nstruct RequestCancelWorkflowExecutionRequest {\n 10: optional string domainUUID\n 20: optional shared.RequestCancelWorkflowExecutionRequest cancelRequest\n 30: optional i64 (js.type = \"Long\") externalInitiatedEventId\n 40: optional shared.WorkflowExecution externalWorkflowExecution\n 50: optional bool childWorkflowOnly\n}\n\nstruct ScheduleDecisionTaskRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution workflowExecution\n 30: optional bool isFirstDecision\n}\n\nstruct DescribeWorkflowExecutionRequest {\n 10: optional string domainUUID\n 20: optional shared.DescribeWorkflowExecutionRequest request\n}\n\n/**\n* RecordChildExecutionCompletedRequest is used for reporting the completion of child execution to parent workflow\n* execution which started it. When a child execution is completed it creates this request and calls the\n* RecordChildExecutionCompleted API with the workflowExecution of parent. It also sets the completedExecution of the\n* child as it could potentially be different than the ChildExecutionStartedEvent of parent in the situation when\n* child creates multiple runs through ContinueAsNew before finally completing.\n**/\nstruct RecordChildExecutionCompletedRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution workflowExecution\n 30: optional i64 (js.type = \"Long\") initiatedId\n 40: optional shared.WorkflowExecution completedExecution\n 50: optional shared.HistoryEvent completionEvent\n}\n\nstruct ReplicateEventsRequest {\n 10: optional string sourceCluster\n 20: optional string domainUUID\n 30: optional shared.WorkflowExecution workflowExecution\n 40: optional i64 (js.type = \"Long\") firstEventId\n 50: optional i64 (js.type = \"Long\") nextEventId\n 60: optional i64 (js.type = \"Long\") version\n 70: optional map replicationInfo\n 80: optional shared.History history\n 90: optional shared.History newRunHistory\n 100: optional bool forceBufferEvents // this attribute is deprecated\n 110: optional i32 eventStoreVersion\n 120: optional i32 newRunEventStoreVersion\n 130: optional bool resetWorkflow\n}\n\nstruct ReplicateRawEventsRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution workflowExecution\n 30: optional map replicationInfo\n 40: optional shared.DataBlob history\n 50: optional shared.DataBlob newRunHistory\n 60: optional i32 eventStoreVersion\n 70: optional i32 newRunEventStoreVersion\n}\n\nstruct SyncShardStatusRequest {\n 10: optional string sourceCluster\n 20: optional i64 (js.type = \"Long\") shardId\n 30: optional i64 (js.type = \"Long\") timestamp\n}\n\nstruct SyncActivityRequest {\n 10: optional string domainId\n 20: optional string workflowId\n 30: optional string runId\n 40: optional i64 (js.type = \"Long\") version\n 50: optional i64 (js.type = \"Long\") scheduledId\n 60: optional i64 (js.type = \"Long\") scheduledTime\n 70: optional i64 (js.type = \"Long\") startedId\n 80: optional i64 (js.type = \"Long\") startedTime\n 90: optional i64 (js.type = \"Long\") lastHeartbeatTime\n 100: optional binary details\n 110: optional i32 attempt\n}\n\n/**\n* HistoryService provides API to start a new long running workflow instance, as well as query and update the history\n* of workflow instances already created.\n**/\nservice HistoryService {\n /**\n * StartWorkflowExecution starts a new long running workflow instance. It will create the instance with\n * 'WorkflowExecutionStarted' event in history and also schedule the first DecisionTask for the worker to make the\n * first decision for this instance. It will return 'WorkflowExecutionAlreadyStartedError', if an instance already\n * exists with same workflowId.\n **/\n shared.StartWorkflowExecutionResponse StartWorkflowExecution(1: StartWorkflowExecutionRequest startRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.WorkflowExecutionAlreadyStartedError sessionAlreadyExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n 7: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * Returns the information from mutable state of workflow execution.\n * It fails with 'EntityNotExistError' if specified workflow execution in unknown to the service.\n **/\n GetMutableStateResponse GetMutableState(1: GetMutableStateRequest getRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.LimitExceededError limitExceededError,\n 6: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * Reset the sticky tasklist related information in mutable state of a given workflow.\n * Things cleared are:\n * 1. StickyTaskList\n * 2. StickyScheduleToStartTimeout\n * 3. ClientLibraryVersion\n * 4. ClientFeatureVersion\n * 5. ClientImpl\n **/\n ResetStickyTaskListResponse ResetStickyTaskList(1: ResetStickyTaskListRequest resetRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.LimitExceededError limitExceededError,\n 6: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * RecordDecisionTaskStarted is called by the Matchingservice before it hands a decision task to the application worker in response to\n * a PollForDecisionTask call. It records in the history the event that the decision task has started. It will return 'EventAlreadyStartedError',\n * if the workflow's execution history already includes a record of the event starting.\n **/\n RecordDecisionTaskStartedResponse RecordDecisionTaskStarted(1: RecordDecisionTaskStartedRequest addRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: EventAlreadyStartedError eventAlreadyStartedError,\n 4: shared.EntityNotExistsError entityNotExistError,\n 5: ShardOwnershipLostError shardOwnershipLostError,\n 6: shared.DomainNotActiveError domainNotActiveError,\n 7: shared.LimitExceededError limitExceededError,\n 8: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * RecordActivityTaskStarted is called by the Matchingservice before it hands a decision task to the application worker in response to\n * a PollForActivityTask call. It records in the history the event that the decision task has started. It will return 'EventAlreadyStartedError',\n * if the workflow's execution history already includes a record of the event starting.\n **/\n RecordActivityTaskStartedResponse RecordActivityTaskStarted(1: RecordActivityTaskStartedRequest addRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: EventAlreadyStartedError eventAlreadyStartedError,\n 4: shared.EntityNotExistsError entityNotExistError,\n 5: ShardOwnershipLostError shardOwnershipLostError,\n 6: shared.DomainNotActiveError domainNotActiveError,\n 7: shared.LimitExceededError limitExceededError,\n 8: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * RespondDecisionTaskCompleted is called by application worker to complete a DecisionTask handed as a result of\n * 'PollForDecisionTask' API call. Completing a DecisionTask will result in new events for the workflow execution and\n * potentially new ActivityTask being created for corresponding decisions. It will also create a DecisionTaskCompleted\n * event in the history for that session. Use the 'taskToken' provided as response of PollForDecisionTask API call\n * for completing the DecisionTask.\n **/\n RespondDecisionTaskCompletedResponse RespondDecisionTaskCompleted(1: RespondDecisionTaskCompletedRequest completeRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n 7: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * RespondDecisionTaskFailed is called by application worker to indicate failure. This results in\n * DecisionTaskFailedEvent written to the history and a new DecisionTask created. This API can be used by client to\n * either clear sticky tasklist or report ny panics during DecisionTask processing.\n **/\n void RespondDecisionTaskFailed(1: RespondDecisionTaskFailedRequest failedRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n 7: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * RecordActivityTaskHeartbeat is called by application worker while it is processing an ActivityTask. If worker fails\n * to heartbeat within 'heartbeatTimeoutSeconds' interval for the ActivityTask, then it will be marked as timedout and\n * 'ActivityTaskTimedOut' event will be written to the workflow history. Calling 'RecordActivityTaskHeartbeat' will\n * fail with 'EntityNotExistsError' in such situations. Use the 'taskToken' provided as response of\n * PollForActivityTask API call for heartbeating.\n **/\n shared.RecordActivityTaskHeartbeatResponse RecordActivityTaskHeartbeat(1: RecordActivityTaskHeartbeatRequest heartbeatRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n 7: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * RespondActivityTaskCompleted is called by application worker when it is done processing an ActivityTask. It will\n * result in a new 'ActivityTaskCompleted' event being written to the workflow history and a new DecisionTask\n * created for the workflow so new decisions could be made. Use the 'taskToken' provided as response of\n * PollForActivityTask API call for completion. It fails with 'EntityNotExistsError' if the taskToken is not valid\n * anymore due to activity timeout.\n **/\n void RespondActivityTaskCompleted(1: RespondActivityTaskCompletedRequest completeRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n 7: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * RespondActivityTaskFailed is called by application worker when it is done processing an ActivityTask. It will\n * result in a new 'ActivityTaskFailed' event being written to the workflow history and a new DecisionTask\n * created for the workflow instance so new decisions could be made. Use the 'taskToken' provided as response of\n * PollForActivityTask API call for completion. It fails with 'EntityNotExistsError' if the taskToken is not valid\n * anymore due to activity timeout.\n **/\n void RespondActivityTaskFailed(1: RespondActivityTaskFailedRequest failRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n 7: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * RespondActivityTaskCanceled is called by application worker when it is successfully canceled an ActivityTask. It will\n * result in a new 'ActivityTaskCanceled' event being written to the workflow history and a new DecisionTask\n * created for the workflow instance so new decisions could be made. Use the 'taskToken' provided as response of\n * PollForActivityTask API call for completion. It fails with 'EntityNotExistsError' if the taskToken is not valid\n * anymore due to activity timeout.\n **/\n void RespondActivityTaskCanceled(1: RespondActivityTaskCanceledRequest canceledRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n 7: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * SignalWorkflowExecution is used to send a signal event to running workflow execution. This results in\n * WorkflowExecutionSignaled event recorded in the history and a decision task being created for the execution.\n **/\n void SignalWorkflowExecution(1: SignalWorkflowExecutionRequest signalRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.ServiceBusyError serviceBusyError,\n 7: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * SignalWithStartWorkflowExecution is used to ensure sending a signal event to a workflow execution.\n * If workflow is running, this results in WorkflowExecutionSignaled event recorded in the history\n * and a decision task being created for the execution.\n * If workflow is not running or not found, it will first try start workflow with given WorkflowIDResuePolicy,\n * and record WorkflowExecutionStarted and WorkflowExecutionSignaled event in case of success.\n * It will return `WorkflowExecutionAlreadyStartedError` if start workflow failed with given policy.\n **/\n shared.StartWorkflowExecutionResponse SignalWithStartWorkflowExecution(1: SignalWithStartWorkflowExecutionRequest signalWithStartRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: ShardOwnershipLostError shardOwnershipLostError,\n 4: shared.DomainNotActiveError domainNotActiveError,\n 5: shared.LimitExceededError limitExceededError,\n 6: shared.ServiceBusyError serviceBusyError,\n 7: shared.WorkflowExecutionAlreadyStartedError workflowAlreadyStartedError,\n )\n\n /**\n * RemoveSignalMutableState is used to remove a signal request ID that was previously recorded. This is currently\n * used to clean execution info when signal decision finished.\n **/\n void RemoveSignalMutableState(1: RemoveSignalMutableStateRequest removeRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n 7: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * TerminateWorkflowExecution terminates an existing workflow execution by recording WorkflowExecutionTerminated event\n * in the history and immediately terminating the execution instance.\n **/\n void TerminateWorkflowExecution(1: TerminateWorkflowExecutionRequest terminateRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n 7: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * ResetWorkflowExecution reset an existing workflow execution by a firstEventID of a existing event batch\n * in the history and immediately terminating the current execution instance.\n * After reset, the history will grow from nextFirstEventID.\n **/\n shared.ResetWorkflowExecutionResponse ResetWorkflowExecution(1: ResetWorkflowExecutionRequest resetRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n 7: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * RequestCancelWorkflowExecution is called by application worker when it wants to request cancellation of a workflow instance.\n * It will result in a new 'WorkflowExecutionCancelRequested' event being written to the workflow history and a new DecisionTask\n * created for the workflow instance so new decisions could be made. It fails with 'EntityNotExistsError' if the workflow is not valid\n * anymore due to completion or doesn't exist.\n **/\n void RequestCancelWorkflowExecution(1: RequestCancelWorkflowExecutionRequest cancelRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.CancellationAlreadyRequestedError cancellationAlreadyRequestedError,\n 6: shared.DomainNotActiveError domainNotActiveError,\n 7: shared.LimitExceededError limitExceededError,\n 8: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * ScheduleDecisionTask is used for creating a decision task for already started workflow execution. This is mainly\n * used by transfer queue processor during the processing of StartChildWorkflowExecution task, where it first starts\n * child execution without creating the decision task and then calls this API after updating the mutable state of\n * parent execution.\n **/\n void ScheduleDecisionTask(1: ScheduleDecisionTaskRequest scheduleRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n 7: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * RecordChildExecutionCompleted is used for reporting the completion of child workflow execution to parent.\n * This is mainly called by transfer queue processor during the processing of DeleteExecution task.\n **/\n void RecordChildExecutionCompleted(1: RecordChildExecutionCompletedRequest completionRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.DomainNotActiveError domainNotActiveError,\n 6: shared.LimitExceededError limitExceededError,\n 7: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * DescribeWorkflowExecution returns information about the specified workflow execution.\n **/\n shared.DescribeWorkflowExecutionResponse DescribeWorkflowExecution(1: DescribeWorkflowExecutionRequest describeRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.LimitExceededError limitExceededError,\n 6: shared.ServiceBusyError serviceBusyError,\n )\n\n void ReplicateEvents(1: ReplicateEventsRequest replicateRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.LimitExceededError limitExceededError,\n 6: shared.RetryTaskError retryTaskError,\n 7: shared.ServiceBusyError serviceBusyError,\n )\n\n void ReplicateRawEvents(1: ReplicateRawEventsRequest replicateRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.LimitExceededError limitExceededError,\n 6: shared.RetryTaskError retryTaskError,\n 7: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * SyncShardStatus sync the status between shards\n **/\n void SyncShardStatus(1: SyncShardStatusRequest syncShardStatusRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.LimitExceededError limitExceededError,\n 6: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * SyncActivity sync the activity status\n **/\n void SyncActivity(1: SyncActivityRequest syncActivityRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: ShardOwnershipLostError shardOwnershipLostError,\n 5: shared.ServiceBusyError serviceBusyError,\n 6: shared.RetryTaskError retryTaskError,\n )\n\n /**\n * DescribeMutableState returns information about the internal states of workflow mutable state.\n **/\n DescribeMutableStateResponse DescribeMutableState(1: DescribeMutableStateRequest request)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: shared.AccessDeniedError accessDeniedError,\n 5: ShardOwnershipLostError shardOwnershipLostError,\n 6: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * DescribeHistoryHost returns information about the internal states of a history host\n **/\n shared.DescribeHistoryHostResponse DescribeHistoryHost(1: shared.DescribeHistoryHostRequest request)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.AccessDeniedError accessDeniedError,\n )\n}\n" diff --git a/idl/github.com/uber/cadence/history.thrift b/idl/github.com/uber/cadence/history.thrift index 60a445c3837..27cfb9d0611 100644 --- a/idl/github.com/uber/cadence/history.thrift +++ b/idl/github.com/uber/cadence/history.thrift @@ -243,7 +243,7 @@ struct ReplicateEventsRequest { 70: optional map replicationInfo 80: optional shared.History history 90: optional shared.History newRunHistory - 100: optional bool forceBufferEvents + 100: optional bool forceBufferEvents // this attribute is deprecated 110: optional i32 eventStoreVersion 120: optional i32 newRunEventStoreVersion 130: optional bool resetWorkflow diff --git a/service/history/historyReplicator.go b/service/history/historyReplicator.go index 9bee23d665c..48eb64757a2 100644 --- a/service/history/historyReplicator.go +++ b/service/history/historyReplicator.go @@ -77,8 +77,6 @@ var ( ErrRetrySyncActivityMsg = "retry on applying sync activity" // ErrRetryBufferEventsMsg is returned when events are arriving out of order, should retry, or specify force apply ErrRetryBufferEventsMsg = "retry on applying buffer events" - // ErrRetryEmptyEventsMsg is returned when events size is 0 - ErrRetryEmptyEventsMsg = "retry on applying empty events" // ErrWorkflowNotFoundMsg is returned when workflow not found ErrWorkflowNotFoundMsg = "retry on workflow not found" // ErrRetryExistingWorkflowMsg is returned when events are arriving out of order, and there is another workflow with same version running @@ -286,7 +284,6 @@ func (r *historyReplicator) ApplyRawEvents(ctx context.Context, requestIn *h.Rep EventStoreVersion: requestIn.EventStoreVersion, NewRunHistory: nil, NewRunEventStoreVersion: nil, - ForceBufferEvents: common.BoolPtr(true), } if requestIn.NewRunHistory != nil { @@ -387,11 +384,6 @@ func (r *historyReplicator) ApplyEvents(ctx context.Context, request *h.Replicat } logger.WithField(logging.TagCurrentVersion, msBuilder.GetReplicationState().LastWriteVersion) - err = r.flushReplicationBuffer(ctx, context, msBuilder, logger) - if err != nil { - logError(logger, "Fail to pre-flush buffer.", err) - return err - } msBuilder, err = r.ApplyOtherEventsVersionChecking(ctx, context, msBuilder, request, logger) if err != nil || msBuilder == nil { return err @@ -423,6 +415,8 @@ func (r *historyReplicator) ApplyOtherEventsMissingMutableState(ctx context.Cont return newRetryTaskErrorWithHint(ErrWorkflowNotFoundMsg, domainID, workflowID, runID, common.FirstEventID) } currentRunID := currentMutableState.GetExecutionInfo().RunID + currentLastEventTaskID := currentMutableState.GetExecutionInfo().LastEventTaskID + currentNextEventID := currentMutableState.GetNextEventID() currentLastWriteVersion := currentMutableState.GetLastWriteVersion() currentStillRunning := currentMutableState.IsWorkflowExecutionRunning() currentRelease(nil) @@ -431,7 +425,8 @@ func (r *historyReplicator) ApplyOtherEventsMissingMutableState(ctx context.Cont logger.Info("Dropping replication task.") r.metricsClient.IncCounter(metrics.ReplicateHistoryEventsScope, metrics.StaleReplicationEventsCounter) return nil - } else if currentLastWriteVersion < lastEvent.GetVersion() { + } + if currentLastWriteVersion < lastEvent.GetVersion() { if currentStillRunning { err = r.terminateWorkflow(ctx, domainID, workflowID, currentRunID, lastEvent.GetVersion(), lastEvent.GetTimestamp(), logger) if err != nil { @@ -448,18 +443,9 @@ func (r *historyReplicator) ApplyOtherEventsMissingMutableState(ctx context.Cont return r.resetor.ApplyResetEvent(ctx, request, domainID, workflowID, currentRunID) } return newRetryTaskErrorWithHint(ErrWorkflowNotFoundMsg, domainID, workflowID, runID, common.FirstEventID) - - } - // currentLastWriteVersion == incomingVersion - logger.Debugf("Retrying replication task. Current RunID: %v, Current LastWriteVersion: %v, Incoming Version: %v.", - currentRunID, currentLastWriteVersion, lastEvent.GetVersion()) - - // try flush the current workflow buffer - currentRunID, currentNextEventID, currentStillRunning, currentLastEventTaskID, err := r.flushCurrentWorkflowBuffer(ctx, domainID, workflowID, logger) - if err != nil { - return err } + // currentLastWriteVersion == incomingVersion if currentStillRunning { if lastEvent.GetTaskId() < currentLastEventTaskID { return nil @@ -483,7 +469,6 @@ func (r *historyReplicator) ApplyOtherEventsVersionChecking(ctx context.Context, rState := msBuilder.GetReplicationState() if rState.LastWriteVersion > incomingVersion { // Replication state is already on a higher version, we can drop this event - // TODO: We need to replay external events like signal to the new version logger.Info("Dropping stale replication task.") r.metricsClient.IncCounter(metrics.ReplicateHistoryEventsScope, metrics.StaleReplicationEventsCounter) _, err = r.garbageCollectSignals(context, msBuilder, request.History.Events) @@ -601,53 +586,20 @@ func (r *historyReplicator) ApplyOtherEvents(ctx context.Context, context workfl return nil } - // out of order replication task and store it in the buffer - logger.Debugf("Buffer out of order replication task. NextEvent: %v, FirstEvent: %v", - msBuilder.GetNextEventID(), firstEventID) - - if !request.GetForceBufferEvents() { - return newRetryTaskErrorWithHint( - ErrRetryBufferEventsMsg, - context.getDomainID(), - context.getExecution().GetWorkflowId(), - context.getExecution().GetRunId(), - msBuilder.GetNextEventID(), - ) - } - - r.metricsClient.RecordTimer( - metrics.ReplicateHistoryEventsScope, - metrics.BufferReplicationTaskTimer, - time.Duration(len(request.History.Events)), + return newRetryTaskErrorWithHint( + ErrRetryBufferEventsMsg, + context.getDomainID(), + context.getExecution().GetWorkflowId(), + context.getExecution().GetRunId(), + msBuilder.GetNextEventID(), ) - - bt, ok := msBuilder.GetAllBufferedReplicationTasks()[request.GetFirstEventId()] - if ok && bt.Version >= request.GetVersion() { - // Have an existing replication task - return nil - } - - err = msBuilder.BufferReplicationTask(request) - if err != nil { - logError(logger, "Failed to buffer out of order replication task.", err) - return err - } - return r.updateMutableStateOnly(context, msBuilder) } // Apply the replication task err = r.ApplyReplicationTask(ctx, context, msBuilder, request, logger) if err != nil { logError(logger, "Fail to Apply Replication task.", err) - return err } - - // Flush buffered replication tasks after applying the update - err = r.flushReplicationBuffer(ctx, context, msBuilder, logger) - if err != nil { - logError(logger, "Fail to flush buffer.", err) - } - return err } @@ -719,81 +671,6 @@ func (r *historyReplicator) ApplyReplicationTask(ctx context.Context, context wo return err } -func (r *historyReplicator) flushReplicationBuffer(ctx context.Context, context workflowExecutionContext, msBuilder mutableState, - logger bark.Logger) error { - - if !msBuilder.IsWorkflowExecutionRunning() { - return nil - } - - domainID := msBuilder.GetExecutionInfo().DomainID - execution := shared.WorkflowExecution{ - WorkflowId: common.StringPtr(msBuilder.GetExecutionInfo().WorkflowID), - RunId: common.StringPtr(msBuilder.GetExecutionInfo().RunID), - } - - flushedCount := 0 - defer func() { - r.metricsClient.RecordTimer( - metrics.ReplicateHistoryEventsScope, - metrics.UnbufferReplicationTaskTimer, - time.Duration(flushedCount), - ) - }() - - // remove all stale buffered replication tasks - for firstEventID, bt := range msBuilder.GetAllBufferedReplicationTasks() { - if msBuilder.IsWorkflowExecutionRunning() && bt.Version < msBuilder.GetLastWriteVersion() { - msBuilder.DeleteBufferedReplicationTask(firstEventID) - applied, err := r.garbageCollectSignals(context, msBuilder, bt.History) - if err != nil { - return err - } - if !applied { - err = r.updateMutableStateOnly(context, msBuilder) - if err != nil { - return err - } - } - } - } - - // Keep on applying on applying buffered replication tasks in a loop - for msBuilder.IsWorkflowExecutionRunning() && msBuilder.HasBufferedReplicationTasks() { - nextEventID := msBuilder.GetNextEventID() - bt, ok := msBuilder.GetAllBufferedReplicationTasks()[nextEventID] - if !ok { - // Bail out if nextEventID is not in the buffer or version is stale - return nil - } - - // We need to delete the task from buffer first to make sure delete update is queued up - // Applying replication task commits the transaction along with the delete - msBuilder.DeleteBufferedReplicationTask(nextEventID) - sourceCluster := r.clusterMetadata.ClusterNameForFailoverVersion(bt.Version) - req := &h.ReplicateEventsRequest{ - SourceCluster: common.StringPtr(sourceCluster), - DomainUUID: common.StringPtr(domainID), - WorkflowExecution: &execution, - FirstEventId: common.Int64Ptr(bt.FirstEventID), - NextEventId: common.Int64Ptr(bt.NextEventID), - Version: common.Int64Ptr(bt.Version), - History: &workflow.History{Events: bt.History}, - NewRunHistory: &workflow.History{Events: bt.NewRunHistory}, - EventStoreVersion: &bt.EventStoreVersion, - NewRunEventStoreVersion: &bt.NewRunEventStoreVersion, - } - - // Apply replication task to workflow execution - if err := r.ApplyReplicationTask(ctx, context, msBuilder, req, logger); err != nil { - return err - } - flushedCount += int(bt.NextEventID - bt.FirstEventID) - } - - return nil -} - func (r *historyReplicator) replicateWorkflowStarted(ctx context.Context, context workflowExecutionContext, msBuilder mutableState, di *decisionInfo, sourceCluster string, history *shared.History, sBuilder stateBuilder, logger bark.Logger) error { @@ -967,10 +844,15 @@ func (r *historyReplicator) replicateWorkflowStarted(ctx context.Context, contex return nil } if currentLastWriteVersion == incomingVersion { - currentRunID, currentNextEventID, _, currentLastEventTaskID, err := r.flushCurrentWorkflowBuffer(ctx, domainID, execution.GetWorkflowId(), logger) + _, currentMutableState, currentRelease, err := r.getCurrentWorkflowMutableState(ctx, domainID, execution.GetWorkflowId()) if err != nil { return err } + currentRunID := currentMutableState.GetExecutionInfo().RunID + currentLastEventTaskID := currentMutableState.GetExecutionInfo().LastEventTaskID + currentNextEventID := currentMutableState.GetNextEventID() + currentRelease(nil) + if executionInfo.LastEventTaskID < currentLastEventTaskID { return nil } @@ -999,33 +881,6 @@ func (r *historyReplicator) replicateWorkflowStarted(ctx context.Context, contex return createWorkflow(isBrandNew, currentRunID, incomingVersion) } -func (r *historyReplicator) flushCurrentWorkflowBuffer(ctx context.Context, domainID string, workflowID string, - logger bark.Logger) (runID string, nextEventID int64, isRunning bool, lastEventTaskID int64, retError error) { - currentContext, currentMutableState, currentRelease, err := r.getCurrentWorkflowMutableState(ctx, domainID, - workflowID) - if err != nil { - return "", 0, false, 0, err - } - defer func() { currentRelease(retError) }() - - // since this new workflow cannot make progress due to existing workflow being open - // try flush the existing workflow's buffer see if we can make it move forward - // First check if there are events which needs to be flushed before applying the update - err = r.flushReplicationBuffer(ctx, currentContext, currentMutableState, logger) - currentRelease(err) - if err != nil { - logError(logger, "Fail to flush buffer for current workflow.", err) - return "", 0, false, 0, err - } - - runID = currentContext.getExecution().GetRunId() - nextEventID = currentMutableState.GetNextEventID() - lastEventTaskID = currentMutableState.GetExecutionInfo().LastEventTaskID - isRunning = currentMutableState.IsWorkflowExecutionRunning() - - return runID, nextEventID, isRunning, lastEventTaskID, nil -} - func (r *historyReplicator) conflictResolutionTerminateCurrentRunningIfNotSelf(ctx context.Context, msBuilder mutableState, incomingVersion int64, incomingTimestamp int64, logger bark.Logger) (currentRunID string, retError error) { // this function aims to solve the edge case when this workflow, when going through diff --git a/service/history/historyReplicator_test.go b/service/history/historyReplicator_test.go index 173ff78107f..91ba30d2ee0 100644 --- a/service/history/historyReplicator_test.go +++ b/service/history/historyReplicator_test.go @@ -1641,7 +1641,7 @@ func (s *historyReplicatorSuite) TestApplyOtherEvents_IncomingEqualToCurrent() { // TODO } -func (s *historyReplicatorSuite) TestApplyOtherEvents_IncomingGreaterThanCurrent_NoForceBuffer() { +func (s *historyReplicatorSuite) TestApplyOtherEvents_IncomingGreaterThanCurrent() { domainID := validDomainID workflowID := "some random workflow ID" runID := uuid.New() @@ -1679,161 +1679,6 @@ func (s *historyReplicatorSuite) TestApplyOtherEvents_IncomingGreaterThanCurrent s.Equal(newRetryTaskErrorWithHint(ErrRetryBufferEventsMsg, domainID, workflowID, runID, currentNextEventID), err) } -func (s *historyReplicatorSuite) TestApplyOtherEvents_IncomingGreaterThanCurrent_ForceBuffer_NoExistingBuffer_WorkflowRunning() { - currentSourceCluster := "some random current source cluster" - currentVersion := int64(4096) - currentNextEventID := int64(10) - - incomingSourceCluster := "some random incoming source cluster" - incomingVersion := currentVersion * 2 - incomingFirstEventID := currentNextEventID + 4 - incomingNextEventID := incomingFirstEventID + 4 - - context := &mockWorkflowExecutionContext{} - defer context.AssertExpectations(s.T()) - msBuilder := &mockMutableState{} - defer msBuilder.AssertExpectations(s.T()) - - request := &h.ReplicateEventsRequest{ - SourceCluster: common.StringPtr(incomingSourceCluster), - Version: common.Int64Ptr(incomingVersion), - FirstEventId: common.Int64Ptr(incomingFirstEventID), - NextEventId: common.Int64Ptr(incomingNextEventID), - ForceBufferEvents: common.BoolPtr(true), - History: &shared.History{Events: []*shared.HistoryEvent{&shared.HistoryEvent{}}}, - NewRunHistory: &shared.History{Events: []*shared.HistoryEvent{&shared.HistoryEvent{}}}, - EventStoreVersion: common.Int32Ptr(233), - NewRunEventStoreVersion: common.Int32Ptr(2333), - } - - s.mockClusterMetadata.On("ClusterNameForFailoverVersion", currentVersion).Return(currentSourceCluster) - msBuilder.On("GetAllBufferedReplicationTasks").Return(map[int64]*persistence.BufferedReplicationTask{}).Once() - msBuilder.On("GetLastWriteVersion").Return(currentVersion) - msBuilder.On("GetNextEventID").Return(currentNextEventID) - msBuilder.On("BufferReplicationTask", request).Return(nil).Once() - msBuilder.On("IsWorkflowExecutionRunning").Return(true) - - context.On("updateHelper", ([]persistence.Task)(nil), ([]persistence.Task)(nil), - mock.Anything, mock.Anything, false, (*historyBuilder)(nil), currentSourceCluster).Return(nil).Once() - - err := s.historyReplicator.ApplyOtherEvents(ctx.Background(), context, msBuilder, request, s.logger) - s.Nil(err) -} - -func (s *historyReplicatorSuite) TestApplyOtherEvents_IncomingGreaterThanCurrent_ForceBuffer_NoExistingBuffer_WorkflowClosed() { - currentVersion := int64(4096) - currentNextEventID := int64(10) - - incomingSourceCluster := "some random incoming source cluster" - incomingVersion := currentVersion * 2 - incomingFirstEventID := currentNextEventID + 4 - incomingNextEventID := incomingFirstEventID + 4 - - context := &mockWorkflowExecutionContext{} - defer context.AssertExpectations(s.T()) - msBuilder := &mockMutableState{} - defer msBuilder.AssertExpectations(s.T()) - - request := &h.ReplicateEventsRequest{ - SourceCluster: common.StringPtr(incomingSourceCluster), - Version: common.Int64Ptr(incomingVersion), - FirstEventId: common.Int64Ptr(incomingFirstEventID), - NextEventId: common.Int64Ptr(incomingNextEventID), - ForceBufferEvents: common.BoolPtr(true), - History: &shared.History{Events: []*shared.HistoryEvent{&shared.HistoryEvent{}}}, - } - - msBuilder.On("GetNextEventID").Return(currentNextEventID) - msBuilder.On("IsWorkflowExecutionRunning").Return(false) - - err := s.historyReplicator.ApplyOtherEvents(ctx.Background(), context, msBuilder, request, s.logger) - s.Nil(err) -} - -func (s *historyReplicatorSuite) TestApplyOtherEvents_IncomingGreaterThanCurrent_ForceBuffer_StaleExistingBuffer() { - currentSourceCluster := "some random current source cluster" - currentVersion := int64(4096) - currentNextEventID := int64(10) - - incomingSourceCluster := "some random incoming source cluster" - incomingVersion := currentVersion * 2 - incomingFirstEventID := currentNextEventID + 4 - incomingNextEventID := incomingFirstEventID + 4 - - context := &mockWorkflowExecutionContext{} - defer context.AssertExpectations(s.T()) - msBuilder := &mockMutableState{} - defer msBuilder.AssertExpectations(s.T()) - - request := &h.ReplicateEventsRequest{ - SourceCluster: common.StringPtr(incomingSourceCluster), - Version: common.Int64Ptr(incomingVersion), - FirstEventId: common.Int64Ptr(incomingFirstEventID), - NextEventId: common.Int64Ptr(incomingNextEventID), - ForceBufferEvents: common.BoolPtr(true), - History: &shared.History{Events: []*shared.HistoryEvent{&shared.HistoryEvent{}}}, - } - - staleBufferReplicationTask := &persistence.BufferedReplicationTask{ - FirstEventID: request.GetFirstEventId(), - NextEventID: request.GetNextEventId(), - Version: request.GetVersion() - 1, - } - - s.mockClusterMetadata.On("ClusterNameForFailoverVersion", currentVersion).Return(currentSourceCluster) - msBuilder.On("GetAllBufferedReplicationTasks").Return(map[int64]*persistence.BufferedReplicationTask{ - incomingFirstEventID: staleBufferReplicationTask, - }).Once() - msBuilder.On("GetLastWriteVersion").Return(currentVersion) - msBuilder.On("GetNextEventID").Return(currentNextEventID) - msBuilder.On("BufferReplicationTask", request).Return(nil).Once() - msBuilder.On("IsWorkflowExecutionRunning").Return(true) - - context.On("updateHelper", ([]persistence.Task)(nil), ([]persistence.Task)(nil), - mock.Anything, mock.Anything, false, (*historyBuilder)(nil), currentSourceCluster).Return(nil).Once() - - err := s.historyReplicator.ApplyOtherEvents(ctx.Background(), context, msBuilder, request, s.logger) - s.Nil(err) -} - -func (s *historyReplicatorSuite) TestApplyOtherEvents_IncomingGreaterThanCurrent_ForceBuffer_StaleIncoming() { - currentVersion := int64(4096) - currentNextEventID := int64(10) - - incomingSourceCluster := "some random incoming source cluster" - incomingVersion := currentVersion * 2 - incomingFirstEventID := currentNextEventID + 4 - incomingNextEventID := incomingFirstEventID + 4 - - context := &mockWorkflowExecutionContext{} - defer context.AssertExpectations(s.T()) - msBuilder := &mockMutableState{} - defer msBuilder.AssertExpectations(s.T()) - - request := &h.ReplicateEventsRequest{ - SourceCluster: common.StringPtr(incomingSourceCluster), - Version: common.Int64Ptr(incomingVersion), - FirstEventId: common.Int64Ptr(incomingFirstEventID), - NextEventId: common.Int64Ptr(incomingNextEventID), - ForceBufferEvents: common.BoolPtr(true), - History: &shared.History{Events: []*shared.HistoryEvent{&shared.HistoryEvent{}}}, - } - - bufferReplicationTask := &persistence.BufferedReplicationTask{ - FirstEventID: request.GetFirstEventId(), - NextEventID: request.GetNextEventId(), - Version: request.GetVersion() + 1, - } - msBuilder.On("GetNextEventID").Return(currentNextEventID) - msBuilder.On("GetAllBufferedReplicationTasks").Return(map[int64]*persistence.BufferedReplicationTask{ - incomingFirstEventID: bufferReplicationTask, - }).Once() - msBuilder.On("IsWorkflowExecutionRunning").Return(true) - - err := s.historyReplicator.ApplyOtherEvents(ctx.Background(), context, msBuilder, request, s.logger) - s.Nil(err) -} - func (s *historyReplicatorSuite) TestApplyReplicationTask() { // TODO } @@ -1867,22 +1712,6 @@ func (s *historyReplicatorSuite) TestApplyReplicationTask_WorkflowClosed() { s.Nil(err) } -func (s *historyReplicatorSuite) TestFlushReplicationBuffer() { - // TODO -} - -func (s *historyReplicatorSuite) TestFlushReplicationBuffer_AlreadyFinished() { - context := &mockWorkflowExecutionContext{} - defer context.AssertExpectations(s.T()) - msBuilder := &mockMutableState{} - defer msBuilder.AssertExpectations(s.T()) - - msBuilder.On("IsWorkflowExecutionRunning").Return(false).Once() - - err := s.historyReplicator.flushReplicationBuffer(ctx.Background(), context, msBuilder, s.logger) - s.Nil(err) -} - func (s *historyReplicatorSuite) TestReplicateWorkflowStarted_BrandNew() { domainID := validDomainID workflowID := "some random workflow ID" @@ -2903,10 +2732,6 @@ func (s *historyReplicatorSuite) TestReplicateWorkflowStarted_CurrentRunning_Inc defer contextCurrent.AssertExpectations(s.T()) contextCurrent.On("lock", mock.Anything).Return(nil) contextCurrent.On("unlock") - contextCurrent.On("getExecution").Return(&workflow.WorkflowExecution{ - WorkflowId: common.StringPtr(workflowID), - RunId: common.StringPtr(currentRunID), - }) msBuilderCurrent := &mockMutableState{} defer msBuilderCurrent.AssertExpectations(s.T()) @@ -2919,9 +2744,6 @@ func (s *historyReplicatorSuite) TestReplicateWorkflowStarted_CurrentRunning_Inc contextCurrentCacheKey := definition.NewWorkflowIdentifier(domainID, currentExecution.GetWorkflowId(), currentExecution.GetRunId()) s.historyReplicator.historyCache.PutIfNotExist(contextCurrentCacheKey, contextCurrent) - msBuilderCurrent.On("GetAllBufferedReplicationTasks").Return(map[int64]*persistence.BufferedReplicationTask{}) - msBuilderCurrent.On("IsWorkflowExecutionRunning").Return(true) - msBuilderCurrent.On("HasBufferedReplicationTasks").Return(false) msBuilderCurrent.On("GetExecutionInfo").Return(&persistence.WorkflowExecutionInfo{ DomainID: domainID, WorkflowID: workflowID, @@ -3044,10 +2866,6 @@ func (s *historyReplicatorSuite) TestReplicateWorkflowStarted_CurrentRunning_Inc defer contextCurrent.AssertExpectations(s.T()) contextCurrent.On("lock", mock.Anything).Return(nil) contextCurrent.On("unlock") - contextCurrent.On("getExecution").Return(&workflow.WorkflowExecution{ - WorkflowId: common.StringPtr(workflowID), - RunId: common.StringPtr(currentRunID), - }) msBuilderCurrent := &mockMutableState{} defer msBuilderCurrent.AssertExpectations(s.T()) @@ -3060,9 +2878,6 @@ func (s *historyReplicatorSuite) TestReplicateWorkflowStarted_CurrentRunning_Inc contextCurrentCacheKey := definition.NewWorkflowIdentifier(domainID, currentExecution.GetWorkflowId(), currentExecution.GetRunId()) s.historyReplicator.historyCache.PutIfNotExist(contextCurrentCacheKey, contextCurrent) - msBuilderCurrent.On("GetAllBufferedReplicationTasks").Return(map[int64]*persistence.BufferedReplicationTask{}) - msBuilderCurrent.On("IsWorkflowExecutionRunning").Return(true) - msBuilderCurrent.On("HasBufferedReplicationTasks").Return(false) msBuilderCurrent.On("GetExecutionInfo").Return(&persistence.WorkflowExecutionInfo{ DomainID: domainID, WorkflowID: workflowID, diff --git a/service/worker/replicator/replicationTask.go b/service/worker/replicator/replicationTask.go index 96c339b623e..9a8eae73701 100644 --- a/service/worker/replicator/replicationTask.go +++ b/service/worker/replicator/replicationTask.go @@ -244,9 +244,6 @@ func (t *historyReplicationTask) HandleErr(err error) error { func (t *historyReplicationTask) RetryErr(err error) bool { t.attempt++ - if t.attempt >= t.config.ReplicatorHistoryBufferRetryCount() { - t.req.ForceBufferEvents = common.BoolPtr(true) - } if t.attempt <= t.config.ReplicationTaskMaxRetry() && isTransientRetryableError(err) { time.Sleep(replicationTaskRetryDelay) diff --git a/service/worker/replicator/replicationTask_test.go b/service/worker/replicator/replicationTask_test.go index fcc008d1833..62d5f948d8f 100644 --- a/service/worker/replicator/replicationTask_test.go +++ b/service/worker/replicator/replicationTask_test.go @@ -415,10 +415,6 @@ func (s *historyReplicationTaskSuite) TestRetryErr_Retryable() { task.attempt = 0 s.True(task.RetryErr(err)) s.False(task.req.GetForceBufferEvents()) - - task.attempt = s.config.ReplicatorHistoryBufferRetryCount() - s.True(task.RetryErr(err)) - s.True(task.req.GetForceBufferEvents()) } func (s *historyReplicationTaskSuite) TestRetryErr_Retryable_ExceedAttempt() {