Skip to content

Commit

Permalink
Bugfix/sd sigint (closes topfreegames#11) (topfreegames#34)
Browse files Browse the repository at this point in the history
* Added a timeout for etcd's revoke function

* Added graceful shutdown in case of etcd goes down

* Update docs to include new configuration values and removed extra line in function
  • Loading branch information
gabrielerzinger authored and andrehp committed Aug 9, 2018
1 parent c05b0ec commit 537a13a
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 30 deletions.
14 changes: 3 additions & 11 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@

[[constraint]]
name = "github.com/coreos/etcd"
version = "3.3.2"
version = "3.3.9"

[[constraint]]
name = "github.com/nats-io/go-nats"
Expand Down
1 change: 1 addition & 0 deletions app.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ func startDefaultSD() {
app.serviceDiscovery, err = cluster.NewEtcdServiceDiscovery(
app.config,
app.server,
app.dieChan,
)
if err != nil {
logger.Log.Fatalf("error starting cluster service discovery component: %s", err.Error())
Expand Down
10 changes: 5 additions & 5 deletions app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func setup() {
initApp()
Configure(true, "testtype", Cluster, map[string]string{}, viper.New())

etcdSD, _ := cluster.NewEtcdServiceDiscovery(app.config, app.server)
etcdSD, _ := cluster.NewEtcdServiceDiscovery(app.config, app.server, app.dieChan)
typeOfetcdSD = reflect.TypeOf(etcdSD)

natsRPCServer, _ := cluster.NewNatsRPCServer(app.config, app.server, nil, app.dieChan)
Expand Down Expand Up @@ -183,7 +183,7 @@ func TestGetMetricsReporters(t *testing.T) {
assert.Equal(t, app.metricsReporters, GetMetricsReporters())
}
func TestGetServerByID(t *testing.T) {
r, err := cluster.NewEtcdServiceDiscovery(app.config, app.server)
r, err := cluster.NewEtcdServiceDiscovery(app.config, app.server, app.dieChan)
assert.NoError(t, err)
assert.NotNil(t, r)
SetServiceDiscoveryClient(r)
Expand All @@ -193,7 +193,7 @@ func TestGetServerByID(t *testing.T) {
}

func TestGetServersByType(t *testing.T) {
r, err := cluster.NewEtcdServiceDiscovery(app.config, app.server)
r, err := cluster.NewEtcdServiceDiscovery(app.config, app.server, app.dieChan)
assert.NoError(t, err)
assert.NotNil(t, r)
SetServiceDiscoveryClient(r)
Expand Down Expand Up @@ -232,7 +232,7 @@ func TestSetRPCClient(t *testing.T) {
func TestSetServiceDiscovery(t *testing.T) {
initApp()
Configure(true, "testtype", Cluster, map[string]string{}, viper.New())
r, err := cluster.NewEtcdServiceDiscovery(app.config, app.server)
r, err := cluster.NewEtcdServiceDiscovery(app.config, app.server, app.dieChan)
assert.NoError(t, err)
assert.NotNil(t, r)
SetServiceDiscoveryClient(r)
Expand Down Expand Up @@ -399,7 +399,7 @@ func TestStartAndListenCluster(t *testing.T) {
initApp()
Configure(true, "testtype", Cluster, map[string]string{}, cfg)

etcdSD, err := cluster.NewEtcdServiceDiscovery(app.config, app.server, cli)
etcdSD, err := cluster.NewEtcdServiceDiscovery(app.config, app.server, app.dieChan, cli)
assert.NoError(t, err)
SetServiceDiscoveryClient(etcdSD)

Expand Down
57 changes: 45 additions & 12 deletions cluster/etcd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,16 @@ type etcdServiceDiscovery struct {
stopChan chan bool
lastSyncTime time.Time
listeners []SDListener
revokeTimeout time.Duration
bootstrapTimeout time.Duration
appDieChan chan bool
}

// NewEtcdServiceDiscovery ctor
func NewEtcdServiceDiscovery(
config *config.Config,
server *Server,
appDieChan chan bool,
cli ...*clientv3.Client,
) (ServiceDiscovery, error) {
var client *clientv3.Client
Expand All @@ -72,6 +76,7 @@ func NewEtcdServiceDiscovery(
server: server,
listeners: make([]SDListener, 0),
stopChan: make(chan bool),
appDieChan: appDieChan,
cli: client,
}

Expand All @@ -87,6 +92,8 @@ func (sd *etcdServiceDiscovery) configure() {
sd.heartbeatTTL = sd.config.GetDuration("pitaya.cluster.sd.etcd.heartbeat.ttl")
sd.logHeartbeat = sd.config.GetBool("pitaya.cluster.sd.etcd.heartbeat.log")
sd.syncServersInterval = sd.config.GetDuration("pitaya.cluster.sd.etcd.syncservers.interval")
sd.revokeTimeout = sd.config.GetDuration("pitaya.cluster.sd.etcd.revoke.timeout")
sd.bootstrapTimeout = sd.config.GetDuration("pitaya.cluster.sd.etcd.bootstrap.timeout")
}

func (sd *etcdServiceDiscovery) watchLeaseChan(c <-chan *clientv3.LeaseKeepAliveResponse) {
Expand Down Expand Up @@ -232,16 +239,28 @@ func (sd *etcdServiceDiscovery) GetServersByType(serverType string) (map[string]
}

func (sd *etcdServiceDiscovery) bootstrap() error {
err := sd.bootstrapLease()
if err != nil {
return err
}

err = sd.bootstrapServer(sd.server)
if err != nil {
c := make(chan error)
defer close(c)
go func() {
logger.Log.Infof("waiting for etcd connection")
err := sd.bootstrapLease()
if err != nil {
c <- err
return
}
err = sd.bootstrapServer(sd.server)
c <- err
}()
select {
case err := <-c:
return err
case <-time.After(sd.bootstrapTimeout):
logger.Log.Warn("timed out waiting for etcd connection")
if sd.appDieChan != nil {
sd.appDieChan <- true
}
return nil
}
return nil
}

// GetServer returns a server given it's id
Expand Down Expand Up @@ -369,12 +388,26 @@ func (sd *etcdServiceDiscovery) SyncServers() error {
func (sd *etcdServiceDiscovery) Shutdown() error {
sd.running = false
close(sd.stopChan)
return sd.revoke()
}

_, err := sd.cli.Revoke(context.TODO(), sd.leaseID)
if err != nil {
return err
// revoke prevents Pitaya from crashing when etcd is not available
func (sd *etcdServiceDiscovery) revoke() error {
c := make(chan error)
defer close(c)
go func() {
logger.Log.Debug("waiting for etcd revoke")
_, err := sd.cli.Revoke(context.TODO(), sd.leaseID)
c <- err
logger.Log.Debug("finished waiting for etcd revoke")
}()
select {
case err := <-c:
return err // completed normally
case <-time.After(sd.revokeTimeout):
logger.Log.Warn("timed out waiting for etcd revoke")
return nil // timed out
}
return nil
}

func (sd *etcdServiceDiscovery) addServer(sv *Server) {
Expand Down
3 changes: 2 additions & 1 deletion cluster/etcd_service_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ func getConfig(conf ...*viper.Viper) *config.Config {

func getEtcdSD(t *testing.T, config *config.Config, server *Server, cli *clientv3.Client) *etcdServiceDiscovery {
t.Helper()
e, err := NewEtcdServiceDiscovery(config, server, cli)
appDieChan := make(chan bool)
e, err := NewEtcdServiceDiscovery(config, server, appDieChan, cli)
assert.NoError(t, err)
return e.(*etcdServiceDiscovery)
}
Expand Down
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ func (c *Config) fillDefaultValues() {
"pitaya.cluster.sd.etcd.endpoints": "localhost:2379",
"pitaya.cluster.sd.etcd.prefix": "pitaya/",
"pitaya.cluster.sd.etcd.heartbeat.ttl": "60s",
"pitaya.cluster.sd.etcd.bootstrap.timeout": "60s",
"pitaya.cluster.sd.etcd.revoke.timeout": "5s",
"pitaya.cluster.sd.etcd.heartbeat.log": false,
"pitaya.cluster.sd.etcd.syncservers.interval": "120s",
"pitaya.modules.bindingstorage.etcd.endpoints": "localhost:2379",
Expand Down
8 changes: 8 additions & 0 deletions docs/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ These configuration values configure service discovery for the default etcd serv
- 60s
- time.Time
- Hearbeat interval for the etcd lease
* - pitaya.cluster.sd.etcd.bootstrap.timeout
- 60s
- time.Duration
- Connection timeout when waiting for etcd to become available
* - pitaya.cluster.sd.etcd.revoke.timeout
- 5s
- time.Duration
- Timeout for etcd's revoke function
* - pitaya.cluster.sd.etcd.heartbeat.log
- false
- bool
Expand Down

0 comments on commit 537a13a

Please sign in to comment.