From 9a072ca10dc0f6ce27e7f6f169b9d97541411181 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mantas=20=C5=A0idlauskas?= Date: Mon, 25 Oct 2021 18:27:55 +0300 Subject: [PATCH] Provide Channel for Ringpop (#4597) --- cmd/server/cadence/server.go | 9 ++++---- common/config/ringpop.go | 41 ++++++++++-------------------------- common/rpc/factory.go | 9 +++++++- 3 files changed, 23 insertions(+), 36 deletions(-) diff --git a/cmd/server/cadence/server.go b/cmd/server/cadence/server.go index 507e13116d6..094af808521 100644 --- a/cmd/server/cadence/server.go +++ b/cmd/server/cadence/server.go @@ -153,11 +153,10 @@ func (s *server) startService() common.Daemon { rpcParams.OutboundsBuilder, rpc.NewCrossDCOutbounds(clusterGroupMetadata.ClusterGroup, rpc.NewDNSPeerChooserFactory(s.cfg.PublicClient.RefreshInterval, params.Logger)), ) - params.RPCFactory = rpc.NewFactory(params.Logger, rpcParams) - dispatcher := params.RPCFactory.GetDispatcher() - + rpcFactory := rpc.NewFactory(params.Logger, rpcParams) + params.RPCFactory = rpcFactory params.MembershipFactory, err = s.cfg.Ringpop.NewFactory( - dispatcher, + rpcFactory.GetChannel(), params.Name, params.Logger, ) @@ -213,7 +212,7 @@ func (s *server) startService() common.Daemon { } } - params.PublicClient = workflowserviceclient.New(dispatcher.ClientConfig(rpc.OutboundPublicClient)) + params.PublicClient = workflowserviceclient.New(params.RPCFactory.GetDispatcher().ClientConfig(rpc.OutboundPublicClient)) params.ArchivalMetadata = archiver.NewArchivalMetadata( dc, diff --git a/common/config/ringpop.go b/common/config/ringpop.go index 9d5f7b8ce83..31b50b666aa 100644 --- a/common/config/ringpop.go +++ b/common/config/ringpop.go @@ -25,19 +25,18 @@ import ( "errors" "fmt" "net" - "reflect" "strings" "sync" "time" + "go.uber.org/yarpc/transport/tchannel" + "github.com/uber/ringpop-go" "github.com/uber/ringpop-go/discovery" "github.com/uber/ringpop-go/discovery/jsonfile" "github.com/uber/ringpop-go/discovery/statichosts" "github.com/uber/ringpop-go/swim" tcg "github.com/uber/tchannel-go" - "go.uber.org/yarpc" - "go.uber.org/yarpc/transport/tchannel" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" @@ -66,7 +65,7 @@ const ( // RingpopFactory implements the RingpopFactory interface type RingpopFactory struct { config *Ringpop - dispatcher *yarpc.Dispatcher + channel tchannel.Channel serviceName string logger log.Logger @@ -78,12 +77,12 @@ type RingpopFactory struct { // NewFactory builds a ringpop factory conforming // to the underlying configuration func (rpConfig *Ringpop) NewFactory( - dispatcher *yarpc.Dispatcher, + channel tchannel.Channel, serviceName string, logger log.Logger, ) (*RingpopFactory, error) { - return newRingpopFactory(rpConfig, dispatcher, serviceName, logger) + return newRingpopFactory(rpConfig, channel, serviceName, logger) } func (rpConfig *Ringpop) validate() error { @@ -151,7 +150,7 @@ func validateBootstrapMode( func newRingpopFactory( rpConfig *Ringpop, - dispatcher *yarpc.Dispatcher, + channel tchannel.Channel, serviceName string, logger log.Logger, ) (*RingpopFactory, error) { @@ -164,7 +163,7 @@ func newRingpopFactory( } return &RingpopFactory{ config: rpConfig, - dispatcher: dispatcher, + channel: channel, serviceName: serviceName, logger: logger, }, nil @@ -216,14 +215,10 @@ func (factory *RingpopFactory) getRingpop() (*membership.RingPop, error) { } func (factory *RingpopFactory) createRingpop() (*membership.RingPop, error) { - - var ch *tcg.Channel - var err error - if ch, err = factory.getChannel(factory.dispatcher); err != nil { - return nil, err - } - - rp, err := ringpop.New(factory.config.Name, ringpop.Channel(ch)) + rp, err := ringpop.New( + factory.config.Name, + ringpop.Channel(factory.channel.(*tcg.Channel)), + ) if err != nil { return nil, err } @@ -239,20 +234,6 @@ func (factory *RingpopFactory) createRingpop() (*membership.RingPop, error) { return membership.NewRingPop(rp, bootstrapOpts, factory.logger), nil } -func (factory *RingpopFactory) getChannel( - dispatcher *yarpc.Dispatcher, -) (*tcg.Channel, error) { - - t := dispatcher.Inbounds()[0].Transports()[0].(*tchannel.ChannelTransport) - ty := reflect.ValueOf(t.Channel()) - var ch *tcg.Channel - var ok bool - if ch, ok = ty.Interface().(*tcg.Channel); !ok { - return nil, errors.New("unable to get tchannel out of the dispatcher") - } - return ch, nil -} - type dnsHostResolver interface { LookupHost(ctx context.Context, host string) (addrs []string, err error) } diff --git a/common/rpc/factory.go b/common/rpc/factory.go index 1baccac4a21..ebff361a51c 100644 --- a/common/rpc/factory.go +++ b/common/rpc/factory.go @@ -41,6 +41,7 @@ type Factory struct { logger log.Logger hostAddressMapper HostAddressMapper tchannel *tchannel.Transport + channel tchannel.Channel grpc *grpc.Transport dispatcher *yarpc.Dispatcher } @@ -50,7 +51,7 @@ func NewFactory(logger log.Logger, p Params) *Factory { inbounds := yarpc.Inbounds{} // Create TChannel transport - // This is here only because ringpop extracts inbound from the dispatcher and expects tchannel.ChannelTransport, + // This is here only because ringpop expects tchannel.ChannelTransport, // everywhere else we use regular tchannel.Transport. ch, err := tchannel.NewChannelTransport( tchannel.ServiceName(p.ServiceName), @@ -107,6 +108,7 @@ func NewFactory(logger log.Logger, p Params) *Factory { tchannel: tchannel, grpc: grpc, dispatcher: dispatcher, + channel: ch.Channel(), } } @@ -115,6 +117,11 @@ func (d *Factory) GetDispatcher() *yarpc.Dispatcher { return d.dispatcher } +// GetChannel returns Tchannel Channel used by Ringpop +func (d *Factory) GetChannel() tchannel.Channel { + return d.channel +} + // CreateDispatcherForOutbound creates a dispatcher for outbound connection func (d *Factory) CreateDispatcherForOutbound( callerName string,