Skip to content

Commit

Permalink
Merge pull request apache#1948 from Yeleights/dev-process
Browse files Browse the repository at this point in the history
Add process null check.
  • Loading branch information
Jave-Chen authored Feb 15, 2020
2 parents c1ee133 + 87cb33b commit 90df148
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessData;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.utils.DagHelper;

import java.util.ArrayList;
import java.util.List;

/**
Expand All @@ -48,41 +47,8 @@ public static DAG<String, TaskNode, TaskNodeRelation> processInstance2DAG(Proces

List<TaskNode> taskNodeList = processData.getTasks();

List<TaskNodeRelation> taskNodeRelations = new ArrayList<>();
ProcessDag processDag = DagHelper.getProcessDag(taskNodeList);

//Traversing node information and building relationships
for (TaskNode taskNode : taskNodeList) {
String preTasks = taskNode.getPreTasks();
List<String> preTasksList = JSONUtils.toList(preTasks, String.class);

//if previous tasks not empty
if (preTasksList != null) {
for (String depNode : preTasksList) {
taskNodeRelations.add(new TaskNodeRelation(depNode, taskNode.getName()));
}
}
}

ProcessDag processDag = new ProcessDag();
processDag.setEdges(taskNodeRelations);
processDag.setNodes(taskNodeList);


// generate detail Dag, to be executed
DAG<String, TaskNode, TaskNodeRelation> dag = new DAG<>();

if (CollectionUtils.isNotEmpty(processDag.getNodes())) {
for (TaskNode node : processDag.getNodes()) {
dag.addNode(node.getName(), node);
}
}

if (CollectionUtils.isNotEmpty(processDag.getEdges())) {
for (TaskNodeRelation edge : processDag.getEdges()) {
dag.addEdge(edge.getStartNode(), edge.getEndNode());
}
}

return dag;
return DagHelper.buildDagGraph(processDag);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.*;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -465,7 +466,7 @@ public Map<String, Object> releaseProcessDefinition(User loginUser, String proje
);

for(Schedule schedule:scheduleList){
logger.info("set schedule offline, schedule id: {}, process definition id: {}", project.getId(), schedule.getId(), id);
logger.info("set schedule offline, project id: {}, schedule id: {}, process definition id: {}", project.getId(), schedule.getId(), id);
// set status
schedule.setReleaseState(ReleaseState.OFFLINE);
scheduleMapper.updateById(schedule);
Expand Down Expand Up @@ -948,11 +949,16 @@ public Map<String, Object> getTaskNodeListByDefinitionId(Integer defineId) throw
return result;
}


String processDefinitionJson = processDefinition.getProcessDefinitionJson();

ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);

//process data check
if (null == processData) {
logger.error("process data is null");
putMsg(result,Status.DATA_IS_NOT_VALID, processDefinitionJson);
return result;
}

List<TaskNode> taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks();

result.put(Constants.DATA_LIST, taskNodeList);
Expand All @@ -974,14 +980,13 @@ public Map<String, Object> getTaskNodeListByDefinitionIdList(String defineIdList

Map<Integer, List<TaskNode>> taskNodeMap = new HashMap<>();
String[] idList = defineIdList.split(",");
List<String> definitionIdList = Arrays.asList(idList);
List<Integer> idIntList = new ArrayList<>();
for(String definitionId : definitionIdList) {
for(String definitionId : idList) {
idIntList.add(Integer.parseInt(definitionId));
}
Integer[] idArray = idIntList.toArray(new Integer[idIntList.size()]);
List<ProcessDefinition> processDefinitionList = processDefineMapper.queryDefinitionListByIdList(idArray);
if (processDefinitionList == null || processDefinitionList.size() ==0) {
if (CollectionUtils.isEmpty(processDefinitionList)) {
logger.info("process definition not exists");
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, defineIdList);
return result;
Expand Down Expand Up @@ -1031,9 +1036,10 @@ public Map<String, Object> viewTree(Integer processId, Integer limit) throws Exc
Map<String, Object> result = new HashMap<>();

ProcessDefinition processDefinition = processDefineMapper.selectById(processId);
if (processDefinition == null) {
if (null == processDefinition) {
logger.info("process define not exists");
throw new RuntimeException("process define not exists");
putMsg(result,Status.PROCESS_DEFINE_NOT_EXIST, processDefinition);
return result;
}
DAG<String, TaskNode, TaskNodeRelation> dag = genDagGraph(processDefinition);
/**
Expand Down Expand Up @@ -1121,10 +1127,10 @@ public Map<String, Object> viewTree(Integer processId, Integer limit) throws Exc
pTreeViewDto.getChildren().add(treeViewDto);
}
postNodeList = dag.getSubsequentNodes(nodeName);
if (postNodeList != null && postNodeList.size() > 0) {
if (CollectionUtils.isNotEmpty(postNodeList)) {
for (String nextNodeName : postNodeList) {
List<TreeViewDto> treeViewDtoList = waitingRunningNodeMap.get(nextNodeName);
if (treeViewDtoList != null && treeViewDtoList.size() > 0) {
if (CollectionUtils.isNotEmpty(treeViewDtoList)) {
treeViewDtoList.add(treeViewDto);
waitingRunningNodeMap.put(nextNodeName, treeViewDtoList);
} else {
Expand All @@ -1136,7 +1142,6 @@ public Map<String, Object> viewTree(Integer processId, Integer limit) throws Exc
}
runningNodeMap.remove(nodeName);
}

if (waitingRunningNodeMap == null || waitingRunningNodeMap.size() == 0) {
break;
} else {
Expand All @@ -1161,75 +1166,29 @@ public Map<String, Object> viewTree(Integer processId, Integer limit) throws Exc
private DAG<String, TaskNode, TaskNodeRelation> genDagGraph(ProcessDefinition processDefinition) throws Exception {

String processDefinitionJson = processDefinition.getProcessDefinitionJson();

ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);

List<TaskNode> taskNodeList = processData.getTasks();
//check process data
if (null != processData) {
List<TaskNode> taskNodeList = processData.getTasks();
processDefinition.setGlobalParamList(processData.getGlobalParams());
ProcessDag processDag = DagHelper.getProcessDag(taskNodeList);

processDefinition.setGlobalParamList(processData.getGlobalParams());


List<TaskNodeRelation> taskNodeRelations = new ArrayList<>();

// Traverse node information and build relationships
for (TaskNode taskNode : taskNodeList) {
String preTasks = taskNode.getPreTasks();
List<String> preTasksList = JSONUtils.toList(preTasks, String.class);

// If the dependency is not empty
if (preTasksList != null) {
for (String depNode : preTasksList) {
taskNodeRelations.add(new TaskNodeRelation(depNode, taskNode.getName()));
}
}
// Generate concrete Dag to be executed
return DagHelper.buildDagGraph(processDag);
}

ProcessDag processDag = new ProcessDag();
processDag.setEdges(taskNodeRelations);
processDag.setNodes(taskNodeList);


// Generate concrete Dag to be executed
return genDagGraph(processDag);


return new DAG<>();
}

/**
* Generate the DAG of process
*
* @return DAG
*/
private DAG<String, TaskNode, TaskNodeRelation> genDagGraph(ProcessDag processDag) {
DAG<String, TaskNode, TaskNodeRelation> dag = new DAG<>();

/**
* Add the ndoes
*/
if (CollectionUtils.isNotEmpty(processDag.getNodes())) {
for (TaskNode node : processDag.getNodes()) {
dag.addNode(node.getName(), node);
}
}

/**
* Add the edges
*/
if (CollectionUtils.isNotEmpty(processDag.getEdges())) {
for (TaskNodeRelation edge : processDag.getEdges()) {
dag.addEdge(edge.getStartNode(), edge.getEndNode());
}
}

return dag;
}


/**
* whether the graph has a ring
*
* @param taskNodeResponseList
* @return
* @param taskNodeResponseList task node response list
* @return if graph has cycle flag
*/
private boolean graphHasCycle(List<TaskNode> taskNodeResponseList) {
DAG<String, TaskNode, String> graph = new DAG<>();
Expand Down
Loading

0 comments on commit 90df148

Please sign in to comment.