Skip to content

Commit

Permalink
repackage name ProcessUtils
Browse files Browse the repository at this point in the history
  • Loading branch information
ponfee committed Oct 31, 2023
1 parent 3e9946d commit 557f313
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 68 deletions.
7 changes: 7 additions & 0 deletions disjob-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@
<artifactId>groovy-jsr223</artifactId>
</dependency>

<!-- OS process operation -->
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna-platform</artifactId>
<version>${jna-platform.version}</version>
</dependency>

<!-- spring dependencies -->
<dependency>
<groupId>org.springframework</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,9 @@
** \/ \/ \/ **
\* */

package cn.ponfee.disjob.core.util;
package cn.ponfee.disjob.common.util;

import cn.ponfee.disjob.common.concurrent.Threads;
import cn.ponfee.disjob.common.exception.Throwables;
import cn.ponfee.disjob.common.util.Fields;
import cn.ponfee.disjob.common.util.Files;
import cn.ponfee.disjob.core.base.JobCodeMsg;
import cn.ponfee.disjob.core.handle.ExecuteResult;
import cn.ponfee.disjob.core.handle.execution.ExecutingTask;
import com.sun.jna.Pointer;
import com.sun.jna.platform.win32.Kernel32;
import com.sun.jna.platform.win32.WinNT;
Expand All @@ -30,7 +24,6 @@
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Consumer;
Expand All @@ -56,32 +49,6 @@ public static void destroy(Process process) {
}
}

public static ExecuteResult complete(Process process, Charset charset, ExecutingTask executingTask, Logger log) {
try (InputStream is = process.getInputStream(); InputStream es = process.getErrorStream()) {
// 一次性获取全部执行结果信息:不是在控制台实时展示执行信息,所以此处不用通过异步线程去获取命令的实时执行信息
String verbose = IOUtils.toString(is, charset);
String error = IOUtils.toString(es, charset);
int code = process.waitFor();
if (code == 0) {
return ExecuteResult.success(verbose);
} else {
return ExecuteResult.failure(JobCodeMsg.JOB_EXECUTE_FAILED.getCode(), code + ": " + error);
}
} catch (Throwable t) {
log.error("Process execute error: " + executingTask, t);
Threads.interruptIfNecessary(t);
return ExecuteResult.failure(JobCodeMsg.JOB_EXECUTE_ERROR.getCode(), Throwables.getRootCauseMessage(t));
} finally {
if (process != null) {
try {
process.destroy();
} catch (Throwable t) {
log.error("Destroy process error: " + executingTask, t);
}
}
}
}

