Skip to content

Commit

Permalink
1,no worker condition , master will while ture wait for worker startu…
Browse files Browse the repository at this point in the history
…p 2,worker response task status sync wait for result (apache#2420)

* dispatch task fail will set task status failed

* 1,no worker condition , master will while ture wait for worker startup
2,worker response task status sync wait for result

Co-authored-by: qiaozhanwei <[email protected]>
  • Loading branch information
qiaozhanwei and qiaozhanwei authored Apr 15, 2020
1 parent 0bad7c2 commit 96835eb
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.dolphinscheduler.common.Constants.*;

/**
* TaskUpdateQueue consumer
*/
Expand All @@ -68,7 +70,7 @@ public class TaskPriorityQueueConsumer extends Thread{
* taskUpdateQueue
*/
@Autowired
private TaskPriorityQueue taskUpdateQueue;
private TaskPriorityQueue taskPriorityQueue;

/**
* processService
Expand All @@ -93,7 +95,7 @@ public void run() {
while (Stopper.isRunning()){
try {
// if not task , blocking here
String taskPriorityInfo = taskUpdateQueue.take();
String taskPriorityInfo = taskPriorityQueue.take();

TaskPriority taskPriority = TaskPriority.of(taskPriorityInfo);

Expand All @@ -114,13 +116,22 @@ public void run() {
private Boolean dispatch(int taskInstanceId){
TaskExecutionContext context = getTaskExecutionContext(taskInstanceId);
ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup());
try {
return dispatcher.dispatch(executionContext);
} catch (ExecuteException e) {
logger.error("execute exception", e);
return false;
}
Boolean result = false;
while (Stopper.isRunning()){
try {
result = dispatcher.dispatch(executionContext);
} catch (ExecuteException e) {
logger.error("dispatch error",e);
try {
Thread.sleep(SLEEP_TIME_MILLIS);
} catch (InterruptedException e1) {}
}

if (result){
break;
}
}
return result;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
Expand All @@ -31,9 +33,12 @@
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.dolphinscheduler.common.Constants.*;

/**
* task ack processor
*/
Expand All @@ -51,9 +56,16 @@ public class TaskAckProcessor implements NettyRequestProcessor {
*/
private final TaskInstanceCacheManager taskInstanceCacheManager;


/**
* processService
*/
private ProcessService processService;

public TaskAckProcessor(){
this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class);
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
this.processService = SpringApplicationContext.getBean(ProcessService.class);
}

/**
Expand All @@ -71,8 +83,10 @@ public void process(Channel channel, Command command) {

String workerAddress = ChannelUtils.toAddress(channel).getAddress();

ExecutionStatus ackStatus = ExecutionStatus.of(taskAckCommand.getStatus());

// TaskResponseEvent
TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ExecutionStatus.of(taskAckCommand.getStatus()),
TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ackStatus,
taskAckCommand.getStartTime(),
workerAddress,
taskAckCommand.getExecutePath(),
Expand All @@ -81,6 +95,18 @@ public void process(Channel channel, Command command) {

taskResponseService.addResponse(taskResponseEvent);

while (Stopper.isRunning()){
TaskInstance taskInstance = processService.findTaskInstanceById(taskAckCommand.getTaskInstanceId());

if (taskInstance != null && ackStatus.typeIsRunning()){
break;
}

try {
Thread.sleep(SLEEP_TIME_MILLIS);
} catch (InterruptedException e) {}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
Expand All @@ -30,9 +32,12 @@
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.dolphinscheduler.common.Constants.*;

/**
* task response processor
*/
Expand All @@ -50,9 +55,15 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
*/
private final TaskInstanceCacheManager taskInstanceCacheManager;

/**
* processService
*/
private ProcessService processService;

public TaskResponseProcessor(){
this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class);
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
this.processService = SpringApplicationContext.getBean(ProcessService.class);
}

/**
Expand All @@ -71,6 +82,8 @@ public void process(Channel channel, Command command) {

taskInstanceCacheManager.cacheTaskInstance(responseCommand);

ExecutionStatus responseStatus = ExecutionStatus.of(responseCommand.getStatus());

// TaskResponseEvent
TaskResponseEvent taskResponseEvent = TaskResponseEvent.newResult(ExecutionStatus.of(responseCommand.getStatus()),
responseCommand.getEndTime(),
Expand All @@ -79,6 +92,18 @@ public void process(Channel channel, Command command) {
responseCommand.getTaskInstanceId());

taskResponseService.addResponse(taskResponseEvent);

while (Stopper.isRunning()){
TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());

if (taskInstance != null && responseStatus.typeIsFinished()){
break;
}

try {
Thread.sleep(SLEEP_TIME_MILLIS);
} catch (InterruptedException e) {}
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
package org.apache.dolphinscheduler.server.worker.processor;


import com.alibaba.fastjson.JSONObject;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
Expand Down Expand Up @@ -93,8 +95,17 @@ private NettyRemoteChannel getRemoteChannel(int taskInstanceId){
}
logger.warn("original master : {} is not reachable, random select master", nettyRemoteChannel.getHost());
Set<String> masterNodes = zookeeperRegistryCenter.getMasterNodesDirectly();
if(CollectionUtils.isEmpty(masterNodes)){
throw new IllegalStateException("no available master node exception");
while (Stopper.isRunning()) {
if (CollectionUtils.isEmpty(masterNodes)) {
logger.error("no available master node");
try {
Thread.sleep(1000);
}catch (Exception e){

}
}else {
break;
}
}
for(String masterNode : masterNodes){
newChannel = nettyRemotingClient.getChannel(Host.of(masterNode));
Expand Down
Loading

0 comments on commit 96835eb

Please sign in to comment.