From 671bfd031573c97d0215617a7a3665df51dd1f5c Mon Sep 17 00:00:00 2001 From: Quanzheng Long Date: Thu, 27 Jun 2019 12:49:07 -0700 Subject: [PATCH] Support DNS mode for public client (#2119) --- Gopkg.lock | 4 + client/clientBean.go | 185 +++++++++++++++++++++++++---- cmd/server/server.go | 11 +- common/service/config/config.go | 8 +- config/development.yaml | 4 +- config/development_active.yaml | 6 +- config/development_mysql.yaml | 4 +- config/development_prometheus.yaml | 4 +- config/development_standby.yaml | 6 +- docker/Dockerfile | 2 +- host/testcluster.go | 2 +- 11 files changed, 193 insertions(+), 43 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 29168a6fd87..f69bf10dcc6 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -887,6 +887,8 @@ "internal/yarpcerrors", "peer", "peer/hostport", + "peer/peerlist/v2", + "peer/roundrobin", "pkg/encoding", "pkg/errors", "pkg/lifecycle", @@ -1116,8 +1118,10 @@ "go.uber.org/thriftrw/thriftreflect", "go.uber.org/thriftrw/wire", "go.uber.org/yarpc", + "go.uber.org/yarpc/api/peer", "go.uber.org/yarpc/api/transport", "go.uber.org/yarpc/encoding/thrift", + "go.uber.org/yarpc/peer/roundrobin", "go.uber.org/yarpc/transport/tchannel", "go.uber.org/yarpc/yarpcerrors", "go.uber.org/zap", diff --git a/client/clientBean.go b/client/clientBean.go index 13eee22699f..304f2b2f10e 100644 --- a/client/clientBean.go +++ b/client/clientBean.go @@ -21,22 +21,28 @@ package client import ( - "errors" + "context" "fmt" - "regexp" - - "go.uber.org/yarpc" - "go.uber.org/yarpc/transport/tchannel" + "net" + "strings" + "time" "github.com/uber/cadence/client/admin" "github.com/uber/cadence/client/frontend" "github.com/uber/cadence/client/history" "github.com/uber/cadence/client/matching" "github.com/uber/cadence/common/cluster" + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/log/tag" + "go.uber.org/yarpc" + "go.uber.org/yarpc/api/peer" + "go.uber.org/yarpc/api/transport" + "go.uber.org/yarpc/peer/roundrobin" + "go.uber.org/yarpc/transport/tchannel" ) const ( - ipPortRegex = `\b(?:(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.){3}(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9]):[1-9]\d*\b` + defaultRefreshInterval = time.Second * 10 ) type ( @@ -62,7 +68,25 @@ type ( remoteFrontendClients map[string]frontend.Client } - ipDispatcherProvider struct { + dnsDispatcherProvider struct { + interval time.Duration + logger log.Logger + } + dnsUpdater struct { + interval time.Duration + dnsAddress string + port string + currentPeers map[string]struct{} + list peer.List + logger log.Logger + } + dnsRefreshResult struct { + updates peer.ListUpdates + newPeers map[string]struct{} + changed bool + } + aPeer struct { + addrPort string } ) @@ -160,22 +184,20 @@ func (h *clientBeanImpl) GetRemoteFrontendClient(cluster string) frontend.Client return client } -// NewIPYarpcDispatcherProvider create a dispatcher provider which handles with IP address -func NewIPYarpcDispatcherProvider() DispatcherProvider { - return &ipDispatcherProvider{} -} - -func (p *ipDispatcherProvider) Get(name string, address string) (*yarpc.Dispatcher, error) { - match, err := regexp.MatchString(ipPortRegex, address) - if err != nil { - return nil, err +// NewDNSYarpcDispatcherProvider create a dispatcher provider which handles with IP address +func NewDNSYarpcDispatcherProvider(logger log.Logger, interval time.Duration) DispatcherProvider { + if interval <= 0 { + interval = defaultRefreshInterval } - if !match { - return nil, errors.New("invalid ip:port address") + return &dnsDispatcherProvider{ + interval: interval, + logger: logger, } +} - channel, err := tchannel.NewChannelTransport( - tchannel.ServiceName(crossDCCaller), +func (p *dnsDispatcherProvider) Get(serviceName string, address string) (*yarpc.Dispatcher, error) { + tchanTransport, err := tchannel.NewTransport( + tchannel.ServiceName(serviceName), // this aim to get rid of the annoying popup about accepting incoming network connections tchannel.ListenAddr("127.0.0.1:0"), ) @@ -183,15 +205,132 @@ func (p *ipDispatcherProvider) Get(name string, address string) (*yarpc.Dispatch return nil, err } + peerList := roundrobin.New(tchanTransport) + peerListUpdater, err := newDNSUpdater(peerList, address, p.interval, p.logger) + if err != nil { + return nil, err + } + peerListUpdater.Start() + outbound := tchanTransport.NewOutbound(peerList) + + p.logger.Info("Creating RPC dispatcher outbound", tag.Service(serviceName), tag.Address(address)) + + // Attach the outbound to the dispatcher (this will add middleware/logging/etc) dispatcher := yarpc.NewDispatcher(yarpc.Config{ Name: crossDCCaller, Outbounds: yarpc.Outbounds{ - name: {Unary: channel.NewSingleOutbound(address)}, + serviceName: transport.Outbounds{ + Unary: outbound, + ServiceName: serviceName, + }, }, }) - err = dispatcher.Start() - if err != nil { + + if err := dispatcher.Start(); err != nil { return nil, err } return dispatcher, nil } + +func newDNSUpdater(list peer.List, dnsPort string, interval time.Duration, logger log.Logger) (*dnsUpdater, error) { + ss := strings.Split(dnsPort, ":") + if len(ss) != 2 { + return nil, fmt.Errorf("incorrect DNS:Port format") + } + return &dnsUpdater{ + interval: interval, + logger: logger, + list: list, + dnsAddress: ss[0], + port: ss[1], + currentPeers: make(map[string]struct{}), + }, nil +} + +func (d *dnsUpdater) Start() { + go func() { + for { + now := time.Now() + res, err := d.refresh() + if err != nil { + d.logger.Error("Failed to update DNS", tag.Error(err), tag.Address(d.dnsAddress)) + } + if res.changed { + if len(res.updates.Additions) > 0 { + d.logger.Info("Add new peers by DNS lookup", tag.Address(d.dnsAddress), tag.Addresses(identifiersToStringList(res.updates.Additions))) + } + if len(res.updates.Removals) > 0 { + d.logger.Info("Remove stale peers by DNS lookup", tag.Address(d.dnsAddress), tag.Addresses(identifiersToStringList(res.updates.Removals))) + } + + err := d.list.Update(res.updates) + if err != nil { + d.logger.Error("Failed to update peerList", tag.Error(err), tag.Address(d.dnsAddress)) + } + d.currentPeers = res.newPeers + } else { + d.logger.Debug("No change in DNS lookup", tag.Address(d.dnsAddress)) + } + sleepDu := now.Add(d.interval).Sub(now) + time.Sleep(sleepDu) + } + }() +} + +func (d *dnsUpdater) refresh() (*dnsRefreshResult, error) { + resolver := net.DefaultResolver + ips, err := resolver.LookupHost(context.Background(), d.dnsAddress) + if err != nil { + return nil, err + } + newPeers := map[string]struct{}{} + for _, ip := range ips { + adr := fmt.Sprintf("%v:%v", ip, d.port) + newPeers[adr] = struct{}{} + } + + updates := peer.ListUpdates{ + Additions: make([]peer.Identifier, 0), + Removals: make([]peer.Identifier, 0), + } + changed := false + // remove if it doesn't exist anymore + for addr := range d.currentPeers { + if _, ok := newPeers[addr]; !ok { + changed = true + updates.Removals = append( + updates.Removals, + aPeer{addrPort: addr}, + ) + } + } + + // add if it doesn't exist before + for addr := range newPeers { + if _, ok := d.currentPeers[addr]; !ok { + changed = true + updates.Additions = append( + updates.Additions, + aPeer{addrPort: addr}, + ) + } + } + + return &dnsRefreshResult{ + updates: updates, + newPeers: newPeers, + changed: changed, + }, nil +} + +func (a aPeer) Identifier() string { + return a.addrPort +} + +func identifiersToStringList(ids []peer.Identifier) []string { + ss := make([]string, 0, len(ids)) + for _, id := range ids { + ss = append(ss, id.Identifier()) + } + return ss +} diff --git a/cmd/server/server.go b/cmd/server/server.go index 7f4c202c1cb..9ae9928bf70 100644 --- a/cmd/server/server.go +++ b/cmd/server/server.go @@ -24,12 +24,11 @@ import ( "log" "time" - "github.com/uber/cadence/common/cluster" - "github.com/uber/cadence/client" "github.com/uber/cadence/common" "github.com/uber/cadence/common/blobstore/filestore" "github.com/uber/cadence/common/blobstore/s3store" + "github.com/uber/cadence/common/cluster" "github.com/uber/cadence/common/elasticsearch" "github.com/uber/cadence/common/log/loggerimpl" "github.com/uber/cadence/common/log/tag" @@ -152,7 +151,13 @@ func (s *server) startService() common.Daemon { s.cfg.Archival.DefaultBucket, enableReadFromArchival(), ) - params.DispatcherProvider = client.NewIPYarpcDispatcherProvider() + + if s.cfg.PublicClient.HostPort != "" { + params.DispatcherProvider = client.NewDNSYarpcDispatcherProvider(params.Logger, s.cfg.PublicClient.RefreshInterval) + } else { + log.Fatalf("need to provide an endpoint config for PublicClient") + } + params.ESConfig = &s.cfg.ElasticSearch params.ESConfig.Enable = dc.GetBoolProperty(dynamicconfig.EnableVisibilityToKafka, params.ESConfig.Enable)() // force override with dynamic config if params.ClusterMetadata.IsGlobalDomainEnabled() { diff --git a/common/service/config/config.go b/common/service/config/config.go index 4bc6a449a28..4025da76b59 100644 --- a/common/service/config/config.go +++ b/common/service/config/config.go @@ -252,7 +252,7 @@ type ( Address struct { // RPCName indicate the remote service name RPCName string `yaml:"rpcName"` - // Address indicate the remote service IP address + // Address indicate the remote service address(Host:Port). Host can be DNS name. RPCAddress string `yaml:"rpcAddress"` } @@ -276,7 +276,7 @@ type ( InitialFailoverVersion int64 `yaml:"initialFailoverVersion"` // RPCName indicate the remote service name RPCName string `yaml:"rpcName"` - // Address indicate the remote service IP address + // Address indicate the remote service address(Host:Port). Host can be DNS name. RPCAddress string `yaml:"rpcAddress"` } @@ -330,8 +330,10 @@ type ( // PublicClient is config for connecting to cadence frontend PublicClient struct { - // HostPort is the host port to connect on + // HostPort is the host port to connect on. Host can be DNS name HostPort string `yaml:"hostPort" validate:"nonzero"` + // interval to refresh DNS. Default to 10s + RefreshInterval time.Duration `yaml:"RefreshInterval"` } // BootstrapMode is an enum type for ringpop bootstrap mode diff --git a/config/development.yaml b/config/development.yaml index caadd9a2bcb..112aef6cf49 100644 --- a/config/development.yaml +++ b/config/development.yaml @@ -89,7 +89,7 @@ clusterMetadata: enabled: true initialFailoverVersion: 0 rpcName: "cadence-frontend" - rpcAddress: "127.0.0.1:7933" + rpcAddress: "localhost:7933" dcRedirectionPolicy: policy: "noop" @@ -136,7 +136,7 @@ elasticsearch: visibility: cadence-visibility-dev publicClient: - hostPort: "127.0.0.1:7933" + hostPort: "localhost:7933" dynamicConfigClient: filepath: "config/dynamicconfig/development.yaml" diff --git a/config/development_active.yaml b/config/development_active.yaml index cb7da6ca3bb..5384507ba0d 100644 --- a/config/development_active.yaml +++ b/config/development_active.yaml @@ -75,12 +75,12 @@ clusterMetadata: enabled: true initialFailoverVersion: 1 rpcName: "cadence-frontend" - rpcAddress: "127.0.0.1:7933" + rpcAddress: "localhost:7933" standby: enabled: true initialFailoverVersion: 0 rpcName: "cadence-frontend" - rpcAddress: "127.0.0.1:8933" + rpcAddress: "localhost:8933" dcRedirectionPolicy: policy: "selected-apis-forwarding" @@ -125,4 +125,4 @@ archival: - "custom-bucket-2" publicClient: - hostPort: "127.0.0.1:7933" + hostPort: "localhost:7933" diff --git a/config/development_mysql.yaml b/config/development_mysql.yaml index 795ce129b4b..5c9daeec1dd 100644 --- a/config/development_mysql.yaml +++ b/config/development_mysql.yaml @@ -87,7 +87,7 @@ clustersInfo: clusterAddress: active: rpcName: "cadence-frontend" - rpcAddress: "127.0.0.1:7933" + rpcAddress: "localhost:7933" dcRedirectionPolicy: policy: "noop" @@ -126,4 +126,4 @@ elasticsearch: visibility: cadence-visibility-dev publicClient: - hostPort: "127.0.0.1:7933" + hostPort: "localhost:7933" diff --git a/config/development_prometheus.yaml b/config/development_prometheus.yaml index 8b4aba4d809..93634840606 100644 --- a/config/development_prometheus.yaml +++ b/config/development_prometheus.yaml @@ -75,7 +75,7 @@ clusterMetadata: enabled: true initialFailoverVersion: 0 rpcName: "cadence-frontend" - rpcAddress: "127.0.0.1:7933" + rpcAddress: "localhost:7933" dcRedirectionPolicy: policy: "noop" @@ -116,5 +116,5 @@ elasticsearch: visibility: cadence-visibility-dev publicClient: - hostPort: "127.0.0.1:7933" + hostPort: "localhost:7933" diff --git a/config/development_standby.yaml b/config/development_standby.yaml index ed7b40f1be1..8d2bec76ab1 100644 --- a/config/development_standby.yaml +++ b/config/development_standby.yaml @@ -75,12 +75,12 @@ clusterMetadata: enabled: true initialFailoverVersion: 1 rpcName: "cadence-frontend" - rpcAddress: "127.0.0.1:7933" + rpcAddress: "localhost:7933" standby: enabled: true initialFailoverVersion: 0 rpcName: "cadence-frontend" - rpcAddress: "127.0.0.1:8933" + rpcAddress: "localhost:8933" dcRedirectionPolicy: policy: "selected-apis-forwarding" @@ -125,5 +125,5 @@ archival: - "custom-bucket-2" publicClient: - hostPort: "127.0.0.1:8933" + hostPort: "localhost:8933" diff --git a/docker/Dockerfile b/docker/Dockerfile index a727c1463ac..47a52909467 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -31,7 +31,7 @@ RUN go get -u golang.org/x/lint/golint # get and compile cadence-server RUN git clone https://github.com/uber/cadence.git /go/src/github.com/uber/cadence -RUN cd /go/src/github.com/uber/cadence && git fetch && git checkout $git_branch && make bins_nothrift +RUN cd /go/src/github.com/uber/cadence && git fetch && git checkout $git_branch && git rev-parse HEAD && make bins_nothrift # Final Cadence image diff --git a/host/testcluster.go b/host/testcluster.go index 0f58908124b..88460f44fbc 100644 --- a/host/testcluster.go +++ b/host/testcluster.go @@ -142,7 +142,7 @@ func NewCluster(options *TestClusterConfig, logger log.Logger) (*TestCluster, er cadenceParams := &CadenceParams{ ClusterMetadata: clusterMetadata, PersistenceConfig: pConfig, - DispatcherProvider: client.NewIPYarpcDispatcherProvider(), + DispatcherProvider: client.NewDNSYarpcDispatcherProvider(logger, 0), MessagingClient: messagingClient, MetadataMgr: testBase.MetadataProxy, MetadataMgrV2: testBase.MetadataManagerV2,