Skip to content

Commit

Permalink
Delay worker startup for service warmup (cadence-workflow#2827)
Browse files Browse the repository at this point in the history
* Delay worker scanner start workflow for service to warm up
* Ringpop service resolver will trigger a force refresh if cannot find target service
  • Loading branch information
wxing1292 authored Nov 15, 2019
1 parent 37c4e32 commit d553f55
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 57 deletions.
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,15 @@ bins_nothrift: fmt lint copyright cadence-cassandra-tool cadence-sql-tool cadenc

bins: thriftc bins_nothrift

test: go-generate bins
test: bins
@rm -f test
@rm -f test.log
@for dir in $(TEST_DIRS); do \
go test -timeout $(TEST_TIMEOUT) -race -coverprofile=$@ "$$dir" $(TEST_TAG) | tee -a test.log; \
done;

release: go-generate test

# need to run xdc tests with race detector off because of ringpop bug causing data race issue
test_xdc: bins
@rm -f test
Expand Down
13 changes: 4 additions & 9 deletions common/membership/rpMonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,8 @@ func (rpo *ringpopMonitor) Start() {
rpo.logger.Fatal("unable to set ring pop labels", tag.Error(err))
}

for service, ring := range rpo.rings {
err := ring.Start()
if err != nil {
rpo.logger.Fatal("unable to start ring pop monitor", tag.Service(service), tag.Error(err))
}
for _, ring := range rpo.rings {
ring.Start()
}
}

Expand All @@ -99,10 +96,8 @@ func (rpo *ringpopMonitor) Stop() {
return
}

for service, ring := range rpo.rings {
if err := ring.Stop(); err != nil {
rpo.logger.Error("unable to stop ring pop monitor", tag.Service(service), tag.Error(err))
}
for _, ring := range rpo.rings {
ring.Stop()
}

rpo.rp.Stop()
Expand Down
103 changes: 58 additions & 45 deletions common/membership/rpServiceResolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package membership

import (
"sync"
"sync/atomic"
"time"

"github.com/dgryski/go-farm"
Expand All @@ -38,21 +39,23 @@ const (
// RoleKey label is set by every single service as soon as it bootstraps its
// ringpop instance. The data for this key is the service name
RoleKey = "serviceName"
minRefreshInternal = time.Second * 4
defaultRefreshInterval = time.Second * 10
replicaPoints = 100
)

type ringpopServiceResolver struct {
service string
isStarted bool
isStopped bool
rp *RingPop
shutdownCh chan struct{}
shutdownWG sync.WaitGroup
logger log.Logger

ringLock sync.RWMutex
ring *hashring.HashRing
status int32
service string
rp *RingPop
refreshChan chan struct{}
shutdownCh chan struct{}
shutdownWG sync.WaitGroup
logger log.Logger

ringLock sync.RWMutex
ringLastRefreshTime time.Time
ring *hashring.HashRing

listenerLock sync.RWMutex
listeners map[string]chan<- *ChangedEvent
Expand All @@ -67,60 +70,60 @@ func newRingpopServiceResolver(
) *ringpopServiceResolver {

return &ringpopServiceResolver{
service: service,
rp: rp,
logger: logger.WithTags(tag.ComponentServiceResolver, tag.Service(service)),
ring: hashring.New(farm.Fingerprint32, replicaPoints),
listeners: make(map[string]chan<- *ChangedEvent),
shutdownCh: make(chan struct{}),
status: common.DaemonStatusInitialized,
service: service,
rp: rp,
refreshChan: make(chan struct{}),
shutdownCh: make(chan struct{}),
logger: logger.WithTags(tag.ComponentServiceResolver, tag.Service(service)),

ring: hashring.New(farm.Fingerprint32, replicaPoints),

listeners: make(map[string]chan<- *ChangedEvent),
}
}

// Start starts the oracle
func (r *ringpopServiceResolver) Start() error {
r.ringLock.Lock()
defer r.ringLock.Unlock()

if r.isStarted {
return nil
func (r *ringpopServiceResolver) Start() {
if !atomic.CompareAndSwapInt32(
&r.status,
common.DaemonStatusInitialized,
common.DaemonStatusStarted,
) {
return
}

r.rp.AddListener(r)
if err := r.doRefresh(); err != nil {
return err
if err := r.refresh(); err != nil {
r.logger.Fatal("unable to start ring pop service resolver", tag.Error(err))
}

r.shutdownWG.Add(1)
go r.refreshRingWorker()

r.isStarted = true
return nil
}

// Stop stops the resolver
func (r *ringpopServiceResolver) Stop() error {
func (r *ringpopServiceResolver) Stop() {
if !atomic.CompareAndSwapInt32(
&r.status,
common.DaemonStatusStarted,
common.DaemonStatusStopped,
) {
return
}

r.ringLock.Lock()
defer r.ringLock.Unlock()
r.listenerLock.Lock()
defer r.listenerLock.Unlock()

if r.isStopped {
return nil
}

if r.isStarted {
r.rp.RemoveListener(r)
r.ring = hashring.New(farm.Fingerprint32, replicaPoints)
r.listeners = make(map[string]chan<- *ChangedEvent)
close(r.shutdownCh)
}
r.rp.RemoveListener(r)
r.ring = hashring.New(farm.Fingerprint32, replicaPoints)
r.listeners = make(map[string]chan<- *ChangedEvent)
close(r.shutdownCh)

if success := common.AwaitWaitGroup(&r.shutdownWG, time.Minute); !success {
r.logger.Warn("service resolver timed out on shutdown.")
}

r.isStopped = true
return nil
}

// Lookup finds the host in the ring responsible for serving the given key
Expand All @@ -132,6 +135,10 @@ func (r *ringpopServiceResolver) Lookup(
defer r.ringLock.RUnlock()
addr, found := r.ring.Lookup(key)
if !found {
select {
case r.refreshChan <- struct{}{}:
default:
}
return nil, ErrInsufficientHosts
}
return NewHostInfo(addr, r.getLabelsMap()), nil
Expand Down Expand Up @@ -189,10 +196,11 @@ func (r *ringpopServiceResolver) refresh() error {
r.ringLock.Lock()
defer r.ringLock.Unlock()

return r.doRefresh()
}
if r.ringLastRefreshTime.After(time.Now().Add(-minRefreshInternal)) {
// refresh too frequently
return nil
}

func (r *ringpopServiceResolver) doRefresh() error {
r.ring = hashring.New(farm.Fingerprint32, replicaPoints)

addrs, err := r.rp.GetReachableMembers(swim.MemberWithLabelAndValue(RoleKey, r.service))
Expand All @@ -205,6 +213,7 @@ func (r *ringpopServiceResolver) doRefresh() error {
r.ring.AddMembers(host)
}

r.ringLastRefreshTime = time.Now()
r.logger.Debug("Current reachable members", tag.Addresses(addrs))
return nil
}
Expand Down Expand Up @@ -248,6 +257,10 @@ func (r *ringpopServiceResolver) refreshRingWorker() {
select {
case <-r.shutdownCh:
return
case <-r.refreshChan:
if err := r.refresh(); err != nil {
r.logger.Error("error periodically refreshing ring", tag.Error(err))
}
case <-refreshTicker.C:
if err := r.refresh(); err != nil {
r.logger.Error("error periodically refreshing ring", tag.Error(err))
Expand Down
15 changes: 13 additions & 2 deletions service/worker/scanner/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ import (
"github.com/uber/cadence/common/service/dynamicconfig"
)

const (
// scannerStartUpDelay is to let services warm up
scannerStartUpDelay = time.Second * 4
)

type (
// Config defines the configuration for scanner
Config struct {
Expand Down Expand Up @@ -127,17 +132,23 @@ func (s *Scanner) Start() error {
func (s *Scanner) startWorkflowWithRetry(
options cclient.StartWorkflowOptions,
workflowType string,
) error {
) {

// let history / matching service warm up
time.Sleep(scannerStartUpDelay)

sdkClient := cclient.NewClient(s.context.GetSDKClient(), common.SystemLocalDomainName, &cclient.Options{})
policy := backoff.NewExponentialRetryPolicy(time.Second)
policy.SetMaximumInterval(time.Minute)
policy.SetExpirationInterval(backoff.NoInterval)
return backoff.Retry(func() error {
err := backoff.Retry(func() error {
return s.startWorkflow(sdkClient, options, workflowType)
}, policy, func(err error) bool {
return true
})
if err != nil {
s.context.GetLogger().Fatal("unable to start scanner", tag.Error(err))
}
}

func (s *Scanner) startWorkflow(
Expand Down

0 comments on commit d553f55

Please sign in to comment.