public static int progress(Process process, Charset charset, Logger log) {
return progress(process, charset, log::info, log::error);
}
Expand Down Expand Up @@ -145,31 +112,27 @@ public static Long getProcessId(Process process) {
public static void killProcess(Long pid, Charset charset) {
try {
if (SystemUtils.IS_OS_WINDOWS) {
List<String> killCommand = Arrays.asList("taskkill", "/PID", String.valueOf(pid), "/F", "/T");
Process process = new ProcessBuilder(killCommand).start();
try (InputStream input = process.getInputStream()) {
String verbose = IOUtils.toString(input, charset);
LOG.info("Stop windows process verbose: {}", verbose);
}
destroy(process);
Process killProcess = new ProcessBuilder("taskkill", "/PID", String.valueOf(pid), "/F", "/T").start();
killProcess.waitFor();
LOG.info("Stop windows process verbose: {} | {}", pid, processVerbose(killProcess, charset));
destroy(killProcess);
} else if (SystemUtils.IS_OS_UNIX) {
// find child process id
List<String> killCommand = Arrays.asList("/bin/sh", "-c", String.format("ps axo pid,ppid| awk '{ if($2==%d) print$1}'", pid));
Process process = new ProcessBuilder(killCommand).start();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream(), charset))) {
// 1、find child process id
String findChildPidCommand = String.format("ps axo pid,ppid | awk '{ if($2==%d) print$1}'", pid);
Process findChildPidProcess = new ProcessBuilder("/bin/sh", "-c", findChildPidCommand).start();
findChildPidProcess.waitFor();
try (InputStream inputStream = findChildPidProcess.getInputStream()) {
// stop all child process
String childPid;
while ((childPid = reader.readLine()) != null) {
if (StringUtils.isNotBlank(childPid)) {
killProcess(Long.valueOf(childPid.trim()), charset);
}
}
List<String> childPidList = IOUtils.readLines(inputStream, charset);
childPidList.stream().filter(StringUtils::isNotBlank).forEach(e -> killProcess(Long.parseLong(e.trim()), charset));
}
// kill current process id
ProcessBuilder killProcessBuilder = new ProcessBuilder("kill", "-9", String.valueOf(pid));
killProcessBuilder.start().waitFor();
LOG.info("Stop unix process id: {}", pid);
destroy(process);
destroy(findChildPidProcess);

// 2、kill current process id
Process killProcess = new ProcessBuilder("kill", "-9", String.valueOf(pid)).start();
killProcess.waitFor();
LOG.info("Stop unix process verbose: {} | {}", pid, processVerbose(killProcess, charset));
destroy(killProcess);
} else {
LOG.error("Stop process id unknown os name: {}, {}", SystemUtils.OS_NAME, pid);
}
Expand All @@ -193,4 +156,10 @@ private static void read(InputStream input, Charset charset, Consumer<String> co
}
}

