Skip to content

Commit

Permalink
Add ReplicateEventsV2 API for NDC (cadence-workflow#2472)
Browse files Browse the repository at this point in the history
  • Loading branch information
wxing1292 authored Sep 4, 2019
1 parent 9984022 commit ce0b4d4
Show file tree
Hide file tree
Showing 20 changed files with 4,395 additions and 2,707 deletions.
6,406 changes: 3,817 additions & 2,589 deletions .gen/go/history/history.go

Large diffs are not rendered by default.

29 changes: 29 additions & 0 deletions .gen/go/history/historyserviceclient/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 36 additions & 1 deletion .gen/go/history/historyserviceserver/server.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 31 additions & 0 deletions .gen/go/history/historyservicetest/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

138 changes: 136 additions & 2 deletions .gen/go/shared/shared.go

Large diffs are not rendered by default.

18 changes: 18 additions & 0 deletions client/history/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,24 @@ func (c *clientImpl) ReplicateRawEvents(
return err
}

func (c *clientImpl) ReplicateEventsV2(
ctx context.Context,
request *h.ReplicateEventsV2Request,
opts ...yarpc.CallOption) error {
client, err := c.getClientForWorkflowID(request.WorkflowExecution.GetWorkflowId())
if err != nil {
return err
}
opts = common.AggregateYarpcOptions(ctx, opts...)
op := func(ctx context.Context, client historyserviceclient.Interface) error {
ctx, cancel := c.createContext(ctx)
defer cancel()
return client.ReplicateEventsV2(ctx, request, opts...)
}
err = c.executeWithRedirect(ctx, client, op)
return err
}

func (c *clientImpl) SyncShardStatus(
ctx context.Context,
request *h.SyncShardStatusRequest,
Expand Down
17 changes: 17 additions & 0 deletions client/history/metricClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,23 @@ func (c *metricClient) ReplicateRawEvents(
return err
}

func (c *metricClient) ReplicateEventsV2(
context context.Context,
request *h.ReplicateEventsV2Request,
opts ...yarpc.CallOption) error {
c.metricsClient.IncCounter(metrics.HistoryClientReplicateEventsV2Scope, metrics.CadenceClientRequests)

sw := c.metricsClient.StartTimer(metrics.HistoryClientReplicateEventsV2Scope, metrics.CadenceClientLatency)
err := c.client.ReplicateEventsV2(context, request, opts...)
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.HistoryClientReplicateEventsV2Scope, metrics.CadenceClientFailures)
}

return err
}

