Skip to content

Commit

Permalink
Refactor service naming constants (cadence-workflow#4516)
Browse files Browse the repository at this point in the history
  • Loading branch information
vytautas-karpavicius authored Sep 28, 2021
1 parent 467824e commit 5428e35
Show file tree
Hide file tree
Showing 31 changed files with 234 additions and 149 deletions.
35 changes: 18 additions & 17 deletions client/clientfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/membership"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/service"
)

const (
Expand Down Expand Up @@ -143,7 +144,7 @@ func (cf *rpcClientFactory) createKeyResolver(serviceName string) (func(key stri
}

func (cf *rpcClientFactory) NewHistoryClientWithTimeout(timeout time.Duration) (history.Client, error) {
keyResolver, err := cf.createKeyResolver(common.HistoryServiceName)
keyResolver, err := cf.createKeyResolver(service.History)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -175,7 +176,7 @@ func (cf *rpcClientFactory) NewMatchingClientWithTimeout(
timeout time.Duration,
longPollTimeout time.Duration,
) (matching.Client, error) {
keyResolver, err := cf.createKeyResolver(common.MatchingServiceName)
keyResolver, err := cf.createKeyResolver(service.Matching)
if err != nil {
return nil, err
}
Expand All @@ -188,7 +189,7 @@ func (cf *rpcClientFactory) NewMatchingClientWithTimeout(
}

clientFetcher := func() ([]matching.Client, error) {
resolver, err := cf.monitor.GetResolver(common.MatchingServiceName)
resolver, err := cf.monitor.GetResolver(service.Matching)
if err != nil {
return nil, err
}
Expand All @@ -197,7 +198,7 @@ func (cf *rpcClientFactory) NewMatchingClientWithTimeout(
for _, host := range resolver.Members() {
hostAddress := host.GetAddress()
if cf.enableGRPCOutbound {
hostAddress, err = cf.rpcFactory.ReplaceGRPCPort(common.MatchingServiceName, hostAddress)
hostAddress, err = cf.rpcFactory.ReplaceGRPCPort(service.Matching, hostAddress)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -232,7 +233,7 @@ func (cf *rpcClientFactory) NewFrontendClientWithTimeout(
timeout time.Duration,
longPollTimeout time.Duration,
) (frontend.Client, error) {
keyResolver, err := cf.createKeyResolver(common.FrontendServiceName)
keyResolver, err := cf.createKeyResolver(service.Frontend)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -316,51 +317,51 @@ func (cf *rpcClientFactory) NewFrontendClientWithTimeoutAndDispatcher(
}

func (cf *rpcClientFactory) newHistoryThriftClient(hostAddress string) (history.Client, error) {
dispatcher, err := cf.rpcFactory.CreateDispatcherForOutbound(historyCaller, common.HistoryServiceName, hostAddress)
dispatcher, err := cf.rpcFactory.CreateDispatcherForOutbound(historyCaller, service.History, hostAddress)
if err != nil {
return nil, err
}
return history.NewThriftClient(historyserviceclient.New(dispatcher.ClientConfig(common.HistoryServiceName))), nil
return history.NewThriftClient(historyserviceclient.New(dispatcher.ClientConfig(service.History))), nil
}

func (cf *rpcClientFactory) newMatchingThriftClient(hostAddress string) (matching.Client, error) {
dispatcher, err := cf.rpcFactory.CreateDispatcherForOutbound(matchingCaller, common.MatchingServiceName, hostAddress)
dispatcher, err := cf.rpcFactory.CreateDispatcherForOutbound(matchingCaller, service.Matching, hostAddress)
if err != nil {
return nil, err
}
return matching.NewThriftClient(matchingserviceclient.New(dispatcher.ClientConfig(common.MatchingServiceName))), nil
return matching.NewThriftClient(matchingserviceclient.New(dispatcher.ClientConfig(service.Matching))), nil
}

func (cf *rpcClientFactory) newFrontendThriftClient(hostAddress string) (frontend.Client, error) {
dispatcher, err := cf.rpcFactory.CreateDispatcherForOutbound(frontendCaller, common.FrontendServiceName, hostAddress)
dispatcher, err := cf.rpcFactory.CreateDispatcherForOutbound(frontendCaller, service.Frontend, hostAddress)
if err != nil {
return nil, err
}
return frontend.NewThriftClient(workflowserviceclient.New(dispatcher.ClientConfig(common.FrontendServiceName))), nil
return frontend.NewThriftClient(workflowserviceclient.New(dispatcher.ClientConfig(service.Frontend))), nil
}

func (cf *rpcClientFactory) newHistoryGRPCClient(hostAddress string) (history.Client, error) {
dispatcher, err := cf.rpcFactory.CreateGRPCDispatcherForOutbound(historyCaller, common.HistoryServiceName, hostAddress)
dispatcher, err := cf.rpcFactory.CreateGRPCDispatcherForOutbound(historyCaller, service.History, hostAddress)
if err != nil {
return nil, err
}
return history.NewGRPCClient(historyv1.NewHistoryAPIYARPCClient(dispatcher.ClientConfig(common.HistoryServiceName))), nil
return history.NewGRPCClient(historyv1.NewHistoryAPIYARPCClient(dispatcher.ClientConfig(service.History))), nil
}

func (cf *rpcClientFactory) newMatchingGRPCClient(hostAddress string) (matching.Client, error) {
dispatcher, err := cf.rpcFactory.CreateGRPCDispatcherForOutbound(matchingCaller, common.MatchingServiceName, hostAddress)
dispatcher, err := cf.rpcFactory.CreateGRPCDispatcherForOutbound(matchingCaller, service.Matching, hostAddress)
if err != nil {
return nil, err
}
return matching.NewGRPCClient(matchingv1.NewMatchingAPIYARPCClient(dispatcher.ClientConfig(common.MatchingServiceName))), nil
return matching.NewGRPCClient(matchingv1.NewMatchingAPIYARPCClient(dispatcher.ClientConfig(service.Matching))), nil
}

func (cf *rpcClientFactory) newFrontendGRPCClient(hostAddress string) (frontend.Client, error) {
dispatcher, err := cf.rpcFactory.CreateGRPCDispatcherForOutbound(frontendCaller, common.FrontendServiceName, hostAddress)
dispatcher, err := cf.rpcFactory.CreateGRPCDispatcherForOutbound(frontendCaller, service.Frontend, hostAddress)
if err != nil {
return nil, err
}
config := dispatcher.ClientConfig(common.FrontendServiceName)
config := dispatcher.ClientConfig(service.Frontend)
return frontend.NewGRPCClient(
apiv1.NewDomainAPIYARPCClient(config),
apiv1.NewWorkflowAPIYARPCClient(config),
Expand Down
3 changes: 2 additions & 1 deletion cmd/server/cadence/cadence.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@ import (
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/client"
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/service"
"github.com/uber/cadence/tools/cassandra"
"github.com/uber/cadence/tools/sql"
)

// validServices is the list of all valid cadence services
var validServices = []string{frontendService, historyService, matchingService, workerService}
var validServices = service.ShortNames(service.List)

// startHandler is the handler for the cli start command
func startHandler(c *cli.Context) {
Expand Down
23 changes: 8 additions & 15 deletions cmd/server/cadence/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,6 @@ type (
}
)

const (
frontendService = "frontend"
historyService = "history"
matchingService = "matching"
workerService = "worker"
)

// newServer returns a new instance of a daemon
// that represents a cadence service
func newServer(service string, cfg *config.Config) common.Daemon {
Expand Down Expand Up @@ -105,7 +98,7 @@ func (s *server) Stop() {
// startService starts a service with the given name and config
func (s *server) startService() common.Daemon {
params := resource.Params{}
params.Name = "cadence-" + s.name
params.Name = service.FullName(s.name)

zapLogger, err := s.cfg.Log.NewZapLogger()
if err != nil {
Expand Down Expand Up @@ -226,11 +219,11 @@ func (s *server) startService() common.Daemon {
AuthProvider: authProvider,
}
}
dispatcher, err := params.DispatcherProvider.GetTChannel(common.FrontendServiceName, s.cfg.PublicClient.HostPort, options)
dispatcher, err := params.DispatcherProvider.GetTChannel(service.Frontend, s.cfg.PublicClient.HostPort, options)
if err != nil {
log.Fatalf("failed to construct dispatcher: %v", err)
}
params.PublicClient = workflowserviceclient.New(dispatcher.ClientConfig(common.FrontendServiceName))
params.PublicClient = workflowserviceclient.New(dispatcher.ClientConfig(service.Frontend))

params.ArchivalMetadata = archiver.NewArchivalMetadata(
dc,
Expand All @@ -255,14 +248,14 @@ func (s *server) startService() common.Daemon {

var daemon common.Daemon

switch s.name {
case frontendService:
switch params.Name {
case service.Frontend:
daemon, err = frontend.NewService(&params)
case historyService:
case service.History:
daemon, err = history.NewService(&params)
case matchingService:
case service.Matching:
daemon, err = matching.NewService(&params)
case workerService:
case service.Worker:
daemon, err = worker.NewService(&params)
}
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions common/cluster/metadataTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
package cluster

import (
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/log/loggerimpl"
"github.com/uber/cadence/common/service"
)

const (
Expand Down Expand Up @@ -56,13 +56,13 @@ var (
TestCurrentClusterName: {
Enabled: true,
InitialFailoverVersion: TestCurrentClusterInitialFailoverVersion,
RPCName: common.FrontendServiceName,
RPCName: service.Frontend,
RPCAddress: TestCurrentClusterFrontendAddress,
},
TestAlternativeClusterName: {
Enabled: true,
InitialFailoverVersion: TestAlternativeClusterInitialFailoverVersion,
RPCName: common.FrontendServiceName,
RPCName: service.Frontend,
RPCAddress: TestAlternativeClusterFrontendAddress,
},
TestDisabledClusterName: {
Expand All @@ -78,7 +78,7 @@ var (
TestCurrentClusterName: {
Enabled: true,
InitialFailoverVersion: TestCurrentClusterInitialFailoverVersion,
RPCName: common.FrontendServiceName,
RPCName: service.Frontend,
RPCAddress: TestCurrentClusterFrontendAddress,
},
}
Expand Down
12 changes: 2 additions & 10 deletions common/config/ringpop.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ import (
"go.uber.org/yarpc"
"go.uber.org/yarpc/transport/tchannel"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/membership"
"github.com/uber/cadence/common/service"
)

const (
Expand All @@ -63,14 +63,6 @@ const (
defaultMaxJoinDuration = 10 * time.Second
)

// CadenceServices indicate the list of cadence services
var CadenceServices = []string{
common.FrontendServiceName,
common.HistoryServiceName,
common.MatchingServiceName,
common.WorkerServiceName,
}

// RingpopFactory implements the RingpopFactory interface
type RingpopFactory struct {
config *Ringpop
Expand Down Expand Up @@ -206,7 +198,7 @@ func (factory *RingpopFactory) createMembership() (membership.Monitor, error) {
return nil, fmt.Errorf("ringpop creation failed: %v", err)
}

membershipMonitor := membership.NewRingpopMonitor(factory.serviceName, CadenceServices, rp, factory.logger)
membershipMonitor := membership.NewRingpopMonitor(factory.serviceName, service.List, rp, factory.logger)
return membershipMonitor, nil
}

Expand Down
11 changes: 0 additions & 11 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,6 @@ const (
EmptyUUID = "emptyUuid"
)

const (
// FrontendServiceName is the name of the frontend service
FrontendServiceName = "cadence-frontend"
// HistoryServiceName is the name of the history service
HistoryServiceName = "cadence-history"
// MatchingServiceName is the name of the matching service
MatchingServiceName = "cadence-matching"
// WorkerServiceName is the name of the worker service
WorkerServiceName = "cadence-worker"
)

// Data encoding types
const (
EncodingTypeJSON EncodingType = "json"
Expand Down
5 changes: 3 additions & 2 deletions common/domain/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/service"
"github.com/uber/cadence/common/types"
)

Expand Down Expand Up @@ -761,7 +762,7 @@ func (d *handlerImpl) validateHistoryArchivalURI(URIString string) error {
return err
}

archiver, err := d.archiverProvider.GetHistoryArchiver(URI.Scheme(), common.FrontendServiceName)
archiver, err := d.archiverProvider.GetHistoryArchiver(URI.Scheme(), service.Frontend)
if err != nil {
return err
}
Expand All @@ -775,7 +776,7 @@ func (d *handlerImpl) validateVisibilityArchivalURI(URIString string) error {
return err
}

archiver, err := d.archiverProvider.GetVisibilityArchiver(URI.Scheme(), common.FrontendServiceName)
archiver, err := d.archiverProvider.GetVisibilityArchiver(URI.Scheme(), service.Frontend)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions common/persistence/persistence-tests/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,8 @@ func (s *TestBase) Setup() {
s.DefaultTestCluster.SetupTestDatabase()

cfg := s.DefaultTestCluster.Config()
scope := tally.NewTestScope(common.HistoryServiceName, make(map[string]string))
metricsClient := metrics.NewClient(scope, service.GetMetricsServiceIdx(common.HistoryServiceName, s.Logger))
scope := tally.NewTestScope(service.History, make(map[string]string))
metricsClient := metrics.NewClient(scope, service.GetMetricsServiceIdx(service.History, s.Logger))
factory := client.NewFactory(&cfg, nil, clusterName, metricsClient, s.Logger)

s.TaskMgr, err = factory.NewTaskManager()
Expand Down
8 changes: 4 additions & 4 deletions common/resource/resourceImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,22 +174,22 @@ func New(
return nil, err
}

frontendServiceResolver, err := membershipMonitor.GetResolver(common.FrontendServiceName)
frontendServiceResolver, err := membershipMonitor.GetResolver(service.Frontend)
if err != nil {
return nil, err
}

matchingServiceResolver, err := membershipMonitor.GetResolver(common.MatchingServiceName)
matchingServiceResolver, err := membershipMonitor.GetResolver(service.Matching)
if err != nil {
return nil, err
}

historyServiceResolver, err := membershipMonitor.GetResolver(common.HistoryServiceName)
historyServiceResolver, err := membershipMonitor.GetResolver(service.History)
if err != nil {
return nil, err
}

workerServiceResolver, err := membershipMonitor.GetResolver(common.WorkerServiceName)
workerServiceResolver, err := membershipMonitor.GetResolver(service.Worker)
if err != nil {
return nil, err
}
Expand Down
10 changes: 5 additions & 5 deletions common/resource/resourceTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/uber/cadence/client/frontend"
"github.com/uber/cadence/client/history"
"github.com/uber/cadence/client/matching"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/archiver"
"github.com/uber/cadence/common/archiver/provider"
"github.com/uber/cadence/common/blobstore"
Expand All @@ -48,6 +47,7 @@ import (
"github.com/uber/cadence/common/mocks"
"github.com/uber/cadence/common/persistence"
persistenceClient "github.com/uber/cadence/common/persistence/client"
"github.com/uber/cadence/common/service"

"go.uber.org/yarpc"
"go.uber.org/zap"
Expand Down Expand Up @@ -159,10 +159,10 @@ func NewTest(
matchingServiceResolver := membership.NewMockServiceResolver(controller)
historyServiceResolver := membership.NewMockServiceResolver(controller)
workerServiceResolver := membership.NewMockServiceResolver(controller)
membershipMonitor.EXPECT().GetResolver(common.FrontendServiceName).Return(frontendServiceResolver, nil).AnyTimes()
membershipMonitor.EXPECT().GetResolver(common.MatchingServiceName).Return(matchingServiceResolver, nil).AnyTimes()
membershipMonitor.EXPECT().GetResolver(common.HistoryServiceName).Return(historyServiceResolver, nil).AnyTimes()
membershipMonitor.EXPECT().GetResolver(common.WorkerServiceName).Return(workerServiceResolver, nil).AnyTimes()
membershipMonitor.EXPECT().GetResolver(service.Frontend).Return(frontendServiceResolver, nil).AnyTimes()
membershipMonitor.EXPECT().GetResolver(service.Matching).Return(matchingServiceResolver, nil).AnyTimes()
membershipMonitor.EXPECT().GetResolver(service.History).Return(historyServiceResolver, nil).AnyTimes()
membershipMonitor.EXPECT().GetResolver(service.Worker).Return(workerServiceResolver, nil).AnyTimes()

scope := tally.NewTestScope("test", nil)

Expand Down
5 changes: 3 additions & 2 deletions common/rpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ import (
"strings"

"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/service"
)

type GRPCPorts map[string]int

func NewGRPCPorts(c *config.Config) GRPCPorts {
grpcPorts := map[string]int{}
for service, config := range c.Services {
grpcPorts["cadence-"+service] = config.RPC.GRPCPort
for name, config := range c.Services {
grpcPorts[service.FullName(name)] = config.RPC.GRPCPort
}
return grpcPorts
}
Expand Down
Loading

0 comments on commit 5428e35

Please sign in to comment.