Skip to content

Commit

Permalink
Use new resource struct for matching / worker (cadence-workflow#2811)
Browse files Browse the repository at this point in the history
* Use new resource struct for matching initialization logic
* Use new resource struct for worker initialization logic
* Update worker scanner initialization logic
* Rewrite ringpop initialization logic to get rid of circular dependency
  • Loading branch information
wxing1292 authored Nov 14, 2019
1 parent eedf89b commit b24ff30
Show file tree
Hide file tree
Showing 31 changed files with 717 additions and 574 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,11 @@ fmt:
@echo "running goimports"
@goimports -local "github.com/uber/cadence" -w $(ALL_SRC)

bins_nothrift: go-generate fmt lint copyright cadence-cassandra-tool cadence-sql-tool cadence cadence-server
bins_nothrift: fmt lint copyright cadence-cassandra-tool cadence-sql-tool cadence cadence-server

bins: thriftc bins_nothrift

test: bins
test: go-generate bins
@rm -f test
@rm -f test.log
@for dir in $(TEST_DIRS); do \
Expand Down
2 changes: 1 addition & 1 deletion cmd/server/cadence.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
)

// validServices is the list of all valid cadence services
var validServices = []string{historyService, matchingService, frontendService, workerService}
var validServices = []string{frontendService, historyService, matchingService, workerService}

