Skip to content

Commit

Permalink
Implement NDC events reapplication (cadence-workflow#2577)
Browse files Browse the repository at this point in the history
* Wire up remaining events reapply logic.
* Add integration test
  • Loading branch information
yux0 authored and wxing1292 committed Sep 24, 2019
1 parent 5f7e5dc commit 1736590
Show file tree
Hide file tree
Showing 66 changed files with 1,024 additions and 135 deletions.
206 changes: 196 additions & 10 deletions .gen/go/history/history.go

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions .gen/go/history/historyserviceclient/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion .gen/go/history/historyservicefx/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion .gen/go/history/historyservicefx/doc.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion .gen/go/history/historyservicefx/server.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions .gen/go/history/historyserviceserver/server.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions .gen/go/history/historyservicetest/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion .gen/go/replicator/replicator.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions .gen/go/shared/shared.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
The MIT License (MIT)

Copyright (c) 2017 Uber Technologies, Inc.
Copyright (c) 2019 Uber Technologies, Inc.

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
42 changes: 42 additions & 0 deletions client/clientBean.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,15 @@ type (
// Bean in an collection of clients
Bean interface {
GetHistoryClient() history.Client
SetHistoryClient(client history.Client)
GetMatchingClient(domainIDToName DomainIDToNameFunc) (matching.Client, error)
SetMatchingClient(client matching.Client)
GetFrontendClient() frontend.Client
SetFrontendClient(client frontend.Client)
GetRemoteAdminClient(cluster string) admin.Client
SetRemoteAdminClient(cluster string, client admin.Client)
GetRemoteFrontendClient(cluster string) frontend.Client
SetRemoteFrontendClient(cluster string, client frontend.Client)
}

// DispatcherProvider provides a diapatcher to a given address
Expand Down Expand Up @@ -146,17 +151,38 @@ func (h *clientBeanImpl) GetHistoryClient() history.Client {
return h.historyClient
}

func (h *clientBeanImpl) SetHistoryClient(
client history.Client,
) {

h.historyClient = client
}

func (h *clientBeanImpl) GetMatchingClient(domainIDToName DomainIDToNameFunc) (matching.Client, error) {
if client := h.matchingClient.Load(); client != nil {
return client.(matching.Client), nil
}
return h.lazyInitMatchingClient(domainIDToName)
}

func (h *clientBeanImpl) SetMatchingClient(
client matching.Client,
) {

h.matchingClient.Store(client)
}

func (h *clientBeanImpl) GetFrontendClient() frontend.Client {
return h.frontendClient
}

func (h *clientBeanImpl) SetFrontendClient(
client frontend.Client,
) {

h.frontendClient = client
}

func (h *clientBeanImpl) GetRemoteAdminClient(cluster string) admin.Client {
client, ok := h.remoteAdminClients[cluster]
if !ok {
Expand All @@ -169,6 +195,14 @@ func (h *clientBeanImpl) GetRemoteAdminClient(cluster string) admin.Client {
return client
}

func (h *clientBeanImpl) SetRemoteAdminClient(
cluster string,
client admin.Client,
) {

h.remoteAdminClients[cluster] = client
}

func (h *clientBeanImpl) GetRemoteFrontendClient(cluster string) frontend.Client {
client, ok := h.remoteFrontendClients[cluster]
if !ok {
Expand All @@ -181,6 +215,14 @@ func (h *clientBeanImpl) GetRemoteFrontendClient(cluster string) frontend.Client
return client
}

func (h *clientBeanImpl) SetRemoteFrontendClient(
cluster string,
client frontend.Client,
) {

h.remoteFrontendClients[cluster] = client
}

func (h *clientBeanImpl) lazyInitMatchingClient(domainIDToName DomainIDToNameFunc) (matching.Client, error) {
h.Lock()
defer h.Unlock()
Expand Down
32 changes: 32 additions & 0 deletions client/clientBean_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ func (_m *MockClientBean) GetHistoryClient() history.Client {
return r0
}

// SetHistoryClient provides a mock function with given fields: _a0
func (_m *MockClientBean) SetHistoryClient(
_a0 history.Client,
) {
}

// GetMatchingClient provides a mock function with given fields: domainIDToName
func (_m *MockClientBean) GetMatchingClient(domainIDToName DomainIDToNameFunc) (matching.Client, error) {
ret := _m.Called(domainIDToName)
Expand All @@ -74,6 +80,12 @@ func (_m *MockClientBean) GetMatchingClient(domainIDToName DomainIDToNameFunc) (
return r0, r1
}

// SetMatchingClient provides a mock function with given fields: _a0
func (_m *MockClientBean) SetMatchingClient(
_a0 matching.Client,
) {
}

// GetFrontendClient provides a mock function with given fields:
func (_m *MockClientBean) GetFrontendClient() frontend.Client {
ret := _m.Called()
Expand All @@ -90,6 +102,12 @@ func (_m *MockClientBean) GetFrontendClient() frontend.Client {
return r0
}

// SetFrontendClient provides a mock function with given fields: _a0
func (_m *MockClientBean) SetFrontendClient(
_a0 frontend.Client,
) {
}

// GetRemoteAdminClient provides a mock function with given fields: _a0
func (_m *MockClientBean) GetRemoteAdminClient(_a0 string) admin.Client {
ret := _m.Called(_a0)
Expand All @@ -106,6 +124,13 @@ func (_m *MockClientBean) GetRemoteAdminClient(_a0 string) admin.Client {
return r0
}

// SetRemoteAdminClient provides a mock function with given fields: _a0, _a1
func (_m *MockClientBean) SetRemoteAdminClient(
_a0 string,
_a1 admin.Client,
) {
}

// GetRemoteFrontendClient provides a mock function with given fields: _a0
func (_m *MockClientBean) GetRemoteFrontendClient(_a0 string) frontend.Client {
ret := _m.Called(_a0)
Expand All @@ -121,3 +146,10 @@ func (_m *MockClientBean) GetRemoteFrontendClient(_a0 string) frontend.Client {

return r0
}

// SetRemoteFrontendClient provides a mock function with given fields: _a0, _a1
func (_m *MockClientBean) SetRemoteFrontendClient(
_a0 string,
_a1 frontend.Client,
) {
}
4 changes: 2 additions & 2 deletions client/history/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,10 +812,10 @@ func (c *clientImpl) GetReplicationMessages(

func (c *clientImpl) ReapplyEvents(
ctx context.Context,
request *workflow.ReapplyEventsRequest,
request *h.ReapplyEventsRequest,
opts ...yarpc.CallOption,
) error {
client, err := c.getClientForWorkflowID(request.GetWorkflowExecution().GetWorkflowId())
client, err := c.getClientForWorkflowID(request.GetRequest().GetWorkflowExecution().GetWorkflowId())
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion client/history/metricClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ func (c *metricClient) QueryWorkflow(

func (c *metricClient) ReapplyEvents(
ctx context.Context,
request *shared.ReapplyEventsRequest,
request *h.ReapplyEventsRequest,
opts ...yarpc.CallOption,
) error {

Expand Down
2 changes: 1 addition & 1 deletion client/history/retryableClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ func (c *retryableClient) QueryWorkflow(

func (c *retryableClient) ReapplyEvents(
ctx context.Context,
request *shared.ReapplyEventsRequest,
request *h.ReapplyEventsRequest,
opts ...yarpc.CallOption,
) error {

Expand Down
5 changes: 5 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,8 @@ const (
HistoryGetReplicationMessagesScope
// HistoryShardControllerScope is the scope used by shard controller
HistoryShardControllerScope
// HistoryReapplyEventsScope is the scope used by event reapplication
HistoryReapplyEventsScope
// TransferQueueProcessorScope is the scope used by all metric emitted by transfer queue processor
TransferQueueProcessorScope
// TransferActiveQueueProcessorScope is the scope used by all metric emitted by transfer queue processor
Expand Down Expand Up @@ -1205,6 +1207,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
HistoryDescribeMutableStateScope: {operation: "DescribeMutableState"},
HistoryGetReplicationMessagesScope: {operation: "GetReplicationMessages"},
HistoryShardControllerScope: {operation: "ShardController"},
HistoryReapplyEventsScope: {operation: "EventReapplication"},
TransferQueueProcessorScope: {operation: "TransferQueueProcessor"},
TransferActiveQueueProcessorScope: {operation: "TransferActiveQueueProcessor"},
TransferStandbyQueueProcessorScope: {operation: "TransferStandbyQueueProcessor"},
Expand Down Expand Up @@ -1518,6 +1521,7 @@ const (
GetReplicationMessagesForShardLatency
ArchiveVisibilityAttemptCount
ArchiveVisibilityFailedCount
EventReapplySkippedCount

NumHistoryMetrics
)
Expand Down Expand Up @@ -1813,6 +1817,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
GetReplicationMessagesForShardLatency: {metricName: "get_replication_messages_for_shard", metricType: Timer},
ArchiveVisibilityAttemptCount: {metricName: "archive_visibility_attempt_count", metricType: Counter},
ArchiveVisibilityFailedCount: {metricName: "archive_visibility_failed_count", metricType: Counter},
EventReapplySkippedCount: {metricName: "event_reapply_skipped_count", metricType: Counter},
},
Matching: {
PollSuccessCounter: {metricName: "poll_success"},
Expand Down
7 changes: 7 additions & 0 deletions common/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/uber/cadence/common/membership"
"github.com/uber/cadence/common/messaging"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/service/config"
"github.com/uber/cadence/common/service/dynamicconfig"
"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
Expand Down Expand Up @@ -113,6 +114,7 @@ type (
dispatcherProvider client.DispatcherProvider
archivalMetadata archiver.ArchivalMetadata
archiverProvider provider.ArchiverProvider
serializer persistence.PayloadSerializer
}
)

Expand All @@ -139,6 +141,7 @@ func New(params *BootstrapParams) Service {
dynamicCollection: dynamicconfig.NewCollection(params.DynamicConfig, params.Logger),
archivalMetadata: params.ArchivalMetadata,
archiverProvider: params.ArchiverProvider,
serializer: persistence.NewPayloadSerializer(),
}

sVice.runtimeMetricsReporter = metrics.NewRuntimeMetricsReporter(params.MetricScope, time.Minute, sVice.GetLogger(), params.InstanceID)
Expand Down Expand Up @@ -284,6 +287,10 @@ func (h *serviceImpl) GetArchiverProvider() provider.ArchiverProvider {
return h.archiverProvider
}

func (h *serviceImpl) GetPayloadSerializer() persistence.PayloadSerializer {
return h.serializer
}

// GetMetricsServiceIdx returns the metrics name
func GetMetricsServiceIdx(serviceName string, logger log.Logger) metrics.ServiceIdx {
switch serviceName {
Expand Down
Loading

0 comments on commit 1736590

Please sign in to comment.