Skip to content

Commit

Permalink
Use common dispatcher for public client outbound cadence-workflow#2 (c…
Browse files Browse the repository at this point in the history
  • Loading branch information
vytautas-karpavicius authored Oct 5, 2021
1 parent d19cae1 commit b1e3001
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 26 deletions.
25 changes: 3 additions & 22 deletions cmd/server/cadence/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/archiver"
"github.com/uber/cadence/common/archiver/provider"
"github.com/uber/cadence/common/authorization"
"github.com/uber/cadence/common/blobstore/filestore"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/config"
Expand Down Expand Up @@ -151,9 +150,10 @@ func (s *server) startService() common.Daemon {
log.Fatalf("error creating rpc factory params: %v", err)
}
params.RPCFactory = rpc.NewFactory(params.Logger, rpcParams)
dispatcher := params.RPCFactory.GetDispatcher()

params.MembershipFactory, err = s.cfg.Ringpop.NewFactory(
params.RPCFactory.GetDispatcher(),
dispatcher,
params.Name,
params.Logger,
)
Expand All @@ -175,10 +175,6 @@ func (s *server) startService() common.Daemon {
clusterGroupMetadata.ClusterGroup,
)

if len(s.cfg.PublicClient.HostPort) == 0 {
log.Fatalf("need to provide an endpoint config for PublicClient")
}

params.DispatcherProvider = rpc.NewDispatcherProvider(params.Logger, rpc.NewDNSPeerChooserFactory(s.cfg.PublicClient.RefreshInterval, params.Logger))

advancedVisMode := dc.GetStringProperty(
Expand Down Expand Up @@ -215,22 +211,7 @@ func (s *server) startService() common.Daemon {
}
}

var options *rpc.DispatcherOptions
if s.cfg.Authorization.OAuthAuthorizer.Enable {
clusterName := s.cfg.ClusterGroupMetadata.CurrentClusterName
authProvider, err := authorization.GetAuthProviderClient(s.cfg.ClusterGroupMetadata.ClusterGroup[clusterName].AuthorizationProvider.PrivateKey)
if err != nil {
log.Fatalf("failed to create AuthProvider: %v", err.Error())
}
options = &rpc.DispatcherOptions{
AuthProvider: authProvider,
}
}
dispatcher, err := params.DispatcherProvider.GetTChannel(service.Frontend, s.cfg.PublicClient.HostPort, options)
if err != nil {
log.Fatalf("failed to construct dispatcher: %v", err)
}
params.PublicClient = workflowserviceclient.New(dispatcher.ClientConfig(service.Frontend))
params.PublicClient = workflowserviceclient.New(dispatcher.ClientConfig(rpc.OutboundPublicClient))

