Skip to content

Commit

Permalink
Remove buffered replication task functionality in favor of resend (ca…
Browse files Browse the repository at this point in the history
  • Loading branch information
wxing1292 authored Apr 11, 2019
1 parent a5880aa commit e1883ef
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 358 deletions.
4 changes: 2 additions & 2 deletions .gen/go/history/idl.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion idl/github.com/uber/cadence/history.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ struct ReplicateEventsRequest {
70: optional map<string, shared.ReplicationInfo> replicationInfo
80: optional shared.History history
90: optional shared.History newRunHistory
100: optional bool forceBufferEvents
100: optional bool forceBufferEvents // this attribute is deprecated
110: optional i32 eventStoreVersion
120: optional i32 newRunEventStoreVersion
130: optional bool resetWorkflow
Expand Down
179 changes: 17 additions & 162 deletions service/history/historyReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ var (
ErrRetrySyncActivityMsg = "retry on applying sync activity"
// ErrRetryBufferEventsMsg is returned when events are arriving out of order, should retry, or specify force apply
ErrRetryBufferEventsMsg = "retry on applying buffer events"
// ErrRetryEmptyEventsMsg is returned when events size is 0
ErrRetryEmptyEventsMsg = "retry on applying empty events"
// ErrWorkflowNotFoundMsg is returned when workflow not found
ErrWorkflowNotFoundMsg = "retry on workflow not found"
// ErrRetryExistingWorkflowMsg is returned when events are arriving out of order, and there is another workflow with same version running
Expand Down Expand Up @@ -286,7 +284,6 @@ func (r *historyReplicator) ApplyRawEvents(ctx context.Context, requestIn *h.Rep
EventStoreVersion: requestIn.EventStoreVersion,
NewRunHistory: nil,
NewRunEventStoreVersion: nil,
ForceBufferEvents: common.BoolPtr(true),
}

if requestIn.NewRunHistory != nil {
Expand Down Expand Up @@ -387,11 +384,6 @@ func (r *historyReplicator) ApplyEvents(ctx context.Context, request *h.Replicat
}

logger.WithField(logging.TagCurrentVersion, msBuilder.GetReplicationState().LastWriteVersion)
err = r.flushReplicationBuffer(ctx, context, msBuilder, logger)
if err != nil {
logError(logger, "Fail to pre-flush buffer.", err)
return err
}
msBuilder, err = r.ApplyOtherEventsVersionChecking(ctx, context, msBuilder, request, logger)
if err != nil || msBuilder == nil {
return err
Expand Down Expand Up @@ -423,6 +415,8 @@ func (r *historyReplicator) ApplyOtherEventsMissingMutableState(ctx context.Cont
return newRetryTaskErrorWithHint(ErrWorkflowNotFoundMsg, domainID, workflowID, runID, common.FirstEventID)
}
currentRunID := currentMutableState.GetExecutionInfo().RunID
currentLastEventTaskID := currentMutableState.GetExecutionInfo().LastEventTaskID
currentNextEventID := currentMutableState.GetNextEventID()
currentLastWriteVersion := currentMutableState.GetLastWriteVersion()
currentStillRunning := currentMutableState.IsWorkflowExecutionRunning()
currentRelease(nil)
Expand All @@ -431,7 +425,8 @@ func (r *historyReplicator) ApplyOtherEventsMissingMutableState(ctx context.Cont
logger.Info("Dropping replication task.")
r.metricsClient.IncCounter(metrics.ReplicateHistoryEventsScope, metrics.StaleReplicationEventsCounter)
return nil
} else if currentLastWriteVersion < lastEvent.GetVersion() {
}
if currentLastWriteVersion < lastEvent.GetVersion() {
if currentStillRunning {
err = r.terminateWorkflow(ctx, domainID, workflowID, currentRunID, lastEvent.GetVersion(), lastEvent.GetTimestamp(), logger)
if err != nil {
Expand All @@ -448,18 +443,9 @@ func (r *historyReplicator) ApplyOtherEventsMissingMutableState(ctx context.Cont
return r.resetor.ApplyResetEvent(ctx, request, domainID, workflowID, currentRunID)
}
return newRetryTaskErrorWithHint(ErrWorkflowNotFoundMsg, domainID, workflowID, runID, common.FirstEventID)

}
// currentLastWriteVersion == incomingVersion
logger.Debugf("Retrying replication task. Current RunID: %v, Current LastWriteVersion: %v, Incoming Version: %v.",
currentRunID, currentLastWriteVersion, lastEvent.GetVersion())

// try flush the current workflow buffer
currentRunID, currentNextEventID, currentStillRunning, currentLastEventTaskID, err := r.flushCurrentWorkflowBuffer(ctx, domainID, workflowID, logger)
if err != nil {
return err
}

// currentLastWriteVersion == incomingVersion
if currentStillRunning {
if lastEvent.GetTaskId() < currentLastEventTaskID {
return nil
Expand All @@ -483,7 +469,6 @@ func (r *historyReplicator) ApplyOtherEventsVersionChecking(ctx context.Context,
rState := msBuilder.GetReplicationState()
if rState.LastWriteVersion > incomingVersion {
// Replication state is already on a higher version, we can drop this event
// TODO: We need to replay external events like signal to the new version
logger.Info("Dropping stale replication task.")
r.metricsClient.IncCounter(metrics.ReplicateHistoryEventsScope, metrics.StaleReplicationEventsCounter)
_, err = r.garbageCollectSignals(context, msBuilder, request.History.Events)
Expand Down Expand Up @@ -601,53 +586,20 @@ func (r *historyReplicator) ApplyOtherEvents(ctx context.Context, context workfl
return nil
}

// out of order replication task and store it in the buffer
logger.Debugf("Buffer out of order replication task. NextEvent: %v, FirstEvent: %v",
msBuilder.GetNextEventID(), firstEventID)

if !request.GetForceBufferEvents() {
return newRetryTaskErrorWithHint(
ErrRetryBufferEventsMsg,
context.getDomainID(),
context.getExecution().GetWorkflowId(),
context.getExecution().GetRunId(),
msBuilder.GetNextEventID(),
)
}

r.metricsClient.RecordTimer(
metrics.ReplicateHistoryEventsScope,
metrics.BufferReplicationTaskTimer,
time.Duration(len(request.History.Events)),
return newRetryTaskErrorWithHint(
ErrRetryBufferEventsMsg,
context.getDomainID(),
context.getExecution().GetWorkflowId(),
context.getExecution().GetRunId(),
msBuilder.GetNextEventID(),
)

bt, ok := msBuilder.GetAllBufferedReplicationTasks()[request.GetFirstEventId()]
if ok && bt.Version >= request.GetVersion() {
// Have an existing replication task
return nil
}

err = msBuilder.BufferReplicationTask(request)
if err != nil {
logError(logger, "Failed to buffer out of order replication task.", err)
return err
}
return r.updateMutableStateOnly(context, msBuilder)
}

// Apply the replication task
err = r.ApplyReplicationTask(ctx, context, msBuilder, request, logger)
if err != nil {
logError(logger, "Fail to Apply Replication task.", err)
return err
}

// Flush buffered replication tasks after applying the update
err = r.flushReplicationBuffer(ctx, context, msBuilder, logger)
if err != nil {
logError(logger, "Fail to flush buffer.", err)
}

return err
}

Expand Down Expand Up @@ -719,81 +671,6 @@ func (r *historyReplicator) ApplyReplicationTask(ctx context.Context, context wo
return err
}

func (r *historyReplicator) flushReplicationBuffer(ctx context.Context, context workflowExecutionContext, msBuilder mutableState,
logger bark.Logger) error {

if !msBuilder.IsWorkflowExecutionRunning() {
return nil
}

domainID := msBuilder.GetExecutionInfo().DomainID
execution := shared.WorkflowExecution{
WorkflowId: common.StringPtr(msBuilder.GetExecutionInfo().WorkflowID),
RunId: common.StringPtr(msBuilder.GetExecutionInfo().RunID),
}

flushedCount := 0
defer func() {
r.metricsClient.RecordTimer(
metrics.ReplicateHistoryEventsScope,
metrics.UnbufferReplicationTaskTimer,
time.Duration(flushedCount),
)
}()

// remove all stale buffered replication tasks
for firstEventID, bt := range msBuilder.GetAllBufferedReplicationTasks() {
if msBuilder.IsWorkflowExecutionRunning() && bt.Version < msBuilder.GetLastWriteVersion() {
msBuilder.DeleteBufferedReplicationTask(firstEventID)
applied, err := r.garbageCollectSignals(context, msBuilder, bt.History)
if err != nil {
return err
}
if !applied {
err = r.updateMutableStateOnly(context, msBuilder)
if err != nil {
return err
}
}
}
}

// Keep on applying on applying buffered replication tasks in a loop
for msBuilder.IsWorkflowExecutionRunning() && msBuilder.HasBufferedReplicationTasks() {
nextEventID := msBuilder.GetNextEventID()
bt, ok := msBuilder.GetAllBufferedReplicationTasks()[nextEventID]
if !ok {
// Bail out if nextEventID is not in the buffer or version is stale
return nil
}

// We need to delete the task from buffer first to make sure delete update is queued up
// Applying replication task commits the transaction along with the delete
msBuilder.DeleteBufferedReplicationTask(nextEventID)
sourceCluster := r.clusterMetadata.ClusterNameForFailoverVersion(bt.Version)
req := &h.ReplicateEventsRequest{
SourceCluster: common.StringPtr(sourceCluster),
DomainUUID: common.StringPtr(domainID),
WorkflowExecution: &execution,
FirstEventId: common.Int64Ptr(bt.FirstEventID),
NextEventId: common.Int64Ptr(bt.NextEventID),
Version: common.Int64Ptr(bt.Version),
History: &workflow.History{Events: bt.History},
NewRunHistory: &workflow.History{Events: bt.NewRunHistory},
EventStoreVersion: &bt.EventStoreVersion,
NewRunEventStoreVersion: &bt.NewRunEventStoreVersion,
}

// Apply replication task to workflow execution
if err := r.ApplyReplicationTask(ctx, context, msBuilder, req, logger); err != nil {
return err
}
flushedCount += int(bt.NextEventID - bt.FirstEventID)
}

return nil
}

func (r *historyReplicator) replicateWorkflowStarted(ctx context.Context, context workflowExecutionContext,
msBuilder mutableState, di *decisionInfo,
sourceCluster string, history *shared.History, sBuilder stateBuilder, logger bark.Logger) error {
Expand Down Expand Up @@ -967,10 +844,15 @@ func (r *historyReplicator) replicateWorkflowStarted(ctx context.Context, contex
return nil
}
if currentLastWriteVersion == incomingVersion {
currentRunID, currentNextEventID, _, currentLastEventTaskID, err := r.flushCurrentWorkflowBuffer(ctx, domainID, execution.GetWorkflowId(), logger)
_, currentMutableState, currentRelease, err := r.getCurrentWorkflowMutableState(ctx, domainID, execution.GetWorkflowId())
if err != nil {
return err
}
currentRunID := currentMutableState.GetExecutionInfo().RunID
currentLastEventTaskID := currentMutableState.GetExecutionInfo().LastEventTaskID
currentNextEventID := currentMutableState.GetNextEventID()
currentRelease(nil)

if executionInfo.LastEventTaskID < currentLastEventTaskID {
return nil
}
Expand Down Expand Up @@ -999,33 +881,6 @@ func (r *historyReplicator) replicateWorkflowStarted(ctx context.Context, contex
return createWorkflow(isBrandNew, currentRunID, incomingVersion)
}

func (r *historyReplicator) flushCurrentWorkflowBuffer(ctx context.Context, domainID string, workflowID string,
logger bark.Logger) (runID string, nextEventID int64, isRunning bool, lastEventTaskID int64, retError error) {
currentContext, currentMutableState, currentRelease, err := r.getCurrentWorkflowMutableState(ctx, domainID,
workflowID)
if err != nil {
return "", 0, false, 0, err
}
defer func() { currentRelease(retError) }()

// since this new workflow cannot make progress due to existing workflow being open
// try flush the existing workflow's buffer see if we can make it move forward
// First check if there are events which needs to be flushed before applying the update
err = r.flushReplicationBuffer(ctx, currentContext, currentMutableState, logger)
currentRelease(err)
if err != nil {
logError(logger, "Fail to flush buffer for current workflow.", err)
return "", 0, false, 0, err
}

runID = currentContext.getExecution().GetRunId()
nextEventID = currentMutableState.GetNextEventID()
lastEventTaskID = currentMutableState.GetExecutionInfo().LastEventTaskID
isRunning = currentMutableState.IsWorkflowExecutionRunning()

return runID, nextEventID, isRunning, lastEventTaskID, nil
}

func (r *historyReplicator) conflictResolutionTerminateCurrentRunningIfNotSelf(ctx context.Context,
msBuilder mutableState, incomingVersion int64, incomingTimestamp int64, logger bark.Logger) (currentRunID string, retError error) {
// this function aims to solve the edge case when this workflow, when going through
Expand Down
Loading

0 comments on commit e1883ef

Please sign in to comment.