Skip to content

Commit

Permalink
Support DNS mode for public client (cadence-workflow#2119)
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored Jun 27, 2019
1 parent 56c39fd commit 671bfd0
Show file tree
Hide file tree
Showing 11 changed files with 193 additions and 43 deletions.
4 changes: 4 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

185 changes: 162 additions & 23 deletions client/clientBean.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,28 @@
package client

import (
"errors"
"context"
"fmt"
"regexp"

"go.uber.org/yarpc"
"go.uber.org/yarpc/transport/tchannel"
"net"
"strings"
"time"

"github.com/uber/cadence/client/admin"
"github.com/uber/cadence/client/frontend"
"github.com/uber/cadence/client/history"
"github.com/uber/cadence/client/matching"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"go.uber.org/yarpc"
"go.uber.org/yarpc/api/peer"
"go.uber.org/yarpc/api/transport"
"go.uber.org/yarpc/peer/roundrobin"
"go.uber.org/yarpc/transport/tchannel"
)

const (
ipPortRegex = `\b(?:(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.){3}(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9]):[1-9]\d*\b`
defaultRefreshInterval = time.Second * 10
)

type (
Expand All @@ -62,7 +68,25 @@ type (
remoteFrontendClients map[string]frontend.Client
}

ipDispatcherProvider struct {
dnsDispatcherProvider struct {
interval time.Duration
logger log.Logger
}
dnsUpdater struct {
interval time.Duration
dnsAddress string
port string
currentPeers map[string]struct{}
list peer.List
logger log.Logger
}
dnsRefreshResult struct {
updates peer.ListUpdates
newPeers map[string]struct{}
changed bool
}
aPeer struct {
addrPort string
}
)

Expand Down Expand Up @@ -160,38 +184,153 @@ func (h *clientBeanImpl) GetRemoteFrontendClient(cluster string) frontend.Client
return client
}

// NewIPYarpcDispatcherProvider create a dispatcher provider which handles with IP address
func NewIPYarpcDispatcherProvider() DispatcherProvider {
return &ipDispatcherProvider{}
}

func (p *ipDispatcherProvider) Get(name string, address string) (*yarpc.Dispatcher, error) {
match, err := regexp.MatchString(ipPortRegex, address)
if err != nil {
return nil, err
// NewDNSYarpcDispatcherProvider create a dispatcher provider which handles with IP address
func NewDNSYarpcDispatcherProvider(logger log.Logger, interval time.Duration) DispatcherProvider {
if interval <= 0 {
interval = defaultRefreshInterval
}
if !match {
return nil, errors.New("invalid ip:port address")
return &dnsDispatcherProvider{
interval: interval,
logger: logger,
}
}

channel, err := tchannel.NewChannelTransport(
tchannel.ServiceName(crossDCCaller),
func (p *dnsDispatcherProvider) Get(serviceName string, address string) (*yarpc.Dispatcher, error) {
tchanTransport, err := tchannel.NewTransport(
tchannel.ServiceName(serviceName),
// this aim to get rid of the annoying popup about accepting incoming network connections
tchannel.ListenAddr("127.0.0.1:0"),
)
if err != nil {
return nil, err
}

peerList := roundrobin.New(tchanTransport)
peerListUpdater, err := newDNSUpdater(peerList, address, p.interval, p.logger)
if err != nil {
return nil, err
}
peerListUpdater.Start()
outbound := tchanTransport.NewOutbound(peerList)

p.logger.Info("Creating RPC dispatcher outbound", tag.Service(serviceName), tag.Address(address))

// Attach the outbound to the dispatcher (this will add middleware/logging/etc)
dispatcher := yarpc.NewDispatcher(yarpc.Config{
Name: crossDCCaller,
Outbounds: yarpc.Outbounds{
name: {Unary: channel.NewSingleOutbound(address)},
serviceName: transport.Outbounds{
Unary: outbound,
ServiceName: serviceName,
},
},
})
err = dispatcher.Start()
if err != nil {

if err := dispatcher.Start(); err != nil {
return nil, err
}
return dispatcher, nil
}

func newDNSUpdater(list peer.List, dnsPort string, interval time.Duration, logger log.Logger) (*dnsUpdater, error) {
ss := strings.Split(dnsPort, ":")
if len(ss) != 2 {
return nil, fmt.Errorf("incorrect DNS:Port format")
}
return &dnsUpdater{
interval: interval,
logger: logger,
list: list,
dnsAddress: ss[0],
port: ss[1],
currentPeers: make(map[string]struct{}),
}, nil
}

func (d *dnsUpdater) Start() {
go func() {
for {
now := time.Now()
res, err := d.refresh()
if err != nil {
d.logger.Error("Failed to update DNS", tag.Error(err), tag.Address(d.dnsAddress))
}
if res.changed {
if len(res.updates.Additions) > 0 {
d.logger.Info("Add new peers by DNS lookup", tag.Address(d.dnsAddress), tag.Addresses(identifiersToStringList(res.updates.Additions)))
}
if len(res.updates.Removals) > 0 {
d.logger.Info("Remove stale peers by DNS lookup", tag.Address(d.dnsAddress), tag.Addresses(identifiersToStringList(res.updates.Removals)))
}

err := d.list.Update(res.updates)
if err != nil {
d.logger.Error("Failed to update peerList", tag.Error(err), tag.Address(d.dnsAddress))
}
d.currentPeers = res.newPeers
} else {
d.logger.Debug("No change in DNS lookup", tag.Address(d.dnsAddress))
}
sleepDu := now.Add(d.interval).Sub(now)
time.Sleep(sleepDu)
}
}()
}

func (d *dnsUpdater) refresh() (*dnsRefreshResult, error) {
resolver := net.DefaultResolver
ips, err := resolver.LookupHost(context.Background(), d.dnsAddress)
if err != nil {
return nil, err
}
newPeers := map[string]struct{}{}
for _, ip := range ips {
adr := fmt.Sprintf("%v:%v", ip, d.port)
newPeers[adr] = struct{}{}
}

updates := peer.ListUpdates{
Additions: make([]peer.Identifier, 0),
Removals: make([]peer.Identifier, 0),
}
changed := false
// remove if it doesn't exist anymore
for addr := range d.currentPeers {
if _, ok := newPeers[addr]; !ok {
changed = true
updates.Removals = append(
updates.Removals,
aPeer{addrPort: addr},
)
}
}

// add if it doesn't exist before
for addr := range newPeers {
if _, ok := d.currentPeers[addr]; !ok {
changed = true
updates.Additions = append(
updates.Additions,
aPeer{addrPort: addr},
)
}
}

return &dnsRefreshResult{
updates: updates,
newPeers: newPeers,
changed: changed,
}, nil
}

func (a aPeer) Identifier() string {
return a.addrPort
}

func identifiersToStringList(ids []peer.Identifier) []string {
ss := make([]string, 0, len(ids))
for _, id := range ids {
ss = append(ss, id.Identifier())
}
return ss
}
11 changes: 8 additions & 3 deletions cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ import (
"log"
"time"

"github.com/uber/cadence/common/cluster"

"github.com/uber/cadence/client"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/blobstore/filestore"
"github.com/uber/cadence/common/blobstore/s3store"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/elasticsearch"
"github.com/uber/cadence/common/log/loggerimpl"
"github.com/uber/cadence/common/log/tag"
Expand Down Expand Up @@ -152,7 +151,13 @@ func (s *server) startService() common.Daemon {
s.cfg.Archival.DefaultBucket,
enableReadFromArchival(),
)
params.DispatcherProvider = client.NewIPYarpcDispatcherProvider()

if s.cfg.PublicClient.HostPort != "" {
params.DispatcherProvider = client.NewDNSYarpcDispatcherProvider(params.Logger, s.cfg.PublicClient.RefreshInterval)
} else {
log.Fatalf("need to provide an endpoint config for PublicClient")
}

params.ESConfig = &s.cfg.ElasticSearch
params.ESConfig.Enable = dc.GetBoolProperty(dynamicconfig.EnableVisibilityToKafka, params.ESConfig.Enable)() // force override with dynamic config
if params.ClusterMetadata.IsGlobalDomainEnabled() {
Expand Down
8 changes: 5 additions & 3 deletions common/service/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ type (
Address struct {
// RPCName indicate the remote service name
RPCName string `yaml:"rpcName"`
// Address indicate the remote service IP address
// Address indicate the remote service address(Host:Port). Host can be DNS name.
RPCAddress string `yaml:"rpcAddress"`
}

Expand All @@ -276,7 +276,7 @@ type (
InitialFailoverVersion int64 `yaml:"initialFailoverVersion"`
// RPCName indicate the remote service name
RPCName string `yaml:"rpcName"`
// Address indicate the remote service IP address
// Address indicate the remote service address(Host:Port). Host can be DNS name.
RPCAddress string `yaml:"rpcAddress"`
}

Expand Down Expand Up @@ -330,8 +330,10 @@ type (

// PublicClient is config for connecting to cadence frontend
PublicClient struct {
// HostPort is the host port to connect on
// HostPort is the host port to connect on. Host can be DNS name
HostPort string `yaml:"hostPort" validate:"nonzero"`
// interval to refresh DNS. Default to 10s
RefreshInterval time.Duration `yaml:"RefreshInterval"`
}

// BootstrapMode is an enum type for ringpop bootstrap mode
Expand Down
4 changes: 2 additions & 2 deletions config/development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ clusterMetadata:
enabled: true
initialFailoverVersion: 0
rpcName: "cadence-frontend"
rpcAddress: "127.0.0.1:7933"
rpcAddress: "localhost:7933"

dcRedirectionPolicy:
policy: "noop"
Expand Down Expand Up @@ -136,7 +136,7 @@ elasticsearch:
visibility: cadence-visibility-dev

publicClient:
hostPort: "127.0.0.1:7933"
hostPort: "localhost:7933"

dynamicConfigClient:
filepath: "config/dynamicconfig/development.yaml"
Expand Down
6 changes: 3 additions & 3 deletions config/development_active.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ clusterMetadata:
enabled: true
initialFailoverVersion: 1
rpcName: "cadence-frontend"
rpcAddress: "127.0.0.1:7933"
rpcAddress: "localhost:7933"
standby:
enabled: true
initialFailoverVersion: 0
rpcName: "cadence-frontend"
rpcAddress: "127.0.0.1:8933"
rpcAddress: "localhost:8933"

dcRedirectionPolicy:
policy: "selected-apis-forwarding"
Expand Down Expand Up @@ -125,4 +125,4 @@ archival:
- "custom-bucket-2"

publicClient:
hostPort: "127.0.0.1:7933"
hostPort: "localhost:7933"
4 changes: 2 additions & 2 deletions config/development_mysql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ clustersInfo:
clusterAddress:
active:
rpcName: "cadence-frontend"
rpcAddress: "127.0.0.1:7933"
rpcAddress: "localhost:7933"

dcRedirectionPolicy:
policy: "noop"
Expand Down Expand Up @@ -126,4 +126,4 @@ elasticsearch:
visibility: cadence-visibility-dev

publicClient:
hostPort: "127.0.0.1:7933"
hostPort: "localhost:7933"
4 changes: 2 additions & 2 deletions config/development_prometheus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ clusterMetadata:
enabled: true
initialFailoverVersion: 0
rpcName: "cadence-frontend"
rpcAddress: "127.0.0.1:7933"
rpcAddress: "localhost:7933"

dcRedirectionPolicy:
policy: "noop"
Expand Down Expand Up @@ -116,5 +116,5 @@ elasticsearch:
visibility: cadence-visibility-dev

publicClient:
hostPort: "127.0.0.1:7933"
hostPort: "localhost:7933"

Loading

0 comments on commit 671bfd0

Please sign in to comment.