Skip to content

Commit

Permalink
Merge membership Monitor and ServiceResolver to membership.Resolver (c…
Browse files Browse the repository at this point in the history
  • Loading branch information
mantas-sidlauskas authored Nov 19, 2021
1 parent 0aa7494 commit 3557eb5
Show file tree
Hide file tree
Showing 39 changed files with 884 additions and 1,175 deletions.
18 changes: 5 additions & 13 deletions client/clientfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type (

rpcClientFactory struct {
rpcFactory common.RPCFactory
monitor membership.Monitor
resolver membership.Resolver
metricsClient metrics.Client
dynConfig *dynamicconfig.Collection
numberOfHistoryShards int
Expand All @@ -78,15 +78,15 @@ type (
// NewRPCClientFactory creates an instance of client factory that knows how to dispatch RPC calls.
func NewRPCClientFactory(
rpcFactory common.RPCFactory,
monitor membership.Monitor,
resolver membership.Resolver,
metricsClient metrics.Client,
dc *dynamicconfig.Collection,
numberOfHistoryShards int,
logger log.Logger,
) Factory {
return &rpcClientFactory{
rpcFactory: rpcFactory,
monitor: monitor,
resolver: resolver,
metricsClient: metricsClient,
dynConfig: dc,
numberOfHistoryShards: numberOfHistoryShards,
Expand Down Expand Up @@ -115,11 +115,7 @@ func (cf *rpcClientFactory) NewHistoryClientWithTimeout(timeout time.Duration) (
rawClient = history.NewThriftClient(historyserviceclient.New(outboundConfig))
}

resolver, err := cf.monitor.GetResolver(service.History)
if err != nil {
return nil, err
}
peerResolver := history.NewPeerResolver(cf.numberOfHistoryShards, resolver, addressMapper)
peerResolver := history.NewPeerResolver(cf.numberOfHistoryShards, cf.resolver, addressMapper)

supportedMessageSize := cf.rpcFactory.GetMaxMessageSize()
maxSizeConfig := cf.dynConfig.GetIntProperty(dynamicconfig.GRPCMaxSizeInByte, supportedMessageSize)
Expand Down Expand Up @@ -164,11 +160,7 @@ func (cf *rpcClientFactory) NewMatchingClientWithTimeout(
rawClient = matching.NewThriftClient(matchingserviceclient.New(outboundConfig))
}

resolver, err := cf.monitor.GetResolver(service.Matching)
if err != nil {
return nil, err
}
peerResolver := matching.NewPeerResolver(resolver, addressMapper)
peerResolver := matching.NewPeerResolver(cf.resolver, addressMapper)

client := matching.NewClient(
timeout,
Expand Down
9 changes: 5 additions & 4 deletions client/history/peerResolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,25 @@ package history
import (
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/membership"
"github.com/uber/cadence/common/service"
)

// PeerResolver is used to resolve history peers.
// Those are deployed instances of Cadence history services that participate in the cluster ring.
// The resulting peer is simply an address of form ip:port where RPC calls can be routed to.
type PeerResolver struct {
numberOfShards int
membership membership.ServiceResolver
resolver membership.Resolver
addressMapper AddressMapperFn
}

type AddressMapperFn func(string) (string, error)

// NewPeerResolver creates a new history peer resolver.
func NewPeerResolver(numberOfShards int, membership membership.ServiceResolver, addressMapper AddressMapperFn) PeerResolver {
func NewPeerResolver(numberOfShards int, resolver membership.Resolver, addressMapper AddressMapperFn) PeerResolver {
return PeerResolver{
numberOfShards: numberOfShards,
membership: membership,
resolver: resolver,
addressMapper: addressMapper,
}
}
Expand All @@ -66,7 +67,7 @@ func (pr PeerResolver) FromDomainID(domainID string) (string, error) {
// FromHostAddress is used for further resolving.
func (pr PeerResolver) FromShardID(shardID int) (string, error) {
shardIDString := string(rune(shardID))
host, err := pr.membership.Lookup(shardIDString)
host, err := pr.resolver.Lookup(service.History, shardIDString)
if err != nil {
return "", err
}
Expand Down
11 changes: 6 additions & 5 deletions client/history/peerResolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,17 @@ import (

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/membership"
"github.com/uber/cadence/common/service"
)

func TestPeerResolver(t *testing.T) {
numShards := 123
controller := gomock.NewController(t)
serviceResolver := membership.NewMockServiceResolver(controller)
serviceResolver.EXPECT().Lookup(string(rune(common.DomainIDToHistoryShard("domainID", numShards)))).Return(membership.NewHostInfo("domainHost:thriftPort", nil), nil)
serviceResolver.EXPECT().Lookup(string(rune(common.WorkflowIDToHistoryShard("workflowID", numShards)))).Return(membership.NewHostInfo("workflowHost:thriftPort", nil), nil)
serviceResolver.EXPECT().Lookup(string(rune(99))).Return(membership.NewHostInfo("shardHost:thriftPort", nil), nil)
serviceResolver.EXPECT().Lookup(string(rune(11))).Return(nil, assert.AnError)
serviceResolver := membership.NewMockResolver(controller)
serviceResolver.EXPECT().Lookup(service.History, string(rune(common.DomainIDToHistoryShard("domainID", numShards)))).Return(membership.NewHostInfo("domainHost:thriftPort", nil), nil)
serviceResolver.EXPECT().Lookup(service.History, string(rune(common.WorkflowIDToHistoryShard("workflowID", numShards)))).Return(membership.NewHostInfo("workflowHost:thriftPort", nil), nil)
serviceResolver.EXPECT().Lookup(service.History, string(rune(99))).Return(membership.NewHostInfo("shardHost:thriftPort", nil), nil)
serviceResolver.EXPECT().Lookup(service.History, string(rune(11))).Return(nil, assert.AnError)

r := NewPeerResolver(numShards, serviceResolver, fakeAddressMapper)

Expand Down
16 changes: 11 additions & 5 deletions client/matching/peerResolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,31 @@

package matching

import "github.com/uber/cadence/common/membership"
import (
"github.com/uber/cadence/common/membership"
"github.com/uber/cadence/common/service"
)

// PeerResolver is used to resolve matching peers.
// Those are deployed instances of Cadence matching services that participate in the cluster ring.
// The resulting peer is simply an address of form ip:port where RPC calls can be routed to.
type PeerResolver struct {
membership membership.ServiceResolver
resolver membership.Resolver
addressMapper AddressMapperFn
}

type AddressMapperFn func(string) (string, error)

// NewPeerResolver creates a new matching peer resolver.
func NewPeerResolver(membership membership.ServiceResolver, addressMapper AddressMapperFn) PeerResolver {
func NewPeerResolver(membership membership.Resolver, addressMapper AddressMapperFn) PeerResolver {
return PeerResolver{membership, addressMapper}
}

// FromTaskList resolves the matching peer responsible for the given task list name.
// It uses our membership provider to lookup which instance currently owns the given task list.
// FromHostAddress is used for further resolving.
func (pr PeerResolver) FromTaskList(taskListName string) (string, error) {
host, err := pr.membership.Lookup(taskListName)
host, err := pr.resolver.Lookup(service.Matching, taskListName)
if err != nil {
return "", err
}
Expand All @@ -51,7 +54,10 @@ func (pr PeerResolver) FromTaskList(taskListName string) (string, error) {

// GetAllPeers returns all matching service peers in the cluster ring.
func (pr PeerResolver) GetAllPeers() ([]string, error) {
hosts := pr.membership.Members()
hosts, err := pr.resolver.Members(service.Matching)
if err != nil {
return nil, err
}
peers := make([]string, 0, len(hosts))
for _, host := range hosts {
peer, err := pr.FromHostAddress(host.GetAddress())
Expand Down
11 changes: 6 additions & 5 deletions client/matching/peerResolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,18 @@ import (
"github.com/stretchr/testify/assert"

"github.com/uber/cadence/common/membership"
"github.com/uber/cadence/common/service"
)

func TestPeerResolver(t *testing.T) {
controller := gomock.NewController(t)
serviceResolver := membership.NewMockServiceResolver(controller)
serviceResolver.EXPECT().Lookup("taskListA").Return(membership.NewHostInfo("taskListA:thriftPort", nil), nil)
serviceResolver.EXPECT().Lookup("invalid").Return(nil, assert.AnError)
serviceResolver.EXPECT().Members().Return([]*membership.HostInfo{
serviceResolver := membership.NewMockResolver(controller)
serviceResolver.EXPECT().Lookup(service.Matching, "taskListA").Return(membership.NewHostInfo("taskListA:thriftPort", nil), nil)
serviceResolver.EXPECT().Lookup(service.Matching, "invalid").Return(nil, assert.AnError)
serviceResolver.EXPECT().Members(service.Matching).Return([]*membership.HostInfo{
membership.NewHostInfo("taskListA:thriftPort", nil),
membership.NewHostInfo("taskListB:thriftPort", nil),
})
}, nil)

r := NewPeerResolver(serviceResolver, fakeAddressMapper)

Expand Down
2 changes: 1 addition & 1 deletion cmd/server/cadence/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (s *server) startService() common.Daemon {
)
rpcFactory := rpc.NewFactory(params.Logger, rpcParams)
params.RPCFactory = rpcFactory
params.MembershipMonitor, err = membership.NewMonitor(
params.MembershipResolver, err = membership.NewResolver(
&s.cfg.Ringpop,
rpcFactory.GetChannel(),
params.Name,
Expand Down
6 changes: 3 additions & 3 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,9 +338,9 @@ func Addresses(ads []string) Tag {
return newObjectTag("addresses", ads)
}

// ListenerName returns tag for ListenerName
func ListenerName(name string) Tag {
return newStringTag("listener-name", name)
// Subscriber returns tag for Subscriber
func Subscriber(subscriber string) Tag {
return newStringTag("subscriber", subscriber)
}

// Address return tag for Address
Expand Down
Loading

0 comments on commit 3557eb5

Please sign in to comment.