Skip to content

Commit

Permalink
新增心跳异常清理逻辑
Browse files Browse the repository at this point in the history
  • Loading branch information
qmdx committed Jul 17, 2019
1 parent d678cb9 commit 27119c4
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@

import com.baomidou.jobs.starter.model.JobsRegistry;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;

/**
* 注册任务信息 Mapper
Expand All @@ -15,9 +13,4 @@
@Mapper
public interface JobsRegistryMapper extends BaseMapper<JobsRegistry> {

/**
* 删除超时数据
*/
@Delete("DELETE FROM jobs_registry WHERE update_time < DATE_ADD(NOW(),INTERVAL -#{timeout} SECOND)")
int deleteTimeOut(@Param("timeout") int timeout);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ public class JobsRegistryServiceImpl implements IJobsRegistryService {

@Override
public int removeTimeOut(int timeout) {
return jobRegistryMapper.deleteTimeOut(timeout);
return jobRegistryMapper.delete(Wrappers.<JobsRegistry>lambdaQuery().le(
JobsRegistry::getUpdateTime, JobsClock.currentTimeMillis() - timeout
));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ public interface JobsConstant {
* 心跳时长
*/
int BEAT_TIMEOUT = 30;
/**
* 清理时长,需比心跳稍大
*/
int CLEAN_TIMEOUT = 50;
String COMMA = ",";
/**
* API URI
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ public class JobsHeartbeat implements Runnable {
* 上锁时长
*/
private long wait = 0;
/**
* 心跳时长
*/
private long beat = 0;

@Override
public void run() {
Expand All @@ -32,7 +36,7 @@ public void run() {
wait = 0;
long nowTime = System.currentTimeMillis();
// 1、预读10s内调度任务
List<JobsInfo> scheduleList = JobsHelper.getJobInfoService().scheduleJobQuery(nowTime + 10000);
List<JobsInfo> scheduleList = JobsHelper.getJobsInfoService().scheduleJobQuery(nowTime + 10000);
if (scheduleList != null && scheduleList.size() > 0) {
// 2、推送时间轮
for (JobsInfo jobsInfo : scheduleList) {
Expand All @@ -57,7 +61,7 @@ public void run() {
JobsHelper.getJobsDisruptorTemplate().publish(jobsInfo, waitSecond);
}
// 更新任务状态
JobsHelper.getJobInfoService().updateById(tempJobsInfo);
JobsHelper.getJobsInfoService().updateById(tempJobsInfo);
}

}
Expand All @@ -75,6 +79,12 @@ public void run() {
} finally {
// 释放锁,上锁时长超过 90 秒强制解锁
jobsLockService.unlock(JobsConstant.DEFAULT_LOCK_KEY, wait > 90);
// 清理异常注册节点
++beat;
if (beat > JobsConstant.BEAT_TIMEOUT) {
JobsHelper.getJobsRegistryService().cleanTimeout();
beat = 0;
}
}
log.debug("Jobs, JobsHeartbeat end");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,36 +28,36 @@ public void afterPropertiesSet() throws Exception {
}

@Resource
private IJobsInfoService _jobInfoService;
private IJobsInfoService _jobsInfoService;
@Resource
private IJobsLogService _jobLogService;
private IJobsLogService _jobsLogService;
@Resource
private IJobsRegistryService _jobRegistryService;
private IJobsRegistryService _jobsRegistryService;
@Resource
private IJobsAlarmHandler _jobsAlarmHandler;
@Resource
private JobsProperties _jobProperties;
private JobsProperties _jobsProperties;
@Resource
private IJobsAdminService _jobsAdminService;
@Resource
private IJobsLockService _jobsLockService;
@Resource
private JobsDisruptorTemplate _jobsDisruptorTemplate;

public static JobsProperties getJobProperties() {
return JOB_HELPER._jobProperties;
public static JobsProperties getJobsProperties() {
return JOB_HELPER._jobsProperties;
}

public static IJobsInfoService getJobInfoService() {
return JOB_HELPER._jobInfoService;
public static IJobsInfoService getJobsInfoService() {
return JOB_HELPER._jobsInfoService;
}

public static IJobsLogService getJobLogService() {
return JOB_HELPER._jobLogService;
public static IJobsLogService getJobsLogService() {
return JOB_HELPER._jobsLogService;
}

public static IJobsRegistryService getJobRegistryService() {
return JOB_HELPER._jobRegistryService;
public static IJobsRegistryService getJobsRegistryService() {
return JOB_HELPER._jobsRegistryService;
}

public static IJobsAlarmHandler getJobsAlarmHandler() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
package com.baomidou.jobs.starter.service;

import com.baomidou.jobs.starter.JobsConstant;

import java.util.List;

public interface IJobsRegistryService {

/**
* 清理超时节点
*/
default int cleanTimeout() {
return removeTimeOut(JobsConstant.CLEAN_TIMEOUT);
}

/**
* 删除超时数据
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.baomidou.jobs.starter.starter;

import com.baomidou.jobs.starter.JobsConstant;
import com.baomidou.jobs.starter.JobsHelper;
import com.baomidou.jobs.starter.executor.IJobsExecutor;
import com.baomidou.jobs.starter.JobsHeartbeat;
Expand Down Expand Up @@ -51,6 +52,8 @@ public void afterPropertiesSet() throws Exception {
executor = new ScheduledThreadPoolExecutor(5);
executor.scheduleAtFixedRate(new JobsHeartbeat(), 1, 1, TimeUnit.SECONDS);

// 启动清理异常注册
JobsHelper.getJobsRegistryService().cleanTimeout();
log.debug("init jobs admin success.");
}

Expand Down Expand Up @@ -78,7 +81,7 @@ private void initRpcProvider() {
Serializer.SerializeEnum.HESSIAN.getSerializer(),
null,
0,
JobsHelper.getJobProperties().getAdminAccessToken(),
JobsHelper.getJobsProperties().getAdminAccessToken(),
null,
null);

Expand Down Expand Up @@ -126,7 +129,7 @@ public static IJobsExecutor getJobsExecutor(String address) throws Exception {
null,
5000,
address,
JobsHelper.getJobProperties().getAppAccessToken(),
JobsHelper.getJobsProperties().getAppAccessToken(),
null,
null).getObject();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class JobsTrigger {
*/
public static boolean trigger(Long jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorParam) {
// load data
JobsInfo jobsInfo = JobsHelper.getJobInfoService().getById(jobId);
JobsInfo jobsInfo = JobsHelper.getJobsInfoService().getById(jobId);
if (jobsInfo == null) {
log.warn("Trigger fail, jobId invalid,jobId={}", jobId);
return false;
Expand All @@ -65,7 +65,7 @@ private static boolean processTrigger(JobsInfo jobsInfo, int finalFailRetryCount
JobsLog jobLog = new JobsLog();
jobLog.setJobId(jobsInfo.getId());
jobLog.setCreateTime(JobsClock.currentTimeMillis());
JobsHelper.getJobLogService().save(jobLog);
JobsHelper.getJobsLogService().save(jobLog);
log.debug("Jobs trigger start, jobId:{}", jobLog.getId());

// 2、init trigger-param
Expand All @@ -80,7 +80,7 @@ private static boolean processTrigger(JobsInfo jobsInfo, int finalFailRetryCount
// 3、init address
String routeAddressResultMsg = "";
String address = null;
List<String> registryList = JobsHelper.getJobRegistryService().listAddress(jobsInfo.getApp());
List<String> registryList = JobsHelper.getJobsRegistryService().listAddress(jobsInfo.getApp());
if (null != registryList) {
JobsResponse<String> routeAddressResult = executorRouteStrategyEnum.getRouter()
.route(triggerParam, registryList);
Expand Down Expand Up @@ -125,7 +125,7 @@ private static boolean processTrigger(JobsInfo jobsInfo, int finalFailRetryCount
jobLog.setExecutorFailRetryCount(finalFailRetryCount);
jobLog.setTriggerCode(triggerResult.getCode());
jobLog.setTriggerMsg(triggerMsgSb.toString());
JobsHelper.getJobLogService().updateById(jobLog);
JobsHelper.getJobsLogService().updateById(jobLog);

log.debug("Jobs trigger end, jobId:{}", jobLog.getId());
return true;
Expand Down

0 comments on commit 27119c4

Please sign in to comment.