Skip to content

Commit

Permalink
Handle NDC events reapplication to current workflow (cadence-workflow…
Browse files Browse the repository at this point in the history
  • Loading branch information
wxing1292 authored Sep 25, 2019
1 parent 1736590 commit 6d5494c
Show file tree
Hide file tree
Showing 11 changed files with 290 additions and 96 deletions.
8 changes: 4 additions & 4 deletions service/history/MockWorkflowExecutionContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,12 +338,12 @@ func (_m *mockWorkflowExecutionContext) getQueryRegistry() QueryRegistry {
return r0
}

func (_m *mockWorkflowExecutionContext) reapplyEvents(_a0 context.Context, _a1 string, _a2 string, _a3 []*workflow.HistoryEvent) error {
ret := _m.Called(_a0, _a1, _a2, _a3)
func (_m *mockWorkflowExecutionContext) reapplyEvents(_a0 []*persistence.WorkflowEvents) error {
ret := _m.Called(_a0)

var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string, string, []*workflow.HistoryEvent) error); ok {
r0 = rf(_a0, _a1, _a2, _a3)
if rf, ok := ret.Get(0).(func([]*persistence.WorkflowEvents) error); ok {
r0 = rf(_a0)
} else {
r0 = ret.Error(0)
}
Expand Down
8 changes: 6 additions & 2 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ func NewEngineWithShardContext(

historyEngImpl.txProcessor = newTransferQueueProcessor(shard, historyEngImpl, visibilityMgr, matching, historyClient, logger)
historyEngImpl.timerProcessor = newTimerQueueProcessor(shard, historyEngImpl, matching, logger)
historyEngImpl.eventsReapplier = newNDCEventsReapplier(shard.GetMetricsClient(), logger)

// Only start the replicator processor if valid publisher is passed in
if publisher != nil {
Expand All @@ -208,6 +209,7 @@ func NewEngineWithShardContext(
historyEngImpl.nDCReplicator = newNDCHistoryReplicator(
shard,
historyCache,
historyEngImpl.eventsReapplier,
logger,
)
}
Expand All @@ -223,7 +225,6 @@ func NewEngineWithShardContext(
historyEngImpl.replicationTaskProcessors = replicationTaskProcessors

shard.SetEngine(historyEngImpl)
historyEngImpl.eventsReapplier = newNDCEventsReapplier(shard.GetMetricsClient(), logger)
return historyEngImpl
}

Expand Down Expand Up @@ -2460,7 +2461,10 @@ func (e *historyEngineImpl) ReapplyEvents(
// reset to workflow finish event
// ignore this case for now
if !msBuilder.IsWorkflowExecutionRunning() {
e.logger.Warn("failed to reapply event to a finished workflow", tag.WorkflowDomainID(domainID), tag.WorkflowID(workflowID))
e.logger.Warn("failed to reapply event to a finished workflow",
tag.WorkflowDomainID(domainID),
tag.WorkflowID(workflowID),
)
e.metricsClient.IncCounter(metrics.HistoryReapplyEventsScope, metrics.EventReapplySkippedCount)
return nil, nil
}
Expand Down
12 changes: 6 additions & 6 deletions service/history/nDCEventsReapplier.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ package history
import (
ctx "context"

"github.com/uber/cadence/.gen/go/shared"
workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/metrics"
)
Expand All @@ -35,7 +35,7 @@ type (
reapplyEvents(
ctx ctx.Context,
msBuilder mutableState,
historyEvents []*shared.HistoryEvent,
historyEvents []*workflow.HistoryEvent,
) error
}

Expand All @@ -48,7 +48,7 @@ type (
func newNDCEventsReapplier(
metricsClient metrics.Client,
logger log.Logger,
) nDCEventsReapplier {
) *nDCEventsReapplierImpl {

return &nDCEventsReapplierImpl{
metricsClient: metricsClient,
Expand All @@ -59,14 +59,14 @@ func newNDCEventsReapplier(
func (r *nDCEventsReapplierImpl) reapplyEvents(
ctx ctx.Context,
msBuilder mutableState,
historyEvents []*shared.HistoryEvent,
historyEvents []*workflow.HistoryEvent,
) error {

reapplyEvents := []*shared.HistoryEvent{}
var reapplyEvents []*workflow.HistoryEvent
// TODO: need to implement Reapply policy
for _, event := range historyEvents {
switch event.GetEventType() {
case shared.EventTypeWorkflowExecutionSignaled:
case workflow.EventTypeWorkflowExecutionSignaled:
reapplyEvents = append(reapplyEvents, event)
}
}
Expand Down
72 changes: 72 additions & 0 deletions service/history/nDCEventsReapplier_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion service/history/nDCHistoryReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type (
metricsClient metrics.Client
domainCache cache.DomainCache
historyCache *historyCache
eventsReapplier nDCEventsReapplier
transactionMgr nDCTransactionMgr
logger log.Logger

Expand All @@ -91,10 +92,11 @@ var errPanic = errors.NewInternalFailureError("encounter panic")
func newNDCHistoryReplicator(
shard ShardContext,
historyCache *historyCache,
eventsReapplier nDCEventsReapplier,
logger log.Logger,
) *nDCHistoryReplicatorImpl {

transactionMgr := newNDCTransactionMgr(shard, historyCache, logger)
transactionMgr := newNDCTransactionMgr(shard, historyCache, eventsReapplier, logger)
replicator := &nDCHistoryReplicatorImpl{
shard: shard,
clusterMetadata: shard.GetService().GetClusterMetadata(),
Expand All @@ -104,6 +106,7 @@ func newNDCHistoryReplicator(
domainCache: shard.GetDomainCache(),
historyCache: historyCache,
transactionMgr: transactionMgr,
eventsReapplier: eventsReapplier,
logger: logger.WithTags(tag.ComponentHistoryReplicator),

newBranchMgr: func(
Expand Down
44 changes: 43 additions & 1 deletion service/history/nDCTransactionMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ type (
historyV2Mgr persistence.HistoryV2Manager
serializer persistence.PayloadSerializer
metricsClient metrics.Client
eventsReapplier nDCEventsReapplier
logger log.Logger

createMgr nDCTransactionMgrForNewWorkflow
Expand All @@ -150,17 +151,19 @@ var _ nDCTransactionMgr = (*nDCTransactionMgrImpl)(nil)
func newNDCTransactionMgr(
shard ShardContext,
historyCache *historyCache,
eventsReapplier nDCEventsReapplier,
logger log.Logger,
) *nDCTransactionMgrImpl {

transactionMgr := &nDCTransactionMgrImpl{
shard: shard,
domainCache: shard.GetDomainCache(),
historyCache: historyCache,
clusterMetadata: shard.GetService().GetClusterMetadata(),
clusterMetadata: shard.GetClusterMetadata(),
historyV2Mgr: shard.GetHistoryV2Manager(),
serializer: shard.GetService().GetPayloadSerializer(),
metricsClient: shard.GetMetricsClient(),
eventsReapplier: eventsReapplier,
logger: logger.WithTags(tag.ComponentHistoryReplicator),

createMgr: nil,
Expand Down Expand Up @@ -224,6 +227,8 @@ func (r *nDCTransactionMgrImpl) backfillWorkflow(
}

mode := persistence.UpdateWorkflowModeUpdateCurrent
// since we are not rebuilding the mutable state then we
// can trust the result from IsCurrentWorkflowGuaranteed
if !targetWorkflow.getMutableState().IsCurrentWorkflowGuaranteed() {
executionInfo := targetWorkflow.getMutableState().GetExecutionInfo()
domainID := executionInfo.DomainID
Expand All @@ -243,6 +248,43 @@ func (r *nDCTransactionMgrImpl) backfillWorkflow(
}
}

targetWorkflowActiveCluster := r.clusterMetadata.ClusterNameForFailoverVersion(
targetWorkflow.getMutableState().GetCurrentVersion(),
)
currentCluster := r.clusterMetadata.GetCurrentClusterName()

// workflow events reapplication to self (self workflow being current workflow)
// we need to handle 2 cases
// 1. workflow still running -> just reapply
// 2. workflow closed -> TODO wait until https://github.com/uber/cadence/issues/2420
// NOTE: also remember that workflow reset will acquire a lock on current workflow (this)

// simple case workflow still running (implies current workflow),
// no workflow reset necessary
if targetWorkflowActiveCluster == currentCluster {
// target workflow is active && target workflow is current workflow
// we need to reapply events here, rather than using reapplyEvents
// within workflow execution context, or otherwise deadlock will appear

if targetWorkflow.getMutableState().IsCurrentWorkflowGuaranteed() {
// case 1
if err := r.eventsReapplier.reapplyEvents(
ctx,
targetWorkflow.getMutableState(),
targetWorkflowEvents.Events,
); err != nil {
return err
}
} else if mode == persistence.UpdateWorkflowModeUpdateCurrent {
// case 2
r.logger.Warn("failed to reapply event to a finished workflow",
tag.WorkflowDomainID(targetWorkflowEvents.DomainID),
tag.WorkflowID(targetWorkflowEvents.WorkflowID),
)
r.metricsClient.IncCounter(metrics.HistoryReapplyEventsScope, metrics.EventReapplySkippedCount)
}
}

return targetWorkflow.getContext().updateWorkflowExecutionWithNew(
now,
mode,
Expand Down
10 changes: 10 additions & 0 deletions service/history/nDCTransactionMgrForExistingWorkflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,11 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) updateAsZombie(
newTransactionPolicy = transactionPolicyPassive.ptr()
}

// release lock on current workflow, since current cluster maybe the active cluster
// and events maybe reapplied to current workflow
currentWorkflow.getReleaseFn()(nil)
currentWorkflow = nil

return targetWorkflow.getContext().updateWorkflowExecutionWithNew(
now,
persistence.UpdateWorkflowModeBypassCurrent,
Expand Down Expand Up @@ -396,6 +401,11 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) conflictResolveAsZombie(
newMutableState = newWorkflow.getMutableState()
}

// release lock on current workflow, since current cluster maybe the active cluster
// and events maybe reapplied to current workflow
currentWorkflow.getReleaseFn()(nil)
currentWorkflow = nil

return targetWorkflow.getContext().conflictResolveWorkflowExecution(
now,
persistence.ConflictResolveWorkflowModeBypassCurrent,
Expand Down
5 changes: 1 addition & 4 deletions service/history/nDCTransactionMgrForNewWorkflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,10 +231,7 @@ func (r *nDCTransactionMgrForNewWorkflowImpl) createAsZombie(
}

if err := targetWorkflow.getContext().reapplyEvents(
ctx,
targetWorkflowSnapshot.ExecutionInfo.DomainID,
targetWorkflowSnapshot.ExecutionInfo.WorkflowID,
targetWorkflowEventsSeq[0].Events,
targetWorkflowEventsSeq,
); err != nil {
return err
}
Expand Down
5 changes: 1 addition & 4 deletions service/history/nDCTransactionMgrForNewWorkflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/uber-go/tally"

Expand Down Expand Up @@ -301,9 +300,7 @@ func (s *nDCTransactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_Create
"",
int64(0),
).Return(nil).Once()
targetContext.On("reapplyEvents", mock.Anything, domainID, workflowID, targetWorkflowEventsSeq[0].Events).
Return(nil).
Times(1)
targetContext.On("reapplyEvents", targetWorkflowEventsSeq).Return(nil).Times(1)

err := s.createMgr.dispatchForNewWorkflow(ctx, now, targetWorkflow)
s.NoError(err)
Expand Down
Loading

0 comments on commit 6d5494c

Please sign in to comment.