Skip to content

Commit

Permalink
Provide Channel for Ringpop (cadence-workflow#4597)
Browse files Browse the repository at this point in the history
  • Loading branch information
mantas-sidlauskas authored Oct 25, 2021
1 parent f182b87 commit 9a072ca
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 36 deletions.
9 changes: 4 additions & 5 deletions cmd/server/cadence/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,10 @@ func (s *server) startService() common.Daemon {
rpcParams.OutboundsBuilder,
rpc.NewCrossDCOutbounds(clusterGroupMetadata.ClusterGroup, rpc.NewDNSPeerChooserFactory(s.cfg.PublicClient.RefreshInterval, params.Logger)),
)
params.RPCFactory = rpc.NewFactory(params.Logger, rpcParams)
dispatcher := params.RPCFactory.GetDispatcher()

rpcFactory := rpc.NewFactory(params.Logger, rpcParams)
params.RPCFactory = rpcFactory
params.MembershipFactory, err = s.cfg.Ringpop.NewFactory(
dispatcher,
rpcFactory.GetChannel(),
params.Name,
params.Logger,
)
Expand Down Expand Up @@ -213,7 +212,7 @@ func (s *server) startService() common.Daemon {
}
}

params.PublicClient = workflowserviceclient.New(dispatcher.ClientConfig(rpc.OutboundPublicClient))
params.PublicClient = workflowserviceclient.New(params.RPCFactory.GetDispatcher().ClientConfig(rpc.OutboundPublicClient))

params.ArchivalMetadata = archiver.NewArchivalMetadata(
dc,
Expand Down
41 changes: 11 additions & 30 deletions common/config/ringpop.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,18 @@ import (
"errors"
"fmt"
"net"
"reflect"
"strings"
"sync"
"time"

"go.uber.org/yarpc/transport/tchannel"

"github.com/uber/ringpop-go"
"github.com/uber/ringpop-go/discovery"
"github.com/uber/ringpop-go/discovery/jsonfile"
"github.com/uber/ringpop-go/discovery/statichosts"
"github.com/uber/ringpop-go/swim"
tcg "github.com/uber/tchannel-go"
"go.uber.org/yarpc"
"go.uber.org/yarpc/transport/tchannel"

"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
Expand Down Expand Up @@ -66,7 +65,7 @@ const (
// RingpopFactory implements the RingpopFactory interface
type RingpopFactory struct {
config *Ringpop
dispatcher *yarpc.Dispatcher
channel tchannel.Channel
serviceName string
logger log.Logger

Expand All @@ -78,12 +77,12 @@ type RingpopFactory struct {
// NewFactory builds a ringpop factory conforming
// to the underlying configuration
func (rpConfig *Ringpop) NewFactory(
dispatcher *yarpc.Dispatcher,
channel tchannel.Channel,
serviceName string,
logger log.Logger,
) (*RingpopFactory, error) {

return newRingpopFactory(rpConfig, dispatcher, serviceName, logger)
return newRingpopFactory(rpConfig, channel, serviceName, logger)
}

func (rpConfig *Ringpop) validate() error {
Expand Down Expand Up @@ -151,7 +150,7 @@ func validateBootstrapMode(

func newRingpopFactory(
rpConfig *Ringpop,
dispatcher *yarpc.Dispatcher,
channel tchannel.Channel,
serviceName string,
logger log.Logger,
) (*RingpopFactory, error) {
Expand All @@ -164,7 +163,7 @@ func newRingpopFactory(
}
return &RingpopFactory{
config: rpConfig,
dispatcher: dispatcher,
channel: channel,
serviceName: serviceName,
logger: logger,
}, nil
Expand Down Expand Up @@ -216,14 +215,10 @@ func (factory *RingpopFactory) getRingpop() (*membership.RingPop, error) {
}

func (factory *RingpopFactory) createRingpop() (*membership.RingPop, error) {

var ch *tcg.Channel
var err error
if ch, err = factory.getChannel(factory.dispatcher); err != nil {
return nil, err
}

rp, err := ringpop.New(factory.config.Name, ringpop.Channel(ch))
rp, err := ringpop.New(
factory.config.Name,
ringpop.Channel(factory.channel.(*tcg.Channel)),
)
if err != nil {
return nil, err
}
Expand All @@ -239,20 +234,6 @@ func (factory *RingpopFactory) createRingpop() (*membership.RingPop, error) {
return membership.NewRingPop(rp, bootstrapOpts, factory.logger), nil
}

func (factory *RingpopFactory) getChannel(
dispatcher *yarpc.Dispatcher,
) (*tcg.Channel, error) {

t := dispatcher.Inbounds()[0].Transports()[0].(*tchannel.ChannelTransport)
ty := reflect.ValueOf(t.Channel())
var ch *tcg.Channel
var ok bool
if ch, ok = ty.Interface().(*tcg.Channel); !ok {
return nil, errors.New("unable to get tchannel out of the dispatcher")
}
return ch, nil
}

type dnsHostResolver interface {
LookupHost(ctx context.Context, host string) (addrs []string, err error)
}
Expand Down
9 changes: 8 additions & 1 deletion common/rpc/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Factory struct {
logger log.Logger
hostAddressMapper HostAddressMapper
tchannel *tchannel.Transport
channel tchannel.Channel
grpc *grpc.Transport
dispatcher *yarpc.Dispatcher
}
Expand All @@ -50,7 +51,7 @@ func NewFactory(logger log.Logger, p Params) *Factory {
inbounds := yarpc.Inbounds{}

// Create TChannel transport
// This is here only because ringpop extracts inbound from the dispatcher and expects tchannel.ChannelTransport,
// This is here only because ringpop expects tchannel.ChannelTransport,
// everywhere else we use regular tchannel.Transport.
ch, err := tchannel.NewChannelTransport(
tchannel.ServiceName(p.ServiceName),
Expand Down Expand Up @@ -107,6 +108,7 @@ func NewFactory(logger log.Logger, p Params) *Factory {
tchannel: tchannel,
grpc: grpc,
dispatcher: dispatcher,
channel: ch.Channel(),
}
}

Expand All @@ -115,6 +117,11 @@ func (d *Factory) GetDispatcher() *yarpc.Dispatcher {
return d.dispatcher
}

// GetChannel returns Tchannel Channel used by Ringpop
func (d *Factory) GetChannel() tchannel.Channel {
return d.channel
}

// CreateDispatcherForOutbound creates a dispatcher for outbound connection
func (d *Factory) CreateDispatcherForOutbound(
callerName string,
Expand Down

0 comments on commit 9a072ca

Please sign in to comment.