diff --git a/bin/node/server.go b/bin/node/server.go index 921fa31..90f1a1c 100644 --- a/bin/node/server.go +++ b/bin/node/server.go @@ -28,7 +28,7 @@ func main() { return } - n, err := node.NewNode(conf.Config.Etcd) + n, err := node.NewNode(conf.Config) if err != nil { log.Error(err.Error()) return @@ -41,7 +41,7 @@ func main() { go n.Run() - log.Noticef("cronsun node[%s] pid[%s] service started, Ctrl+C or send kill sign to exit", n.Key, n.PID) + log.Noticef("cronsun %s service started, Ctrl+C or send kill sign to exit", n.String()) // 注册退出事件 event.On(event.EXIT, n.Stop) // 监听退出信号 diff --git a/conf/conf.go b/conf/conf.go index 02b2e1e..25e80cd 100644 --- a/conf/conf.go +++ b/conf/conf.go @@ -37,6 +37,7 @@ type Conf struct { Proc string // proc 路径 Cmd string // cmd 路径 + Sep string // etcd key 的连接符 Ttl int64 // 节点超时时间,单位秒 diff --git a/conf/files/base.json.sample b/conf/files/base.json.sample index 2a24cf9..6d46dbb 100644 --- a/conf/files/base.json.sample +++ b/conf/files/base.json.sample @@ -1,7 +1,8 @@ { "Proc": "/cronsun/proc", "Cmd": "/cronsun/cmd", - "Timeout": 10, + "Sep": "/", + "Ttl": 10, "Log": "@extend:log.json", "Etcd": "@extend:etcd.json" } diff --git a/node/node.go b/node/node.go index 2ddd3c3..979abe6 100644 --- a/node/node.go +++ b/node/node.go @@ -10,33 +10,39 @@ import ( client "github.com/coreos/etcd/clientv3" + "sunteng/commons/log" "sunteng/commons/util" "sunteng/cronsun/conf" + "syscall" ) const ( ReqTimeout = 2 * time.Second - - Spliter = "/" ) // Node 执行 cron 命令服务的结构体 type Node struct { *client.Client + ttl time.Duration + prefix string + Key string PID string lID client.LeaseID // lease id + lch <-chan *client.LeaseKeepAliveResponse + + done chan struct{} } -func NewNode(cfg client.Config) (n *Node, err error) { +func NewNode(cfg *conf.Conf) (n *Node, err error) { ip, err := util.GetLocalIP() if err != nil { return } - cli, err := client.New(cfg) + cli, err := client.New(cfg.Etcd) if err != nil { return } @@ -44,12 +50,21 @@ func NewNode(cfg client.Config) (n *Node, err error) { n = &Node{ Client: cli, - Key: conf.Config.Proc + Spliter + ip.String(), + ttl: time.Duration(cfg.Ttl) * time.Second, + prefix: cfg.Proc + cfg.Sep, + + Key: cfg.Proc + cfg.Sep + ip.String(), PID: strconv.Itoa(os.Getpid()), + + done: make(chan struct{}), } return } +func (n *Node) String() string { + return "node[" + n.Key[len(n.prefix):] + "] pid[" + n.PID + "]" +} + // 注册到 /cronsun/proc/xx func (n *Node) Register() (err error) { pid, err := n.Exist() @@ -58,7 +73,7 @@ func (n *Node) Register() (err error) { } if pid != -1 { - return fmt.Errorf("node[%s] pid[%d] exist", n.Key, pid) + return fmt.Errorf("node[%s] pid[%d] exist", n.Key[len(n.prefix):], pid) } resp, err := n.Client.Grant(context.TODO(), conf.Config.Ttl) @@ -69,11 +84,13 @@ func (n *Node) Register() (err error) { if _, err = n.Client.Put(context.TODO(), n.Key, n.PID, client.WithLease(resp.ID)); err != nil { return } - if _, err = n.Client.KeepAlive(context.TODO(), resp.ID); err != nil { + + ch, err := n.Client.KeepAlive(context.TODO(), resp.ID) + if err != nil { return } - n.lID = resp.ID + n.lID, n.lch = resp.ID, ch return } @@ -95,14 +112,16 @@ func (n *Node) Exist() (pid int, err error) { if _, err = n.Client.Delete(ctx, n.Key, client.WithFromKey()); err != nil { return } + return -1, nil } p, err := os.FindProcess(pid) if err != nil { - return + return -1, nil } - if p != nil { + // TODO: 暂时不考虑 linux/unix 以外的系统 + if p != nil && p.Signal(syscall.Signal(0)) == nil { return } @@ -111,11 +130,37 @@ func (n *Node) Exist() (pid int, err error) { // 启动服务 func (n *Node) Run() { + go n.keepAlive() +} + +// 断网掉线重新注册 +func (n *Node) keepAlive() { + for { + for _ = range n.lch { + } + select { + case <-n.done: + return + default: + } + time.Sleep(time.Duration(n.ttl+1) * time.Second) + + log.Noticef("%s has dropped, try to reconnect...", n.String()) + if err := n.Register(); err != nil { + log.Warn(err.Error()) + } else { + log.Noticef("%s reconnected", n.String()) + } + } } // 启动服务 func (n *Node) Stop(i interface{}) { - n.Client.Revoke(context.TODO(), n.lID) + close(n.done) + // 防止断网时卡住 + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + n.Client.Delete(ctx, n.Key, client.WithFromKey()) + cancel() n.Client.Close() }