Skip to content

Commit

Permalink
任务重试检测修改
Browse files Browse the repository at this point in the history
  • Loading branch information
scxwhite committed Jun 28, 2019
1 parent 7324098 commit 99d2a3f
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ public class LogConstant {

public static final String LOST_JOB_LOG = "漏跑任务,自动恢复执行\n";

public static final String CHECK_QUEUE_LOG = "已经在Schedule队列中,无法再次运行\n";

public static final String CHECK_MANUAL_QUEUE_LOG = "已经在Manual队列中,无法再次运行\n";
public static final String CHECK_QUEUE_LOG = "已经在调度队列中,无法再次运行,请稍后再试\n";

public static final String CANCEL_JOB_LOG = "已经在队列中,无法再次运行\n";

Expand Down
19 changes: 4 additions & 15 deletions hera-core/src/main/java/com/dfire/core/netty/master/Master.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import javax.mail.MessagingException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -898,20 +897,13 @@ public boolean checkJobExists(HeraJobHistoryVo heraJobHistory, boolean checkOnly
for (JobElement jobElement : masterContext.getScheduleQueue()) {
if (ActionUtil.jobEquals(jobElement.getJobId(), actionId)) {
exists = true;
if (!checkOnly) {
heraJobHistory.getLog().append(LogConstant.CHECK_QUEUE_LOG);
}
TaskLog.warn("调度队列已存在该任务,添加失败 {}", actionId);
}
}
// check所有的worker中是否有此任务的id在执行,如果有,不进入队列等待
for (MasterWorkHolder workHolder : masterContext.getWorkMap().values()) {
if (workHolder.getRunning().contains(jobId)) {
exists = true;

if (!checkOnly) {
heraJobHistory.getLog().append(LogConstant.CHECK_QUEUE_LOG + "执行worker ip " + workHolder.getChannel().getLocalAddress());
}
TaskLog.warn("该任务正在执行,添加失败 {}", actionId);
}
}
Expand All @@ -920,27 +912,24 @@ public boolean checkJobExists(HeraJobHistoryVo heraJobHistory, boolean checkOnly
for (JobElement jobElement : masterContext.getManualQueue()) {
if (ActionUtil.jobEquals(jobElement.getJobId(), actionId)) {
exists = true;
if (!checkOnly) {
heraJobHistory.getLog().append(LogConstant.CHECK_MANUAL_QUEUE_LOG);
}
TaskLog.warn("手动任务队列已存在该任务,添加失败 {}", actionId);
}
}

for (MasterWorkHolder workHolder : masterContext.getWorkMap().values()) {
if (workHolder.getManningRunning().contains(jobId)) {
exists = true;
if (!checkOnly) {
heraJobHistory.getLog().append(LogConstant.CHECK_MANUAL_QUEUE_LOG + "执行worker ip " + workHolder.getChannel().getLocalAddress());
}
TaskLog.warn("该任务正在执行,添加失败 {}", actionId);
}
}
}
if (exists && !checkOnly) {
heraJobHistory.getLog().append(LogConstant.CHECK_QUEUE_LOG);
heraJobHistory.setStartTime(new Date());
heraJobHistory.setEndTime(new Date());
heraJobHistory.setStatusEnum(StatusEnum.FAILED);
heraJobHistory.setIllustrate("任务已在调度队列");
//由于设置为失败会被告警 所以暂时设置为wait状态
heraJobHistory.setStatusEnum(StatusEnum.WAIT);
masterContext.getHeraJobHistoryService().update(BeanConvertUtils.convert(heraJobHistory));
}
return exists;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ private void runScheduleJobContext(MasterWorkHolder selectWork, String actionId,
} catch (InterruptedException e) {
ErrorLog.error("sleep interrupted", e);
}

}
HeraJobHistoryVo heraJobHistoryVo;
HeraJobHistory heraJobHistory;
Expand All @@ -225,11 +226,16 @@ private void runScheduleJobContext(MasterWorkHolder selectWork, String actionId,
.operator(heraAction.getOwner())
.hostGroupId(heraAction.getHostGroupId())
.build();

heraJobHistoryVo = BeanConvertUtils.convert(heraJobHistory);
if (master.checkJobExists(heraJobHistoryVo, true)) {
TaskLog.info("--------------------------{}正在执行,取消重试--------------------------", heraAction.getJobId());
return;
}
masterContext.getHeraJobHistoryService().insert(heraJobHistory);
heraAction.setHistoryId(heraJobHistory.getId());
heraAction.setStatus(StatusEnum.RUNNING.toString());
masterContext.getHeraJobActionService().update(heraAction);
heraJobHistoryVo = BeanConvertUtils.convert(heraJobHistory);
heraJobHistoryVo.getLog().append(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 第" + (runCount - 1) + "次重试运行\n");
triggerType = heraJobHistoryVo.getTriggerType();
}
Expand Down Expand Up @@ -285,12 +291,10 @@ private void runScheduleJobContext(MasterWorkHolder selectWork, String actionId,
heraAction.setStatisticEndTime(new Date());
masterContext.getHeraJobActionService().update(heraAction);
if (runCount < (retryCount + 1) && !success && !isCancelJob) {
if (master.checkJobExists(heraJobHistoryVo, true)) {
DebugLog.info("--------------------------任务在队列中,取消重试--------------------------");
} else {
DebugLog.info("--------------------------失败任务,准备重试--------------------------");
runScheduleJobContext(selectWork, actionId, runCount, retryCount, retryWaitTime);
}

DebugLog.info("--------------------------失败任务,准备重试--------------------------");
runScheduleJobContext(selectWork, actionId, runCount, retryCount, retryWaitTime);

}
}

Expand Down

0 comments on commit 99d2a3f

Please sign in to comment.