params.ArchivalMetadata = archiver.NewArchivalMetadata(
dc,
Expand Down
45 changes: 45 additions & 0 deletions common/rpc/outbounds.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,57 @@
package rpc

import (
"fmt"

"github.com/uber/cadence/common/authorization"
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/service"

"go.uber.org/yarpc"
"go.uber.org/yarpc/api/middleware"
"go.uber.org/yarpc/transport/grpc"
"go.uber.org/yarpc/transport/tchannel"
)

const (
// OutboundPublicClient is the name of configured public client outbound
OutboundPublicClient = "public-client"
)

// OutboundsBuilder allows defining outbounds for the dispatcher
type OutboundsBuilder interface {
Build(*grpc.Transport, *tchannel.Transport) (yarpc.Outbounds, error)
}

type publicClientOutbound struct {
address string
authMiddleware middleware.UnaryOutbound
}

func newPublicClientOutbound(config *config.Config) (publicClientOutbound, error) {
if len(config.PublicClient.HostPort) == 0 {
return publicClientOutbound{}, fmt.Errorf("need to provide an endpoint config for PublicClient")
}

var authMiddleware middleware.UnaryOutbound
if config.Authorization.OAuthAuthorizer.Enable {
clusterName := config.ClusterGroupMetadata.CurrentClusterName
clusterInfo := config.ClusterGroupMetadata.ClusterGroup[clusterName]
authProvider, err := authorization.GetAuthProviderClient(clusterInfo.AuthorizationProvider.PrivateKey)
if err != nil {
return publicClientOutbound{}, fmt.Errorf("create AuthProvider: %v", err)
}
authMiddleware = &authOutboundMiddleware{authProvider}
}

return publicClientOutbound{config.PublicClient.HostPort, authMiddleware}, nil
}

func (b publicClientOutbound) Build(_ *grpc.Transport, tchannel *tchannel.Transport) (yarpc.Outbounds, error) {
return yarpc.Outbounds{
OutboundPublicClient: {
ServiceName: service.Frontend,
Unary: middleware.ApplyUnaryOutbound(tchannel.NewSingleOutbound(b.address), b.authMiddleware),
},
}, nil
}
91 changes: 91 additions & 0 deletions common/rpc/outbounds_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package rpc

import (
"io/ioutil"
"testing"

"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/service"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/yarpc/transport/grpc"
"go.uber.org/yarpc/transport/tchannel"
)

func TestPublicClientOutbound(t *testing.T) {
makeConfig := func(hostPort string, enableAuth bool, keyPath string) *config.Config {
return &config.Config{
PublicClient: config.PublicClient{HostPort: hostPort},
Authorization: config.Authorization{OAuthAuthorizer: config.OAuthAuthorizer{Enable: enableAuth}},
ClusterGroupMetadata: &config.ClusterGroupMetadata{
CurrentClusterName: "cluster-A",
ClusterGroup: map[string]config.ClusterInformation{
"cluster-A": {
AuthorizationProvider: config.AuthorizationProvider{
PrivateKey: keyPath,
},
},
},
},
}
}

_, err := newPublicClientOutbound(&config.Config{})
require.EqualError(t, err, "need to provide an endpoint config for PublicClient")

builder, err := newPublicClientOutbound(makeConfig("localhost:1234", false, ""))
require.NoError(t, err)
require.NotNil(t, builder)
require.Equal(t, "localhost:1234", builder.address)
require.Equal(t, nil, builder.authMiddleware)

builder, err = newPublicClientOutbound(makeConfig("localhost:1234", true, "invalid"))
require.EqualError(t, err, "create AuthProvider: invalid private key path invalid")

builder, err = newPublicClientOutbound(makeConfig("localhost:1234", true, tempFile(t, "private-key")))
require.NoError(t, err)
require.NotNil(t, builder)
require.Equal(t, "localhost:1234", builder.address)
require.NotNil(t, builder.authMiddleware)

grpc := &grpc.Transport{}
tchannel := &tchannel.Transport{}
outbounds, err := builder.Build(grpc, tchannel)
require.NoError(t, err)
assert.Equal(t, outbounds[OutboundPublicClient].ServiceName, service.Frontend)
assert.NotNil(t, outbounds[OutboundPublicClient].Unary)
}

func tempFile(t *testing.T, content string) string {
f, err := ioutil.TempFile("", "")
require.NoError(t, err)

f.Write([]byte(content))
require.NoError(t, err)

err = f.Close()
require.NoError(t, err)

return f.Name()
}
9 changes: 8 additions & 1 deletion common/rpc/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,21 @@ func NewParams(serviceName string, config *config.Config) (Params, error) {

listenIP, err := getListenIP(serviceConfig.RPC)
if err != nil {
return Params{}, fmt.Errorf("failed to get listen IP: %v", err)
return Params{}, fmt.Errorf("get listen IP: %v", err)
}

publicClientOutbound, err := newPublicClientOutbound(config)
if err != nil {
return Params{}, fmt.Errorf("public client outbound: %v", err)
}

return Params{
ServiceName: serviceName,
TChannelAddress: fmt.Sprintf("%v:%v", listenIP, serviceConfig.RPC.Port),
GRPCAddress: fmt.Sprintf("%v:%v", listenIP, serviceConfig.RPC.GRPCPort),
GRPCMaxMsgSize: serviceConfig.RPC.GRPCMaxMsgSize,
HostAddressMapper: NewGRPCPorts(config),
OutboundsBuilder: publicClientOutbound,
}, nil
}

Expand Down
11 changes: 8 additions & 3 deletions common/rpc/params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,22 @@ import (
func TestNewParams(t *testing.T) {
serviceName := service.Frontend
makeConfig := func(svc config.Service) *config.Config {
return &config.Config{Services: map[string]config.Service{"frontend": svc}}
return &config.Config{
PublicClient: config.PublicClient{HostPort: "localhost:9999"},
Services: map[string]config.Service{"frontend": svc}}
}

_, err := NewParams(serviceName, &config.Config{})
assert.EqualError(t, err, "no config section for service: frontend")

_, err = NewParams(serviceName, makeConfig(config.Service{RPC: config.RPC{BindOnLocalHost: true, BindOnIP: "1.2.3.4"}}))
assert.EqualError(t, err, "failed to get listen IP: bindOnLocalHost and bindOnIP are mutually exclusive")
assert.EqualError(t, err, "get listen IP: bindOnLocalHost and bindOnIP are mutually exclusive")

_, err = NewParams(serviceName, makeConfig(config.Service{RPC: config.RPC{BindOnIP: "invalidIP"}}))
assert.EqualError(t, err, "failed to get listen IP: unable to parse bindOnIP value or it is not an IPv4 address: invalidIP")
assert.EqualError(t, err, "get listen IP: unable to parse bindOnIP value or it is not an IPv4 address: invalidIP")

_, err = NewParams(serviceName, &config.Config{Services: map[string]config.Service{"frontend": {}}})
assert.EqualError(t, err, "public client outbound: need to provide an endpoint config for PublicClient")

params, err := NewParams(serviceName, makeConfig(config.Service{RPC: config.RPC{BindOnLocalHost: true, Port: 1111, GRPCPort: 2222, GRPCMaxMsgSize: 3333}}))
assert.NoError(t, err)
Expand Down

0 comments on commit b1e3001

Please sign in to comment.