diff --git a/service/history/MockWorkflowExecutionContext.go b/service/history/MockWorkflowExecutionContext.go index 89a4f32f34c..2c95f5189bd 100644 --- a/service/history/MockWorkflowExecutionContext.go +++ b/service/history/MockWorkflowExecutionContext.go @@ -203,12 +203,12 @@ func (_m *mockWorkflowExecutionContext) appendFirstBatchHistoryForContinueAsNew( return r0 } -func (_m *mockWorkflowExecutionContext) replicateWorkflowExecution(_a0 *h.ReplicateEventsRequest, _a1 []persistence.Task, _a2 []persistence.Task, _a3 int64, _a4 int64, _a5 time.Time) error { - ret := _m.Called(_a0, _a1, _a2, _a3, _a4, _a5) +func (_m *mockWorkflowExecutionContext) replicateWorkflowExecution(_a0 *h.ReplicateEventsRequest, _a1 []persistence.Task, _a2 []persistence.Task, _a3 int64, _a4 time.Time) error { + ret := _m.Called(_a0, _a1, _a2, _a3, _a4) var r0 error - if rf, ok := ret.Get(0).(func(*h.ReplicateEventsRequest, []persistence.Task, []persistence.Task, int64, int64, time.Time) error); ok { - r0 = rf(_a0, _a1, _a2, _a3, _a4, _a5) + if rf, ok := ret.Get(0).(func(*h.ReplicateEventsRequest, []persistence.Task, []persistence.Task, int64, time.Time) error); ok { + r0 = rf(_a0, _a1, _a2, _a3, _a4) } else { r0 = ret.Error(0) } diff --git a/service/history/historyReplicationTask.go b/service/history/historyReplicationTask.go new file mode 100644 index 00000000000..d92c573d9ea --- /dev/null +++ b/service/history/historyReplicationTask.go @@ -0,0 +1,221 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package history + +import ( + "time" + + "github.com/pborman/uuid" + h "github.com/uber/cadence/.gen/go/history" + "github.com/uber/cadence/.gen/go/shared" + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/cluster" + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/log/tag" +) + +type ( + historyReplicationTask interface { + getDomainID() string + getExecution() shared.WorkflowExecution + getWorkflowID() string + getRunID() string + getEventTime() time.Time + getFirstEvent() *shared.HistoryEvent + getLastEvent() *shared.HistoryEvent + getVersion() int64 + getSourceCluster() string + getEvents() []*shared.HistoryEvent + getNewRunEvents() []*shared.HistoryEvent + getLogger() log.Logger + getRequest() *h.ReplicateEventsRequest + } + + historyReplicationTaskImpl struct { + sourceCluster string + domainID string + execution shared.WorkflowExecution + version int64 + firstEvent *shared.HistoryEvent + lastEvent *shared.HistoryEvent + eventTime time.Time + historyEvents []*shared.HistoryEvent + newRunHistoryEvents []*shared.HistoryEvent + + request *h.ReplicateEventsRequest + + startTime time.Time + logger log.Logger + } +) + +func newReplicationTask(clusterMetadata cluster.Metadata, now time.Time, logger log.Logger, + request *h.ReplicateEventsRequest) (*historyReplicationTaskImpl, error) { + + if err := validateReplicateEventsRequest(request); err != nil { + return nil, err + } + + domainID := request.GetDomainUUID() + execution := *request.WorkflowExecution + + version := request.GetVersion() + sourceCluster := clusterMetadata.ClusterNameForFailoverVersion(version) + + history := request.History + var newRunHistoryEvents []*shared.HistoryEvent + if request.NewRunHistory != nil { + newRunHistoryEvents = request.NewRunHistory.Events + } + historyEvents := history.Events + firstEvent := history.Events[0] + lastEvent := history.Events[len(history.Events)-1] + + eventTime := int64(0) + for _, event := range historyEvents { + if event.GetTimestamp() > eventTime { + eventTime = event.GetTimestamp() + } + } + + logger = logger.WithTags( + tag.WorkflowID(execution.GetWorkflowId()), + tag.WorkflowRunID(execution.GetRunId()), + tag.SourceCluster(sourceCluster), + tag.IncomingVersion(version), + tag.WorkflowFirstEventID(firstEvent.GetEventId()), + tag.WorkflowNextEventID(lastEvent.GetTaskId()+1), + ) + + return &historyReplicationTaskImpl{ + sourceCluster: sourceCluster, + domainID: domainID, + execution: shared.WorkflowExecution{ + WorkflowId: common.StringPtr(execution.GetWorkflowId()), + RunId: common.StringPtr(execution.GetRunId()), + }, + version: version, + firstEvent: firstEvent, + lastEvent: lastEvent, + eventTime: time.Unix(0, eventTime), + historyEvents: historyEvents, + newRunHistoryEvents: newRunHistoryEvents, + + request: request, + + startTime: now, // TODO use time source + logger: logger, + }, nil +} + +func (t *historyReplicationTaskImpl) getDomainID() string { + return t.domainID +} + +func (t *historyReplicationTaskImpl) getExecution() shared.WorkflowExecution { + return t.execution +} + +func (t *historyReplicationTaskImpl) getWorkflowID() string { + return t.execution.GetWorkflowId() +} + +func (t *historyReplicationTaskImpl) getRunID() string { + return t.execution.GetRunId() +} + +func (t *historyReplicationTaskImpl) getEventTime() time.Time { + return t.eventTime +} + +func (t *historyReplicationTaskImpl) getFirstEvent() *shared.HistoryEvent { + return t.firstEvent +} + +func (t *historyReplicationTaskImpl) getLastEvent() *shared.HistoryEvent { + return t.lastEvent +} + +func (t *historyReplicationTaskImpl) getVersion() int64 { + return t.version +} + +func (t *historyReplicationTaskImpl) getSourceCluster() string { + return t.sourceCluster +} + +func (t *historyReplicationTaskImpl) getEvents() []*shared.HistoryEvent { + return t.historyEvents +} + +func (t *historyReplicationTaskImpl) getNewRunEvents() []*shared.HistoryEvent { + return t.newRunHistoryEvents +} + +func (t *historyReplicationTaskImpl) getLogger() log.Logger { + return t.logger +} + +func (t *historyReplicationTaskImpl) getRequest() *h.ReplicateEventsRequest { + return t.request +} + +func validateReplicateEventsRequest(request *h.ReplicateEventsRequest) error { + if valid := validateUUID(request.GetDomainUUID()); !valid { + return ErrInvalidDomainID + } + if request.WorkflowExecution == nil { + return ErrInvalidExecution + } + if valid := validateUUID(request.WorkflowExecution.GetRunId()); !valid { + return ErrInvalidRunID + } + if request.History == nil || len(request.History.Events) == 0 { + return ErrEmptyHistoryRawEventBatch + } + if request.GetFirstEventId() != request.History.Events[0].GetEventId() || + request.GetNextEventId() != request.History.Events[len(request.History.Events)-1].GetEventId()+1 { + return ErrEventIDMismatch + } + + for _, event := range request.History.Events { + if event.GetVersion() != request.GetVersion() { + return ErrEventVersionMismatch + } + } + + if request.NewRunHistory != nil { + for _, event := range request.NewRunHistory.Events { + if event.GetVersion() != request.GetVersion() { + return ErrEventVersionMismatch + } + } + } + + return nil +} + +func validateUUID(input string) bool { + if uuid.Parse(input) == nil { + return false + } + return true +} diff --git a/service/history/historyReplicator.go b/service/history/historyReplicator.go index 99b05290db0..13619d9bcae 100644 --- a/service/history/historyReplicator.go +++ b/service/history/historyReplicator.go @@ -652,13 +652,8 @@ func (r *historyReplicator) ApplyReplicationTask(ctx ctx.Context, context workfl err = r.replicateWorkflowStarted(ctx, context, msBuilder, request.GetSourceCluster(), request.History, sBuilder, logger) default: - // Generate a transaction ID for appending events to history - transactionID, err2 := r.shard.GetNextTransferTaskID() - if err2 != nil { - return err2 - } now := time.Unix(0, lastEvent.GetTimestamp()) - err = context.replicateWorkflowExecution(request, sBuilder.getTransferTasks(), sBuilder.getTimerTasks(), lastEvent.GetEventId(), transactionID, now) + err = context.replicateWorkflowExecution(request, sBuilder.getTransferTasks(), sBuilder.getTimerTasks(), lastEvent.GetEventId(), now) } if err == nil { diff --git a/service/history/historyReplicatorV2.go b/service/history/historyReplicatorV2.go new file mode 100644 index 00000000000..af448cb5eeb --- /dev/null +++ b/service/history/historyReplicatorV2.go @@ -0,0 +1,488 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package history + +import ( + ctx "context" + "time" + + "github.com/pborman/uuid" + h "github.com/uber/cadence/.gen/go/history" + "github.com/uber/cadence/.gen/go/shared" + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/cache" + "github.com/uber/cadence/common/cluster" + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/log/tag" + "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/persistence" +) + +var ( + // ErrInvalidDomainID is returned if domain ID is invalid + ErrInvalidDomainID = &shared.BadRequestError{Message: "invalid domain ID"} + // ErrInvalidExecution is returned if execution is invalid + ErrInvalidExecution = &shared.BadRequestError{Message: "invalid execution"} + // ErrInvalidRunID is returned if run ID is invalid + ErrInvalidRunID = &shared.BadRequestError{Message: "invalid run ID"} + // ErrEventIDMismatch is returned if event ID mis-matched + ErrEventIDMismatch = &shared.BadRequestError{Message: "event ID mismatch"} + // ErrEventVersionMismatch is returned if event version mis-matched + ErrEventVersionMismatch = &shared.BadRequestError{Message: "event version mismatch"} +) + +type ( + historyReplicatorV2 struct { + shard ShardContext + historyEngine *historyEngineImpl + historyCache *historyCache + domainCache cache.DomainCache + historySerializer persistence.PayloadSerializer + historyV2Mgr persistence.HistoryV2Manager + clusterMetadata cluster.Metadata + metricsClient metrics.Client + logger log.Logger + resetor workflowResetor + + getNewConflictResolver conflictResolverProvider + getNewStateBuilder stateBuilderProvider + getNewMutableState mutableStateProvider + } +) + +func newHistoryReplicatorV2(shard ShardContext, historyEngine *historyEngineImpl, historyCache *historyCache, domainCache cache.DomainCache, + historyV2Mgr persistence.HistoryV2Manager, logger log.Logger) *historyReplicatorV2 { + replicator := &historyReplicatorV2{ + shard: shard, + historyEngine: historyEngine, + historyCache: historyCache, + domainCache: domainCache, + historySerializer: persistence.NewPayloadSerializer(), + historyV2Mgr: historyV2Mgr, + clusterMetadata: shard.GetService().GetClusterMetadata(), + metricsClient: shard.GetMetricsClient(), + logger: logger.WithTags(tag.ComponentHistoryReplicator), + + getNewConflictResolver: func(context workflowExecutionContext, logger log.Logger) conflictResolver { + return nil // TODO this is a skeleton implementation + }, + getNewStateBuilder: func(msBuilder mutableState, logger log.Logger) stateBuilder { + return nil // TODO this is a skeleton implementation + }, + getNewMutableState: func(version int64, logger log.Logger) mutableState { + return newMutableStateBuilderWithReplicationState( + shard.GetService().GetClusterMetadata().GetCurrentClusterName(), + shard, + shard.GetEventsCache(), + logger, + version, + ) + }, + } + replicator.resetor = nil // TODO wire v2 history replicator with workflow reseter + + return replicator +} + +func (r *historyReplicatorV2) ApplyEvents(ctx ctx.Context, request *h.ReplicateEventsRequest) (retError error) { + + startTime := time.Now() + task, err := newReplicationTask(r.clusterMetadata, startTime, r.logger, request) + if err != nil { + return err + } + + context, release, err := r.historyCache.getOrCreateWorkflowExecutionWithTimeout( + ctx, + task.domainID, + task.execution, + ) + if err != nil { + // for get workflow execution context, with valid run id + // err will not be of type EntityNotExistsError + return err + } + defer func() { release(retError) }() + + return r.applyEvents(ctx, context, task) +} + +func (r *historyReplicatorV2) applyEvents(ctx ctx.Context, context workflowExecutionContext, + task historyReplicationTask) (retError error) { + + switch task.getFirstEvent().GetEventType() { + case shared.EventTypeWorkflowExecutionStarted: + _, err := context.loadWorkflowExecution() + switch err.(type) { + case nil: + r.metricsClient.IncCounter(metrics.ReplicateHistoryEventsScope, metrics.DuplicateReplicationEventsCounter) + return nil + case *shared.EntityNotExistsError: + // mutable state not created, proceed + return r.applyStartEvents(ctx, context, task) + default: + // unable to get mutable state, return err so we can retry the task later + return err + } + + default: + // apply events, other than simple start workflow execution + // the continue as new + start workflow execution combination will also be processed here + mutableState, err := context.loadWorkflowExecution() + switch err.(type) { + case nil: + mutableState, err = r.applyNonStartEventsBranchChecking(ctx, context, mutableState, task) + if err != nil || mutableState == nil { + return err + } + doContinue, err := r.applyNonStartEventsEventIDChecking(ctx, context, mutableState, task) + if err != nil || !doContinue { + return err + } + return r.applyNonStartEvents(ctx, context, mutableState, task) + case *shared.EntityNotExistsError: + // mutable state not created, proceed + return r.applyNonStartEventsMissingMutableState(ctx, context, task) + default: + // unable to get mutable state, return err so we can retry the task later + return err + } + } +} + +func (r *historyReplicatorV2) applyStartEvents(ctx ctx.Context, context workflowExecutionContext, + task historyReplicationTask) (retError error) { + + requestID := uuid.New() // requestID used for start workflow execution request. This is not on the history event. + msBuilder := r.getNewMutableState(task.getVersion(), task.getLogger()) + sBuilder := r.getNewStateBuilder(msBuilder, task.getLogger()) + + // directly use stateBuilder to apply events for other events(including continueAsNew) + _, _, _, err := sBuilder.applyEvents( + task.getDomainID(), + requestID, + task.getExecution(), + task.getEvents(), + task.getNewRunEvents(), + persistence.EventStoreVersionV2, // 3+ DC will only use event store version 2 + persistence.EventStoreVersionV2, // 3+ DC will only use event store version 2 + ) + if err != nil { + return err + } + + _, err = context.appendFirstBatchEventsForStandby(msBuilder, task.getEvents()) + if err != nil { + return err + } + + // workflow passive side logic should not generate any replication task + createReplicationTask := false + transferTasks := sBuilder.getTransferTasks() + timerTasks := sBuilder.getTimerTasks() + defer func() { + if retError == nil { + r.notify(task.getSourceCluster(), task.getEventTime(), transferTasks, timerTasks) + } + }() + + // try to create the workflow execution + createMode := persistence.CreateWorkflowModeBrandNew + prevRunID := "" + prevLastWriteVersion := int64(0) + err = context.createWorkflowExecution( + msBuilder, task.getSourceCluster(), createReplicationTask, task.getEventTime(), transferTasks, timerTasks, + createMode, prevRunID, prevLastWriteVersion, + ) + if err == nil { + return nil + } + if _, ok := err.(*persistence.WorkflowExecutionAlreadyStartedError); !ok { + return err + } + + // we have WorkflowExecutionAlreadyStartedError + errExist := err.(*persistence.WorkflowExecutionAlreadyStartedError) + currentRunID := errExist.RunID + currentState := errExist.State + currentLastWriteVersion := errExist.LastWriteVersion + + if currentRunID == task.getRunID() { + return nil + } + + // current workflow is completed + if currentState == persistence.WorkflowStateCompleted { + // allow the application of workflow creation if currentLastWriteVersion > incomingVersion + // because this can be caused by missing replication events + // proceed to create workflow + createMode = persistence.CreateWorkflowModeWorkflowIDReuse + prevRunID = currentRunID + prevLastWriteVersion = currentLastWriteVersion + return context.createWorkflowExecution( + msBuilder, task.getSourceCluster(), createReplicationTask, task.getEventTime(), transferTasks, timerTasks, + createMode, prevRunID, prevLastWriteVersion, + ) + } + + // current workflow is still running + if currentLastWriteVersion > task.getVersion() { + // TODO do backfill workflow, with zombie state + return nil + } + if currentLastWriteVersion == task.getVersion() { + // TODO should the LastEventTaskID be included in the current workflow execution record? + + _, currentMutableState, currentRelease, err := r.getCurrentWorkflowMutableState( + ctx, + task.getDomainID(), + task.getWorkflowID(), + ) + if err != nil { + return err + } + currentRunID := currentMutableState.GetExecutionInfo().RunID + currentLastEventTaskID := currentMutableState.GetExecutionInfo().LastEventTaskID + currentNextEventID := currentMutableState.GetNextEventID() + currentRelease(nil) + + if msBuilder.GetExecutionInfo().LastEventTaskID < currentLastEventTaskID { + // TODO do backfill workflow, with zombie state + return nil + } + return newRetryTaskErrorWithHint(ErrRetryExistingWorkflowMsg, task.getDomainID(), + task.getWorkflowID(), currentRunID, currentNextEventID) + } + + // currentStartVersion < incomingVersion && current workflow still running + // this can happen during the failover; since we have no idea + // whether the remote active cluster is aware of the current running workflow, + // the only thing we can do is to terminate the current workflow and + // start the new workflow from the request + + // same workflow ID, same shard + _, err = r.terminateWorkflow(ctx, task.getDomainID(), + task.getWorkflowID(), currentRunID, task.getVersion(), task.getEventTime()) + if err != nil { + if _, ok := err.(*shared.EntityNotExistsError); !ok { + return err + } + // if workflow is completed just when the call is made, will get EntityNotExistsError + // we are not sure whether the workflow to be terminated ends with continue as new or not + // so when encounter EntityNotExistsError, just contiue to execute, if err occurs, + // there will be retry on the worker level + } + createMode = persistence.CreateWorkflowModeWorkflowIDReuse + prevRunID = currentRunID + prevLastWriteVersion = task.getVersion() + return context.createWorkflowExecution( + msBuilder, task.getSourceCluster(), createReplicationTask, task.getEventTime(), transferTasks, timerTasks, + createMode, prevRunID, prevLastWriteVersion, + ) +} + +func (r *historyReplicatorV2) applyNonStartEventsBranchChecking(ctx ctx.Context, context workflowExecutionContext, mutableState mutableState, + task historyReplicationTask) (mutableState, error) { + + // TODO do workflow history lowest common ancestor checking + return nil, nil +} + +func (r *historyReplicatorV2) applyNonStartEventsEventIDChecking(ctx ctx.Context, context workflowExecutionContext, mutableState mutableState, + task historyReplicationTask) (doContinue bool, retError error) { + + nextEventID := mutableState.GetNextEventID() + if task.getFirstEvent().GetEventId() < nextEventID { + // duplicate replication task + r.metricsClient.IncCounter(metrics.ReplicateHistoryEventsScope, metrics.DuplicateReplicationEventsCounter) + return false, nil + } + if task.getFirstEvent().GetEventId() > nextEventID { + return false, newRetryTaskErrorWithHint( + ErrRetryBufferEventsMsg, + task.getDomainID(), + task.getWorkflowID(), + task.getRunID(), + nextEventID, + ) + } + return true, nil +} + +func (r *historyReplicatorV2) applyNonStartEvents(ctx ctx.Context, context workflowExecutionContext, mutableState mutableState, + task historyReplicationTask) error { + + requestID := uuid.New() // requestID used for start workflow execution request. This is not on the history event. + sBuilder := r.getNewStateBuilder(mutableState, task.getLogger()) + + // directly use stateBuilder to apply events for other events(including continueAsNew) + _, _, newRunMutableState, err := sBuilder.applyEvents( + task.getDomainID(), + requestID, + task.getExecution(), + task.getEvents(), + task.getNewRunEvents(), + persistence.EventStoreVersionV2, // 3+ DC will only use event store version 2 + persistence.EventStoreVersionV2, // 3+ DC will only use event store version 2 + ) + if err != nil { + return err + } + + if newRunMutableState != nil { + // Generate a transaction ID for appending events to history + transactionID, err := r.shard.GetNextTransferTaskID() + if err != nil { + return err + } + // continueAsNew + err = context.appendFirstBatchHistoryForContinueAsNew(newRunMutableState, transactionID) + if err != nil { + return err + } + } + + err = context.replicateWorkflowExecution( + task.getRequest(), + sBuilder.getTransferTasks(), + sBuilder.getTimerTasks(), + task.getLastEvent().GetEventId(), + task.getEventTime(), + ) + if err == nil { + r.notify(task.getSourceCluster(), task.getEventTime(), sBuilder.getTransferTasks(), sBuilder.getTimerTasks()) + } + + return err +} + +func (r *historyReplicatorV2) applyNonStartEventsMissingMutableState(ctx ctx.Context, context workflowExecutionContext, + task historyReplicationTask) error { + + // we need to check the current workflow execution + _, currentMutableState, currentRelease, err := r.getCurrentWorkflowMutableState( + ctx, + task.getDomainID(), + task.getWorkflowID(), + ) + if err != nil { + if _, ok := err.(*shared.EntityNotExistsError); !ok { + return err + } + // err is EntityNotExistsError + return newRetryTaskErrorWithHint(ErrWorkflowNotFoundMsg, task.getDomainID(), task.getWorkflowID(), task.getRunID(), common.FirstEventID) + } + + currentRunID := currentMutableState.GetExecutionInfo().RunID + currentLastEventTaskID := currentMutableState.GetExecutionInfo().LastEventTaskID + currentNextEventID := currentMutableState.GetNextEventID() + currentLastWriteVersion := currentMutableState.GetLastWriteVersion() + currentStillRunning := currentMutableState.IsWorkflowExecutionRunning() + currentRelease(nil) + + if currentLastWriteVersion > task.getVersion() { + // return retry task error for backfill + return newRetryTaskErrorWithHint(ErrWorkflowNotFoundMsg, task.getDomainID(), task.getWorkflowID(), task.getRunID(), common.FirstEventID) + } + if currentLastWriteVersion < task.getVersion() { + if currentStillRunning { + _, err := r.terminateWorkflow(ctx, task.getDomainID(), task.getWorkflowID(), currentRunID, task.getVersion(), task.getEventTime()) + if err != nil { + if _, ok := err.(*shared.EntityNotExistsError); !ok { + return err + } + // if workflow is completed just when the call is made, will get EntityNotExistsError + // we are not sure whether the workflow to be terminated ends with continue as new or not + // so when encounter EntityNotExistsError, just continue to execute, if err occurs, + // there will be retry on the worker level + } + // TODO if local was not active, better return a retry task error backfill all history, in case kafka lost events. + } + if task.getRequest().GetResetWorkflow() { + return r.resetor.ApplyResetEvent(ctx, task.getRequest(), task.getDomainID(), task.getWorkflowID(), currentRunID) + } + return newRetryTaskErrorWithHint(ErrWorkflowNotFoundMsg, task.getDomainID(), task.getWorkflowID(), task.getRunID(), common.FirstEventID) + } + + // currentLastWriteVersion == incomingVersion + if currentStillRunning { + if task.getLastEvent().GetTaskId() < currentLastEventTaskID { + // return retry task error for backfill + return newRetryTaskErrorWithHint(ErrWorkflowNotFoundMsg, task.getDomainID(), task.getWorkflowID(), task.getRunID(), common.FirstEventID) + } + return newRetryTaskErrorWithHint(ErrWorkflowNotFoundMsg, task.getDomainID(), task.getWorkflowID(), currentRunID, currentNextEventID) + } + + if task.getRequest().GetResetWorkflow() { + //Note that at this point, current run is already closed and currentLastWriteVersion <= incomingVersion + return r.resetor.ApplyResetEvent(ctx, task.getRequest(), task.getDomainID(), task.getWorkflowID(), currentRunID) + } + return newRetryTaskErrorWithHint(ErrWorkflowNotFoundMsg, task.getDomainID(), task.getWorkflowID(), task.getRunID(), common.FirstEventID) +} + +// func (r *historyReplicator) getCurrentWorkflowInfo(domainID string, workflowID string) (runID string, lastWriteVersion int64, closeStatus int, retError error) { +func (r *historyReplicatorV2) getCurrentWorkflowMutableState(ctx ctx.Context, domainID string, + workflowID string) (workflowExecutionContext, mutableState, releaseWorkflowExecutionFunc, error) { + + // TODO refactor for case if context is already current workflow + + // we need to check the current workflow execution + context, release, err := r.historyCache.getOrCreateWorkflowExecutionWithTimeout(ctx, + domainID, + // only use the workflow ID, to get the current running one + shared.WorkflowExecution{WorkflowId: common.StringPtr(workflowID)}, + ) + if err != nil { + return nil, nil, nil, err + } + + msBuilder, err := context.loadWorkflowExecution() + if err != nil { + // no matter what error happen, we need to retry + release(err) + return nil, nil, nil, err + } + return context, msBuilder, release, nil +} + +func (r *historyReplicatorV2) terminateWorkflow(ctx ctx.Context, domainID string, workflowID string, runID string, + terminationEventVersion int64, now time.Time) (bool, error) { + + // TODO should this workflow termination event be replicated? + // TODO 2DC vs 3+DC, different logic + + // TODO if the workflow to be terminated has last write version being local active + // terminate with last write version and generate replication task + // TODO if the workflow to be terminated's last write version being remote active + // terminate with incoming version + + return false, nil +} + +func (r *historyReplicatorV2) notify(clusterName string, now time.Time, + transferTasks []persistence.Task, timerTasks []persistence.Task) { + + now = now.Add(-r.shard.GetConfig().StandbyClusterDelay()) + + r.shard.SetCurrentTime(clusterName, now) + r.historyEngine.txProcessor.NotifyNewTask(clusterName, transferTasks) + r.historyEngine.timerProcessor.NotifyNewTimers(clusterName, now, timerTasks) +} diff --git a/service/history/stateBuilder.go b/service/history/stateBuilder.go index d271863fe42..797312b6140 100644 --- a/service/history/stateBuilder.go +++ b/service/history/stateBuilder.go @@ -62,6 +62,8 @@ type ( ) const ( + // ErrMessageHistorySizeZero indicate that history is empty + ErrMessageHistorySizeZero = "encounter history size being zero" // ErrMessageNewRunHistorySizeZero indicate that new run history is empty ErrMessageNewRunHistorySizeZero = "encounter new run history size being zero" ) @@ -102,20 +104,23 @@ func (b *stateBuilderImpl) getNewRunTimerTasks() []persistence.Task { func (b *stateBuilderImpl) applyEvents(domainID, requestID string, execution shared.WorkflowExecution, history []*shared.HistoryEvent, newRunHistory []*shared.HistoryEvent, eventStoreVersion, newRunEventStoreVersion int32) (*shared.HistoryEvent, *decisionInfo, mutableState, error) { - var lastEvent *shared.HistoryEvent + + if len(history) == 0 { + return nil, nil, nil, errors.NewInternalFailureError(ErrMessageHistorySizeZero) + } + firstEvent := history[0] + lastEvent := history[len(history)-1] var lastDecision *decisionInfo var newRunMutableStateBuilder mutableState - var firstEvent *shared.HistoryEvent - if len(history) > 0 { - firstEvent = history[0] - } // need to clear the stickiness since workflow turned to passive b.msBuilder.ClearStickyness() + for _, event := range history { - lastEvent = event // NOTE: stateBuilder is also being used in the active side if b.msBuilder.GetReplicationState() != nil { + // this function must be called within the for loop, in case + // history event version changed during for loop b.msBuilder.UpdateReplicationStateVersion(event.GetVersion(), true) sourceClusterName := b.clusterMetadata.ClusterNameForFailoverVersion(lastEvent.GetVersion()) b.msBuilder.UpdateReplicationStateLastEventID(sourceClusterName, lastEvent.GetVersion(), lastEvent.GetEventId()) @@ -464,6 +469,9 @@ func (b *stateBuilderImpl) applyEvents(domainID, requestID string, execution sha } } + b.msBuilder.GetExecutionInfo().SetLastFirstEventID(firstEvent.GetEventId()) + b.msBuilder.GetExecutionInfo().SetNextEventID(lastEvent.GetEventId() + 1) + return lastEvent, lastDecision, newRunMutableStateBuilder, nil } diff --git a/service/history/workflowExecutionContext.go b/service/history/workflowExecutionContext.go index 3df1faece30..608798a1f54 100644 --- a/service/history/workflowExecutionContext.go +++ b/service/history/workflowExecutionContext.go @@ -60,7 +60,7 @@ type ( getLogger() log.Logger loadWorkflowExecution() (mutableState, error) lock(context.Context) error - replicateWorkflowExecution(request *h.ReplicateEventsRequest, transferTasks []persistence.Task, timerTasks []persistence.Task, lastEventID, transactionID int64, now time.Time) error + replicateWorkflowExecution(request *h.ReplicateEventsRequest, transferTasks []persistence.Task, timerTasks []persistence.Task, lastEventID int64, now time.Time) error resetMutableState(prevRunID string, resetBuilder mutableState) (mutableState, error) resetWorkflowExecution(currMutableState mutableState, updateCurr bool, closeTask, cleanupTask persistence.Task, newMutableState mutableState, transferTasks, timerTasks, currReplicationTasks, insertReplicationTasks []persistence.Task, baseRunID string, forkRunNextEventID, prevRunVersion int64) (retError error) scheduleNewDecision(transferTasks []persistence.Task, timerTasks []persistence.Task) ([]persistence.Task, []persistence.Task, error) @@ -406,7 +406,13 @@ func (c *workflowExecutionContextImpl) updateWorkflowExecutionWithNewRunAndConte } func (c *workflowExecutionContextImpl) replicateWorkflowExecution(request *h.ReplicateEventsRequest, - transferTasks []persistence.Task, timerTasks []persistence.Task, lastEventID, transactionID int64, now time.Time) error { + transferTasks []persistence.Task, timerTasks []persistence.Task, lastEventID int64, now time.Time) error { + + transactionID, err := c.shard.GetNextTransferTaskID() + if err != nil { + return err + } + nextEventID := lastEventID + 1 c.msBuilder.GetExecutionInfo().SetNextEventID(nextEventID)