Skip to content

Commit

Permalink
Add frontend DC redirection functionality and policy (cadence-workflo…
Browse files Browse the repository at this point in the history
…w#1409)

* Add new frontend API redirection layer
* Wire DC redirection policy config
  • Loading branch information
wxing1292 authored Jan 26, 2019
1 parent d27f279 commit dc2d9bf
Show file tree
Hide file tree
Showing 36 changed files with 2,520 additions and 58 deletions.
12 changes: 12 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ install-schema-cdc: bins
./cadence-cassandra-tool --ep 127.0.0.1 create -k cadence_visibility_active --rf 1
./cadence-cassandra-tool -ep 127.0.0.1 -k cadence_visibility_active setup-schema -v 0.0
./cadence-cassandra-tool -ep 127.0.0.1 -k cadence_visibility_active update-schema -d ./schema/cassandra/visibility/versioned

@echo Setting up cadence_standby key space
./cadence-cassandra-tool --ep 127.0.0.1 create -k cadence_standby --rf 1
./cadence-cassandra-tool -ep 127.0.0.1 -k cadence_standby setup-schema -v 0.0
Expand All @@ -202,8 +203,19 @@ install-schema-cdc: bins
./cadence-cassandra-tool -ep 127.0.0.1 -k cadence_visibility_standby setup-schema -v 0.0
./cadence-cassandra-tool -ep 127.0.0.1 -k cadence_visibility_standby update-schema -d ./schema/cassandra/visibility/versioned

@echo Setting up cadence_other key space
./cadence-cassandra-tool --ep 127.0.0.1 create -k cadence_other --rf 1
./cadence-cassandra-tool -ep 127.0.0.1 -k cadence_other setup-schema -v 0.0
./cadence-cassandra-tool -ep 127.0.0.1 -k cadence_other update-schema -d ./schema/cassandra/cadence/versioned
./cadence-cassandra-tool --ep 127.0.0.1 create -k cadence_visibility_other --rf 1
./cadence-cassandra-tool -ep 127.0.0.1 -k cadence_visibility_other setup-schema -v 0.0
./cadence-cassandra-tool -ep 127.0.0.1 -k cadence_visibility_other update-schema -d ./schema/cassandra/visibility/versioned

start-cdc-active: bins
./cadence-server --zone active start

start-cdc-standby: bins
./cadence-server --zone standby start

start-cdc-other: bins
./cadence-server --zone other start
13 changes: 13 additions & 0 deletions client/clientBean.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/uber/cadence/client/frontend"
"github.com/uber/cadence/client/history"
"github.com/uber/cadence/client/matching"
"github.com/uber/cadence/client/public"
"github.com/uber/cadence/common/cluster"
)

Expand All @@ -45,6 +46,7 @@ type (
GetHistoryClient() history.Client
GetMatchingClient() matching.Client
GetFrontendClient() frontend.Client
GetPublicClient() public.Client
GetRemoteAdminClient(cluster string) admin.Client
GetRemoteFrontendClient(cluster string) frontend.Client
}
Expand All @@ -58,6 +60,7 @@ type (
historyClient history.Client
matchingClient matching.Client
frontendClient frontend.Client
publicClient public.Client
remoteAdminClients map[string]admin.Client
remoteFrontendClients map[string]frontend.Client
}
Expand All @@ -84,6 +87,11 @@ func NewClientBean(factory Factory, dispatcherProvider DispatcherProvider, clust
return nil, err
}

publicClient, err := factory.NewPublicClient()
if err != nil {
return nil, err
}

