Skip to content

Commit

Permalink
Use named port to select transport for outbound calls (cadence-workfl…
Browse files Browse the repository at this point in the history
  • Loading branch information
mantas-sidlauskas authored Mar 3, 2022
1 parent f1a0983 commit 4dab59a
Show file tree
Hide file tree
Showing 29 changed files with 367 additions and 371 deletions.
17 changes: 7 additions & 10 deletions client/clientfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,18 +104,17 @@ func (cf *rpcClientFactory) NewMatchingClient(domainIDToName DomainIDToNameFunc)

func (cf *rpcClientFactory) NewHistoryClientWithTimeout(timeout time.Duration) (history.Client, error) {
var rawClient history.Client
var addressMapper history.AddressMapperFn
var namedPort = membership.PortTchannel

outboundConfig := cf.rpcFactory.GetDispatcher().ClientConfig(service.History)
if rpc.IsGRPCOutbound(outboundConfig) {
rawClient = history.NewGRPCClient(historyv1.NewHistoryAPIYARPCClient(outboundConfig))
addressMapper = func(address string) (string, error) {
return cf.rpcFactory.ReplaceGRPCPort(service.History, address)
}
namedPort = membership.PortGRPC
} else {
rawClient = history.NewThriftClient(historyserviceclient.New(outboundConfig))
}

peerResolver := history.NewPeerResolver(cf.numberOfHistoryShards, cf.resolver, addressMapper)
peerResolver := history.NewPeerResolver(cf.numberOfHistoryShards, cf.resolver, namedPort)

supportedMessageSize := cf.rpcFactory.GetMaxMessageSize()
maxSizeConfig := cf.dynConfig.GetIntProperty(dynamicconfig.GRPCMaxSizeInByte, supportedMessageSize)
Expand Down Expand Up @@ -149,18 +148,16 @@ func (cf *rpcClientFactory) NewMatchingClientWithTimeout(
longPollTimeout time.Duration,
) (matching.Client, error) {
var rawClient matching.Client
var addressMapper matching.AddressMapperFn
var namedPort = membership.PortTchannel
outboundConfig := cf.rpcFactory.GetDispatcher().ClientConfig(service.Matching)
if rpc.IsGRPCOutbound(outboundConfig) {
rawClient = matching.NewGRPCClient(matchingv1.NewMatchingAPIYARPCClient(outboundConfig))
addressMapper = func(address string) (string, error) {
return cf.rpcFactory.ReplaceGRPCPort(service.Matching, address)
}
namedPort = membership.PortGRPC
} else {
rawClient = matching.NewThriftClient(matchingserviceclient.New(outboundConfig))
}

peerResolver := matching.NewPeerResolver(cf.resolver, addressMapper)
peerResolver := matching.NewPeerResolver(cf.resolver, namedPort)

client := matching.NewClient(
timeout,
Expand Down
20 changes: 9 additions & 11 deletions client/history/peerResolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,15 @@ import (
type PeerResolver struct {
numberOfShards int
resolver membership.Resolver
addressMapper AddressMapperFn
namedPort string // grpc or tchannel, depends on yarpc configuration
}

type AddressMapperFn func(string) (string, error)

// NewPeerResolver creates a new history peer resolver.
func NewPeerResolver(numberOfShards int, resolver membership.Resolver, addressMapper AddressMapperFn) PeerResolver {
func NewPeerResolver(numberOfShards int, resolver membership.Resolver, namedPort string) PeerResolver {
return PeerResolver{
numberOfShards: numberOfShards,
resolver: resolver,
addressMapper: addressMapper,
namedPort: namedPort,
}
}

Expand Down Expand Up @@ -71,15 +69,15 @@ func (pr PeerResolver) FromShardID(shardID int) (string, error) {
if err != nil {
return "", err
}
return pr.FromHostAddress(host.GetAddress())
return host.GetNamedAddress(pr.namedPort)
}

// FromHostAddress resolves the final history peer responsible for the given host address.
// The address may be used as is, or processed with additional address mapper.
// In case of gRPC transport, the port within the address is replaced with gRPC port.
// The address is formed by adding port for specified transport
func (pr PeerResolver) FromHostAddress(hostAddress string) (string, error) {
if pr.addressMapper == nil {
return hostAddress, nil
host, err := pr.resolver.LookupByAddress(service.History, hostAddress)
if err != nil {
return "", err
}
return pr.addressMapper(hostAddress)
return host.GetNamedAddress(pr.namedPort)
}
53 changes: 30 additions & 23 deletions client/history/peerResolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package history

import (
"errors"
"testing"

gomock "github.com/golang/mock/gomock"
Expand All @@ -35,45 +36,51 @@ func TestPeerResolver(t *testing.T) {
numShards := 123
controller := gomock.NewController(t)
serviceResolver := membership.NewMockResolver(controller)
serviceResolver.EXPECT().Lookup(service.History, string(rune(common.DomainIDToHistoryShard("domainID", numShards)))).Return(membership.NewHostInfo("domainHost:thriftPort"), nil)
serviceResolver.EXPECT().Lookup(service.History, string(rune(common.WorkflowIDToHistoryShard("workflowID", numShards)))).Return(membership.NewHostInfo("workflowHost:thriftPort"), nil)
serviceResolver.EXPECT().Lookup(service.History, string(rune(99))).Return(membership.NewHostInfo("shardHost:thriftPort"), nil)
serviceResolver.EXPECT().Lookup(
service.History, string(rune(common.DomainIDToHistoryShard("domainID", numShards)))).Return(
membership.NewDetailedHostInfo(
"domainHost:123",
"domainHost_123",
membership.PortMap{membership.PortTchannel: 1234}),
nil)
serviceResolver.EXPECT().Lookup(service.History, string(rune(common.WorkflowIDToHistoryShard("workflowID", numShards)))).Return(
membership.NewDetailedHostInfo(
"workflowHost:123",
"workflow",
membership.PortMap{membership.PortTchannel: 1235, membership.PortGRPC: 1666}), nil)

serviceResolver.EXPECT().Lookup(service.History, string(rune(99))).Return(
membership.NewDetailedHostInfo(
"shardHost:123",
"shard_123",
membership.PortMap{membership.PortTchannel: 1235}),
nil)

serviceResolver.EXPECT().LookupByAddress(service.History, "invalid address").Return(
membership.HostInfo{},
errors.New("host not found"),
)

serviceResolver.EXPECT().Lookup(service.History, string(rune(11))).Return(membership.HostInfo{}, assert.AnError)

r := NewPeerResolver(numShards, serviceResolver, fakeAddressMapper)
r := NewPeerResolver(numShards, serviceResolver, membership.PortTchannel)

peer, err := r.FromDomainID("domainID")
assert.NoError(t, err)
assert.Equal(t, "domainHost:grpcPort", peer)
assert.Equal(t, "domainHost:1234", peer)

peer, err = r.FromWorkflowID("workflowID")
assert.NoError(t, err)
assert.Equal(t, "workflowHost:grpcPort", peer)
assert.Equal(t, "workflowHost:1235", peer)

peer, err = r.FromShardID(99)
assert.NoError(t, err)
assert.Equal(t, "shardHost:grpcPort", peer)
assert.Equal(t, "shardHost:1235", peer)

_, err = r.FromShardID(11)
assert.Error(t, err)

_, err = r.FromHostAddress("invalid address")
assert.Error(t, err)

r = NewPeerResolver(numShards, nil, nil)
peer, err = r.FromHostAddress("no mapper")
assert.NoError(t, err)
assert.Equal(t, "no mapper", peer)
}

func fakeAddressMapper(address string) (string, error) {
switch address {
case "domainHost:thriftPort":
return "domainHost:grpcPort", nil
case "workflowHost:thriftPort":
return "workflowHost:grpcPort", nil
case "shardHost:thriftPort":
return "shardHost:grpcPort", nil
}
return "", assert.AnError
}
22 changes: 13 additions & 9 deletions client/matching/peerResolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,16 @@ import (
// Those are deployed instances of Cadence matching services that participate in the cluster ring.
// The resulting peer is simply an address of form ip:port where RPC calls can be routed to.
type PeerResolver struct {
resolver membership.Resolver
addressMapper AddressMapperFn
resolver membership.Resolver
namedPort string // grpc or tchannel, depends on yarpc configuration
}

type AddressMapperFn func(string) (string, error)

// NewPeerResolver creates a new matching peer resolver.
func NewPeerResolver(membership membership.Resolver, addressMapper AddressMapperFn) PeerResolver {
return PeerResolver{membership, addressMapper}
func NewPeerResolver(membership membership.Resolver, namedPort string) PeerResolver {
return PeerResolver{
resolver: membership,
namedPort: namedPort,
}
}

// FromTaskList resolves the matching peer responsible for the given task list name.
Expand Down Expand Up @@ -73,8 +74,11 @@ func (pr PeerResolver) GetAllPeers() ([]string, error) {
// The address may be used as is, or processed with additional address mapper.
// In case of gRPC transport, the port within the address is replaced with gRPC port.
func (pr PeerResolver) FromHostAddress(hostAddress string) (string, error) {
if pr.addressMapper == nil {
return hostAddress, nil
host, err := pr.resolver.LookupByAddress(service.Matching, hostAddress)
if err != nil {
return "", err
}
return pr.addressMapper(hostAddress)

return host.GetNamedAddress(pr.namedPort)

}
54 changes: 34 additions & 20 deletions client/matching/peerResolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,46 @@ import (
func TestPeerResolver(t *testing.T) {
controller := gomock.NewController(t)
serviceResolver := membership.NewMockResolver(controller)
serviceResolver.EXPECT().Lookup(service.Matching, "taskListA").Return(membership.NewHostInfo("taskListA:thriftPort"), nil)
host1 := membership.NewDetailedHostInfo(
"tasklistHost:1234",
"tasklistHost_1234",
membership.PortMap{
membership.PortTchannel: 1234,
membership.PortGRPC: 1244,
},
)
host2 := membership.NewDetailedHostInfo(
"tasklistHost2:1235",
"tasklistHost2_1235",
membership.PortMap{
membership.PortTchannel: 1235,
membership.PortGRPC: 1245,
},
)
serviceResolver.EXPECT().Lookup(service.Matching, "taskListA").Return(
host1,
nil)
serviceResolver.EXPECT().Lookup(service.Matching, "invalid").Return(membership.HostInfo{}, assert.AnError)
serviceResolver.EXPECT().LookupByAddress(service.Matching, "invalid address").Return(membership.HostInfo{}, assert.AnError)

serviceResolver.EXPECT().Members(service.Matching).Return([]membership.HostInfo{
membership.NewHostInfo("taskListA:thriftPort"),
membership.NewHostInfo("taskListB:thriftPort"),
host1,
host2,
}, nil)

r := NewPeerResolver(serviceResolver, fakeAddressMapper)
serviceResolver.EXPECT().LookupByAddress(service.Matching, "tasklistHost2:1235").Return(
host2,
nil,
).AnyTimes()
serviceResolver.EXPECT().LookupByAddress(service.Matching, "tasklistHost:1234").Return(
host1,
nil,
).AnyTimes()
r := NewPeerResolver(serviceResolver, membership.PortGRPC)

peer, err := r.FromTaskList("taskListA")
assert.NoError(t, err)
assert.Equal(t, "taskListA:grpcPort", peer)
assert.Equal(t, "tasklistHost:1244", peer)

_, err = r.FromTaskList("invalid")
assert.Error(t, err)
Expand All @@ -54,20 +82,6 @@ func TestPeerResolver(t *testing.T) {

peers, err := r.GetAllPeers()
assert.NoError(t, err)
assert.Equal(t, []string{"taskListA:grpcPort", "taskListB:grpcPort"}, peers)

r = NewPeerResolver(nil, nil)
peer, err = r.FromHostAddress("no mapper")
assert.NoError(t, err)
assert.Equal(t, "no mapper", peer)
}
assert.Equal(t, []string{"tasklistHost:1244", "tasklistHost2:1245"}, peers)

func fakeAddressMapper(address string) (string, error) {
switch address {
case "taskListA:thriftPort":
return "taskListA:grpcPort", nil
case "taskListB:thriftPort":
return "taskListB:grpcPort", nil
}
return "", assert.AnError
}
4 changes: 2 additions & 2 deletions cmd/server/cadence/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ func (s *server) startService() common.Daemon {
if err != nil {
log.Fatal("failed to create the zap logger, err: ", err.Error())
}
params.Logger = loggerimpl.NewLogger(zapLogger)
params.UpdateLoggerWithServiceName(params.Name)
params.Logger = loggerimpl.NewLogger(zapLogger).WithTags(tag.Service(params.Name))

params.PersistenceConfig = s.cfg.Persistence

err = nil
Expand Down
42 changes: 41 additions & 1 deletion common/membership/hostinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ package membership

import (
"fmt"
"net"
"strconv"
"strings"
)

Expand All @@ -37,15 +39,18 @@ type PortMap map[string]uint16

// HostInfo is a type that contains the info about a cadence host
type HostInfo struct {
addr string // ip:port
addr string // ip:port returned by peer provider
ip string // @todo should we set this to net.IP ?
identity string
portMap PortMap // ports host is listening to
}

// NewHostInfo creates a new HostInfo instance
func NewHostInfo(addr string) HostInfo {
ip, _, _ := net.SplitHostPort(addr)
return HostInfo{
addr: addr,
ip: ip,
}
}

Expand All @@ -60,8 +65,10 @@ func (m PortMap) String() string {

// NewDetailedHostInfo creates a new HostInfo instance with identity and portmap information
func NewDetailedHostInfo(addr string, identity string, portMap PortMap) HostInfo {
ip, _, _ := net.SplitHostPort(addr)
return HostInfo{
addr: addr,
ip: ip,
identity: identity,
portMap: portMap,
}
Expand All @@ -72,6 +79,39 @@ func (hi HostInfo) GetAddress() string {
return hi.addr
}

// GetNamedAddress returns the ip:port address
func (hi HostInfo) GetNamedAddress(port string) (string, error) {
if port, set := hi.portMap[port]; set {
return net.JoinHostPort(hi.ip, strconv.Itoa(int(port))), nil
}

return "", fmt.Errorf("port %q is not set for %+v", port, hi)
}

// Belongs tells if ip:port is assigned to this member
func (hi HostInfo) Belongs(address string) (bool, error) {

if hi.addr == address {
return true, nil
}

ip, port, err := net.SplitHostPort(address)
if err != nil {
return false, err
}

if ip != hi.ip {
return false, nil
}

for _, number := range hi.portMap {
if port == strconv.Itoa(int(number)) {
return true, nil
}
}
return false, nil
}

// Identity implements ringpop's Membership interface
func (hi HostInfo) Identity() string {
// if identity is not set, return address
Expand Down
Loading

0 comments on commit 4dab59a

Please sign in to comment.