Skip to content

Commit

Permalink
Pass blacklist in config instead of parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
Leonardo Hahn authored and felipejfc committed Nov 13, 2019
1 parent d89a986 commit 16450d9
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 33 deletions.
1 change: 0 additions & 1 deletion app.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,6 @@ func startDefaultSD() {
app.config,
app.server,
app.dieChan,
nil,
)
if err != nil {
logger.Log.Fatalf("error starting cluster service discovery component: %s", err.Error())
Expand Down
10 changes: 5 additions & 5 deletions app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func setup() {
initApp()
Configure(true, "testtype", Cluster, map[string]string{}, viper.New())

etcdSD, err := cluster.NewEtcdServiceDiscovery(app.config, app.server, app.dieChan, nil)
etcdSD, err := cluster.NewEtcdServiceDiscovery(app.config, app.server, app.dieChan)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -202,7 +202,7 @@ func TestGetMetricsReporters(t *testing.T) {
assert.Equal(t, app.metricsReporters, GetMetricsReporters())
}
func TestGetServerByID(t *testing.T) {
r, err := cluster.NewEtcdServiceDiscovery(app.config, app.server, app.dieChan, nil)
r, err := cluster.NewEtcdServiceDiscovery(app.config, app.server, app.dieChan)
assert.NoError(t, err)
assert.NotNil(t, r)
SetServiceDiscoveryClient(r)
Expand All @@ -212,7 +212,7 @@ func TestGetServerByID(t *testing.T) {
}

func TestGetServersByType(t *testing.T) {
r, err := cluster.NewEtcdServiceDiscovery(app.config, app.server, app.dieChan, nil)
r, err := cluster.NewEtcdServiceDiscovery(app.config, app.server, app.dieChan)
assert.NoError(t, err)
assert.NotNil(t, r)
SetServiceDiscoveryClient(r)
Expand Down Expand Up @@ -251,7 +251,7 @@ func TestSetRPCClient(t *testing.T) {
func TestSetServiceDiscovery(t *testing.T) {
initApp()
Configure(true, "testtype", Cluster, map[string]string{}, viper.New())
r, err := cluster.NewEtcdServiceDiscovery(app.config, app.server, app.dieChan, nil)
r, err := cluster.NewEtcdServiceDiscovery(app.config, app.server, app.dieChan)
assert.NoError(t, err)
assert.NotNil(t, r)
SetServiceDiscoveryClient(r)
Expand Down Expand Up @@ -418,7 +418,7 @@ func TestStartAndListenCluster(t *testing.T) {
initApp()
Configure(true, "testtype", Cluster, map[string]string{}, cfg)

etcdSD, err := cluster.NewEtcdServiceDiscovery(app.config, app.server, app.dieChan, nil, cli)
etcdSD, err := cluster.NewEtcdServiceDiscovery(app.config, app.server, app.dieChan, cli)
assert.NoError(t, err)
SetServiceDiscoveryClient(etcdSD)

Expand Down
25 changes: 14 additions & 11 deletions cluster/etcd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,24 +72,22 @@ func NewEtcdServiceDiscovery(
config *config.Config,
server *Server,
appDieChan chan bool,
serverTypesBlacklist []string,
cli ...*clientv3.Client,
) (ServiceDiscovery, error) {
var client *clientv3.Client
if len(cli) > 0 {
client = cli[0]
}
sd := &etcdServiceDiscovery{
config: config,
running: false,
server: server,
serverMapByType: make(map[string]map[string]*Server),
listeners: make([]SDListener, 0),
stopChan: make(chan bool),
stopLeaseChan: make(chan bool),
appDieChan: appDieChan,
cli: client,
serverTypesBlacklist: serverTypesBlacklist,
config: config,
running: false,
server: server,
serverMapByType: make(map[string]map[string]*Server),
listeners: make([]SDListener, 0),
stopChan: make(chan bool),
stopLeaseChan: make(chan bool),
appDieChan: appDieChan,
cli: client,
}

sd.configure()
Expand All @@ -111,6 +109,11 @@ func (sd *etcdServiceDiscovery) configure() {
sd.grantLeaseMaxRetries = sd.config.GetInt("pitaya.cluster.sd.etcd.grantlease.maxretries")
sd.grantLeaseInterval = sd.config.GetDuration("pitaya.cluster.sd.etcd.grantlease.retryinterval")
sd.shutdownDelay = sd.config.GetDuration("pitaya.cluster.sd.etcd.shutdown.delay")
sd.serverTypesBlacklist = sd.config.GetStringSlice("pitaya.cluster.sd.etcd.servertypeblacklist")

if len(sd.serverTypesBlacklist) > 0 {
logger.Log.Info("using server types blacklist: %s", sd.serverTypesBlacklist)
}
}

func (sd *etcdServiceDiscovery) watchLeaseChan(c <-chan *clientv3.LeaseKeepAliveResponse) {
Expand Down
33 changes: 17 additions & 16 deletions cluster/etcd_service_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ func getConfig(conf ...*viper.Viper) *config.Config {
return config
}

func getEtcdSD(t *testing.T, config *config.Config, server *Server, blacklist []string, cli *clientv3.Client) *etcdServiceDiscovery {
func getEtcdSD(t *testing.T, config *config.Config, server *Server, cli *clientv3.Client) *etcdServiceDiscovery {
t.Helper()
appDieChan := make(chan bool)
e, err := NewEtcdServiceDiscovery(config, server, appDieChan, blacklist, cli)
e, err := NewEtcdServiceDiscovery(config, server, appDieChan, cli)
assert.NoError(t, err)
return e.(*etcdServiceDiscovery)
}
Expand All @@ -115,7 +115,7 @@ func TestNewEtcdServiceDiscovery(t *testing.T) {
config := getConfig()
c, cli := helpers.GetTestEtcd(t)
defer c.Terminate(t)
e := getEtcdSD(t, config, table.server, nil, cli)
e := getEtcdSD(t, config, table.server, cli)
assert.NotNil(t, e)
})
}
Expand All @@ -128,7 +128,7 @@ func TestEtcdSDBootstrapLease(t *testing.T) {
config := getConfig()
c, cli := helpers.GetTestEtcd(t)
defer c.Terminate(t)
e := getEtcdSD(t, config, table.server, nil, cli)
e := getEtcdSD(t, config, table.server, cli)
err := e.grantLease()
assert.NoError(t, err)
assert.NotEmpty(t, e.leaseID)
Expand All @@ -143,7 +143,7 @@ func TestEtcdSDBootstrapLeaseError(t *testing.T) {
config := getConfig()
c, cli := helpers.GetTestEtcd(t)
c.Terminate(t)
e := getEtcdSD(t, config, table.server, nil, cli)
e := getEtcdSD(t, config, table.server, cli)
err := e.grantLease()
assert.Error(t, err)
})
Expand All @@ -157,7 +157,7 @@ func TestEtcdSDBootstrapServer(t *testing.T) {
config := getConfig()
c, cli := helpers.GetTestEtcd(t)
defer c.Terminate(t)
e := getEtcdSD(t, config, table.server, nil, cli)
e := getEtcdSD(t, config, table.server, cli)
e.grantLease()
err := e.bootstrapServer(table.server)
assert.NoError(t, err)
Expand All @@ -182,7 +182,7 @@ func TestEtcdSDDeleteServer(t *testing.T) {
config := getConfig()
c, cli := helpers.GetTestEtcd(t)
defer c.Terminate(t)
e := getEtcdSD(t, config, table.server, nil, cli)
e := getEtcdSD(t, config, table.server, cli)
e.grantLease()
err := e.bootstrapServer(table.server)
assert.NoError(t, err)
Expand Down Expand Up @@ -221,7 +221,7 @@ func TestEtcdSDDeleteLocalInvalidServers(t *testing.T) {
t.Run(table.server.ID, func(t *testing.T) {
config := getConfig()
_, cli := helpers.GetTestEtcd(t)
e := getEtcdSD(t, config, table.server, nil, cli)
e := getEtcdSD(t, config, table.server, cli)
invalidServer := &Server{
ID: "invalid",
Type: "bla",
Expand All @@ -242,7 +242,7 @@ func TestEtcdSDGetServer(t *testing.T) {
config := getConfig()
c, cli := helpers.GetTestEtcd(t)
defer c.Terminate(t)
e := getEtcdSD(t, config, table.server, nil, cli)
e := getEtcdSD(t, config, table.server, cli)
e.grantLease()
e.bootstrapServer(table.server)
sv, err := e.GetServer(table.server.ID)
Expand All @@ -258,7 +258,7 @@ func TestEtcdSDGetServers(t *testing.T) {
config := getConfig()
c, cli := helpers.GetTestEtcd(t)
defer c.Terminate(t)
e := getEtcdSD(t, config, &Server{}, nil, cli)
e := getEtcdSD(t, config, &Server{}, cli)
e.grantLease()
for _, server := range table.servers {
e.bootstrapServer(server)
Expand All @@ -277,7 +277,7 @@ func TestEtcdSDInit(t *testing.T) {
config := getConfig(conf)
c, cli := helpers.GetTestEtcd(t)
defer c.Terminate(t)
e := getEtcdSD(t, config, table.server, nil, cli)
e := getEtcdSD(t, config, table.server, cli)
e.Init()
// should set running
assert.True(t, e.running)
Expand All @@ -304,7 +304,7 @@ func TestEtcdBeforeShutdown(t *testing.T) {
config := getConfig()
c, cli := helpers.GetTestEtcd(t)
defer c.Terminate(t)
e := getEtcdSD(t, config, table.server, nil, cli)
e := getEtcdSD(t, config, table.server, cli)
e.Init()
assert.True(t, e.running)
e.BeforeShutdown()
Expand All @@ -322,7 +322,7 @@ func TestEtcdShutdown(t *testing.T) {
config := getConfig()
c, cli := helpers.GetTestEtcd(t)
defer c.Terminate(t)
e := getEtcdSD(t, config, table.server, nil, cli)
e := getEtcdSD(t, config, table.server, cli)
e.Init()
assert.True(t, e.running)
e.Shutdown()
Expand All @@ -340,7 +340,7 @@ func TestEtcdWatchChangesAddNewServers(t *testing.T) {
config := getConfig(conf)
c, cli := helpers.GetTestEtcd(t)
defer c.Terminate(t)
e := getEtcdSD(t, config, table.server, nil, cli)
e := getEtcdSD(t, config, table.server, cli)
e.running = true
e.bootstrapServer(table.server)
e.watchEtcdChanges()
Expand Down Expand Up @@ -374,7 +374,7 @@ func TestEtcdWatchChangesDeleteServers(t *testing.T) {
config := getConfig(conf)
c, cli := helpers.GetTestEtcd(t)
defer c.Terminate(t)
e := getEtcdSD(t, config, table.server, nil, cli)
e := getEtcdSD(t, config, table.server, cli)
e.running = true
e.bootstrapServer(table.server)
e.watchEtcdChanges()
Expand Down Expand Up @@ -411,10 +411,11 @@ func TestEtcdWatchChangesWithBlacklist(t *testing.T) {
t.Run(table.name, func(t *testing.T) {
conf := viper.New()
conf.Set("pitaya.cluster.sd.etcd.syncservers.interval", "10ms")
conf.Set("pitaya.cluster.sd.etcd.serverTypeBlacklist", table.serverTypeBlacklist)
config := getConfig(conf)
c, cli := helpers.GetTestEtcd(t)
defer c.Terminate(t)
e := getEtcdSD(t, config, table.server, table.serverTypeBlacklist, cli)
e := getEtcdSD(t, config, table.server, cli)
e.running = true
_ = e.bootstrapServer(table.server)
e.watchEtcdChanges()
Expand Down

0 comments on commit 16450d9

Please sign in to comment.