Skip to content

Commit

Permalink
Remove ringpop use from integration tests (cadence-workflow#1638)
Browse files Browse the repository at this point in the history
* Create MembershipMonitorFactory interface
* Create ringpop implementation of membership monitor factory - move ringpop init and use behind it
* Create simple static membership monitor for integration tests
* Make dynamic config configurable for frontend RPS in the integration tests
  • Loading branch information
shreyassrivatsan authored Apr 9, 2019
1 parent fe12d70 commit fc188c9
Show file tree
Hide file tree
Showing 9 changed files with 318 additions and 124 deletions.
2 changes: 1 addition & 1 deletion cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (s *server) startService() common.Daemon {
params.Logger = s.cfg.Log.NewBarkLogger()
params.PersistenceConfig = s.cfg.Persistence

params.RingpopFactory, err = s.cfg.Ringpop.NewFactory()
params.MembershipFactory, err = s.cfg.Ringpop.NewFactory(params.Logger, params.Name)
if err != nil {
log.Fatalf("error creating ringpop factory: %v", err)
}
Expand Down
4 changes: 4 additions & 0 deletions common/membership/rpMonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ func (rpo *ringpopMonitor) Stop() {
ring.Stop()
}
rpo.stopped = true

if rpo.rp != nil {
rpo.rp.Destroy()
}
}

func (rpo *ringpopMonitor) WhoAmI() (*HostInfo, error) {
Expand Down
50 changes: 43 additions & 7 deletions common/service/config/ringpop.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (
"strings"
"time"

"github.com/uber-common/bark"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/membership"
"github.com/uber/ringpop-go"
"github.com/uber/ringpop-go/discovery"
"github.com/uber/ringpop-go/discovery/jsonfile"
Expand All @@ -52,15 +55,25 @@ const (
defaultMaxJoinDuration = 10 * time.Second
)

// CadenceServices indicate the list of cadence services
var CadenceServices = []string{
common.FrontendServiceName,
common.HistoryServiceName,
common.MatchingServiceName,
common.WorkerServiceName,
}

// RingpopFactory implements the RingpopFactory interface
type RingpopFactory struct {
config *Ringpop
config *Ringpop
logger bark.Logger
serviceName string
}

// NewFactory builds a ringpop factory conforming
// to the underlying configuration
func (rpConfig *Ringpop) NewFactory() (*RingpopFactory, error) {
return newRingpopFactory(rpConfig)
func (rpConfig *Ringpop) NewFactory(logger bark.Logger, serviceName string) (*RingpopFactory, error) {
return newRingpopFactory(rpConfig, logger, serviceName)
}

func (rpConfig *Ringpop) validate() error {
Expand Down Expand Up @@ -115,18 +128,41 @@ func validateBootstrapMode(rpConfig *Ringpop) error {
return nil
}

func newRingpopFactory(rpConfig *Ringpop) (*RingpopFactory, error) {
func newRingpopFactory(rpConfig *Ringpop, logger bark.Logger, serviceName string) (*RingpopFactory, error) {
if err := rpConfig.validate(); err != nil {
return nil, err
}
if rpConfig.MaxJoinDuration == 0 {
rpConfig.MaxJoinDuration = defaultMaxJoinDuration
}
return &RingpopFactory{config: rpConfig}, nil
return &RingpopFactory{config: rpConfig, logger: logger, serviceName: serviceName}, nil
}

// Create is the implementation for MembershipMonitorFactory.Create
func (factory *RingpopFactory) Create(dispatcher *yarpc.Dispatcher) (membership.Monitor, error) {
// use actual listen port (in case service is bound to :0 or 0.0.0.0:0)
rp, err := factory.createRingpop(dispatcher)
if err != nil {
return nil, fmt.Errorf("ringpop creation failed: %v", err)
}

labels, err := rp.Labels()
if err != nil {
return nil, fmt.Errorf("ringpop get node labels failed: %v", err)
}

if err = labels.Set(membership.RoleKey, factory.serviceName); err != nil {
return nil, fmt.Errorf("ringpop setting role label failed: %v", err)
}

membershipMonitor := membership.NewRingpopMonitor(CadenceServices, rp, factory.logger)
if err = membershipMonitor.Start(); err != nil {
return nil, err
}
return membershipMonitor, nil
}

// CreateRingpop is the implementation for RingpopFactory.CreateRingpop
func (factory *RingpopFactory) CreateRingpop(dispatcher *yarpc.Dispatcher) (*ringpop.Ringpop, error) {
func (factory *RingpopFactory) createRingpop(dispatcher *yarpc.Dispatcher) (*ringpop.Ringpop, error) {
var ch *tcg.Channel
var err error
if ch, err = factory.getChannel(dispatcher); err != nil {
Expand Down
12 changes: 7 additions & 5 deletions common/service/config/ringpop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ package config

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/uber-common/bark"
"github.com/uber/ringpop-go/discovery/statichosts"
"gopkg.in/yaml.v2"
"testing"
"time"
)

type RingpopSuite struct {
Expand All @@ -54,7 +56,7 @@ func (s *RingpopSuite) TestHostsMode() {
s.Equal(time.Second*30, cfg.MaxJoinDuration)
cfg.validate()
s.Nil(err)
f, err := cfg.NewFactory()
f, err := cfg.NewFactory(bark.NewNopLogger(), "test")
s.Nil(err)
s.NotNil(f)
}
Expand All @@ -69,7 +71,7 @@ func (s *RingpopSuite) TestFileMode() {
s.Equal(time.Second*30, cfg.MaxJoinDuration)
err = cfg.validate()
s.Nil(err)
f, err := cfg.NewFactory()
f, err := cfg.NewFactory(bark.NewNopLogger(), "test")
s.Nil(err)
s.NotNil(f)
}
Expand All @@ -83,7 +85,7 @@ func (s *RingpopSuite) TestCustomMode() {
s.NotNil(cfg.validate())
cfg.DiscoveryProvider = statichosts.New("127.0.0.1")
s.Nil(cfg.validate())
f, err := cfg.NewFactory()
f, err := cfg.NewFactory(bark.NewNopLogger(), "test")
s.Nil(err)
s.NotNil(f)
}
Expand Down
35 changes: 9 additions & 26 deletions common/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
"github.com/uber-common/bark"
"github.com/uber-go/tally"
es "github.com/uber/cadence/common/elasticsearch"
"github.com/uber/ringpop-go"
"go.uber.org/yarpc"
)

Expand All @@ -60,7 +59,7 @@ type (
Logger bark.Logger
ThrottledLogger bark.Logger
MetricScope tally.Scope
RingpopFactory RingpopFactory
MembershipFactory MembershipMonitorFactory
RPCFactory common.RPCFactory
PProfInitializer common.PProfInitializer
PersistenceConfig config.Persistence
Expand All @@ -76,10 +75,10 @@ type (
DCRedirectionPolicy config.DCRedirectionPolicy
}

// RingpopFactory provides a bootstrapped ringpop
RingpopFactory interface {
// CreateRingpop vends a bootstrapped ringpop object
CreateRingpop(d *yarpc.Dispatcher) (*ringpop.Ringpop, error)
// MembershipMonitorFactory provides a bootstrapped membership monitor
MembershipMonitorFactory interface {
// Create vends a bootstrapped membership monitor
Create(d *yarpc.Dispatcher) (membership.Monitor, error)
}

// Service contains the objects specific to this service
Expand All @@ -89,8 +88,7 @@ type (
hostName string
hostInfo *membership.HostInfo
dispatcher *yarpc.Dispatcher
rp *ringpop.Ringpop
rpFactory RingpopFactory
membershipFactory MembershipMonitorFactory
membershipMonitor membership.Monitor
rpcFactory common.RPCFactory
pprofInitializer common.PProfInitializer
Expand All @@ -117,7 +115,7 @@ func New(params *BootstrapParams) Service {
logger: params.Logger,
throttledLogger: params.ThrottledLogger,
rpcFactory: params.RPCFactory,
rpFactory: params.RingpopFactory,
membershipFactory: params.MembershipFactory,
pprofInitializer: params.PProfInitializer,
metricsScope: params.MetricScope,
numberOfHistoryShards: params.PersistenceConfig.NumHistoryShards,
Expand Down Expand Up @@ -172,22 +170,11 @@ func (h *serviceImpl) Start() {
h.logger.WithFields(bark.Fields{logging.TagErr: err}).Fatal("Failed to start yarpc dispatcher")
}

// use actual listen port (in case service is bound to :0 or 0.0.0.0:0)
h.rp, err = h.rpFactory.CreateRingpop(h.dispatcher)
h.membershipMonitor, err = h.membershipFactory.Create(h.dispatcher)
if err != nil {
h.logger.WithFields(bark.Fields{logging.TagErr: err}).Fatal("Ringpop creation failed")
h.logger.WithFields(bark.Fields{logging.TagErr: err}).Fatal("Membership monitor creation failed")
}

labels, err := h.rp.Labels()
if err != nil {
h.logger.WithFields(bark.Fields{logging.TagErr: err}).Fatal("Ringpop get node labels failed")
}
err = labels.Set(membership.RoleKey, h.sName)
if err != nil {
h.logger.WithFields(bark.Fields{logging.TagErr: err}).Fatal("Ringpop setting role label failed")
}

h.membershipMonitor = membership.NewRingpopMonitor(cadenceServices, h.rp, h.logger)
err = h.membershipMonitor.Start()
if err != nil {
h.logger.WithFields(bark.Fields{logging.TagErr: err}).Fatal("starting membership monitor failed")
Expand Down Expand Up @@ -225,10 +212,6 @@ func (h *serviceImpl) Stop() {
h.membershipMonitor.Stop()
}

if h.rp != nil {
h.rp.Destroy()
}

if h.dispatcher != nil {
h.dispatcher.Stop()
}
Expand Down
84 changes: 84 additions & 0 deletions host/dynamicconfig.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package host

import (
"time"

"github.com/uber/cadence/common/service/dynamicconfig"
)

var (
// Override value for integer keys for dynamic config
intKeys = map[dynamicconfig.Key]int{
dynamicconfig.FrontendRPS: 1500,
}
)

type dynamicClient struct {
client dynamicconfig.Client
}

func (d *dynamicClient) GetValue(name dynamicconfig.Key, defaultValue interface{}) (interface{}, error) {
return d.client.GetValue(name, defaultValue)
}

func (d *dynamicClient) GetValueWithFilters(
name dynamicconfig.Key, filters map[dynamicconfig.Filter]interface{}, defaultValue interface{},
) (interface{}, error) {
return d.client.GetValueWithFilters(name, filters, defaultValue)
}

func (d *dynamicClient) GetIntValue(name dynamicconfig.Key, filters map[dynamicconfig.Filter]interface{}, defaultValue int) (int, error) {
if val, ok := intKeys[name]; ok {
return val, nil
}
return d.client.GetIntValue(name, filters, defaultValue)
}

func (d *dynamicClient) GetFloatValue(name dynamicconfig.Key, filters map[dynamicconfig.Filter]interface{}, defaultValue float64) (float64, error) {
return d.client.GetFloatValue(name, filters, defaultValue)
}

func (d *dynamicClient) GetBoolValue(name dynamicconfig.Key, filters map[dynamicconfig.Filter]interface{}, defaultValue bool) (bool, error) {
return d.client.GetBoolValue(name, filters, defaultValue)
}

func (d *dynamicClient) GetStringValue(name dynamicconfig.Key, filters map[dynamicconfig.Filter]interface{}, defaultValue string) (string, error) {
return d.client.GetStringValue(name, filters, defaultValue)
}

func (d *dynamicClient) GetMapValue(
name dynamicconfig.Key, filters map[dynamicconfig.Filter]interface{}, defaultValue map[string]interface{},
) (map[string]interface{}, error) {
return d.client.GetMapValue(name, filters, defaultValue)
}

func (d *dynamicClient) GetDurationValue(
name dynamicconfig.Key, filters map[dynamicconfig.Filter]interface{}, defaultValue time.Duration,
) (time.Duration, error) {
return d.client.GetDurationValue(name, filters, defaultValue)
}

// newIntegrationConfigClient - returns a dynamic config client for integration testing
func newIntegrationConfigClient(client dynamicconfig.Client) dynamicconfig.Client {
return &dynamicClient{client}
}
Loading

0 comments on commit fc188c9

Please sign in to comment.