Skip to content

Commit

Permalink
Provide portmap to ringpop (cadence-workflow#4745)
Browse files Browse the repository at this point in the history
  • Loading branch information
mantas-sidlauskas authored Feb 22, 2022
1 parent 71c2774 commit 9b50717
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 34 deletions.
4 changes: 4 additions & 0 deletions cmd/server/cadence/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ func (s *server) startService() common.Daemon {
params.Name,
&s.cfg.Ringpop,
rpcFactory.GetChannel(),
membership.PortMap{
membership.PortGRPC: svcCfg.RPC.GRPCPort,
membership.PortTchannel: svcCfg.RPC.Port,
},
params.Logger,
)

Expand Down
4 changes: 2 additions & 2 deletions common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ type (
// RPC contains the rpc config items
RPC struct {
// Port is the port on which the Thrift TChannel will bind to
Port int `yaml:"port"`
Port uint16 `yaml:"port"`
// GRPCPort is the port on which the grpc listener will bind to
GRPCPort int `yaml:"grpcPort"`
GRPCPort uint16 `yaml:"grpcPort"`
// BindOnLocalHost is true if localhost is the bind address
BindOnLocalHost bool `yaml:"bindOnLocalHost"`
// BindOnIP can be used to bind service on specific ip (eg. `0.0.0.0`) -
Expand Down
5 changes: 5 additions & 0 deletions common/membership/hostinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ import (
"strings"
)

const (
PortTchannel = "tchannel"
PortGRPC = "grpc"
)

// PortMap is a map of port names to port numbers.
type PortMap map[string]uint16

Expand Down
46 changes: 20 additions & 26 deletions common/peerprovider/ringpopprovider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ package ringpopprovider

import (
"fmt"
"net"
"strconv"
"sync"
"sync/atomic"
Expand All @@ -46,13 +45,12 @@ import (
type (
// Provider use ringpop to announce membership changes
Provider struct {
status int32
service string
ringpop *ringpop.Ringpop
bootParams *swim.BootstrapOptions
logger log.Logger
channel tchannel.Channel

status int32
service string
ringpop *ringpop.Ringpop
bootParams *swim.BootstrapOptions
logger log.Logger
portmap membership.PortMap
mu sync.RWMutex
subscribers map[string]chan<- *membership.ChangedEvent
}
Expand All @@ -61,9 +59,7 @@ type (
const (
// roleKey label is set by every single service as soon as it bootstraps its
// ringpop instance. The data for this key is the service name
roleKey = "serviceName"
portTchannel = "tchannel"
portgRPC = "grpc"
roleKey = "serviceName"
)

var _ membership.PeerProvider = (*Provider)(nil)
Expand All @@ -72,6 +68,7 @@ func New(
service string,
config *Config,
channel tchannel.Channel,
portMap membership.PortMap,
logger log.Logger,
) (*Provider, error) {
if err := config.validate(); err != nil {
Expand All @@ -93,23 +90,23 @@ func New(
return nil, fmt.Errorf("ringpop instance creation: %w", err)
}

return NewRingpopProvider(service, rp, bootstrapOpts, channel, logger), nil
return NewRingpopProvider(service, rp, portMap, bootstrapOpts, logger), nil
}

// NewRingpopProvider sets up ringpop based peer provider
func NewRingpopProvider(
service string,
rp *ringpop.Ringpop,
portMap membership.PortMap,
bootstrapOpts *swim.BootstrapOptions,
channel tchannel.Channel,
logger log.Logger,
) *Provider {
return &Provider{
service: service,
status: common.DaemonStatusInitialized,
bootParams: bootstrapOpts,
logger: logger,
channel: channel,
portmap: portMap,
ringpop: rp,
subscribers: map[string]chan<- *membership.ChangedEvent{},
}
Expand Down Expand Up @@ -138,14 +135,11 @@ func (r *Provider) Start() {
r.logger.Fatal("unable to get ring pop labels", tag.Error(err))
}

// set tchannel port to labels
_, port, err := net.SplitHostPort(r.channel.PeerInfo().HostPort)
if err != nil {
r.logger.Fatal("unable get tchannel port", tag.Error(err))
}

if err = labels.Set(portTchannel, port); err != nil {
r.logger.Fatal("unable to set ringpop tchannel label", tag.Error(err))
// set port labels
for name, port := range r.portmap {
if err = labels.Set(name, strconv.Itoa(int(port))); err != nil {
r.logger.Fatal("unable to set port label", tag.Error(err))
}
}

if err = labels.Set(roleKey, r.service); err != nil {
Expand Down Expand Up @@ -200,21 +194,21 @@ func (r *Provider) GetMembers(service string) ([]membership.HostInfo, error) {
return false
}

if v, ok := member.Label(portTchannel); ok {
if v, ok := member.Label(membership.PortTchannel); ok {
port, err := labelToPort(v)
if err != nil {
r.logger.Warn("tchannel port cannot be converted", tag.Error(err), tag.Value(v))
} else {
portMap[portTchannel] = port
portMap[membership.PortTchannel] = port
}
}

if v, ok := member.Label(portgRPC); ok {
if v, ok := member.Label(membership.PortGRPC); ok {
port, err := labelToPort(v)
if err != nil {
r.logger.Warn("grpc port cannot be converted", tag.Error(err), tag.Value(v))
} else {
portMap[portgRPC] = port
portMap[membership.PortGRPC] = port
}
}

Expand Down
3 changes: 2 additions & 1 deletion common/peerprovider/ringpopprovider/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

"github.com/uber/cadence/common/log/loggerimpl"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/membership"
)

type HostInfo struct {
Expand Down Expand Up @@ -99,7 +100,7 @@ func NewTestRingpopCluster(ringPopApp string, size int, ipAddr string, seed stri
return nil
}

NewRingpopProvider(ringPopApp, ringPop, bOptions, cluster.channels[i], logger)
NewRingpopProvider(ringPopApp, ringPop, membership.PortMap{}, bOptions, logger)

}
return cluster
Expand Down
6 changes: 3 additions & 3 deletions common/rpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ type (
GetGRPCAddress(service, hostAddress string) (string, error)
}

GRPCPorts map[string]int
GRPCPorts map[string]uint16
)

func NewGRPCPorts(c *config.Config) GRPCPorts {
grpcPorts := map[string]int{}
grpcPorts := map[string]uint16{}
for name, config := range c.Services {
grpcPorts[service.FullName(name)] = config.RPC.GRPCPort
}
Expand All @@ -59,5 +59,5 @@ func (p GRPCPorts) GetGRPCAddress(service, hostAddress string) (string, error) {
hostAddress = newHostAddress
}

return net.JoinHostPort(hostAddress, strconv.Itoa(port)), nil
return net.JoinHostPort(hostAddress, strconv.Itoa(int(port))), nil
}
4 changes: 2 additions & 2 deletions common/rpc/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ func NewParams(serviceName string, config *config.Config, dc *dynamicconfig.Coll

return Params{
ServiceName: serviceName,
TChannelAddress: net.JoinHostPort(listenIP.String(), strconv.Itoa(serviceConfig.RPC.Port)),
GRPCAddress: net.JoinHostPort(listenIP.String(), strconv.Itoa(serviceConfig.RPC.GRPCPort)),
TChannelAddress: net.JoinHostPort(listenIP.String(), strconv.Itoa(int(serviceConfig.RPC.Port))),
GRPCAddress: net.JoinHostPort(listenIP.String(), strconv.Itoa(int(serviceConfig.RPC.GRPCPort))),
GRPCMaxMsgSize: serviceConfig.RPC.GRPCMaxMsgSize,
HostAddressMapper: NewGRPCPorts(config),
OutboundsBuilder: CombineOutbounds(
Expand Down

0 comments on commit 9b50717

Please sign in to comment.