Skip to content

Commit

Permalink
支持后台手动停止运行中的shell任务
Browse files Browse the repository at this point in the history
  • Loading branch information
ouqiang committed Jan 27, 2018
1 parent fd39434 commit f081d89
Show file tree
Hide file tree
Showing 12 changed files with 108 additions and 68 deletions.
4 changes: 2 additions & 2 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

# set -x -u
# 构建应用, 生成压缩包 gocron.zip或gocron.tar.gz
# ./build.sh -p windows -a amd64
# ./build.sh -p windows -a amd64 -v 1.4
# 参数含义
# -p 指定平台(windows|linux|darwin)
# -a 指定体系架构(amd64|386), 默认amd64

# -v 版本号

TEMP_DIR=`date +%s`-temp-`echo $RANDOM`

Expand Down
3 changes: 2 additions & 1 deletion build_node.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

# set -x -u
# 任务节点打包, 生成压缩包 gocron-node.zip或gocron-node.tar.gz
# ./build-node.sh -p windows -a amd64
# ./build-node.sh -p windows -a amd64 -v 1.4
# 参数含义
# -p 指定平台(windows|linux|darwin)
# -a 指定体系架构(amd64|386), 默认amd64
# -v 版本号


# 目标平台 windows,linux,darwin
Expand Down
2 changes: 1 addition & 1 deletion cmd/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func shutdown() {
serviceTask := new(service.Task)
// 停止所有任务调度
logger.Info("停止定时任务调度")
serviceTask.Stop()
serviceTask.WaitAndExit()
}

// 判断应用是否需要升级, 当存在版本号文件且版本小于app.VersionId时升级
Expand Down
4 changes: 2 additions & 2 deletions modules/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func InitEnv(versionString string) {
DataDir = AppDir + "/data"
AppConfig = ConfDir + "/app.ini"
VersionFile = ConfDir + "/.version"
createDirIfNeed(ConfDir, LogDir, DataDir)
createDirIfNotExists(ConfDir, LogDir, DataDir)
Installed = IsInstalled()
VersionId = ToNumberVersion(versionString)
}
Expand Down Expand Up @@ -108,7 +108,7 @@ func ToNumberVersion(versionString string) int {
}