func (c *metricClient) SyncShardStatus(
context context.Context,
request *h.SyncShardStatusRequest,
Expand Down
12 changes: 12 additions & 0 deletions client/history/retryableClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,18 @@ func (c *retryableClient) ReplicateRawEvents(
return backoff.Retry(op, c.policy, c.isRetryable)
}

func (c *retryableClient) ReplicateEventsV2(
ctx context.Context,
request *h.ReplicateEventsV2Request,
opts ...yarpc.CallOption) error {

op := func() error {
return c.client.ReplicateEventsV2(ctx, request, opts...)
}

return backoff.Retry(op, c.policy, c.isRetryable)
}

func (c *retryableClient) SyncShardStatus(
ctx context.Context,
request *h.SyncShardStatusRequest,
Expand Down
6 changes: 6 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,8 @@ const (
// HistoryClientReplicateRawEventsScope tracks RPC calls to history service
HistoryClientReplicateRawEventsScope
// HistoryClientSyncShardStatusScope tracks RPC calls to history service
HistoryClientReplicateEventsV2Scope
// HistoryClientReplicateRawEventsV2Scope tracks RPC calls to history service
HistoryClientSyncShardStatusScope
// HistoryClientSyncActivityScope tracks RPC calls to history service
HistoryClientSyncActivityScope
Expand Down Expand Up @@ -674,6 +676,8 @@ const (
HistoryReplicateEventsScope
// HistoryReplicateRawEventsScope tracks ReplicateEvents API calls received by service
HistoryReplicateRawEventsScope
// HistoryReplicateEventsV2Scope tracks ReplicateEvents API calls received by service
HistoryReplicateEventsV2Scope
// HistorySyncShardStatusScope tracks HistorySyncShardStatus API calls received by service
HistorySyncShardStatusScope
// HistorySyncActivityScope tracks HistoryActivity API calls received by service
Expand Down Expand Up @@ -960,6 +964,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
HistoryClientRecordChildExecutionCompletedScope: {operation: "HistoryClientRecordChildExecutionCompleted", tags: map[string]string{CadenceRoleTagName: HistoryRoleTagValue}},
HistoryClientReplicateEventsScope: {operation: "HistoryClientReplicateEvents", tags: map[string]string{CadenceRoleTagName: HistoryRoleTagValue}},
HistoryClientReplicateRawEventsScope: {operation: "HistoryClientReplicateRawEvents", tags: map[string]string{CadenceRoleTagName: HistoryRoleTagValue}},
HistoryClientReplicateEventsV2Scope: {operation: "HistoryClientReplicateEventsV2", tags: map[string]string{CadenceRoleTagName: HistoryRoleTagValue}},
HistoryClientSyncShardStatusScope: {operation: "HistoryClientSyncShardStatusScope", tags: map[string]string{CadenceRoleTagName: HistoryRoleTagValue}},
HistoryClientSyncActivityScope: {operation: "HistoryClientSyncActivityScope", tags: map[string]string{CadenceRoleTagName: HistoryRoleTagValue}},
HistoryClientGetReplicationTasksScope: {operation: "HistoryClientGetReplicationTasksScope", tags: map[string]string{CadenceRoleTagName: HistoryRoleTagValue}},
Expand Down Expand Up @@ -1152,6 +1157,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
HistoryRequestCancelWorkflowExecutionScope: {operation: "RequestCancelWorkflowExecution"},
HistoryReplicateEventsScope: {operation: "ReplicateEvents"},
HistoryReplicateRawEventsScope: {operation: "ReplicateRawEvents"},
HistoryReplicateEventsV2Scope: {operation: "ReplicateEventsV2"},
HistorySyncShardStatusScope: {operation: "SyncShardStatus"},
HistorySyncActivityScope: {operation: "SyncActivity"},
HistoryDescribeMutableStateScope: {operation: "DescribeMutableState"},
Expand Down
2 changes: 1 addition & 1 deletion common/testing/history_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (s *historyEventTestSuit) Test_HistoryEvent_Generator() {
Batches: make([]NDCTestBatch, 0),
}
curr := root
//eventRanches := make([][]Vertex, 0, totalBranchNumber)
// eventRanches := make([][]Vertex, 0, totalBranchNumber)
for currentBranch > 0 {
for s.generator.HasNextVertex() {
events := s.generator.GetNextVertices()
Expand Down
24 changes: 12 additions & 12 deletions common/testing/history_event_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (h *HistoryAttributesGeneratorImpl) GenerateHistoryEvents(
history := make([]*shared.History, 0, len(batches))
eventID := startEventID

//TODO: Marker and EventTypeUpsertWorkflowSearchAttributes need to be added to the model and also to generate event attributes
// TODO: Marker and EventTypeUpsertWorkflowSearchAttributes need to be added to the model and also to generate event attributes
for _, batch := range batches {
historyEvents := make([]*shared.HistoryEvent, 0)
for _, event := range batch.Events {
Expand Down Expand Up @@ -705,7 +705,7 @@ func getDefaultHistoryEvent(eventID, version int64) *shared.HistoryEvent {
func InitializeHistoryEventGenerator() Generator {
generator := NewEventGenerator(time.Now().UnixNano())

//Functions
// Functions
notPendingDecisionTask := func() bool {
count := 0
for _, e := range generator.ListGeneratedVertices() {
Expand Down Expand Up @@ -799,7 +799,7 @@ func InitializeHistoryEventGenerator() Generator {
return true
}

//Setup decision task model
// Setup decision task model
decisionModel := NewHistoryEventModel()
decisionSchedule := NewHistoryEventVertex(shared.EventTypeDecisionTaskScheduled.String())
decisionStart := NewHistoryEventVertex(shared.EventTypeDecisionTaskStarted.String())
Expand All @@ -820,15 +820,15 @@ func InitializeHistoryEventGenerator() Generator {
decisionModel.AddEdge(decisionScheduleToStart, decisionStartToComplete, decisionStartToFail, decisionStartToTimedOut,
decisionFailToSchedule, decisionTimedOutToSchedule)

//Setup workflow model
// Setup workflow model
workflowModel := NewHistoryEventModel()
workflowStart := NewHistoryEventVertex(shared.EventTypeWorkflowExecutionStarted.String())
workflowSignal := NewHistoryEventVertex(shared.EventTypeWorkflowExecutionSignaled.String())
workflowComplete := NewHistoryEventVertex(shared.EventTypeWorkflowExecutionCompleted.String())
continueAsNew := NewHistoryEventVertex(shared.EventTypeWorkflowExecutionContinuedAsNew.String())
workflowFail := NewHistoryEventVertex(shared.EventTypeWorkflowExecutionFailed.String())
workflowCancel := NewHistoryEventVertex(shared.EventTypeWorkflowExecutionCanceled.String())
workflowCancelRequest := NewHistoryEventVertex(shared.EventTypeWorkflowExecutionCancelRequested.String()) //?
workflowCancelRequest := NewHistoryEventVertex(shared.EventTypeWorkflowExecutionCancelRequested.String())
workflowTerminate := NewHistoryEventVertex(shared.EventTypeWorkflowExecutionTerminated.String())
workflowTimedOut := NewHistoryEventVertex(shared.EventTypeWorkflowExecutionTimedOut.String())
workflowStartToSignal := NewHistoryEventEdge(workflowStart, workflowSignal)
Expand All @@ -846,14 +846,14 @@ func InitializeHistoryEventGenerator() Generator {
workflowModel.AddEdge(workflowStartToSignal, workflowStartToDecisionSchedule, workflowSignalToDecisionSchedule,
decisionCompleteToCAN, decisionCompleteToWorkflowComplete, decisionCompleteToWorkflowFailed, workflowCancelRequestToCancel)

//Setup activity model
// Setup activity model
activityModel := NewHistoryEventModel()
activitySchedule := NewHistoryEventVertex(shared.EventTypeActivityTaskScheduled.String())
activityStart := NewHistoryEventVertex(shared.EventTypeActivityTaskStarted.String())
activityComplete := NewHistoryEventVertex(shared.EventTypeActivityTaskCompleted.String())
activityFail := NewHistoryEventVertex(shared.EventTypeActivityTaskFailed.String())
activityTimedOut := NewHistoryEventVertex(shared.EventTypeActivityTaskTimedOut.String())
activityCancelRequest := NewHistoryEventVertex(shared.EventTypeActivityTaskCancelRequested.String()) //?
activityCancelRequest := NewHistoryEventVertex(shared.EventTypeActivityTaskCancelRequested.String())
activityCancel := NewHistoryEventVertex(shared.EventTypeActivityTaskCanceled.String())
activityCancelRequestFail := NewHistoryEventVertex(shared.EventTypeRequestCancelActivityTaskFailed.String())
decisionCompleteToATSchedule := NewHistoryEventEdge(decisionComplete, activitySchedule)
Expand Down Expand Up @@ -893,7 +893,7 @@ func InitializeHistoryEventGenerator() Generator {
activityFailToDecisionSchedule, activityTimedOutToDecisionSchedule, activityCancelReqToCancel, activityCancelReqToCancelFail,
activityCancelToDecisionSchedule, decisionCompleteToActivityCancelRequest, activityCancelRequestFailToDecisionSchedule)

//Setup timer model
// Setup timer model
timerModel := NewHistoryEventModel()
timerStart := NewHistoryEventVertex(shared.EventTypeTimerStarted.String())
timerFired := NewHistoryEventVertex(shared.EventTypeTimerFired.String())
Expand All @@ -910,7 +910,7 @@ func InitializeHistoryEventGenerator() Generator {
timerCancelToDecisionSchedule.SetCondition(notPendingDecisionTask)
timerModel.AddEdge(timerStartToFire, decisionCompleteToCancel, decisionCompleteToTimerStart, timerFiredToDecisionSchedule, timerCancelToDecisionSchedule)

//Setup child workflow model
// Setup child workflow model
childWorkflowModel := NewHistoryEventModel()
childWorkflowInitial := NewHistoryEventVertex(shared.EventTypeStartChildWorkflowExecutionInitiated.String())
childWorkflowInitialFail := NewHistoryEventVertex(shared.EventTypeStartChildWorkflowExecutionFailed.String())
Expand Down Expand Up @@ -946,7 +946,7 @@ func InitializeHistoryEventGenerator() Generator {
childWorkflowCompleteToDecisionSchedule, childWorkflowTerminateToDecisionSchedule, childWorkflowTimedOutToDecisionSchedule,
childWorkflowInitialFailToDecisionSchedule)

//Setup external workflow model
// Setup external workflow model
externalWorkflowModel := NewHistoryEventModel()
externalWorkflowSignal := NewHistoryEventVertex(shared.EventTypeSignalExternalWorkflowExecutionInitiated.String())
externalWorkflowSignalFailed := NewHistoryEventVertex(shared.EventTypeSignalExternalWorkflowExecutionFailed.String())
Expand All @@ -973,11 +973,11 @@ func InitializeHistoryEventGenerator() Generator {
externalWorkflowSignaledToDecisionSchedule, externalWorkflowSignalFailedToDecisionSchedule,
externalWorkflowCanceledToDecisionSchedule, externalWorkflowCancelFailToDecisionSchedule)

//Config event generator
// Config event generator
generator.SetBatchGenerationRule(canDoBatch)
generator.AddInitialEntryVertex(workflowStart)
generator.AddExitVertex(workflowComplete, workflowFail, workflowTerminate, workflowTimedOut, continueAsNew)
//generator.AddRandomEntryVertex(workflowSignal, workflowTerminate, workflowTimedOut)
// generator.AddRandomEntryVertex(workflowSignal, workflowTerminate, workflowTimedOut)
generator.AddModel(decisionModel)
generator.AddModel(workflowModel)
generator.AddModel(activityModel)
Expand Down
1 change: 1 addition & 0 deletions host/ndc/nDC_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package ndc

import (
Expand Down
Loading

0 comments on commit ce0b4d4

Please sign in to comment.