Skip to content

Commit

Permalink
Switch system worker to gRPC (cadence-workflow#4679)
Browse files Browse the repository at this point in the history
  • Loading branch information
vytautas-karpavicius authored Dec 13, 2021
1 parent e0a1d20 commit 3865361
Show file tree
Hide file tree
Showing 13 changed files with 107 additions and 51 deletions.
3 changes: 2 additions & 1 deletion canary/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ import (
"time"

"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
apiv1 "go.uber.org/cadence/.gen/proto/api/v1"
"go.uber.org/cadence/compatibility"
"go.uber.org/yarpc"
"go.uber.org/yarpc/transport/grpc"
"go.uber.org/yarpc/transport/tchannel"
"go.uber.org/zap"

apiv1 "github.com/uber/cadence-idl/go/proto/api/v1"

"github.com/uber/cadence/common/log/loggerimpl"
)

Expand Down
19 changes: 5 additions & 14 deletions client/clientfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"time"

"go.uber.org/yarpc/api/transport"
"go.uber.org/yarpc/transport/grpc"

"github.com/uber/cadence/.gen/go/admin/adminserviceclient"
"github.com/uber/cadence/.gen/go/cadence/workflowserviceclient"
Expand All @@ -46,6 +45,7 @@ import (
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/membership"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/rpc"
"github.com/uber/cadence/common/service"
)

Expand Down Expand Up @@ -106,7 +106,7 @@ func (cf *rpcClientFactory) NewHistoryClientWithTimeout(timeout time.Duration) (
var rawClient history.Client
var addressMapper history.AddressMapperFn
outboundConfig := cf.rpcFactory.GetDispatcher().ClientConfig(service.History)
if isGRPCOutbound(outboundConfig) {
if rpc.IsGRPCOutbound(outboundConfig) {
rawClient = history.NewGRPCClient(historyv1.NewHistoryAPIYARPCClient(outboundConfig))
addressMapper = func(address string) (string, error) {
return cf.rpcFactory.ReplaceGRPCPort(service.History, address)
Expand Down Expand Up @@ -151,7 +151,7 @@ func (cf *rpcClientFactory) NewMatchingClientWithTimeout(
var rawClient matching.Client
var addressMapper matching.AddressMapperFn
outboundConfig := cf.rpcFactory.GetDispatcher().ClientConfig(service.Matching)
if isGRPCOutbound(outboundConfig) {
if rpc.IsGRPCOutbound(outboundConfig) {
rawClient = matching.NewGRPCClient(matchingv1.NewMatchingAPIYARPCClient(outboundConfig))
addressMapper = func(address string) (string, error) {
return cf.rpcFactory.ReplaceGRPCPort(service.Matching, address)
Expand Down Expand Up @@ -185,7 +185,7 @@ func (cf *rpcClientFactory) NewAdminClientWithTimeoutAndConfig(
largeTimeout time.Duration,
) (admin.Client, error) {
var client admin.Client
if isGRPCOutbound(config) {
if rpc.IsGRPCOutbound(config) {
client = admin.NewGRPCClient(adminv1.NewAdminAPIYARPCClient(config))
} else {
client = admin.NewThriftClient(adminserviceclient.New(config))
Expand All @@ -207,7 +207,7 @@ func (cf *rpcClientFactory) NewFrontendClientWithTimeoutAndConfig(
longPollTimeout time.Duration,
) (frontend.Client, error) {
var client frontend.Client
if isGRPCOutbound(config) {
if rpc.IsGRPCOutbound(config) {
client = frontend.NewGRPCClient(
apiv1.NewDomainAPIYARPCClient(config),
apiv1.NewWorkflowAPIYARPCClient(config),
Expand All @@ -227,12 +227,3 @@ func (cf *rpcClientFactory) NewFrontendClientWithTimeoutAndConfig(
}
return client, nil
}

func isGRPCOutbound(config transport.ClientConfig) bool {
namer, ok := config.GetUnaryOutbound().(transport.Namer)
if !ok {
// This should not happen, unless yarpc older than v1.43.0 is used
panic("Outbound does not implement transport.Namer")
}
return namer.TransportName() == grpc.TransportName
}
15 changes: 14 additions & 1 deletion cmd/server/cadence/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import (
"time"

"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
"go.uber.org/cadence/compatibility"

apiv1 "github.com/uber/cadence-idl/go/proto/api/v1"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/archiver"
Expand Down Expand Up @@ -225,7 +228,17 @@ func (s *server) startService() common.Daemon {
}
}

params.PublicClient = workflowserviceclient.New(params.RPCFactory.GetDispatcher().ClientConfig(rpc.OutboundPublicClient))
publicClientConfig := params.RPCFactory.GetDispatcher().ClientConfig(rpc.OutboundPublicClient)
if rpc.IsGRPCOutbound(publicClientConfig) {
params.PublicClient = compatibility.NewThrift2ProtoAdapter(
apiv1.NewDomainAPIYARPCClient(publicClientConfig),
apiv1.NewWorkflowAPIYARPCClient(publicClientConfig),
apiv1.NewWorkerAPIYARPCClient(publicClientConfig),
apiv1.NewVisibilityAPIYARPCClient(publicClientConfig),
)
} else {
params.PublicClient = workflowserviceclient.New(publicClientConfig)
}

params.ArchivalMetadata = archiver.NewArchivalMetadata(
dc,
Expand Down
4 changes: 2 additions & 2 deletions common/config/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,14 +185,14 @@ func validClusterGroupMetadata() *ClusterGroupMetadata {
Enabled: true,
InitialFailoverVersion: 0,
RPCName: "cadence-frontend",
RPCAddress: "localhost:7933",
RPCAddress: "localhost:7833",
RPCTransport: "grpc",
},
"standby": {
Enabled: true,
InitialFailoverVersion: 2,
RPCName: "cadence-frontend",
RPCAddress: "localhost:7933",
RPCAddress: "localhost:7833",
RPCTransport: "grpc",
},
},
Expand Down
24 changes: 11 additions & 13 deletions common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"encoding/json"
"fmt"
"log"
"net"
"time"

"github.com/uber-go/tally/m3"
Expand Down Expand Up @@ -463,6 +462,11 @@ type (
// HostPort is the host port to connect on. Host can be DNS name
// Default to currentCluster's RPCAddress in ClusterInformation
HostPort string `yaml:"hostPort"`
// Transport is the tranport to use when communicating using the SDK client.
// Defaults to:
// - currentCluster's RPCTransport in ClusterInformation (if HostPort is not provided)
// - grpc (if HostPort is provided)
Transport string `yaml:"transport"`
// interval to refresh DNS. Default to 10s
RefreshInterval time.Duration `yaml:"RefreshInterval"`
}
Expand Down Expand Up @@ -533,18 +537,12 @@ func (c *Config) fillDefaults() {
if c.PublicClient.HostPort == "" && c.ClusterGroupMetadata != nil {
name := c.ClusterGroupMetadata.CurrentClusterName
currentCluster := c.ClusterGroupMetadata.ClusterGroup[name]
if currentCluster.RPCTransport != "grpc" {
c.PublicClient.HostPort = currentCluster.RPCAddress
} else {
// public client cannot use gRPC because GoSDK hasn't supported it. we need to fallback to Thrift
// TODO: remove this fallback after GoSDK supporting gRPC
thriftPort := c.Services["frontend"].RPC.Port // use the Thrift port from RPC config
host, _, err := net.SplitHostPort(currentCluster.RPCAddress)
if err != nil {
log.Fatalf("RPCAddress is invalid for cluster %v", name)
}
c.PublicClient.HostPort = fmt.Sprintf("%v:%v", host, thriftPort)
}
c.PublicClient.HostPort = currentCluster.RPCAddress
c.PublicClient.Transport = currentCluster.RPCTransport

}
if c.PublicClient.Transport == "" {
c.PublicClient.Transport = "grpc"
}

if c.ClusterGroupMetadata.ClusterRedirectionPolicy == nil && c.DCRedirectionPolicy != nil {
Expand Down
2 changes: 1 addition & 1 deletion common/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func TestConfigFallbacks(t *testing.T) {
assert.NotEmpty(t, cfg.Persistence.DataStores["cass"].Cassandra, "cassandra config should remain after update")
assert.NotEmpty(t, cfg.Persistence.DataStores["cass"].NoSQL, "nosql config should contain cassandra config / not be empty")
assert.NotZero(t, cfg.Persistence.DataStores["default"].SQL.NumShards, "num shards should be nonzero")
assert.Equal(t, "localhost:7900", cfg.PublicClient.HostPort)
assert.Equal(t, "localhost:7833", cfg.PublicClient.HostPort)
}

func TestConfigErrorInAuthorizationConfig(t *testing.T) {
Expand Down
24 changes: 21 additions & 3 deletions common/rpc/outbounds.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func (b multiOutbounds) Build(grpc *grpc.Transport, tchannel *tchannel.Transport

type publicClientOutbound struct {
address string
isGRPC bool
authMiddleware middleware.UnaryOutbound
}

Expand All @@ -100,14 +101,22 @@ func newPublicClientOutbound(config *config.Config) (publicClientOutbound, error
authMiddleware = &authOutboundMiddleware{authProvider}
}

return publicClientOutbound{config.PublicClient.HostPort, authMiddleware}, nil
isGrpc := config.PublicClient.Transport == grpc.TransportName

return publicClientOutbound{config.PublicClient.HostPort, isGrpc, authMiddleware}, nil
}

func (b publicClientOutbound) Build(_ *grpc.Transport, tchannel *tchannel.Transport) (yarpc.Outbounds, error) {
func (b publicClientOutbound) Build(grpc *grpc.Transport, tchannel *tchannel.Transport) (yarpc.Outbounds, error) {
var outbound transport.UnaryOutbound
if b.isGRPC {
outbound = grpc.NewSingleOutbound(b.address)
} else {
outbound = tchannel.NewSingleOutbound(b.address)
}
return yarpc.Outbounds{
OutboundPublicClient: {
ServiceName: service.Frontend,
Unary: middleware.ApplyUnaryOutbound(tchannel.NewSingleOutbound(b.address), b.authMiddleware),
Unary: middleware.ApplyUnaryOutbound(outbound, b.authMiddleware),
},
}, nil
}
Expand Down Expand Up @@ -203,3 +212,12 @@ func (o directOutbound) Build(grpc *grpc.Transport, tchannel *tchannel.Transport
},
}, nil
}

func IsGRPCOutbound(config transport.ClientConfig) bool {
namer, ok := config.GetUnaryOutbound().(transport.Namer)
if !ok {
// This should not happen, unless yarpc older than v1.43.0 is used
panic("Outbound does not implement transport.Namer")
}
return namer.TransportName() == grpc.TransportName
}
22 changes: 17 additions & 5 deletions common/rpc/outbounds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/yarpc"
"go.uber.org/yarpc/api/peer"
"go.uber.org/yarpc/api/transport"
"go.uber.org/yarpc/peer/direct"
"go.uber.org/yarpc/transport/grpc"
"go.uber.org/yarpc/transport/tchannel"
Expand Down Expand Up @@ -78,9 +79,9 @@ func TestCombineOutbounds(t *testing.T) {
}

func TestPublicClientOutbound(t *testing.T) {
makeConfig := func(hostPort string, enableAuth bool, keyPath string) *config.Config {
makeConfig := func(hostPort string, transport string, enableAuth bool, keyPath string) *config.Config {
return &config.Config{
PublicClient: config.PublicClient{HostPort: hostPort},
PublicClient: config.PublicClient{HostPort: hostPort, Transport: transport},
Authorization: config.Authorization{OAuthAuthorizer: config.OAuthAuthorizer{Enable: enableAuth}},
ClusterGroupMetadata: &config.ClusterGroupMetadata{
CurrentClusterName: "cluster-A",
Expand All @@ -98,20 +99,23 @@ func TestPublicClientOutbound(t *testing.T) {
_, err := newPublicClientOutbound(&config.Config{})
require.EqualError(t, err, "need to provide an endpoint config for PublicClient")

builder, err := newPublicClientOutbound(makeConfig("localhost:1234", false, ""))
builder, err := newPublicClientOutbound(makeConfig("localhost:1234", "tchannel", false, ""))
require.NoError(t, err)
require.NotNil(t, builder)
require.Equal(t, "localhost:1234", builder.address)
require.Equal(t, nil, builder.authMiddleware)
require.False(t, builder.isGRPC)

builder, err = newPublicClientOutbound(makeConfig("localhost:1234", true, "invalid"))
builder, err = newPublicClientOutbound(makeConfig("localhost:1234", "tchannel", true, "invalid"))
require.EqualError(t, err, "create AuthProvider: invalid private key path invalid")
require.False(t, builder.isGRPC)

builder, err = newPublicClientOutbound(makeConfig("localhost:1234", true, tempFile(t, "private-key")))
builder, err = newPublicClientOutbound(makeConfig("localhost:1234", "grpc", true, tempFile(t, "private-key")))
require.NoError(t, err)
require.NotNil(t, builder)
require.Equal(t, "localhost:1234", builder.address)
require.NotNil(t, builder.authMiddleware)
require.True(t, builder.isGRPC)

grpc := &grpc.Transport{}
tchannel := &tchannel.Transport{}
Expand Down Expand Up @@ -166,6 +170,14 @@ func TestDirectOutbound(t *testing.T) {
assert.NotNil(t, outbounds["cadence-history"].Unary)
}

func TestIsGRPCOutboud(t *testing.T) {
assert.True(t, IsGRPCOutbound(&transport.OutboundConfig{Outbounds: transport.Outbounds{Unary: (&grpc.Transport{}).NewSingleOutbound("localhost:1234")}}))
assert.False(t, IsGRPCOutbound(&transport.OutboundConfig{Outbounds: transport.Outbounds{Unary: (&tchannel.Transport{}).NewSingleOutbound("localhost:1234")}}))
assert.Panics(t, func() {
IsGRPCOutbound(&transport.OutboundConfig{Outbounds: transport.Outbounds{Unary: fakeOutbound{}}})
})
}

func tempFile(t *testing.T, content string) string {
f, err := ioutil.TempFile("", "")
require.NoError(t, err)
Expand Down
3 changes: 2 additions & 1 deletion config/development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ clusterGroupMetadata:
cluster0:
enabled: true
initialFailoverVersion: 0
rpcAddress: "localhost:7933" # this is to let worker service and XDC replicator connected to the frontend service. In cluster setup, localhost will not work
rpcAddress: "localhost:7833" # this is to let worker service and XDC replicator connected to the frontend service. In cluster setup, localhost will not work
rpcTransport: "grpc"

dcRedirectionPolicy:
policy: "noop"
Expand Down
5 changes: 3 additions & 2 deletions config/development_oauth.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ clusterGroupMetadata:
cluster0:
enabled: true
initialFailoverVersion: 0
rpcAddress: "localhost:7933" # this is to let worker service and XDC replicator connected to the frontend service. In cluster setup, localhost will not work
rpcAddress: "localhost:7833" # this is to let worker service and XDC replicator connected to the frontend service. In cluster setup, localhost will not work
rpcTransport: "grpc"
authorizationProvider:
enable: true
type: "OAuthAuthorization"
privateKey: "config/credentials/keytest"
privateKey: "config/credentials/keytest"
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ require (
go.mongodb.org/mongo-driver v1.7.3
go.opencensus.io v0.22.5 // indirect
go.uber.org/atomic v1.7.0
go.uber.org/cadence v0.17.1-0.20210820042115-b09692f6838f
go.uber.org/cadence v0.17.1-0.20211207091621-4cd144b0b16d
go.uber.org/config v1.4.0
go.uber.org/fx v1.13.1
go.uber.org/multierr v1.6.0
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ github.com/uber-go/mapdecode v1.0.0/go.mod h1:b5nP15FwXTgpjTjeA9A2uTHXV5UJCl4arw
github.com/uber-go/tally v3.3.12+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyufu1cEi0jdVnRdxvjnmU=
github.com/uber-go/tally v3.3.15+incompatible h1:9hLSgNBP28CjIaDmAuRTq9qV+UZY+9PcvAkXO4nNMwg=
github.com/uber-go/tally v3.3.15+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyufu1cEi0jdVnRdxvjnmU=
github.com/uber/cadence-idl v0.0.0-20211111101836-d6b70b60eb8c/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/cadence-idl v0.0.0-20211125000611-a6d3e3393ba6 h1:eH1W60bvlJCN0KFJMJuxNLxtR+hM73/FlwkaMlyV/D4=
github.com/uber/cadence-idl v0.0.0-20211125000611-a6d3e3393ba6/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/jaeger-client-go v2.22.1+incompatible h1:NHcubEkVbahf9t3p75TOCR83gdUHXjRJvjoBh1yACsM=
Expand Down Expand Up @@ -480,6 +481,8 @@ go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/cadence v0.17.1-0.20210820042115-b09692f6838f h1:dYJaDf0DLTpUwoViuNCNpPbkx5gOz/hbql5RAfIblkI=
go.uber.org/cadence v0.17.1-0.20210820042115-b09692f6838f/go.mod h1:sGTCtpVbS/CSJtiEwi/a2dhhUvJ7hCloBUyVA7LzkZg=
go.uber.org/cadence v0.17.1-0.20211207091621-4cd144b0b16d h1:b+Tjvauk6sYz6yvlR/iYPMd0K955X3Tb6sg9sxfqJ9s=
go.uber.org/cadence v0.17.1-0.20211207091621-4cd144b0b16d/go.mod h1:s91dOf0kcJbumPscRIVFV/4Xq/exhefzpXmnDiRRTxs=
go.uber.org/config v1.4.0 h1:upnMPpMm6WlbZtXoasNkK4f0FhxwS+W4Iqz5oNznehQ=
go.uber.org/config v1.4.0/go.mod h1:aCyrMHmUAc/s2h9sv1koP84M9ZF/4K+g2oleyESO/Ig=
go.uber.org/dig v1.8.0/go.mod h1:X34SnWGr8Fyla9zQNO2GSO2D+TIuqB14OS8JhYocIyw=
Expand Down
Loading

0 comments on commit 3865361

Please sign in to comment.