Skip to content

Commit

Permalink
gRPC for cross DC traffic (cadence-workflow#4390)
Browse files Browse the repository at this point in the history
* Create remote thrift/grpc client depending on outbound transport

* Support GRPC in DispatcherProvider

* Allow configuring replication transport

* Create replication dispatcher based on config

* Updated config to use grpc for replication

* Update changelog

* Renamed Get to GetTChannel

* Panic on non-implemented transport.Namer
  • Loading branch information
vytautas-karpavicius authored Aug 25, 2021
1 parent 76573a2 commit 59c8f0e
Show file tree
Hide file tree
Showing 14 changed files with 147 additions and 39 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
You can find a list of previous releases on the [github releases](https://github.com/uber/cadence/releases) page.

## [Unreleased]
### Added
- Added gRPC support for cross domain traffic. This can be enabled in `ClusterGroupMetadata` config section with `rpcTransport: "grpc"` option. By default, tchannel is used. (#4390)

## [0.21.3] - 2021-07-17
### Added
Expand Down
37 changes: 32 additions & 5 deletions client/clientBean.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"go.uber.org/yarpc/api/peer"
"go.uber.org/yarpc/api/transport"
"go.uber.org/yarpc/peer/roundrobin"
"go.uber.org/yarpc/transport/grpc"
"go.uber.org/yarpc/transport/tchannel"

"github.com/uber/cadence/client/admin"
Expand Down Expand Up @@ -65,9 +66,10 @@ type (
SetRemoteFrontendClient(cluster string, client frontend.Client)
}

// DispatcherProvider provides a diapatcher to a given address
// DispatcherProvider provides a dispatcher to a given address
DispatcherProvider interface {
Get(name string, address string) (*yarpc.Dispatcher, error)
GetTChannel(name string, address string) (*yarpc.Dispatcher, error)
GetGRPC(name string, address string) (*yarpc.Dispatcher, error)
}

clientBeanImpl struct {
Expand Down Expand Up @@ -117,7 +119,14 @@ func NewClientBean(factory Factory, dispatcherProvider DispatcherProvider, clust
continue
}

dispatcher, err := dispatcherProvider.Get(info.RPCName, info.RPCAddress)
var dispatcher *yarpc.Dispatcher
var err error
switch info.RPCTransport {
case tchannel.TransportName:
dispatcher, err = dispatcherProvider.GetTChannel(info.RPCName, info.RPCAddress)
case grpc.TransportName:
dispatcher, err = dispatcherProvider.GetGRPC(info.RPCName, info.RPCAddress)
}
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -256,7 +265,7 @@ func NewDNSYarpcDispatcherProvider(logger log.Logger, interval time.Duration) Di
}
}

func (p *dnsDispatcherProvider) Get(serviceName string, address string) (*yarpc.Dispatcher, error) {
func (p *dnsDispatcherProvider) GetTChannel(serviceName string, address string) (*yarpc.Dispatcher, error) {
tchanTransport, err := tchannel.NewTransport(
tchannel.ServiceName(serviceName),
// this aim to get rid of the annoying popup about accepting incoming network connections
Expand All @@ -274,8 +283,26 @@ func (p *dnsDispatcherProvider) Get(serviceName string, address string) (*yarpc.
peerListUpdater.Start()
outbound := tchanTransport.NewOutbound(peerList)

p.logger.Info("Creating RPC dispatcher outbound", tag.Address(address))
p.logger.Info("Creating TChannel dispatcher outbound", tag.Address(address))
return p.createOutboundDispatcher(serviceName, outbound)
}

func (p *dnsDispatcherProvider) GetGRPC(serviceName string, address string) (*yarpc.Dispatcher, error) {
grpcTransport := grpc.NewTransport()

peerList := roundrobin.New(grpcTransport)
peerListUpdater, err := newDNSUpdater(peerList, address, p.interval, p.logger)
if err != nil {
return nil, err
}
peerListUpdater.Start()
outbound := grpcTransport.NewOutbound(peerList)

p.logger.Info("Creating GRPC dispatcher outbound", tag.Address(address))
return p.createOutboundDispatcher(serviceName, outbound)
}

func (p *dnsDispatcherProvider) createOutboundDispatcher(serviceName string, outbound transport.UnaryOutbound) (*yarpc.Dispatcher, error) {
// Attach the outbound to the dispatcher (this will add middleware/logging/etc)
dispatcher := yarpc.NewDispatcher(yarpc.Config{
Name: crossDCCaller,
Expand Down
29 changes: 27 additions & 2 deletions client/clientfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@ import (
"time"

"go.uber.org/yarpc"
"go.uber.org/yarpc/api/transport"
"go.uber.org/yarpc/transport/grpc"

"github.com/uber/cadence/.gen/go/admin/adminserviceclient"
"github.com/uber/cadence/.gen/go/cadence/workflowserviceclient"
"github.com/uber/cadence/.gen/go/history/historyserviceclient"
"github.com/uber/cadence/.gen/go/matching/matchingserviceclient"

adminv1 "github.com/uber/cadence/.gen/proto/admin/v1"
apiv1 "github.com/uber/cadence/.gen/proto/api/v1"
historyv1 "github.com/uber/cadence/.gen/proto/history/v1"
matchingv1 "github.com/uber/cadence/.gen/proto/matching/v1"
Expand Down Expand Up @@ -268,7 +271,11 @@ func (cf *rpcClientFactory) NewAdminClientWithTimeoutAndDispatcher(
}

clientProvider := func(clientKey string) (interface{}, error) {
return admin.NewThriftClient(adminserviceclient.New(dispatcher.ClientConfig(rpcName))), nil
config := dispatcher.ClientConfig(rpcName)
if isGRPCOutbound(config) {
return admin.NewGRPCClient(adminv1.NewAdminAPIYARPCClient(config)), nil
}
return admin.NewThriftClient(adminserviceclient.New(config)), nil
}

client := admin.NewClient(timeout, largeTimeout, common.NewClientCache(keyResolver, clientProvider))
Expand All @@ -292,7 +299,16 @@ func (cf *rpcClientFactory) NewFrontendClientWithTimeoutAndDispatcher(
}

clientProvider := func(clientKey string) (interface{}, error) {
return frontend.NewThriftClient(workflowserviceclient.New(dispatcher.ClientConfig(rpcName))), nil
config := dispatcher.ClientConfig(rpcName)
if isGRPCOutbound(config) {
return frontend.NewGRPCClient(
apiv1.NewDomainAPIYARPCClient(config),
apiv1.NewWorkflowAPIYARPCClient(config),
apiv1.NewWorkerAPIYARPCClient(config),
apiv1.NewVisibilityAPIYARPCClient(config),
), nil
}
return frontend.NewThriftClient(workflowserviceclient.New(config)), nil
}

client := frontend.NewClient(timeout, longPollTimeout, common.NewClientCache(keyResolver, clientProvider))
Expand Down Expand Up @@ -358,3 +374,12 @@ func (cf *rpcClientFactory) newFrontendGRPCClient(hostAddress string) (frontend.
apiv1.NewVisibilityAPIYARPCClient(config),
), nil
}

func isGRPCOutbound(config transport.ClientConfig) bool {
namer, ok := config.GetUnaryOutbound().(transport.Namer)
if !ok {
// This should not happen, unless yarpc older than v1.43.0 is used
panic("Outbound does not implement transport.Namer")
}
return namer.TransportName() == grpc.TransportName
}
2 changes: 1 addition & 1 deletion cmd/server/cadence/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (s *server) startService() common.Daemon {
}
}

dispatcher, err := params.DispatcherProvider.Get(common.FrontendServiceName, s.cfg.PublicClient.HostPort)
dispatcher, err := params.DispatcherProvider.GetTChannel(common.FrontendServiceName, s.cfg.PublicClient.HostPort)
if err != nil {
log.Fatalf("failed to construct dispatcher: %v", err)
}
Expand Down
16 changes: 15 additions & 1 deletion common/config/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import (
"fmt"
"log"

"go.uber.org/yarpc/transport/grpc"
"go.uber.org/yarpc/transport/tchannel"

"go.uber.org/multierr"
)

Expand Down Expand Up @@ -59,6 +62,10 @@ type (
// Address indicate the remote service address(Host:Port). Host can be DNS name.
// For currentCluster, it's usually the same as publicClient.hostPort
RPCAddress string `yaml:"rpcAddress" validate:"nonzero"`
// RPCTransport specifies transport to use for replication traffic.
// Allowed values: tchannel|grpc
// Default: tchannel
RPCTransport string `yaml:"rpcTransport"`
}
)

Expand Down Expand Up @@ -112,6 +119,10 @@ func (m *ClusterGroupMetadata) validate() error {
if info.Enabled && (len(info.RPCName) == 0 || len(info.RPCAddress) == 0) {
errs = multierr.Append(errs, fmt.Errorf("cluster %v: rpc name / address is empty", clusterName))
}
if info.RPCTransport != tchannel.TransportName && info.RPCTransport != grpc.TransportName {
errs = multierr.Append(errs, fmt.Errorf("cluster %v: rpc transport must %v or %v",
clusterName, tchannel.TransportName, grpc.TransportName))
}
}
if len(versionToClusterName) != len(m.ClusterGroup) {
errs = multierr.Append(errs, errors.New("initial versions of the cluster group have duplicates"))
Expand Down Expand Up @@ -141,7 +152,10 @@ func (m *ClusterGroupMetadata) fillDefaults() {
if cluster.RPCName == "" {
// filling RPCName with a default value if empty
cluster.RPCName = "cadence-frontend"
m.ClusterGroup[name] = cluster
}
if cluster.RPCTransport == "" {
cluster.RPCTransport = tchannel.TransportName
}
m.ClusterGroup[name] = cluster
}
}
12 changes: 12 additions & 0 deletions common/config/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func TestClusterGroupMetadataDefaults(t *testing.T) {

assert.Equal(t, "active", config.PrimaryClusterName)
assert.Equal(t, "cadence-frontend", config.ClusterGroup["active"].RPCName)
assert.Equal(t, "tchannel", config.ClusterGroup["active"].RPCTransport)
}

func TestClusterGroupMetadataValidate(t *testing.T) {
Expand Down Expand Up @@ -134,6 +135,15 @@ func TestClusterGroupMetadataValidate(t *testing.T) {
}),
err: "cluster active: rpc name / address is empty",
},
{
msg: "invalid rpc transport",
config: modify(validClusterGroupMetadata(), func(m *ClusterGroupMetadata) {
active := m.ClusterGroup["active"]
active.RPCTransport = "invalid"
m.ClusterGroup["active"] = active
}),
err: "cluster active: rpc transport must tchannel or grpc",
},
{
msg: "initial version duplicated",
config: modify(validClusterGroupMetadata(), func(m *ClusterGroupMetadata) {
Expand Down Expand Up @@ -176,12 +186,14 @@ func validClusterGroupMetadata() *ClusterGroupMetadata {
InitialFailoverVersion: 0,
RPCName: "cadence-frontend",
RPCAddress: "localhost:7933",
RPCTransport: "grpc",
},
"standby": {
Enabled: true,
InitialFailoverVersion: 2,
RPCName: "cadence-frontend",
RPCAddress: "localhost:7933",
RPCTransport: "grpc",
},
},
}
Expand Down
9 changes: 6 additions & 3 deletions config/development_active.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,20 @@ clusterGroupMetadata:
enabled: true
initialFailoverVersion: 1
rpcName: "cadence-frontend"
rpcAddress: "localhost:7933" # this is to let worker service and XDC replicator connected to the frontend service. In cluster setup, localhost will not work
rpcAddress: "localhost:7833" # this is to let worker service and XDC replicator connected to the frontend service. In cluster setup, localhost will not work
rpcTransport: "grpc"
standby:
enabled: true
initialFailoverVersion: 0
rpcName: "cadence-frontend"
rpcAddress: "localhost:8933" # this is to let worker service and XDC replicator connected to the frontend service. In cluster setup, localhost will not work
rpcAddress: "localhost:8833" # this is to let worker service and XDC replicator connected to the frontend service. In cluster setup, localhost will not work
rpcTransport: "grpc"
other:
enabled: true
initialFailoverVersion: 2
rpcName: "cadence-frontend"
rpcAddress: "localhost:9933" # this is to let worker service and XDC replicator connected to the frontend service. In cluster setup, localhost will not work
rpcAddress: "localhost:9833" # this is to let worker service and XDC replicator connected to the frontend service. In cluster setup, localhost will not work
rpcTransport: "grpc"

dcRedirectionPolicy:
policy: "selected-apis-forwarding"
Expand Down
9 changes: 6 additions & 3 deletions config/development_other.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,20 @@ clusterGroupMetadata:
enabled: true
initialFailoverVersion: 1
rpcName: "cadence-frontend"
rpcAddress: "localhost:7933" # this is to let worker service and XDC replicator connected to the frontend service. In cluster setup, localhost will not work
rpcAddress: "localhost:7833" # this is to let worker service and XDC replicator connected to the frontend service. In cluster setup, localhost will not work
rpcTransport: "grpc"
standby:
enabled: true
initialFailoverVersion: 0
rpcName: "cadence-frontend"
rpcAddress: "localhost:8933" # this is to let worker service and XDC replicator connected to the frontend service. In cluster setup, localhost will not work
rpcAddress: "localhost:8833" # this is to let worker service and XDC replicator connected to the frontend service. In cluster setup, localhost will not work
rpcTransport: "grpc"
other:
enabled: true
initialFailoverVersion: 2
rpcName: "cadence-frontend"
rpcAddress: "localhost:9933" # this is to let worker service and XDC replicator connected to the frontend service. In cluster setup, localhost will not work
rpcAddress: "localhost:9833" # this is to let worker service and XDC replicator connected to the frontend service. In cluster setup, localhost will not work
rpcTransport: "grpc"

dcRedirectionPolicy:
policy: "selected-apis-forwarding"
Expand Down
9 changes: 6 additions & 3 deletions config/development_standby.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,20 @@ clusterGroupMetadata:
enabled: true
initialFailoverVersion: 1
rpcName: "cadence-frontend"
rpcAddress: "localhost:7933" # this is to let worker service and XDC replicator connected to the frontend service. In cluster setup, localhost will not work
rpcAddress: "localhost:7833" # this is to let worker service and XDC replicator connected to the frontend service. In cluster setup, localhost will not work
rpcTransport: "grpc"
standby:
enabled: true
initialFailoverVersion: 0
rpcName: "cadence-frontend"
rpcAddress: "localhost:8933" # this is to let worker service and XDC replicator connected to the frontend service. In cluster setup, localhost will not work
rpcAddress: "localhost:8833" # this is to let worker service and XDC replicator connected to the frontend service. In cluster setup, localhost will not work
rpcTransport: "grpc"
other:
enabled: true
initialFailoverVersion: 2
rpcName: "cadence-frontend"
rpcAddress: "localhost:9933" # this is to let worker service and XDC replicator connected to the frontend service. In cluster setup, localhost will not work
rpcAddress: "localhost:9833" # this is to let worker service and XDC replicator connected to the frontend service. In cluster setup, localhost will not work
rpcTransport: "grpc"

dcRedirectionPolicy:
policy: "selected-apis-forwarding"
Expand Down
6 changes: 4 additions & 2 deletions docker/config_template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -214,13 +214,15 @@ clusterGroupMetadata:
enabled: true
initialFailoverVersion: 0
rpcName: "cadence-frontend"
rpcAddress: {{ default .Env.PRIMARY_FRONTEND_SERVICE "cadence" }}:{{ default .Env.FRONTEND_PORT "7933" }}
rpcAddress: {{ default .Env.PRIMARY_FRONTEND_SERVICE "cadence" }}:{{ default .Env.FRONTEND_PORT "7833" }}
rpcTransport: "grpc"
{{- if .Env.ENABLE_GLOBAL_DOMAIN }}
secondary:
enabled: true
initialFailoverVersion: 2
rpcName: "cadence-frontend"
rpcAddress: {{ default .Env.SECONDARY_FRONTEND_SERVICE "cadence-secondary" }}:{{ default .Env.FRONTEND_PORT "7933" }}
rpcAddress: {{ default .Env.SECONDARY_FRONTEND_SERVICE "cadence-secondary" }}:{{ default .Env.FRONTEND_PORT "7833" }}
rpcTransport: "grpc"
{{- end }}


Expand Down
4 changes: 2 additions & 2 deletions host/onebox.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func (c *cadenceImpl) startHistory(
integrationClient := newIntegrationConfigClient(dynamicconfig.NewNopClient())
c.overrideHistoryDynamicConfig(integrationClient)
params.DynamicConfig = integrationClient
dispatcher, err := params.DispatcherProvider.Get(common.FrontendServiceName, c.FrontendAddress())
dispatcher, err := params.DispatcherProvider.GetTChannel(common.FrontendServiceName, c.FrontendAddress())
if err != nil {
c.logger.Fatal("Failed to get dispatcher for history", tag.Error(err))
}
Expand Down Expand Up @@ -590,7 +590,7 @@ func (c *cadenceImpl) startWorker(hosts map[string][]string, startWG *sync.WaitG
c.logger.Fatal("Failed to copy persistence config for worker", tag.Error(err))
}

dispatcher, err := params.DispatcherProvider.Get(common.FrontendServiceName, c.FrontendAddress())
dispatcher, err := params.DispatcherProvider.GetTChannel(common.FrontendServiceName, c.FrontendAddress())
if err != nil {
c.logger.Fatal("Failed to get dispatcher for worker", tag.Error(err))
}
Expand Down
Loading

0 comments on commit 59c8f0e

Please sign in to comment.