Skip to content

Commit

Permalink
v5.3.2 代码优化
Browse files Browse the repository at this point in the history
  • Loading branch information
zhaoyachao committed Mar 31, 2024
1 parent 7ceac66 commit e8b56bf
Show file tree
Hide file tree
Showing 18 changed files with 182 additions and 38 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,9 @@

+ v5.3.2 [zdh_web]所有模块优化代码
+ v5.3.2 [zdh_web]新增ZDH_RUN_MODE环境变量
+ v5.3.2 [zdh_web]营销模块调度执行完成状态未更新bug修复
+ v5.3.2 [zdh_web]营销模块新增自动同步小流量
+ v5.3.2 [zdh_web]使用mapstruct代替apache beanutils工具


+ v5.1.1 [zdh_web]支持hadoop,hive,hbase大数据权限(用户认证,数据权限)【未完成】
Expand Down
14 changes: 14 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<shiro.version>1.7.1</shiro.version>
<mapstruct.version>1.5.5.Final</mapstruct.version>
<lombok.version>1.16.20</lombok.version>
<mybatis.spring.version>1.3.1</mybatis.spring.version>
<pagehelper.version>3.7.5</pagehelper.version>
Expand Down Expand Up @@ -100,6 +101,19 @@
</repositories>

<dependencies>
<!-- https://mvnrepository.com/artifact/org.mapstruct/mapstruct -->
<dependency>
<groupId>org.mapstruct</groupId>
<artifactId>mapstruct</artifactId>
<version>${mapstruct.version}</version>
</dependency>

