Skip to content

Commit

Permalink
Merge pull request PaddlePaddle#3062 from helinwang/grace
Browse files Browse the repository at this point in the history
gracefully shutdown master, pserver, fix gometalinter errors
  • Loading branch information
helinwang authored Jul 28, 2017
2 parents 35b6415 + 6fab04f commit 6a0b365
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 60 deletions.
28 changes: 24 additions & 4 deletions go/cmd/master/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"net"
"net/http"
"net/rpc"
"os"
"os/signal"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -68,6 +70,20 @@ func main() {
store = &master.InMemStore{}
}

shutdown := func() {
log.Infoln("shutting down gracefully")
err := store.Shutdown()
if err != nil {
log.Errorln(err)
}
}

// Guaranteed to run even panic happens.
defer shutdown()

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)

s, err := master.NewService(store, *chunkPerTask, *taskTimeoutDur, *taskTimeoutMax)
if err != nil {
log.Fatal(err)
Expand All @@ -84,8 +100,12 @@ func main() {
log.Fatal(err)
}

err = http.Serve(l, nil)
if err != nil {
log.Fatal(err)
}
go func() {
err = http.Serve(l, nil)
if err != nil {
log.Fatal(err)
}
}()

<-c
}
31 changes: 26 additions & 5 deletions go/cmd/pserver/pserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"net"
"net/http"
"net/rpc"
"os"
"os/signal"
"strconv"
"time"

Expand All @@ -33,7 +35,8 @@ func main() {
index := flag.Int("index", -1, "index of this pserver, should be larger or equal than 0")
etcdEndpoint := flag.String("etcd-endpoint", "http://127.0.0.1:2379",
"comma separated endpoint string for pserver to connect to etcd")
etcdTimeout := flag.Duration("etcd-timeout", 5*time.Second, "timeout for etcd calls")
dialTimeout := flag.Duration("dial-timeout", 5*time.Second, "dial timeout")
etcdTTL := flag.Int("etcd-ttl", 5, "etcd time to live in seconds")
numPservers := flag.Int("num-pservers", 1, "total pserver count in a training job")
checkpointPath := flag.String("checkpoint-path", "/checkpoints/", "save checkpoint path")
checkpointInterval := flag.Duration("checkpoint-interval", 600*time.Second, "save checkpoint per interval seconds")
Expand All @@ -53,7 +56,7 @@ func main() {
if *index >= 0 {
idx = *index
} else {
e = pserver.NewEtcdClient(*etcdEndpoint, *numPservers, *etcdTimeout)
e = pserver.NewEtcdClient(*etcdEndpoint, *numPservers, *dialTimeout, *etcdTTL)
idx, err = e.Register(*port)
candy.Must(err)

Expand All @@ -67,6 +70,20 @@ func main() {
}
}

shutdown := func() {
log.Infoln("shutting down gracefully")
sErr := e.Shutdown()
if sErr != nil {
log.Errorln(sErr)
}
}

// Guaranteed to run even panic happens.
defer shutdown()

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)

s, err := pserver.NewService(idx, *checkpointInterval, *checkpointPath, e, cp)
candy.Must(err)

Expand All @@ -77,7 +94,11 @@ func main() {
l, err := net.Listen("tcp", ":"+strconv.Itoa(*port))
candy.Must(err)

log.Infof("start pserver at port %d", *port)
err = http.Serve(l, nil)
candy.Must(err)
go func() {
log.Infof("start pserver at port %d", *port)
err = http.Serve(l, nil)
candy.Must(err)
}()

<-c
}
25 changes: 19 additions & 6 deletions go/master/etcd_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,12 @@ type EtcdClient struct {
statePath string
client *clientv3.Client
lock *concurrency.Mutex
sess *concurrency.Session
}

// NewEtcdClient creates a new EtcdClient.
func NewEtcdClient(endpoints []string, addr string, lockPath, addrPath, statePath string, ttlSec int) (*EtcdClient, error) {
log.Debugf("Connecting to etcd at %v", endpoints)
// TODO(helin): gracefully shutdown etcd store. Because etcd
// store holds a etcd lock, even though the lock will expire
// when the lease timeout, we need to implement graceful
// shutdown to release the lock.
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: dialTimeout,
Expand All @@ -67,12 +64,12 @@ func NewEtcdClient(endpoints []string, addr string, lockPath, addrPath, statePat
// one master running, but split-brain problem may cause
// multiple master servers running), and the cluster management
// software will kill one of them.
log.Debugf("Trying to acquire lock at %s.", lockPath)
log.Infof("Trying to acquire lock at %s.", lockPath)
err = lock.Lock(context.TODO())
if err != nil {
return nil, err
}
log.Debugf("Successfully acquired lock at %s.", lockPath)
log.Infof("Successfully acquired lock at %s.", lockPath)

put := clientv3.OpPut(addrPath, addr)
resp, err := cli.Txn(context.Background()).If(lock.IsOwner()).Then(put).Commit()
Expand All @@ -89,6 +86,7 @@ func NewEtcdClient(endpoints []string, addr string, lockPath, addrPath, statePat
statePath: statePath,
client: cli,
lock: lock,
sess: sess,
}

return e, nil
Expand Down Expand Up @@ -157,6 +155,21 @@ func (e *EtcdClient) Load() ([]byte, error) {
return state, nil
}

// Shutdown shuts down the etcd client gracefully.
func (e *EtcdClient) Shutdown() error {
err := e.sess.Close()
newErr := e.client.Close()
if newErr != nil {
if err == nil {
err = newErr
} else {
log.Errorln(newErr)
}
}

return err
}

// GetKey gets the value by the specify key.
func GetKey(c *clientv3.Client, key string, timeout time.Duration) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
Expand Down
5 changes: 5 additions & 0 deletions go/master/inmem_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,8 @@ func (m *InMemStore) Load() ([]byte, error) {

return m.buf, nil
}

// Shutdown shuts down the in mem store.
func (m *InMemStore) Shutdown() error {
return nil
}
1 change: 1 addition & 0 deletions go/master/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ var ErrPassAfter = errors.New("pass number larger than master")
type Store interface {
Save([]byte) error
Load() ([]byte, error)
Shutdown() error
}

// Chunk is a chunk of data consisted of several data instances.
Expand Down
6 changes: 3 additions & 3 deletions go/pserver/client/c/cclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ var curHandle C.paddle_pserver_client
func add(c *client.Client) C.paddle_pserver_client {
mu.Lock()
defer mu.Unlock()
client := curHandle
cli := curHandle
curHandle++
handleMap[client] = c
return client
handleMap[cli] = c
return cli
}

func get(client C.paddle_pserver_client) *client.Client {
Expand Down
Loading

0 comments on commit 6a0b365

Please sign in to comment.