remoteAdminClients := map[string]admin.Client{}
remoteFrontendClients := map[string]frontend.Client{}
for cluster, address := range clusterMetadata.GetAllClientAddress() {
Expand Down Expand Up @@ -119,6 +127,7 @@ func NewClientBean(factory Factory, dispatcherProvider DispatcherProvider, clust
historyClient: historyClient,
matchingClient: matchingClient,
frontendClient: frontendClient,
publicClient: publicClient,
remoteAdminClients: remoteAdminClients,
remoteFrontendClients: remoteFrontendClients,
}, nil
Expand All @@ -136,6 +145,10 @@ func (h *clientBeanImpl) GetFrontendClient() frontend.Client {
return h.frontendClient
}

func (h *clientBeanImpl) GetPublicClient() public.Client {
return h.publicClient
}

func (h *clientBeanImpl) GetRemoteAdminClient(cluster string) admin.Client {
client, ok := h.remoteAdminClients[cluster]
if !ok {
Expand Down
17 changes: 17 additions & 0 deletions client/clientBean_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/uber/cadence/client/frontend"
"github.com/uber/cadence/client/history"
"github.com/uber/cadence/client/matching"
"github.com/uber/cadence/client/public"
)

// MockClientBean is an autogenerated mock type for the MockClientBean type
Expand Down Expand Up @@ -83,6 +84,22 @@ func (_m *MockClientBean) GetFrontendClient() frontend.Client {
return r0
}

// GetPublicClient provides a mock function with given fields:
func (_m *MockClientBean) GetPublicClient() public.Client {
ret := _m.Called()

var r0 public.Client
if rf, ok := ret.Get(0).(func() public.Client); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(public.Client)
}
}

return r0
}

