Skip to content

Commit

Permalink
node: 断网重连
Browse files Browse the repository at this point in the history
  • Loading branch information
miraclesu committed May 9, 2017
1 parent 5970541 commit 9639300
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 14 deletions.
4 changes: 2 additions & 2 deletions bin/node/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func main() {
return
}

n, err := node.NewNode(conf.Config.Etcd)
n, err := node.NewNode(conf.Config)
if err != nil {
log.Error(err.Error())
return
Expand All @@ -41,7 +41,7 @@ func main() {

go n.Run()

log.Noticef("cronsun node[%s] pid[%s] service started, Ctrl+C or send kill sign to exit", n.Key, n.PID)
log.Noticef("cronsun %s service started, Ctrl+C or send kill sign to exit", n.String())
// 注册退出事件
event.On(event.EXIT, n.Stop)
// 监听退出信号
Expand Down
1 change: 1 addition & 0 deletions conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Conf struct {

Proc string // proc 路径
Cmd string // cmd 路径
Sep string // etcd key 的连接符

Ttl int64 // 节点超时时间,单位秒

Expand Down
3 changes: 2 additions & 1 deletion conf/files/base.json.sample
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
{
"Proc": "/cronsun/proc",
"Cmd": "/cronsun/cmd",
"Timeout": 10,
"Sep": "/",
"Ttl": 10,
"Log": "@extend:log.json",
"Etcd": "@extend:etcd.json"
}
67 changes: 56 additions & 11 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,46 +10,61 @@ import (

client "github.com/coreos/etcd/clientv3"

"sunteng/commons/log"
"sunteng/commons/util"
"sunteng/cronsun/conf"
"syscall"
)

const (
ReqTimeout = 2 * time.Second

Spliter = "/"
)

// Node 执行 cron 命令服务的结构体
type Node struct {
*client.Client

ttl time.Duration
prefix string

Key string
PID string

lID client.LeaseID // lease id
lch <-chan *client.LeaseKeepAliveResponse

done chan struct{}
}

func NewNode(cfg client.Config) (n *Node, err error) {
func NewNode(cfg *conf.Conf) (n *Node, err error) {
ip, err := util.GetLocalIP()
if err != nil {
return
}

cli, err := client.New(cfg)
cli, err := client.New(cfg.Etcd)
if err != nil {
return
}

n = &Node{
Client: cli,

Key: conf.Config.Proc + Spliter + ip.String(),
ttl: time.Duration(cfg.Ttl) * time.Second,
prefix: cfg.Proc + cfg.Sep,

Key: cfg.Proc + cfg.Sep + ip.String(),
PID: strconv.Itoa(os.Getpid()),

done: make(chan struct{}),
}
return
}

func (n *Node) String() string {
return "node[" + n.Key[len(n.prefix):] + "] pid[" + n.PID + "]"
}

// 注册到 /cronsun/proc/xx
func (n *Node) Register() (err error) {
pid, err := n.Exist()
Expand All @@ -58,7 +73,7 @@ func (n *Node) Register() (err error) {
}

if pid != -1 {
return fmt.Errorf("node[%s] pid[%d] exist", n.Key, pid)
return fmt.Errorf("node[%s] pid[%d] exist", n.Key[len(n.prefix):], pid)
}

resp, err := n.Client.Grant(context.TODO(), conf.Config.Ttl)
Expand All @@ -69,11 +84,13 @@ func (n *Node) Register() (err error) {
if _, err = n.Client.Put(context.TODO(), n.Key, n.PID, client.WithLease(resp.ID)); err != nil {
return
}
if _, err = n.Client.KeepAlive(context.TODO(), resp.ID); err != nil {

ch, err := n.Client.KeepAlive(context.TODO(), resp.ID)
if err != nil {
return
}
n.lID = resp.ID

n.lID, n.lch = resp.ID, ch
return
}

Expand All @@ -95,14 +112,16 @@ func (n *Node) Exist() (pid int, err error) {
if _, err = n.Client.Delete(ctx, n.Key, client.WithFromKey()); err != nil {
return
}
return -1, nil
}

p, err := os.FindProcess(pid)
if err != nil {
return
return -1, nil
}

if p != nil {
// TODO: 暂时不考虑 linux/unix 以外的系统
if p != nil && p.Signal(syscall.Signal(0)) == nil {
return
}

Expand All @@ -111,11 +130,37 @@ func (n *Node) Exist() (pid int, err error) {

// 启动服务
func (n *Node) Run() {
go n.keepAlive()
}

// 断网掉线重新注册
func (n *Node) keepAlive() {
for {
for _ = range n.lch {
}

select {
case <-n.done:
return
default:
}
time.Sleep(time.Duration(n.ttl+1) * time.Second)

log.Noticef("%s has dropped, try to reconnect...", n.String())
if err := n.Register(); err != nil {
log.Warn(err.Error())
} else {
log.Noticef("%s reconnected", n.String())
}
}
}

// 启动服务
func (n *Node) Stop(i interface{}) {
n.Client.Revoke(context.TODO(), n.lID)
close(n.done)
// 防止断网时卡住
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
n.Client.Delete(ctx, n.Key, client.WithFromKey())
cancel()
n.Client.Close()
}

0 comments on commit 9639300

Please sign in to comment.