private static String processVerbose(Process process, Charset charset) throws IOException {
try (InputStream input = process.getInputStream()) {
return IOUtils.toString(input, charset);
}
}

}
6 changes: 0 additions & 6 deletions disjob-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,6 @@
<artifactId>spring-boot-starter-data-redis</artifactId>
<optional>true</optional>
</dependency>
<!-- OS process operation -->
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna-platform</artifactId>
<version>${jna-platform.version}</version>
</dependency>
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-ui</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
import cn.ponfee.disjob.core.handle.JobHandler;
import cn.ponfee.disjob.core.handle.Savepoint;
import cn.ponfee.disjob.core.handle.execution.ExecutingTask;
import cn.ponfee.disjob.core.util.ProcessUtils;
import cn.ponfee.disjob.core.util.JobUtils;
import cn.ponfee.disjob.common.util.ProcessUtils;
import lombok.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -49,16 +50,28 @@
public class CommandJobHandler extends JobHandler {
private final static Logger LOG = LoggerFactory.getLogger(CommandJobHandler.class);

private Charset charset;
private Long pid;

@Override
protected void onStop() {
if (pid != null) {
ProcessUtils.killProcess(pid, charset);
}
}

@Override
public ExecuteResult execute(ExecutingTask executingTask, Savepoint savepoint) throws Exception {
String taskParam = executingTask.getTaskParam();
Assert.hasText(taskParam, "Command param cannot be empty.");
CommandParam commandParam = Jsons.JSON5.readValue(taskParam, CommandParam.class);
Assert.notEmpty(commandParam.cmdarray, "Command array cannot be empty.");
Charset charset = Files.charset(commandParam.charset);
this.charset = Files.charset(commandParam.charset);

Process process = Runtime.getRuntime().exec(commandParam.cmdarray, commandParam.envp);
return ProcessUtils.complete(process, charset, executingTask, LOG);
this.pid = ProcessUtils.getProcessId(process);
LOG.info("Command process id: {} | {}", executingTask.getTaskId(), pid);
return JobUtils.completeProcess(process, charset, executingTask, LOG);
}

@Data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
import cn.ponfee.disjob.core.handle.JobHandler;
import cn.ponfee.disjob.core.handle.Savepoint;
import cn.ponfee.disjob.core.handle.execution.ExecutingTask;
import cn.ponfee.disjob.core.util.ProcessUtils;
import cn.ponfee.disjob.core.util.JobUtils;
import cn.ponfee.disjob.common.util.ProcessUtils;
import lombok.Data;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -54,19 +55,31 @@ public class ScriptJobHandler extends JobHandler {
private static final String[] DOWNLOAD_PROTOCOL = {"http://", "https://", "ftp://"};
private static final String WORKER_SCRIPT_DIR = SystemUtils.USER_HOME + "/disjob/worker/scripts/";

private Charset charset;
private Long pid;

@Override
protected void onStop() {
if (pid != null) {
ProcessUtils.killProcess(pid, charset);
}
}

@Override
public ExecuteResult execute(ExecutingTask executingTask, Savepoint savepoint) throws Exception {
ScriptParam scriptParam = Jsons.JSON5.readValue(executingTask.getTaskParam(), ScriptParam.class);
Assert.notNull(scriptParam, () -> "Invalid script param: " + scriptParam);
Assert.notNull(scriptParam.type, () -> "Script type cannot be null: " + scriptParam);
scriptParam.type.check();
Charset charset = Files.charset(scriptParam.charset);
this.charset = Files.charset(scriptParam.charset);

String scriptFileName = scriptParam.type.buildFileName(executingTask.getTaskId());
String scriptPath = prepareScriptFile(scriptParam.script, scriptFileName, charset);

Process process = scriptParam.type.exec(scriptPath, scriptParam.envp);
return ProcessUtils.complete(process, charset, executingTask, LOG);
this.pid = ProcessUtils.getProcessId(process);
LOG.info("Script process id: {} | {}", executingTask.getTaskId(), pid);
return JobUtils.completeProcess(process, charset, executingTask, LOG);
}

public enum ScriptType {
Expand Down
36 changes: 36 additions & 0 deletions disjob-core/src/main/java/cn/ponfee/disjob/core/util/JobUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,19 @@

package cn.ponfee.disjob.core.util;

import cn.ponfee.disjob.common.concurrent.Threads;
import cn.ponfee.disjob.common.exception.Throwables;
import cn.ponfee.disjob.common.util.NetUtils;
import cn.ponfee.disjob.core.base.JobCodeMsg;
import cn.ponfee.disjob.core.base.JobConstants;
import cn.ponfee.disjob.core.handle.ExecuteResult;
import cn.ponfee.disjob.core.handle.execution.ExecutingTask;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;

import java.io.InputStream;
import java.nio.charset.Charset;

/**
* Job utility
Expand All @@ -19,6 +29,32 @@
*/
public class JobUtils {

public static ExecuteResult completeProcess(Process process, Charset charset, ExecutingTask executingTask, Logger log) {
try (InputStream is = process.getInputStream(); InputStream es = process.getErrorStream()) {
// 一次性获取全部执行结果信息:不是在控制台实时展示执行信息,所以此处不用通过异步线程去获取命令的实时执行信息
String verbose = IOUtils.toString(is, charset);
String error = IOUtils.toString(es, charset);
int code = process.waitFor();
if (code == 0) {
return ExecuteResult.success(verbose);
} else {
return ExecuteResult.failure(JobCodeMsg.JOB_EXECUTE_FAILED.getCode(), code + ": " + error);
}
} catch (Throwable t) {
log.error("Process execute error: " + executingTask, t);
Threads.interruptIfNecessary(t);
return ExecuteResult.failure(JobCodeMsg.JOB_EXECUTE_ERROR.getCode(), Throwables.getRootCauseMessage(t));
} finally {
if (process != null) {
try {
process.destroy();
} catch (Throwable t) {
log.error("Destroy process error: " + executingTask, t);
}
}
}
}

public static String getLocalHost(String specifiedHost) {
String host = specifiedHost;
if (StringUtils.isNotEmpty(host)) {
Expand Down

0 comments on commit 557f313

Please sign in to comment.