// GetRemoteAdminClient provides a mock function with given fields: _a0
func (_m *MockClientBean) GetRemoteAdminClient(_a0 string) admin.Client {
ret := _m.Called(_a0)
Expand Down
40 changes: 39 additions & 1 deletion client/clientfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,22 @@ import (
"go.uber.org/yarpc"

"github.com/uber/cadence/.gen/go/admin/adminserviceclient"
"github.com/uber/cadence/.gen/go/cadence/workflowserviceclient"
"github.com/uber/cadence/.gen/go/history/historyserviceclient"
"github.com/uber/cadence/.gen/go/matching/matchingserviceclient"
"github.com/uber/cadence/client/admin"
"github.com/uber/cadence/client/frontend"
"github.com/uber/cadence/client/history"
"github.com/uber/cadence/client/matching"
"github.com/uber/cadence/client/public"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/membership"
"github.com/uber/cadence/common/metrics"
"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
publicClientInterface "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
)

const (
publicCaller = "cadence-public-client"
frontendCaller = "cadence-frontend-client"
historyCaller = "history-service-client"
matchingCaller = "matching-service-client"
Expand All @@ -54,10 +57,12 @@ type Factory interface {
NewHistoryClient() (history.Client, error)
NewMatchingClient() (matching.Client, error)
NewFrontendClient() (frontend.Client, error)
NewPublicClient() (public.Client, error)

NewHistoryClientWithTimeout(timeout time.Duration) (history.Client, error)
NewMatchingClientWithTimeout(timeout time.Duration, longPollTimeout time.Duration) (matching.Client, error)
NewFrontendClientWithTimeout(timeout time.Duration, longPollTimeout time.Duration) (frontend.Client, error)
NewPublicClientWithTimeout(timeout time.Duration, longPollTimeout time.Duration) (public.Client, error)

NewAdminClientWithTimeoutAndDispatcher(rpcName string, timeout time.Duration, dispatcher *yarpc.Dispatcher) (admin.Client, error)
NewFrontendClientWithTimeoutAndDispatcher(rpcName string, timeout time.Duration, longPollTimeout time.Duration, dispatcher *yarpc.Dispatcher) (frontend.Client, error)
Expand Down Expand Up @@ -93,6 +98,10 @@ func (cf *rpcClientFactory) NewFrontendClient() (frontend.Client, error) {
return cf.NewFrontendClientWithTimeout(frontend.DefaultTimeout, frontend.DefaultLongPollTimeout)
}

func (cf *rpcClientFactory) NewPublicClient() (public.Client, error) {
return cf.NewPublicClientWithTimeout(public.DefaultTimeout, public.DefaultLongPollTimeout)
}

func (cf *rpcClientFactory) NewHistoryClientWithTimeout(timeout time.Duration) (history.Client, error) {
resolver, err := cf.monitor.GetResolver(common.HistoryServiceName)
if err != nil {
Expand Down Expand Up @@ -179,6 +188,35 @@ func (cf *rpcClientFactory) NewFrontendClientWithTimeout(
return client, nil
}

func (cf *rpcClientFactory) NewPublicClientWithTimeout(
timeout time.Duration,
longPollTimeout time.Duration,
) (public.Client, error) {

// public client and frontend client are essentially the same,
// except the interface definition
resolver, err := cf.monitor.GetResolver(common.FrontendServiceName)
if err != nil {
return nil, err
}

keyResolver := func(key string) (string, error) {
host, err := resolver.Lookup(key)
if err != nil {
return "", err
}
return host.GetAddress(), nil
}

clientProvider := func(clientKey string) (interface{}, error) {
dispatcher := cf.rpcFactory.CreateDispatcherForOutbound(publicCaller, common.FrontendServiceName, clientKey)
return publicClientInterface.New(dispatcher.ClientConfig(common.FrontendServiceName)), nil
}

client := public.NewClient(timeout, longPollTimeout, common.NewClientCache(keyResolver, clientProvider))
return client, nil
}

func (cf *rpcClientFactory) NewAdminClientWithTimeoutAndDispatcher(
rpcName string,
timeout time.Duration,
Expand Down
20 changes: 18 additions & 2 deletions client/frontend/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ import (
"go.uber.org/yarpc"

"github.com/pborman/uuid"
"github.com/uber/cadence/.gen/go/cadence/workflowserviceclient"
"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
"go.uber.org/cadence/.gen/go/shared"
)

var _ Client = (*clientImpl)(nil)
Expand Down Expand Up @@ -316,6 +316,22 @@ func (c *clientImpl) ResetStickyTaskList(
return client.ResetStickyTaskList(ctx, request, opts...)
}

func (c *clientImpl) ResetWorkflowExecution(
ctx context.Context,
request *shared.ResetWorkflowExecutionRequest,
opts ...yarpc.CallOption,
) (*shared.ResetWorkflowExecutionResponse, error) {

opts = common.AggregateYarpcOptions(ctx, opts...)
client, err := c.getRandomClient()
if err != nil {
return nil, err
}
ctx, cancel := c.createContext(ctx)
defer cancel()
return client.ResetWorkflowExecution(ctx, request, opts...)
}

func (c *clientImpl) RespondActivityTaskCanceled(
ctx context.Context,
request *shared.RespondActivityTaskCanceledRequest,
Expand Down
2 changes: 1 addition & 1 deletion client/frontend/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
package frontend

import (
"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
"github.com/uber/cadence/.gen/go/cadence/workflowserviceclient"
)

// Client is the interface exposed by frontend service client
Expand Down
21 changes: 19 additions & 2 deletions client/frontend/metricClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ package frontend

import (
"context"

"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common/metrics"
"go.uber.org/cadence/.gen/go/shared"
"go.uber.org/yarpc"
)

Expand Down Expand Up @@ -331,6 +330,24 @@ func (c *metricClient) ResetStickyTaskList(
return resp, err
}

func (c *metricClient) ResetWorkflowExecution(
ctx context.Context,
request *shared.ResetWorkflowExecutionRequest,
opts ...yarpc.CallOption,
) (*shared.ResetWorkflowExecutionResponse, error) {

c.metricsClient.IncCounter(metrics.FrontendClientResetWorkflowExecutionScope, metrics.CadenceClientRequests)

sw := c.metricsClient.StartTimer(metrics.FrontendClientResetWorkflowExecutionScope, metrics.CadenceClientLatency)
resp, err := c.client.ResetWorkflowExecution(ctx, request, opts...)
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.FrontendClientResetWorkflowExecutionScope, metrics.CadenceClientFailures)
}
return resp, err
}

func (c *metricClient) RespondActivityTaskCanceled(
ctx context.Context,
request *shared.RespondActivityTaskCanceledRequest,
Expand Down
18 changes: 17 additions & 1 deletion client/frontend/retryableClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (

"go.uber.org/yarpc"

"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common/backoff"
"go.uber.org/cadence/.gen/go/shared"
)

var _ Client = (*retryableClient)(nil)
Expand Down Expand Up @@ -290,6 +290,22 @@ func (c *retryableClient) ResetStickyTaskList(
return resp, err
}

func (c *retryableClient) ResetWorkflowExecution(
ctx context.Context,
request *shared.ResetWorkflowExecutionRequest,
opts ...yarpc.CallOption,
) (*shared.ResetWorkflowExecutionResponse, error) {

var resp *shared.ResetWorkflowExecutionResponse
op := func() error {
var err error
resp, err = c.client.ResetWorkflowExecution(ctx, request, opts...)
return err
}
err := backoff.Retry(op, c.policy, c.isRetryable)
return resp, err
}

func (c *retryableClient) RespondActivityTaskCanceled(
ctx context.Context,
request *shared.RespondActivityTaskCanceledRequest,
Expand Down
Loading

0 comments on commit dc2d9bf

Please sign in to comment.