Skip to content

Commit

Permalink
等待任务结束从轮询改为WaitGroup
Browse files Browse the repository at this point in the history
  • Loading branch information
ouqiang committed Jan 26, 2018
1 parent 0950fc6 commit 02e525a
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 57 deletions.
20 changes: 1 addition & 19 deletions cmd/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"github.com/ouqiang/gocron/models"
"github.com/ouqiang/gocron/modules/app"
"github.com/ouqiang/gocron/modules/logger"
"github.com/ouqiang/gocron/modules/rpc/grpcpool"
"github.com/ouqiang/gocron/modules/setting"
"github.com/ouqiang/gocron/routers"
"github.com/ouqiang/gocron/service"
Expand All @@ -13,7 +12,6 @@ import (
"os"
"os/signal"
"syscall"
"time"
)

// web服务器默认端口
Expand Down Expand Up @@ -152,23 +150,7 @@ func shutdown() {
serviceTask := new(service.Task)
// 停止所有任务调度
logger.Info("停止定时任务调度")
serviceTask.StopAll()

taskNumInRunning := service.TaskNum.Num()
logger.Infof("正在运行的任务有%d个", taskNumInRunning)
if taskNumInRunning > 0 {
logger.Info("等待所有任务执行完成后退出")
}
for {
if taskNumInRunning <= 0 {
break
}
time.Sleep(3 * time.Second)
taskNumInRunning = service.TaskNum.Num()
}

// 释放gRPC连接池
grpcpool.Pool.ReleaseAll()
serviceTask.Stop()
}

// 判断应用是否需要升级, 当存在版本号文件且版本小于app.VersionId时升级
Expand Down
10 changes: 7 additions & 3 deletions modules/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io/ioutil"
"strconv"
"strings"
"fmt"
)

var (
Expand All @@ -35,7 +36,7 @@ func InitEnv(versionString string) {
DataDir = AppDir + "/data"
AppConfig = ConfDir + "/app.ini"
VersionFile = ConfDir + "/.version"
checkDirExists(ConfDir, LogDir, DataDir)
createDirIfNeed(ConfDir, LogDir, DataDir)
Installed = IsInstalled()
VersionId = ToNumberVersion(versionString)
}
Expand Down Expand Up @@ -107,10 +108,13 @@ func ToNumberVersion(versionString string) int {
}

// 检测目录是否存在
func checkDirExists(path ...string) {
func createDirIfNeed(path ...string) {
for _, value := range path {
if !utils.FileExist(value) {
logger.Fatal(value + "目录不存在或无权限访问")
err := os.Mkdir(value, 0755)
if err != nil {
logger.Fatal(fmt.Sprintf("创建目录失败:%s", err.Error()))
}
}
}
}
62 changes: 27 additions & 35 deletions service/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,61 +23,51 @@ var Cron *cron.Cron
var runInstance Instance

// 任务计数-正在运行中的任务
var TaskNum TaskCount
var taskCount TaskCount

// 任务计数
type TaskCount struct {
num int
sync.RWMutex
wg sync.WaitGroup
exit chan bool
}

func (c *TaskCount) Add() {
c.Lock()
defer c.Unlock()
c.num += 1
func (tc *TaskCount) Add() {
tc.wg.Add(1)
}

func (c *TaskCount) Done() {
c.Lock()
defer c.Unlock()
c.num -= 1
func (tc *TaskCount) Done() {
tc.wg.Done()
}

func (c *TaskCount) Num() int {
c.RLock()
defer c.RUnlock()
func (tc *TaskCount) Exit() {
tc.wg.Done()
<-tc.exit
}

return c.num
func (tc *TaskCount) Wait() {
tc.Add()
tc.wg.Wait()
close(tc.exit)
}

// 任务ID作为Key
type Instance struct {
Status map[int]bool
sync.RWMutex
m sync.Map
}

// 是否有任务处于运行中
func (i *Instance) has(key int) bool {
i.RLock()
defer i.RUnlock()
running, ok := i.Status[key]
if ok && running {
return true
}
_, ok := i.m.Load(key)

return false
return ok
}

func (i *Instance) add(key int) {
i.Lock()
defer i.Unlock()
i.Status[key] = true
i.m.Store(key, true)
}

func (i *Instance) done(key int) {
i.Lock()
defer i.Unlock()
delete(i.Status, key)
i.m.Delete(key)
}

type Task struct{}
Expand All @@ -92,8 +82,9 @@ type TaskResult struct {
func (task *Task) Initialize() {
Cron = cron.New()
Cron.Start()
runInstance = Instance{make(map[int]bool), sync.RWMutex{}}
TaskNum = TaskCount{0, sync.RWMutex{}}
runInstance = Instance{}
taskCount = TaskCount{sync.WaitGroup{}, make(chan bool)}
go taskCount.Wait()

taskModel := new(models.Task)
taskList, err := taskModel.ActiveList()
Expand Down Expand Up @@ -137,8 +128,9 @@ func (task *Task) Add(taskModel models.Task) {
}

// 停止所有任务
func (task *Task) StopAll() {
func (task *Task) Stop() {
Cron.Stop()
taskCount.Exit()
}

// 直接运行任务
Expand Down Expand Up @@ -251,8 +243,8 @@ func createJob(taskModel models.Task) cron.FuncJob {
return nil
}
taskFunc := func() {
TaskNum.Add()
defer TaskNum.Done()
taskCount.Add()
defer taskCount.Done()
taskLogId := beforeExecJob(taskModel)
if taskLogId <= 0 {
return
Expand Down

0 comments on commit 02e525a

Please sign in to comment.