// 检测目录是否存在
func createDirIfNeed(path ...string) {
func createDirIfNotExists(path ...string) {
for _, value := range path {
if !utils.FileExist(value) {
err := os.Mkdir(value, 0755)
Expand Down
24 changes: 23 additions & 1 deletion modules/rpc/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,30 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"time"
"sync"
)

var (
taskMap sync.Map
)

var (
errUnavailable = errors.New("无法连接远程服务器")
)

func generateTaskUniqueKey(ip string, port int, id int64) string {
return fmt.Sprintf("%s:%d:%d", ip, port, id)
}

func Stop(ip string, port int , id int64) {
key := generateTaskUniqueKey(ip, port, id)
cancel, ok := taskMap.Load(key)
if !ok {
return
}
cancel.(context.CancelFunc)()
}

func Exec(ip string, port int, taskReq *pb.TaskRequest) (string, error) {
defer func() {
if err := recover(); err != nil {
Expand All @@ -39,7 +57,9 @@ func Exec(ip string, port int, taskReq *pb.TaskRequest) (string, error) {
}
timeout := time.Duration(taskReq.Timeout) * time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
taskUniqueKey := generateTaskUniqueKey(ip, port, taskReq.Id)
taskMap.Store(taskUniqueKey, cancel)
defer taskMap.Delete(taskUniqueKey)
resp, err := c.Run(ctx, taskReq)
if err != nil {
return parseGRPCError(err, conn, &isConnClosed)
Expand All @@ -60,6 +80,8 @@ func parseGRPCError(err error, conn *grpc.ClientConn, connClosed *bool) (string,
return "", errUnavailable
case codes.DeadlineExceeded:
return "", errors.New("执行超时, 强制结束")
case codes.Canceled:
return "", errors.New("手动停止")
}
return "", err
}
33 changes: 21 additions & 12 deletions modules/rpc/proto/task.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions modules/rpc/proto/task.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ service Task {
message TaskRequest {
string command = 2; // 命令
int32 timeout = 3; // 任务执行超时时间
int64 id = 4; // 执行任务唯一ID
}

message TaskResponse {
Expand Down
1 change: 1 addition & 0 deletions routers/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func Register(m *macaron.Macaron) {
m.Get("", task.Index)
m.Get("/log", tasklog.Index)
m.Post("/log/clear", tasklog.Clear)
m.Post("/log/stop", tasklog.Stop)
m.Post("/remove/:id", task.Remove)
m.Post("/enable/:id", task.Enable)
m.Post("/disable/:id", task.Disable)
Expand Down
26 changes: 26 additions & 0 deletions routers/tasklog/task_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/ouqiang/gocron/routers/base"
"gopkg.in/macaron.v1"
"html/template"
"github.com/ouqiang/gocron/service"
)

func Index(ctx *macaron.Context) {
Expand Down Expand Up @@ -48,6 +49,31 @@ func Clear(ctx *macaron.Context) string {
return json.Success(utils.SuccessContent, nil)
}

// 停止运行中的任务
func Stop(ctx *macaron.Context) string {
id := ctx.QueryInt64("id")
taskId := ctx.QueryInt("task_id")
taskModel := new(models.Task)
task, err := taskModel.Detail(taskId)
json := utils.JsonResponse{}
if err != nil {
return json.CommonFailure("获取任务信息失败#" + err.Error(), err)
}
if task.Protocol != models.TaskRPC {
return json.CommonFailure("仅支持SHELL任务手动停止")
}
if len(task.Hosts) == 0 {
return json.CommonFailure("任务节点列表为空")
}
serviceTask := new(service.Task)
for _, host := range task.Hosts {
serviceTask.Stop(host.Name, host.Port, id)

}

return json.Success("已执行停止操作, 请等待任务退出", nil);
}

// 删除N个月前的日志
func Remove(ctx *macaron.Context) string {
month := ctx.ParamsInt(":id")
Expand Down
25 changes: 16 additions & 9 deletions service/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"strings"
"sync"
"time"
"net/http"
)

// 定时任务调度管理器
Expand Down Expand Up @@ -127,8 +128,13 @@ func (task *Task) Add(taskModel models.Task) {
}
}

// 停止所有任务
func (task *Task) Stop() {
// 停止运行中的任务
func (task *Task) Stop(ip string, port int, id int64) {
rpcClient.Stop(ip, port, id)
}

// 等待所有任务结束后退出
func (task *Task) WaitAndExit() {
Cron.Stop()
taskCount.Exit()
}
Expand All @@ -139,7 +145,7 @@ func (task *Task) Run(taskModel models.Task) {
}

type Handler interface {
Run(taskModel models.Task) (string, error)
Run(taskModel models.Task, taskUniqueId int64) (string, error)
}

// HTTP任务
Expand All @@ -148,13 +154,13 @@ type HTTPHandler struct{}
// http任务执行时间不超过300秒
const HttpExecTimeout = 300

func (h *HTTPHandler) Run(taskModel models.Task) (result string, err error) {
func (h *HTTPHandler) Run(taskModel models.Task, taskUniqueId int64) (result string, err error) {
if taskModel.Timeout <= 0 || taskModel.Timeout > HttpExecTimeout {
taskModel.Timeout = HttpExecTimeout
}
resp := httpclient.Get(taskModel.Command, taskModel.Timeout)
// 返回状态码非200,均为失败
if resp.StatusCode != 200 {
if resp.StatusCode != http.StatusOK {
return resp.Body, errors.New(fmt.Sprintf("HTTP状态码非200-->%d", resp.StatusCode))
}

Expand All @@ -164,10 +170,11 @@ func (h *HTTPHandler) Run(taskModel models.Task) (result string, err error) {
// RPC调用执行任务
type RPCHandler struct{}

func (h *RPCHandler) Run(taskModel models.Task) (result string, err error) {
func (h *RPCHandler) Run(taskModel models.Task, taskUniqueId int64) (result string, err error) {
taskRequest := new(pb.TaskRequest)
taskRequest.Timeout = int32(taskModel.Timeout)
taskRequest.Command = taskModel.Command
taskRequest.Id = taskUniqueId
var resultChan chan TaskResult = make(chan TaskResult, len(taskModel.Hosts))
for _, taskHost := range taskModel.Hosts {
go func(th models.TaskHostDetail) {
Expand Down Expand Up @@ -250,7 +257,7 @@ func createJob(taskModel models.Task) cron.FuncJob {
return
}
logger.Infof("开始执行任务#%s#命令-%s", taskModel.Name, taskModel.Command)
taskResult := execJob(handler, taskModel)
taskResult := execJob(handler, taskModel, taskLogId)
logger.Infof("任务完成#%s#命令-%s", taskModel.Name, taskModel.Command)
afterExecJob(taskModel, taskResult, taskLogId)
}
Expand Down Expand Up @@ -371,7 +378,7 @@ func SendNotification(taskModel models.Task, taskResult TaskResult) {
}

// 执行具体任务
func execJob(handler Handler, taskModel models.Task) TaskResult {
func execJob(handler Handler, taskModel models.Task, taskUniqueId int64) TaskResult {
defer func() {
if err := recover(); err != nil {
logger.Error("panic#service/task.go:execJob#", err)
Expand All @@ -389,7 +396,7 @@ func execJob(handler Handler, taskModel models.Task) TaskResult {
var output string
var err error
for i < execTimes {
output, err = handler.Run(taskModel)
output, err = handler.Run(taskModel, taskUniqueId)
if err == nil {
return TaskResult{Result: output, Err: err, RetryTimes: i}
}
Expand Down
13 changes: 13 additions & 0 deletions templates/task/log.html
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ <h3 class="ui dividing header">
>查看结果
</button>
{{{end}}}


{{{if and (eq .Status 1) (eq .Protocol 2) }}}
<button class="ui small blue button" onclick="stopTask({{{.Id}}}, {{{.TaskId}}})">停止任务</button>
{{{end}}}
</td>
</tr>
{{{end}}}
Expand Down Expand Up @@ -148,6 +153,14 @@ <h3 class="ui dividing header">
}).modal('refresh').modal('show');
}

function stopTask(id, taskId) {
util.confirm("确定要停止任务吗", function () {
util.post("/task/log/stop/", {id: id, task_id:taskId}, function () {
location.reload();
});
});
}

function clearLog() {
util.confirm("确定要删除所有日志吗?", function() {
util.post("/task/log/clear",{}, function() {
Expand Down
40 changes: 0 additions & 40 deletions upload_package_to_qiniu.sh

This file was deleted.

0 comments on commit f081d89

Please sign in to comment.