Skip to content

Commit

Permalink
Replication support for all history events (cadence-workflow#659)
Browse files Browse the repository at this point in the history
IDL changes to replicator and history to also include history of next
run with the same replication task.
Mutable state changes to have replication api directly invoked by
replicator when it is applying history events on the passive
cluster.
Fixed ContinuedAsNew processing to lookup parentDomainName from cache
and correctly create history events using ParentDomainInfo.
  • Loading branch information
samarabbas authored Apr 18, 2018
1 parent c4a6e1d commit 93dc0ae
Show file tree
Hide file tree
Showing 13 changed files with 630 additions and 280 deletions.
4 changes: 2 additions & 2 deletions .gen/go/history/idl.go

Large diffs are not rendered by default.

28 changes: 26 additions & 2 deletions .gen/go/history/types.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions .gen/go/replicator/idl.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 33 additions & 9 deletions .gen/go/replicator/types.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion host/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4059,6 +4059,7 @@ func (s *integrationSuite) TestChildWorkflowWithContinueAsNew() {

// Process ChildExecution Started event and all generations of child executions
for i := 0; i < 11; i++ {
s.logger.Warnf("decision: %v", i)
_, err = poller.pollAndProcessDecisionTask(false, false)
s.logger.Infof("pollAndProcessDecisionTask: %v", err)
s.Nil(err)
Expand All @@ -4068,7 +4069,7 @@ func (s *integrationSuite) TestChildWorkflowWithContinueAsNew() {
s.NotNil(startedEvent)

// Process Child Execution final decision to complete it
_, err = poller.pollAndProcessDecisionTask(false, false)
_, err = poller.pollAndProcessDecisionTask(true, false)
s.logger.Infof("pollAndProcessDecisionTask: %v", err)
s.Nil(err)
s.True(childComplete)
Expand Down
1 change: 1 addition & 0 deletions idl/github.com/uber/cadence/history.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ struct ReplicateEventsRequest {
40: optional i64 (js.type = "Long") nextEventId
50: optional i64 (js.type = "Long") version
60: optional shared.History history
70: optional shared.History newRunHistory
}

/**
Expand Down
1 change: 1 addition & 0 deletions idl/github.com/uber/cadence/replicator.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ struct HistoryTaskAttributes {
50: optional i64 (js.type = "Long") nextEventId
60: optional i64 (js.type = "Long") version
70: optional shared.History history
80: optional shared.History newRunHistory
}

struct ReplicationTask {
Expand Down
14 changes: 13 additions & 1 deletion service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1141,8 +1141,20 @@ Update_History_Loop:
}
domainName := domainEntry.GetInfo().Name

// Extract parentDomainName so it can be passed down to next run of workflow execution
var parentDomainName string
if msBuilder.hasParentExecution() {
parentDomainID := msBuilder.executionInfo.ParentDomainID
parentDomainEntry, err := e.shard.GetDomainCache().GetDomainByID(parentDomainID)
if err != nil {
return err
}
parentDomainName = parentDomainEntry.GetInfo().Name
}

runID := uuid.New()
_, newStateBuilder, err := msBuilder.AddContinueAsNewEvent(completedID, domainID, domainName, runID, attributes)
_, newStateBuilder, err := msBuilder.AddContinueAsNewEvent(completedID, domainID, domainName, runID,
parentDomainName, attributes)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 93dc0ae

Please sign in to comment.