// main entry point for the cadence server
func main() {
Expand Down
20 changes: 13 additions & 7 deletions cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,6 @@ func (s *server) startService() common.Daemon {
params.Logger = loggerimpl.NewLogger(s.cfg.Log.NewZapLogger())
params.PersistenceConfig = s.cfg.Persistence

params.MembershipFactory, err = s.cfg.Ringpop.NewFactory(params.Logger, params.Name)
if err != nil {
log.Fatalf("error creating ringpop factory: %v", err)
}

params.DynamicConfig, err = dynamicconfig.NewFileBasedClient(&s.cfg.DynamicConfigClient, params.Logger.WithTags(tag.Service(params.Name)), s.doneC)
if err != nil {
log.Printf("error creating file based dynamic config client, use no-op config client instead. error: %v", err)
Expand All @@ -124,6 +119,14 @@ func (s *server) startService() common.Daemon {
svcCfg := s.cfg.Services[s.name]
params.MetricScope = svcCfg.Metrics.NewScope(params.Logger)
params.RPCFactory = svcCfg.RPC.NewFactory(params.Name, params.Logger)
params.MembershipFactory, err = s.cfg.Ringpop.NewFactory(
params.RPCFactory.GetDispatcher(),
params.Name,
params.Logger,
)
if err != nil {
log.Fatalf("error creating ringpop factory: %v", err)
}
params.PProfInitializer = svcCfg.PProf.NewInitializer(params.Logger)

params.DCRedirectionPolicy = s.cfg.DCRedirectionPolicy
Expand Down Expand Up @@ -211,9 +214,12 @@ func (s *server) startService() common.Daemon {
case historyService:
daemon = history.NewService(&params)
case matchingService:
daemon = matching.NewService(&params)
daemon, err = matching.NewService(&params)
case workerService:
daemon = worker.NewService(&params)
daemon, err = worker.NewService(&params)
}
if err != nil {
params.Logger.Fatal("Fail to start "+s.name+" service ", tag.Error(err))
}

go execute(daemon, s.doneC)
Expand Down
5 changes: 3 additions & 2 deletions common/membership/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"errors"

"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
)

// ErrUnknownService is thrown for a service that is not tracked by this instance
Expand All @@ -47,8 +48,8 @@ type (
// Monitor provides membership information for all cadence services.
// It can be used to query which member host of a service is responsible for serving a given key.
Monitor interface {
Start() error
Stop()
common.Daemon

WhoAmI() (*HostInfo, error)
Lookup(service string, key string) (*HostInfo, error)
GetResolver(service string) (ServiceResolver, error)
Expand Down
85 changes: 85 additions & 0 deletions common/membership/ringpop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package membership

import (
"sync/atomic"

"github.com/uber/ringpop-go"
"github.com/uber/ringpop-go/swim"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
)

type (
// RingPop is a simple wrapper
RingPop struct {
status int32
*ringpop.Ringpop
bootParams *swim.BootstrapOptions
logger log.Logger
}
)

// NewRingPop create a new ring pop wrapper
func NewRingPop(
ringPop *ringpop.Ringpop,
bootParams *swim.BootstrapOptions,
logger log.Logger,
) *RingPop {
return &RingPop{
status: common.DaemonStatusInitialized,
Ringpop: ringPop,
bootParams: bootParams,
logger: logger,
}
}

// Start start ring pop
func (r *RingPop) Start() {
if !atomic.CompareAndSwapInt32(
&r.status,
common.DaemonStatusInitialized,
common.DaemonStatusStarted,
) {
return
}

_, err := r.Ringpop.Bootstrap(r.bootParams)
if err != nil {
r.logger.Fatal("unable to bootstrap ringpop", tag.Error(err))
}
}

// Stop stop ring pop
func (r *RingPop) Stop() {
if !atomic.CompareAndSwapInt32(
&r.status,
common.DaemonStatusStarted,
common.DaemonStatusStopped,
) {
return
}

r.Destroy()
}
85 changes: 50 additions & 35 deletions common/membership/rpMonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,76 +21,91 @@
package membership

import (
"sync"

ringpop "github.com/uber/ringpop-go"
"sync/atomic"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
)

type ringpopMonitor struct {
started bool
stopped bool
services []string
rp *ringpop.Ringpop
rings map[string]*ringpopServiceResolver
logger log.Logger
mutex sync.Mutex
status int32

serviceName string
services []string
rp *RingPop
rings map[string]*ringpopServiceResolver
logger log.Logger
}

var _ Monitor = (*ringpopMonitor)(nil)

// NewRingpopMonitor returns a ringpop-based membership monitor
func NewRingpopMonitor(services []string, rp *ringpop.Ringpop, logger log.Logger) Monitor {
func NewRingpopMonitor(
serviceName string,
services []string,
rp *RingPop,
logger log.Logger,
) Monitor {

rpo := &ringpopMonitor{
services: services,
rp: rp,
logger: logger,
rings: make(map[string]*ringpopServiceResolver),
status: common.DaemonStatusInitialized,
serviceName: serviceName,
services: services,
rp: rp,
logger: logger,
rings: make(map[string]*ringpopServiceResolver),
}
for _, service := range services {
rpo.rings[service] = newRingpopServiceResolver(service, rp, logger)
}
return rpo
}

func (rpo *ringpopMonitor) Start() error {
rpo.mutex.Lock()
defer rpo.mutex.Unlock()
func (rpo *ringpopMonitor) Start() {
if !atomic.CompareAndSwapInt32(
&rpo.status,
common.DaemonStatusInitialized,
common.DaemonStatusStarted,
) {
return
}

rpo.rp.Start()

if rpo.started {
return nil
labels, err := rpo.rp.Labels()
if err != nil {
rpo.logger.Fatal("unable to get ring pop labels", tag.Error(err))
}

if err = labels.Set(RoleKey, rpo.serviceName); err != nil {
rpo.logger.Fatal("unable to set ring pop labels", tag.Error(err))
}

for service, ring := range rpo.rings {
err := ring.Start()
if err != nil {
rpo.logger.Error("Failed to initialize ring.", tag.Service(service))
return err
rpo.logger.Fatal("unable to start ring pop monitor", tag.Service(service), tag.Error(err))
}
}

rpo.started = true
return nil
}

func (rpo *ringpopMonitor) Stop() {
rpo.mutex.Lock()
defer rpo.mutex.Unlock()

if rpo.stopped {
if !atomic.CompareAndSwapInt32(
&rpo.status,
common.DaemonStatusStarted,
common.DaemonStatusStopped,
) {
return
}

for _, ring := range rpo.rings {
ring.Stop()
for service, ring := range rpo.rings {
if err := ring.Stop(); err != nil {
rpo.logger.Error("unable to stop ring pop monitor", tag.Service(service), tag.Error(err))
}
}
rpo.stopped = true

if rpo.rp != nil {
rpo.rp.Destroy()
}
rpo.rp.Stop()
}

func (rpo *ringpopMonitor) WhoAmI() (*HostInfo, error) {
Expand Down
10 changes: 3 additions & 7 deletions common/membership/rpMonitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,14 @@ func (s *RpoSuite) TestRingpopMonitor() {
testService := NewTestRingpopCluster("rpm-test", 3, "127.0.0.1", "", "rpm-test")
s.NotNil(testService, "Failed to create test service")

services := []string{"rpm-test"}

logger := loggerimpl.NewNopLogger()
rpm := NewRingpopMonitor(services, testService.rings[0], logger)
err := rpm.Start()
s.Nil(err, "Failed to start ringpop monitor")

// Sleep to give time for the ring to stabilize
rpm := testService.rings[0]

time.Sleep(time.Second)

listenCh := make(chan *ChangedEvent, 5)
err = rpm.AddListener("rpm-test", "test-listener", listenCh)
err := rpm.AddListener("rpm-test", "test-listener", listenCh)
s.Nil(err, "AddListener failed")

host, err := rpm.Lookup("rpm-test", "key")
Expand Down
Loading

0 comments on commit b24ff30

Please sign in to comment.