<dependency>
<groupId>org.mapstruct</groupId>
<artifactId>mapstruct-processor</artifactId>
<version>${mapstruct.version}</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
Expand Down
11 changes: 4 additions & 7 deletions src/main/java/com/zyc/zdh/controller/NodeController.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import com.zyc.zdh.job.SetUpJob;
import com.zyc.zdh.job.SnowflakeIdWorker;
import com.zyc.zdh.util.DateUtil;
import org.apache.commons.beanutils.BeanUtils;
import com.zyc.zdh.util.MapStructMapper;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -192,7 +192,8 @@ public ReturnInfo server_setup(String id,String build_branch){
ServerTaskInstance sti=new ServerTaskInstance();
String id2=SnowflakeIdWorker.getInstance().nextId()+"";
try {
BeanUtils.copyProperties(sti, serverTaskInfo);
//BeanUtils.copyProperties(sti, serverTaskInfo);
sti = MapStructMapper.INSTANCE.serverTaskInfoToServerTaskInstance(serverTaskInfo);
sti.setVersion_type("branch");
sti.setVersion(DateUtil.formatNodash(new Date()));
sti.setId(id2);
Expand All @@ -217,11 +218,7 @@ public void run() {
}
}).start();

} catch (IllegalAccessException e) {
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
logger.error("类:"+Thread.currentThread().getStackTrace()[1].getClassName()+" 函数:"+Thread.currentThread().getStackTrace()[1].getMethodName()+ " 异常: {}", e);
return ReturnInfo.build(RETURN_CODE.FAIL.getCode(),"一键部署失败", e);
} catch (InvocationTargetException e) {
} catch (Exception e) {
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
logger.error("类:"+Thread.currentThread().getStackTrace()[1].getClassName()+" 函数:"+Thread.currentThread().getStackTrace()[1].getMethodName()+ " 异常: {}", e);
return ReturnInfo.build(RETURN_CODE.FAIL.getCode(),"一键部署失败", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import com.zyc.zdh.service.ZdhPermissionService;
import com.zyc.zdh.util.Const;
import com.zyc.zdh.util.DBUtil;
import org.apache.commons.beanutils.BeanUtils;
import com.zyc.zdh.util.MapStructMapper;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -296,7 +296,8 @@ public ReturnInfo<List<String>> etl_task_batch_create(EtlTaskBatchInfo etlTaskBa
//生成单源ETL
for (String table : tab) {
EtlTaskInfo eti = new EtlTaskInfo();
BeanUtils.copyProperties(eti, etbi);
//BeanUtils.copyProperties(eti, etbi);
eti = MapStructMapper.INSTANCE.etlTaskBatchInfoToEtlTaskInfo(etbi);
eti.setId(SnowflakeIdWorker.getInstance().nextId() + "");
eti.setEtl_context(etbi.getEtl_pre_context() + "_" + table);
if (!StringUtils.isEmpty(etbi.getEtl_suffix_context())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import com.zyc.zdh.shiro.RedisUtil;
import com.zyc.zdh.util.Const;
import com.zyc.zdh.util.DateUtil;
import com.zyc.zdh.util.MapStructMapper;
import com.zyc.zdh.util.SFTPUtil;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.ibatis.session.RowBounds;
import org.slf4j.Logger;
Expand Down Expand Up @@ -284,7 +284,8 @@ public ReturnInfo strategy_group_task_execute(StrategyGroupInfo strategyGroupInf
sgi.setCur_time(strategyGroupInfo.getStart_time());

StrategyGroupInstance strategyGroupInstance = new StrategyGroupInstance();
BeanUtils.copyProperties(strategyGroupInstance, sgi);
//BeanUtils.copyProperties(strategyGroupInstance, sgi);
strategyGroupInstance = MapStructMapper.INSTANCE.strategyGroupInfoToStrategyGroupInstance(sgi);

strategyGroupInstance.setId(SnowflakeIdWorker.getInstance().nextId() + "");

Expand All @@ -295,7 +296,7 @@ public ReturnInfo strategy_group_task_execute(StrategyGroupInfo strategyGroupInf
strategyGroupInstance.setCreate_time(new Timestamp(System.currentTimeMillis()));
strategyGroupInstance.setUpdate_time(new Timestamp(System.currentTimeMillis()));
strategyGroupInstance.setMisfire("0");
if(strategyGroupInstance.getGroup_type().equalsIgnoreCase("offline")){
if(strategyGroupInstance.getGroup_type().equalsIgnoreCase(Const.STRATEGY_GROUP_TYPE_OFFLINE)){
strategyGroupInstance.setSmall_flow_rate("1,100");
}

Expand Down Expand Up @@ -356,6 +357,26 @@ public ReturnInfo<PageResult<List<StrategyGroupInstance>>> strategy_group_instan

strategyGroupInstances = strategyGroupInstanceMapper.selectByExampleAndRowBounds(example, rowBounds);

//遍历小流量是否同步
Map<String, String> tmp = new HashMap<>();
String small_flow_key = "small_flow_rate_"+group_id;
Object small_flow_value = redisUtil.get(small_flow_key);
if(small_flow_value != null){
tmp = JSON.parseObject(small_flow_value.toString(), Map.class);
}
for(StrategyGroupInstance sgi: strategyGroupInstances){
if(!StringUtils.isEmpty(sgi.getGroup_type()) && sgi.getGroup_type().equalsIgnoreCase(Const.STRATEGY_GROUP_TYPE_ONLINE)){
if(tmp.containsKey(sgi.getId())){
sgi.setSmall_flow_status(Const.ON);
}else{
sgi.setSmall_flow_status(Const.OFF);
}
}
if(sgi.getGroup_type().equalsIgnoreCase(Const.STRATEGY_GROUP_TYPE_OFFLINE)){
sgi.setSmall_flow_status(Const.ON);
}
}

PageResult<List<StrategyGroupInstance>> pageResult=new PageResult<>();
pageResult.setTotal(total);
pageResult.setRows(strategyGroupInstances);
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/com/zyc/zdh/entity/StrategyGroupInstance.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import javax.persistence.Id;
import javax.persistence.Table;
import javax.persistence.Transient;
import java.sql.Timestamp;
import java.util.Date;

Expand Down Expand Up @@ -139,6 +140,12 @@ public class StrategyGroupInstance {
*/
private String small_flow_rate;

/**
* 小流量状态
*/
@Transient
private String small_flow_status;

/**
* @return id
*/
Expand Down Expand Up @@ -588,4 +595,12 @@ public String getSmall_flow_rate() {
public void setSmall_flow_rate(String small_flow_rate) {
this.small_flow_rate = small_flow_rate;
}

public String getSmall_flow_status() {
return small_flow_status;
}

public void setSmall_flow_status(String small_flow_status) {
this.small_flow_status = small_flow_status;
}
}
4 changes: 2 additions & 2 deletions src/main/java/com/zyc/zdh/job/CheckDepJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public static void run_sub_task(String scheduleId) {
if(StringUtils.isEmpty(scheduleId) && !StringUtils.isEmpty(tli.getSchedule_id())){
if(!redisUtil.exists("schedule_"+tli.getSchedule_id())){
logger.error("run sub task: {}, not found schedule: {}", tli.getId(), tli.getSchedule_id());
if((new Date().getTime() - tli.getUpdate_time().getTime()) >= (10 * 60 *1000)){
if((System.currentTimeMillis() - tli.getUpdate_time().getTime()) >= (10 * 60 *1000)){
logger.info("run sub task: {}, not found schedule: {}, update status error", tli.getId(), tli.getSchedule_id());
taskLogInstanceMapper.updateStatusById(JobStatus.ERROR.getValue(),DateUtil.getCurrentTime(), tli.getId());
}
Expand Down Expand Up @@ -226,7 +226,7 @@ public static void run_sub_task(String scheduleId) {
if(StringUtils.isEmpty(scheduleId) && !StringUtils.isEmpty(tl.getSchedule_id())){
if(!redisUtil.exists("schedule_"+tl.getSchedule_id())){
logger.error("run sub task: {}, not found schedule: {}", tl.getId(), tl.getSchedule_id());
if((new Date().getTime() - tl.getUpdate_time().getTime()) >= (10 * 60 *1000)){
if((System.currentTimeMillis() - tl.getUpdate_time().getTime()) >= (10 * 60 *1000)){
logger.info("run sub task: {}, not found schedule: {}, update status error", tl.getId(), tl.getSchedule_id());
taskLogInstanceMapper.updateStatusById(JobStatus.ERROR.getValue(),DateUtil.getCurrentTime(), tl.getId());
}
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/com/zyc/zdh/job/CheckStrategyDepJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,15 @@ public static void run_sub_task() {
StrategyInstanceMapper sim=(StrategyInstanceMapper) SpringContext.getBean("strategyInstanceMapper");

//获取所有实时类型的可执行子任务
List<StrategyInstance> strategyInstanceOnLineList=sim.selectThreadByStatus1(new String[] {JobStatus.CREATE.getValue(),JobStatus.CHECK_DEP.getValue()}, "online");
List<StrategyInstance> strategyInstanceOnLineList=sim.selectThreadByStatus1(new String[] {JobStatus.CREATE.getValue(),JobStatus.CHECK_DEP.getValue()}, Const.STRATEGY_GROUP_TYPE_ONLINE);
for(StrategyInstance tl :strategyInstanceOnLineList){
tl.setStatus(JobStatus.ETL.getValue());
tl.setUpdate_time(new Timestamp(System.currentTimeMillis()));
JobDigitalMarket.updateTaskLog(tl,sim);
}

//获取所有离线类型的可执行的非依赖类型子任务
List<StrategyInstance> strategyInstanceList=sim.selectThreadByStatus1(new String[] {JobStatus.CREATE.getValue(),JobStatus.CHECK_DEP.getValue()}, "offline");
List<StrategyInstance> strategyInstanceList=sim.selectThreadByStatus1(new String[] {JobStatus.CREATE.getValue(),JobStatus.CHECK_DEP.getValue()}, Const.STRATEGY_GROUP_TYPE_OFFLINE);
for(StrategyInstance tl :strategyInstanceList){
try{
//如果skip状态,跳过当前策略实例,理论上不会有skip状态,策略的跳过是通过is_disenable=true实现
Expand Down Expand Up @@ -316,8 +316,8 @@ public static void create_group_final_status(){
for(StrategyGroupInstance sgi:sgis){

//策略组实例到期自动结束
if(sgi.getGroup_type().equalsIgnoreCase("online") && sgi.getStatus().equalsIgnoreCase(JobStatus.SUB_TASK_DISPATCH.getValue())){
if(new Date().getTime() > sgi.getEnd_time().getTime()){
if(sgi.getGroup_type().equalsIgnoreCase(Const.STRATEGY_GROUP_TYPE_ONLINE) && sgi.getStatus().equalsIgnoreCase(JobStatus.SUB_TASK_DISPATCH.getValue())){
if(System.currentTimeMillis() > sgi.getEnd_time().getTime()){
sgim.updateStatusById3(JobStatus.FINISH.getValue(), DateUtil.getCurrentTime(), sgi.getId());
sim.updateStatusKillByGroupInstanceId(sgi.getId(), JobStatus.FINISH.getValue());
}
Expand Down
16 changes: 8 additions & 8 deletions src/main/java/com/zyc/zdh/job/JobCommon2.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import com.zyc.zdh.service.ZdhLogsService;
import com.zyc.zdh.shiro.RedisUtil;
import com.zyc.zdh.util.*;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringEscapeUtils;
Expand All @@ -37,7 +36,6 @@

import java.io.*;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
Expand Down Expand Up @@ -3267,12 +3265,13 @@ public void run() {
tgli.setId(SnowflakeIdWorker.getInstance().nextId() + "");
try {
//复制quartzjobinfo到tli,任务基础信息完成复制
BeanUtils.copyProperties(tgli, quartzJobInfo);
} catch (IllegalAccessException e) {
logger.error("类:" + Thread.currentThread().getStackTrace()[1].getClassName() + " 函数:" + Thread.currentThread().getStackTrace()[1].getMethodName() + " 异常: {}" , e);
} catch (InvocationTargetException e) {
//BeanUtils.copyProperties(tgli, quartzJobInfo);
tgli = MapStructMapper.INSTANCE.quartzJobInfoToTaskGroupLogInstance(quartzJobInfo);
} catch (Exception e) {
logger.error("类:" + Thread.currentThread().getStackTrace()[1].getClassName() + " 函数:" + Thread.currentThread().getStackTrace()[1].getMethodName() + " 异常: {}" , e);
}
} //catch (Exception e) {
// logger.error("类:" + Thread.currentThread().getStackTrace()[1].getClassName() + " 函数:" + Thread.currentThread().getStackTrace()[1].getMethodName() + " 异常: {}" , e);
// }
//逻辑发送错误代码捕获发生自动重试(retry_job) 不重新生成实例id,使用旧的实例id
String last_task_id = "";
if (is_retry == 0) {
Expand Down Expand Up @@ -3737,7 +3736,8 @@ public static List<TaskLogInstance> sub_task_log_instance(TaskGroupLogInstance t
JSONArray lines = JSON.parseObject(tgli.getJsmind_data()).getJSONArray("line");
for (Object job : tasks) {
TaskLogInstance taskLogInstance = new TaskLogInstance();
BeanUtils.copyProperties(taskLogInstance, tgli);
//BeanUtils.copyProperties(taskLogInstance, tgli);
taskLogInstance = MapStructMapper.INSTANCE.taskGroupLogInstanceToTaskLogInstance(tgli);

String etl_task_id = ((JSONObject) job).getString("etl_task_id");//具体任务id
String pageSourceId = ((JSONObject) job).getString("divId");//前端生成的div 标识
Expand Down
16 changes: 11 additions & 5 deletions src/main/java/com/zyc/zdh/job/JobDigitalMarket.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import com.zyc.zdh.service.ZdhLogsService;
import com.zyc.zdh.shiro.RedisUtil;
import com.zyc.zdh.util.*;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -267,7 +266,8 @@ public void run() {
StrategyGroupInstance sgi = new StrategyGroupInstance();
try {
//复制quartzjobinfo到tli,任务基础信息完成复制
BeanUtils.copyProperties(sgi, strategyGroupInfo);
//BeanUtils.copyProperties(sgi, strategyGroupInfo);
sgi = MapStructMapper.INSTANCE.strategyGroupInfoToStrategyGroupInstance(strategyGroupInfo);
sgi.setId(SnowflakeIdWorker.getInstance().nextId() + "");
sgi.setStrategy_group_id(strategyGroupInfo.getId());
sgi.setCreate_time(new Timestamp(System.currentTimeMillis()));
Expand Down Expand Up @@ -313,11 +313,16 @@ public void run() {
sub_strategy_instance(sgi, sub_tasks);
sgim.updateStatus2Create(new String[]{sgi.getId()});

//在线策略小流量生效
//在线策略小流量生效,谨慎操作,不会清理历史小流量配置,清理历史小流量需要在平台手动同步小流量
if(!StringUtils.isEmpty(sgi.getSmall_flow_rate()) && sgi.getGroup_type().equalsIgnoreCase(JobGroupType.ONLINE.getValue())){
Map<String, String> tmp = new HashMap<>();
String small_flow_key = "small_flow_rate_"+sgi.getStrategy_group_id();
Object small_flow_value = redisUtil.get(small_flow_key);
if(small_flow_value != null){
tmp = JSON.parseObject(small_flow_value.toString(), Map.class);
}
tmp.put(sgi.getId(), sgi.getSmall_flow_rate());
redisUtil.set("small_flow_rate_"+sgi.getStrategy_group_id(), JSON.toJSONString(tmp));
redisUtil.set(small_flow_key, JSON.toJSONString(tmp));
}
debugInfo(sgi);

Expand Down Expand Up @@ -391,7 +396,8 @@ public static List<StrategyInstance> sub_strategy_instance(StrategyGroupInstance
JSONArray lines = JSON.parseObject(sgi.getJsmind_data()).getJSONArray("line");
for (Object job : tasks) {
StrategyInstance si = new StrategyInstance();
BeanUtils.copyProperties(si, sgi);
//BeanUtils.copyProperties(si, sgi);
si = MapStructMapper.INSTANCE.strategyGroupInstanceToStrategyInstance(sgi);

String pageSourceId = ((JSONObject) job).getString("divId");//前端生成的div 标识
String more_task = ((JSONObject) job).getString("more_task");
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/com/zyc/zdh/job/MyJobBean.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ protected void executeInternal(JobExecutionContext context)
StrategyGroupInfo strategyGroupInfo=new StrategyGroupInfo();
strategyGroupInfo = sgm.selectByPrimaryKey(taskId);
strategyGroupInfo.setQuartz_time(new Timestamp(currentTime.getTime()));
if(context.getTrigger().getNextFireTime()==null){
strategyGroupInfo.setStatus(JobStatus.FINISH.getValue());
}
JobDigitalMarket.chooseJobBean(strategyGroupInfo, 0, null, null);
}else if(!StringUtils.isEmpty(taskType) && taskType.equalsIgnoreCase("beaconfire")){
//烽火台告警模块调度任务,告警模块计划采用纯内存存储,可以理解,这部分数据可以不做一致性(原因:告警是需要时效性的,错过去的告警已经失去了告警的意义)
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/com/zyc/zdh/job/ShellJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import com.zyc.zdh.util.Const;

import java.io.*;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;
Expand Down Expand Up @@ -68,7 +67,7 @@ public static Boolean shellCommand(TaskLogInstance tli) {
if (tli.getIs_script() != null && tli.getIs_script().equals("true")) {
logger.info("[" + jobType + "] JOB ,以脚本方式执行");
insertLog(tli, "info", "[" + jobType + "] JOB ,以脚本方式执行");
String fileName = new Date().getTime() + "";
String fileName = System.currentTimeMillis() + "";
if (system.toLowerCase().startsWith("win")) {
fileName = fileName + ".bat";
} else {
Expand Down
Loading

0 comments on commit e8b56bf

Please sign in to comment.