diff --git a/README.md b/README.md index ccd5d646c..7b694090e 100644 --- a/README.md +++ b/README.md @@ -57,7 +57,7 @@ disjob # 主项目① - 任务在执行时若抛出[PauseTaskException](disjob-core/src/main/java/cn/ponfee/disjob/core/exception/PauseTaskException.java),会暂停对应实例下的全部任务(包括分布在不同worker机器中的任务) - 支持广播任务,广播任务会派发给job-group下的所有worker执行 - 支持Job间的依赖,多个Job配置好依赖关系后便会按既定的依赖顺序依次执行 -- 支持DAG工作流,可把jobHandler配置为复杂的DAG表达式,如:A->B,C,(D->E)->D,F->G +- 支持DAG工作流,可把`jobHandler`配置为复杂的DAG表达式,如:A->B,C,(D->E)->D,F->G - 提供Web管理后台,通过界面进行作业配置,任务监控等 ## Comparison @@ -65,14 +65,14 @@ disjob # 主项目① | | **Quartz** | **Elastic-Job** | **Xxl-Job** | **Disjob** | | ---------------- | ---------- |-----------------|----------------------|----------------------------------------| | **触发类型** | Cron | Cron | Cron、固定频率、父子依赖 | Cron、指定时间、固定频率、固定延时、父子依赖 | -| **任务编排** | 不支持 | 不支持 | 不支持 | DAG表达式 | -| **任务分片** | 不支持 | 静态分片 | 广播任务 | 广播任务、动态分片 | -| **停止与恢复** | 不支持 | 不支持 | 终止运行中的任务 | 暂停执行中的任务、恢复已暂停的任务 | -| **保存执行快照** | 不支持 | 不支持 | 不支持 | 支持 | -| **失败重试** | 不支持 | 失效转移 | 支持 | 支持 | -| **后台管理** | 不支持 | 支持 | 支持 | 支持 | -| **监控告警** | 不支持 | 邮件 | 邮件 | 暂不支持 | -| **查看执行日志** | 不支持 | 支持 | 支持 | 暂不支持 | +| **任务编排** | 无 | 无 | 无 | DAG表达式 | +| **任务分片** | 无 | 静态分片 | 广播任务 | 广播任务、动态分片 | +| **停止与恢复** | 无 | 无 | 终止运行中的任务 | 暂停执行中的任务、恢复执行已暂停的任务 | +| **保存执行快照** | 无 | 无 | 无 | 有 | +| **失败重试** | 无 | 失效转移 | 有 | 有 | +| **后台管理** | 无 | 有 | 有 | 有 | +| **监控告警** | 无 | 邮件 | 邮件 | 暂无 | +| **查看执行日志** | 无 | 有 | 有 | 暂无 | ## [Download From Maven Central](https://central.sonatype.com/namespace/cn.ponfee) diff --git a/disjob-admin/ruoyi-admin/src/test/java/cn/ponfee/disjob/admin/CopyrightVerifyTest.java b/disjob-admin/ruoyi-admin/src/test/java/cn/ponfee/disjob/admin/CopyrightVerifyTest.java index 53b1d1854..295db19b2 100644 --- a/disjob-admin/ruoyi-admin/src/test/java/cn/ponfee/disjob/admin/CopyrightVerifyTest.java +++ b/disjob-admin/ruoyi-admin/src/test/java/cn/ponfee/disjob/admin/CopyrightVerifyTest.java @@ -36,12 +36,12 @@ public class CopyrightVerifyTest { private static final String COPYRIGHT_KEYWORD = " Copyright (c) 2017-2023 Ponfee "; private static final String BASE_DIR = MavenProjects.getProjectBaseDir(); - private static final String COPYRIGHT = ThrowingSupplier.get(() -> IOUtils.resourceToString("copy-right.txt", UTF_8, CopyrightVerifyTest.class.getClassLoader())); + private static final String COPYRIGHT = ThrowingSupplier.doChecked(() -> IOUtils.resourceToString("copy-right.txt", UTF_8, CopyrightVerifyTest.class.getClassLoader())); @Test public void upsertCopyright() { handleFile(file -> { - String text = ThrowingSupplier.get(() -> IOUtils.toString(file.toURI(), UTF_8)); + String text = ThrowingSupplier.doChecked(() -> IOUtils.toString(file.toURI(), UTF_8)); if (!isOwnerCode(text)) { return; } @@ -72,7 +72,7 @@ public void upsertCopyright() { @Test public void checkCopyright() { handleFile(file -> { - String text = ThrowingSupplier.get(() -> IOUtils.toString(file.toURI(), UTF_8)); + String text = ThrowingSupplier.doChecked(() -> IOUtils.toString(file.toURI(), UTF_8)); if (StringUtils.countMatches(text, " @author ") == 0) { System.out.println(file.getName()); } else if (isOwnerCode(text)) { @@ -92,7 +92,7 @@ public void checkCopyright() { @Test public void testNoCopyright() { handleFile(file -> { - String text = ThrowingSupplier.get(() -> IOUtils.toString(file.toURI(), UTF_8)); + String text = ThrowingSupplier.doChecked(() -> IOUtils.toString(file.toURI(), UTF_8)); if (!text.contains(COPYRIGHT_KEYWORD)) { System.out.println(file.getName()); } diff --git a/disjob-common/src/main/java/cn/ponfee/disjob/common/base/ConsistentHash.java b/disjob-common/src/main/java/cn/ponfee/disjob/common/base/ConsistentHash.java index 6aa1724b5..ee6b7dc70 100644 --- a/disjob-common/src/main/java/cn/ponfee/disjob/common/base/ConsistentHash.java +++ b/disjob-common/src/main/java/cn/ponfee/disjob/common/base/ConsistentHash.java @@ -9,12 +9,14 @@ package cn.ponfee.disjob.common.base; import cn.ponfee.disjob.common.util.CRC16; +import com.google.common.hash.Hashing; import org.apache.commons.codec.digest.DigestUtils; -import java.nio.charset.StandardCharsets; import java.util.*; import java.util.function.Function; +import static java.nio.charset.StandardCharsets.UTF_8; + /** * Consistent hashing algorithm. * @@ -66,7 +68,11 @@ public interface HashFunction { return h; }; - HashFunction CRC_16 = key -> CRC16.digest(key.getBytes(StandardCharsets.UTF_8)); + HashFunction SIP_HASH = key -> Hashing.sipHash24().hashBytes(key.getBytes(UTF_8)).asInt(); + + HashFunction MURMUR3_32 = key -> Hashing.murmur3_32_fixed().hashBytes(key.getBytes(UTF_8)).asInt(); + + HashFunction CRC_16 = key -> CRC16.digest(key.getBytes(UTF_8)); } /** diff --git a/disjob-common/src/main/java/cn/ponfee/disjob/common/concurrent/AsyncDelayedExecutor.java b/disjob-common/src/main/java/cn/ponfee/disjob/common/concurrent/AsyncDelayedExecutor.java index 7f49dc8c1..59cdec85f 100644 --- a/disjob-common/src/main/java/cn/ponfee/disjob/common/concurrent/AsyncDelayedExecutor.java +++ b/disjob-common/src/main/java/cn/ponfee/disjob/common/concurrent/AsyncDelayedExecutor.java @@ -118,9 +118,9 @@ public void run() { if (delayed != null) { E data = delayed.getData(); if (asyncExecutor != null) { - asyncExecutor.submit(ThrowingRunnable.caught(() -> processor.accept(data))); + asyncExecutor.submit(ThrowingRunnable.toCaught(() -> processor.accept(data))); } else { - ThrowingRunnable.execute(() -> processor.accept(data)); + ThrowingRunnable.doCaught(() -> processor.accept(data)); } } } diff --git a/disjob-common/src/main/java/cn/ponfee/disjob/common/concurrent/LoopThread.java b/disjob-common/src/main/java/cn/ponfee/disjob/common/concurrent/LoopThread.java index 9f45d9379..3436ed062 100644 --- a/disjob-common/src/main/java/cn/ponfee/disjob/common/concurrent/LoopThread.java +++ b/disjob-common/src/main/java/cn/ponfee/disjob/common/concurrent/LoopThread.java @@ -62,7 +62,7 @@ public static LoopThread createStarted(String name, boolean daemon, int priority public void run() { LOG.info("Loop process thread begin."); if (delayMs > 0) { - ThrowingRunnable.run(() -> Thread.sleep(delayMs)); + ThrowingRunnable.doChecked(() -> Thread.sleep(delayMs)); } while (state.get() == RUNNING) { try { @@ -90,7 +90,7 @@ public synchronized void start() { public boolean terminate() { if (state.compareAndSet(RUNNING, TERMINATED)) { - ThrowingRunnable.execute(super::interrupt); + ThrowingRunnable.doCaught(super::interrupt); return true; } else { return false; diff --git a/disjob-common/src/main/java/cn/ponfee/disjob/common/concurrent/ThreadPoolExecutors.java b/disjob-common/src/main/java/cn/ponfee/disjob/common/concurrent/ThreadPoolExecutors.java index cdd57f9a7..57646ec7a 100644 --- a/disjob-common/src/main/java/cn/ponfee/disjob/common/concurrent/ThreadPoolExecutors.java +++ b/disjob-common/src/main/java/cn/ponfee/disjob/common/concurrent/ThreadPoolExecutors.java @@ -206,7 +206,7 @@ public static boolean shutdown(ExecutorService executorService) { return true; } catch (Throwable t) { LOG.error("Shutdown ExecutorService occur error.", t); - ThrowingRunnable.execute(executorService::shutdownNow); + ThrowingRunnable.doCaught(executorService::shutdownNow); Threads.interruptIfNecessary(t); return false; } @@ -232,7 +232,7 @@ public static boolean shutdown(ExecutorService executorService, int awaitSeconds } catch (Throwable t) { LOG.error("Shutdown ExecutorService occur error.", t); if (!hasCallShutdownNow) { - ThrowingRunnable.execute(executorService::shutdownNow); + ThrowingRunnable.doCaught(executorService::shutdownNow); } Threads.interruptIfNecessary(t); } diff --git a/disjob-common/src/main/java/cn/ponfee/disjob/common/exception/Throwables.java b/disjob-common/src/main/java/cn/ponfee/disjob/common/exception/Throwables.java index f1bd9ba87..aee7a3ee9 100644 --- a/disjob-common/src/main/java/cn/ponfee/disjob/common/exception/Throwables.java +++ b/disjob-common/src/main/java/cn/ponfee/disjob/common/exception/Throwables.java @@ -85,7 +85,7 @@ default ThrowingCallable toCallable(R result) { }; } - static void run(ThrowingRunnable runnable) { + static void doChecked(ThrowingRunnable runnable) { try { runnable.run(); } catch (Throwable t) { @@ -93,11 +93,11 @@ static void run(ThrowingRunnable runnable) { } } - static void execute(ThrowingRunnable runnable) { - execute(runnable, EMPTY_MESSAGE); + static void doCaught(ThrowingRunnable runnable) { + doCaught(runnable, EMPTY_MESSAGE); } - static void execute(ThrowingRunnable runnable, Supplier message) { + static void doCaught(ThrowingRunnable runnable, Supplier message) { try { runnable.run(); } catch (Throwable t) { @@ -106,7 +106,7 @@ static void execute(ThrowingRunnable runnable, Supplier message) { } } - static Runnable checked(ThrowingRunnable runnable) { + static Runnable toChecked(ThrowingRunnable runnable) { return () -> { try { runnable.run(); @@ -116,11 +116,11 @@ static Runnable checked(ThrowingRunnable runnable) { }; } - static Runnable caught(ThrowingRunnable runnable) { - return caught(runnable, EMPTY_MESSAGE); + static Runnable toCaught(ThrowingRunnable runnable) { + return toCaught(runnable, EMPTY_MESSAGE); } - static Runnable caught(ThrowingRunnable runnable, Supplier message) { + static Runnable toCaught(ThrowingRunnable runnable, Supplier message) { return () -> { try { runnable.run(); @@ -146,7 +146,7 @@ default ThrowingRunnable toRunnable() { return this::get; } - static R get(ThrowingSupplier supplier) { + static R doChecked(ThrowingSupplier supplier) { try { return supplier.get(); } catch (Throwable t) { @@ -154,11 +154,11 @@ static R get(ThrowingSupplier supplier) { } } - static R execute(ThrowingSupplier supplier) { - return execute(supplier, null, EMPTY_MESSAGE); + static R doCaught(ThrowingSupplier supplier) { + return doCaught(supplier, null, EMPTY_MESSAGE); } - static R execute(ThrowingSupplier supplier, R defaultValue, Supplier message) { + static R doCaught(ThrowingSupplier supplier, R defaultValue, Supplier message) { try { return supplier.get(); } catch (Throwable t) { @@ -168,7 +168,7 @@ static R execute(ThrowingSupplier supplier, R defaultValue, Supplier Supplier checked(ThrowingSupplier supplier) { + static Supplier toChecked(ThrowingSupplier supplier) { return () -> { try { return supplier.get(); @@ -178,11 +178,11 @@ static Supplier checked(ThrowingSupplier supplier) { }; } - static Supplier caught(ThrowingSupplier supplier) { - return caught(supplier, null, EMPTY_MESSAGE); + static Supplier toCaught(ThrowingSupplier supplier) { + return toCaught(supplier, null, EMPTY_MESSAGE); } - static Supplier caught(ThrowingSupplier supplier, R defaultValue, Supplier message) { + static Supplier toCaught(ThrowingSupplier supplier, R defaultValue, Supplier message) { return () -> { try { return supplier.get(); @@ -209,7 +209,7 @@ default ThrowingRunnable toRunnable() { return this::call; } - static R call(ThrowingCallable callable) { + static R doChecked(ThrowingCallable callable) { try { return callable.call(); } catch (Throwable t) { @@ -217,11 +217,11 @@ static R call(ThrowingCallable callable) { } } - static R execute(ThrowingCallable callable) { - return execute(callable, null, EMPTY_MESSAGE); + static R doCaught(ThrowingCallable callable) { + return doCaught(callable, null, EMPTY_MESSAGE); } - static R execute(ThrowingCallable callable, R defaultValue, Supplier message) { + static R doCaught(ThrowingCallable callable, R defaultValue, Supplier message) { try { return callable.call(); } catch (Throwable t) { @@ -231,7 +231,7 @@ static R execute(ThrowingCallable callable, R defaultValue, Supplier Callable checked(ThrowingCallable callable) { + static Callable toChecked(ThrowingCallable callable) { return () -> { try { return callable.call(); @@ -241,11 +241,11 @@ static Callable checked(ThrowingCallable callable) { }; } - static Callable caught(ThrowingCallable supplier) { - return caught(supplier, null, EMPTY_MESSAGE); + static Callable toCaught(ThrowingCallable supplier) { + return toCaught(supplier, null, EMPTY_MESSAGE); } - static Callable caught(ThrowingCallable supplier, R defaultValue, Supplier message) { + static Callable toCaught(ThrowingCallable supplier, R defaultValue, Supplier message) { return () -> { try { return supplier.call(); @@ -275,7 +275,7 @@ default ThrowingFunction toFunction(R result) { }; } - static void accept(ThrowingConsumer consumer, E arg) { + static void doChecked(ThrowingConsumer consumer, E arg) { try { consumer.accept(arg); } catch (Throwable t) { @@ -283,11 +283,11 @@ static void accept(ThrowingConsumer consumer, E arg) { } } - static void execute(ThrowingConsumer consumer, E arg) { - execute(consumer, arg, EMPTY_MESSAGE); + static void doCaught(ThrowingConsumer consumer, E arg) { + doCaught(consumer, arg, EMPTY_MESSAGE); } - static void execute(ThrowingConsumer consumer, E arg, Supplier message) { + static void doCaught(ThrowingConsumer consumer, E arg, Supplier message) { try { consumer.accept(arg); } catch (Throwable t) { @@ -296,7 +296,7 @@ static void execute(ThrowingConsumer consumer, E arg, Supplier } } - static Consumer checked(ThrowingConsumer consumer) { + static Consumer toChecked(ThrowingConsumer consumer) { return e -> { try { consumer.accept(e); @@ -306,11 +306,11 @@ static Consumer checked(ThrowingConsumer consumer) { }; } - static Consumer caught(ThrowingConsumer consumer) { - return caught(consumer, EMPTY_MESSAGE); + static Consumer toCaught(ThrowingConsumer consumer) { + return toCaught(consumer, EMPTY_MESSAGE); } - static Consumer caught(ThrowingConsumer consumer, Supplier message) { + static Consumer toCaught(ThrowingConsumer consumer, Supplier message) { return arg -> { try { consumer.accept(arg); @@ -337,7 +337,7 @@ default ThrowingConsumer toConsumer() { return this::apply; } - static R apply(ThrowingFunction function, E arg) { + static R doChecked(ThrowingFunction function, E arg) { try { return function.apply(arg); } catch (Throwable t) { @@ -345,11 +345,11 @@ static R apply(ThrowingFunction function, E arg) { } } - static R execute(ThrowingFunction function, E arg) { - return execute(function, arg, null, EMPTY_MESSAGE); + static R doCaught(ThrowingFunction function, E arg) { + return doCaught(function, arg, null, EMPTY_MESSAGE); } - static R execute(ThrowingFunction function, E arg, R defaultValue, Supplier message) { + static R doCaught(ThrowingFunction function, E arg, R defaultValue, Supplier message) { try { return function.apply(arg); } catch (Throwable t) { @@ -359,7 +359,7 @@ static R execute(ThrowingFunction function, E arg, R defaultValu } } - static Function checked(ThrowingFunction function) { + static Function toChecked(ThrowingFunction function) { return e -> { try { return function.apply(e); @@ -369,11 +369,11 @@ static Function checked(ThrowingFunction function) { }; } - static Function caught(ThrowingFunction function) { - return caught(function, null, EMPTY_MESSAGE); + static Function toCaught(ThrowingFunction function) { + return toCaught(function, null, EMPTY_MESSAGE); } - static Function caught(ThrowingFunction function, R defaultValue, Supplier message) { + static Function toCaught(ThrowingFunction function, R defaultValue, Supplier message) { return arg -> { try { return function.apply(arg); diff --git a/disjob-common/src/main/java/cn/ponfee/disjob/common/lock/DoInDatabaseLocked.java b/disjob-common/src/main/java/cn/ponfee/disjob/common/lock/DoInDatabaseLocked.java index d8b5845f7..e1cb9c8cb 100644 --- a/disjob-common/src/main/java/cn/ponfee/disjob/common/lock/DoInDatabaseLocked.java +++ b/disjob-common/src/main/java/cn/ponfee/disjob/common/lock/DoInDatabaseLocked.java @@ -12,6 +12,7 @@ import cn.ponfee.disjob.common.concurrent.Threads; import cn.ponfee.disjob.common.spring.JdbcTemplateWrapper; import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.support.JdbcUtils; import org.springframework.util.Assert; import java.sql.PreparedStatement; @@ -72,8 +73,9 @@ public T action(Callable caller) { preparedStatement.setString(1, lockName); // 关闭一个Statement对象同时也会使得该对象创建的所有ResultSet对象被关闭,即:可以不显示关闭ResultSet // ResultSet所持有的资源不会立刻被释放,直到GC执行,因此明确地关闭ResultSet是一个更好的做法 - ResultSet resultSet = preparedStatement.executeQuery(); - Assert.state(resultSet.next(), () -> "Lock Not found '" + lockName + "'."); + ResultSet rs = preparedStatement.executeQuery(); + Assert.state(rs.next() && rs.getInt(1) == 1, () -> "Lock Not found '" + lockName + "'."); + JdbcUtils.closeResultSet(rs); return caller.call(); }); } diff --git a/disjob-common/src/main/java/cn/ponfee/disjob/common/spring/JdbcTemplateWrapper.java b/disjob-common/src/main/java/cn/ponfee/disjob/common/spring/JdbcTemplateWrapper.java index cb83809c7..83fa441f2 100644 --- a/disjob-common/src/main/java/cn/ponfee/disjob/common/spring/JdbcTemplateWrapper.java +++ b/disjob-common/src/main/java/cn/ponfee/disjob/common/spring/JdbcTemplateWrapper.java @@ -115,7 +115,9 @@ public T executeInTransaction(ThrowingFunction) conn -> { DatabaseMetaData meta = conn.getMetaData(); ResultSet rs = meta.getTables(null, null, tableName, null); - boolean exists = rs.next(); + boolean exists = rs.next() && tableName.equalsIgnoreCase(rs.getString(3)); JdbcUtils.closeResultSet(rs); return exists; }); diff --git a/disjob-common/src/main/java/cn/ponfee/disjob/common/util/WaitForProcess.java b/disjob-common/src/main/java/cn/ponfee/disjob/common/util/WaitForProcess.java index e8842a4d3..a9045cac3 100644 --- a/disjob-common/src/main/java/cn/ponfee/disjob/common/util/WaitForProcess.java +++ b/disjob-common/src/main/java/cn/ponfee/disjob/common/util/WaitForProcess.java @@ -29,9 +29,9 @@ public static boolean process(int round, long[] sleepMillis, boolean caught, Boo long sleepTime = sleepMillis[Math.min(i, lastIndex)]; if (sleepTime > 0) { if (caught) { - ThrowingRunnable.execute(() -> Thread.sleep(sleepTime)); + ThrowingRunnable.doCaught(() -> Thread.sleep(sleepTime)); } else { - ThrowingRunnable.run(() -> Thread.sleep(sleepTime)); + ThrowingRunnable.doChecked(() -> Thread.sleep(sleepTime)); } } if (processor.getAsBoolean()) { diff --git a/disjob-common/src/test/java/cn/ponfee/disjob/common/util/CopyrightTest.java b/disjob-common/src/test/java/cn/ponfee/disjob/common/util/CopyrightTest.java index bb9f8a195..a47b0555a 100644 --- a/disjob-common/src/test/java/cn/ponfee/disjob/common/util/CopyrightTest.java +++ b/disjob-common/src/test/java/cn/ponfee/disjob/common/util/CopyrightTest.java @@ -35,14 +35,14 @@ public class CopyrightTest { private static final String COPYRIGHT_KEYWORD = " Copyright (c) 2017-2023 Ponfee "; private static final String BASE_DIR = MavenProjects.getProjectBaseDir(); - private static final String COPYRIGHT = ThrowingSupplier.get( + private static final String COPYRIGHT = ThrowingSupplier.doChecked( () -> IOUtils.resourceToString("copy-right.txt", UTF_8, CopyrightTest.class.getClassLoader()) ); @Test public void upsertCopyright() { handleFile(file -> { - String text = ThrowingSupplier.get(() -> IOUtils.toString(file.toURI(), UTF_8)); + String text = ThrowingSupplier.doChecked(() -> IOUtils.toString(file.toURI(), UTF_8)); if (!isOwnerCode(text)) { return; } @@ -73,7 +73,7 @@ public void upsertCopyright() { @Test public void checkCopyright() { handleFile(file -> { - String text = ThrowingSupplier.get(() -> IOUtils.toString(file.toURI(), UTF_8)); + String text = ThrowingSupplier.doChecked(() -> IOUtils.toString(file.toURI(), UTF_8)); if (StringUtils.countMatches(text, " @author ") == 0) { System.out.println(file.getName()); } else if (isOwnerCode(text)) { @@ -93,7 +93,7 @@ public void checkCopyright() { @Test public void testNoCopyright() { handleFile(file -> { - String text = ThrowingSupplier.get(() -> IOUtils.toString(file.toURI(), UTF_8)); + String text = ThrowingSupplier.doChecked(() -> IOUtils.toString(file.toURI(), UTF_8)); if (!text.contains(COPYRIGHT_KEYWORD)) { System.out.println(file.getName()); } diff --git a/disjob-core/src/main/java/cn/ponfee/disjob/core/util/JobUtils.java b/disjob-core/src/main/java/cn/ponfee/disjob/core/util/JobUtils.java index a86005b4b..5b214c213 100644 --- a/disjob-core/src/main/java/cn/ponfee/disjob/core/util/JobUtils.java +++ b/disjob-core/src/main/java/cn/ponfee/disjob/core/util/JobUtils.java @@ -19,6 +19,7 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.InputStream; import java.nio.charset.Charset; @@ -30,6 +31,8 @@ */ public class JobUtils { + private static final Logger LOG = LoggerFactory.getLogger(JobUtils.class); + public static ExecuteResult completeProcess(Process process, Charset charset, ExecutingTask executingTask, Logger log) { try (InputStream is = process.getInputStream(); InputStream es = process.getErrorStream()) { // 一次性获取全部执行结果信息:不是在控制台实时展示执行信息,所以此处不用通过异步线程去获取命令的实时执行信息 @@ -58,33 +61,37 @@ public static ExecuteResult completeProcess(Process process, Charset charset, Ex public static String getLocalHost(String specifiedHost) { String host = specifiedHost; - if (StringUtils.isNotEmpty(host)) { - return validateHost(host, "specified"); + if (isValidHost(host, "specified")) { + return host; } host = System.getProperty(JobConstants.DISJOB_BOUND_SERVER_HOST); - if (StringUtils.isNotEmpty(host)) { - return validateHost(host, "jvm"); + if (isValidHost(host, "jvm")) { + return host; } host = System.getenv(JobConstants.DISJOB_BOUND_SERVER_HOST); - if (StringUtils.isNotEmpty(host)) { - return validateHost(host, "os"); + if (isValidHost(host, "os")) { + return host; } host = NetUtils.getLocalHost(); - if (StringUtils.isNotEmpty(host)) { - return validateHost(host, "network"); + if (isValidHost(host, "network")) { + return host; } - throw new IllegalStateException("Not found available server host"); + throw new AssertionError("Not found available server host."); } - private static String validateHost(String host, String from) { + private static boolean isValidHost(String host, String from) { + if (StringUtils.isBlank(host)) { + return false; + } if (NetUtils.isValidLocalHost(host)) { - return host; + return true; } - throw new AssertionError("Invalid bound server host configured " + from + ": " + host); + LOG.warn("Invalid bound server host configured {}: {}", from, host); + return false; } } diff --git a/disjob-id/src/main/java/cn/ponfee/disjob/id/snowflake/db/DbDistributedSnowflake.java b/disjob-id/src/main/java/cn/ponfee/disjob/id/snowflake/db/DbDistributedSnowflake.java index 5e71ece41..63ba71e38 100644 --- a/disjob-id/src/main/java/cn/ponfee/disjob/id/snowflake/db/DbDistributedSnowflake.java +++ b/disjob-id/src/main/java/cn/ponfee/disjob/id/snowflake/db/DbDistributedSnowflake.java @@ -124,7 +124,7 @@ public long generateId() { @Override public void close() { if (heartbeatThread.terminate()) { - ThrowingSupplier.execute(() -> jdbcTemplateWrapper.delete(DEREGISTER_WORKER_SQL, bizTag, serverTag)); + ThrowingSupplier.doCaught(() -> jdbcTemplateWrapper.delete(DEREGISTER_WORKER_SQL, bizTag, serverTag)); } } diff --git a/disjob-id/src/main/java/cn/ponfee/disjob/id/snowflake/zk/ZkDistributedSnowflake.java b/disjob-id/src/main/java/cn/ponfee/disjob/id/snowflake/zk/ZkDistributedSnowflake.java index 11db8b019..22e650c70 100644 --- a/disjob-id/src/main/java/cn/ponfee/disjob/id/snowflake/zk/ZkDistributedSnowflake.java +++ b/disjob-id/src/main/java/cn/ponfee/disjob/id/snowflake/zk/ZkDistributedSnowflake.java @@ -236,7 +236,7 @@ private int registerWorkerId(int workerIdBitLength) throws Exception { .forPath(serverTagParentPath) .stream() .map(e -> serverTagParentPath + SEP + e) - .map(ThrowingFunction.checked(this::getData)) + .map(ThrowingFunction.toChecked(this::getData)) .filter(Objects::nonNull) .map(Bytes::toInt) .collect(Collectors.toSet()); @@ -262,7 +262,7 @@ private int registerWorkerId(int workerIdBitLength) throws Exception { return usableWorkerId; } catch (Throwable t) { if (isCreatedWorkerIdPath) { - ThrowingRunnable.execute(() -> deletePath(workerIdPath)); + ThrowingRunnable.doCaught(() -> deletePath(workerIdPath)); } LOG.warn("Registry snowflake zk worker '{}' failed: {}", workerIdPath, t.getMessage()); Threads.interruptIfNecessary(t); @@ -384,7 +384,7 @@ public void stateChanged(CuratorFramework client, ConnectionState state) { lastSessionId = sessionId; } - ThrowingRunnable.execute(() -> RetryTemplate.execute(zkDistributedSnowflake::onReconnected, 3, 1000)); + ThrowingRunnable.doCaught(() -> RetryTemplate.execute(zkDistributedSnowflake::onReconnected, 3, 1000)); } } } diff --git a/disjob-registry/disjob-registry-consul/src/main/java/cn/ponfee/disjob/registry/consul/ConsulServerRegistry.java b/disjob-registry/disjob-registry-consul/src/main/java/cn/ponfee/disjob/registry/consul/ConsulServerRegistry.java index 5faa7a98e..49a6e1e12 100644 --- a/disjob-registry/disjob-registry-consul/src/main/java/cn/ponfee/disjob/registry/consul/ConsulServerRegistry.java +++ b/disjob-registry/disjob-registry-consul/src/main/java/cn/ponfee/disjob/registry/consul/ConsulServerRegistry.java @@ -125,7 +125,7 @@ public void close() { consulTtlCheckThread.terminate(); registered.forEach(this::deregister); registered.clear(); - ThrowingRunnable.execute(() -> Threads.stopThread(consulSubscriberThread, 100)); + ThrowingRunnable.doCaught(() -> Threads.stopThread(consulSubscriberThread, 100)); super.close(); } diff --git a/disjob-registry/disjob-registry-database/src/main/java/cn/ponfee/disjob/registry/database/DatabaseServerRegistry.java b/disjob-registry/disjob-registry-database/src/main/java/cn/ponfee/disjob/registry/database/DatabaseServerRegistry.java index 797e4631d..227e29da2 100644 --- a/disjob-registry/disjob-registry-database/src/main/java/cn/ponfee/disjob/registry/database/DatabaseServerRegistry.java +++ b/disjob-registry/disjob-registry-database/src/main/java/cn/ponfee/disjob/registry/database/DatabaseServerRegistry.java @@ -175,7 +175,7 @@ public final void register(R server) { public final void deregister(R server) { registered.remove(server); Object[] args = new Object[]{namespace, registerRoleName, server.serialize()}; - ThrowingSupplier.execute(() -> jdbcTemplateWrapper.delete(DEREGISTER_SQL, args)); + ThrowingSupplier.doCaught(() -> jdbcTemplateWrapper.delete(DEREGISTER_SQL, args)); log.info("Server deregister: {} | {}", registryRole.name(), server); } diff --git a/disjob-registry/disjob-registry-database/src/main/java/cn/ponfee/disjob/registry/database/configuration/DatabaseServerRegistryAutoConfiguration.java b/disjob-registry/disjob-registry-database/src/main/java/cn/ponfee/disjob/registry/database/configuration/DatabaseServerRegistryAutoConfiguration.java index e828dcecc..255d0ed6f 100644 --- a/disjob-registry/disjob-registry-database/src/main/java/cn/ponfee/disjob/registry/database/configuration/DatabaseServerRegistryAutoConfiguration.java +++ b/disjob-registry/disjob-registry-database/src/main/java/cn/ponfee/disjob/registry/database/configuration/DatabaseServerRegistryAutoConfiguration.java @@ -105,7 +105,7 @@ private static class DatabaseRegistryDataSourceDestroy implements DisposableBean public void destroy() { DataSource dataSource = wrapper.jdbcTemplate().getDataSource(); if (dataSource != null) { - ThrowingRunnable.execute(() -> Destroyable.destroy(dataSource), () -> "Database registry datasource destroy error."); + ThrowingRunnable.doCaught(() -> Destroyable.destroy(dataSource), () -> "Database registry datasource destroy error."); LOG.info("Database registry datasource destroy finished."); } } diff --git a/disjob-registry/disjob-registry-etcd/src/main/java/cn/ponfee/disjob/registry/etcd/EtcdClient.java b/disjob-registry/disjob-registry-etcd/src/main/java/cn/ponfee/disjob/registry/etcd/EtcdClient.java index b30f256f6..a39a5cfa1 100644 --- a/disjob-registry/disjob-registry-etcd/src/main/java/cn/ponfee/disjob/registry/etcd/EtcdClient.java +++ b/disjob-registry/disjob-registry-etcd/src/main/java/cn/ponfee/disjob/registry/etcd/EtcdClient.java @@ -286,7 +286,7 @@ public void close() { @Override public void accept(WatchResponse response) { - ThrowingRunnable.execute(latch::await); + ThrowingRunnable.doCaught(latch::await); List events = response.getEvents(); if (events.stream().noneMatch(e -> CHANGED_EVENT_TYPES.contains(e.getEventType()))) { diff --git a/disjob-registry/disjob-registry-etcd/src/main/java/cn/ponfee/disjob/registry/etcd/EtcdServerRegistry.java b/disjob-registry/disjob-registry-etcd/src/main/java/cn/ponfee/disjob/registry/etcd/EtcdServerRegistry.java index 429e1c0d1..f92043f38 100644 --- a/disjob-registry/disjob-registry-etcd/src/main/java/cn/ponfee/disjob/registry/etcd/EtcdServerRegistry.java +++ b/disjob-registry/disjob-registry-etcd/src/main/java/cn/ponfee/disjob/registry/etcd/EtcdServerRegistry.java @@ -141,10 +141,10 @@ public void close() { keepAliveCheckThread.terminate(); registered.forEach(this::deregister); registered.clear(); - ThrowingRunnable.execute(keepAlive::close); - ThrowingRunnable.execute(() -> client.revokeLease(leaseId)); - ThrowingRunnable.execute(client::close); - ThrowingRunnable.execute(super::close); + ThrowingRunnable.doCaught(keepAlive::close); + ThrowingRunnable.doCaught(() -> client.revokeLease(leaseId)); + ThrowingRunnable.doCaught(client::close); + ThrowingRunnable.doCaught(super::close); } // ------------------------------------------------------------------private method @@ -180,9 +180,9 @@ private void keepAliveRecover() { synchronized (keepAliveLock) { try { if (this.keepAlive != null) { - ThrowingRunnable.execute(this.keepAlive::close); + ThrowingRunnable.doCaught(this.keepAlive::close); this.keepAlive = null; - ThrowingRunnable.execute(() -> client.revokeLease(leaseId)); + ThrowingRunnable.doCaught(() -> client.revokeLease(leaseId)); } createLeaseIdAndKeepAlive(); } catch (Throwable t) { diff --git a/disjob-registry/disjob-registry-nacos/src/main/java/cn/ponfee/disjob/registry/nacos/NacosServerRegistry.java b/disjob-registry/disjob-registry-nacos/src/main/java/cn/ponfee/disjob/registry/nacos/NacosServerRegistry.java index 0f25fad52..64962b423 100644 --- a/disjob-registry/disjob-registry-nacos/src/main/java/cn/ponfee/disjob/registry/nacos/NacosServerRegistry.java +++ b/disjob-registry/disjob-registry-nacos/src/main/java/cn/ponfee/disjob/registry/nacos/NacosServerRegistry.java @@ -57,7 +57,7 @@ protected NacosServerRegistry(NacosRegistryProperties config) { try { this.namingService = NacosFactory.createNamingService(config.toProperties()); this.eventListener = event -> { - ThrowingRunnable.execute(latch::await); + ThrowingRunnable.doCaught(latch::await); if (event instanceof NamingEvent) { doRefreshDiscoveryServers(((NamingEvent) event).getInstances()); } @@ -117,8 +117,8 @@ public void close() { registered.forEach(this::deregister); registered.clear(); - ThrowingRunnable.execute(() -> namingService.unsubscribe(discoveryRootPath, groupName, eventListener)); - ThrowingRunnable.execute(namingService::shutDown); + ThrowingRunnable.doCaught(() -> namingService.unsubscribe(discoveryRootPath, groupName, eventListener)); + ThrowingRunnable.doCaught(namingService::shutDown); super.close(); } diff --git a/disjob-registry/disjob-registry-redis/src/main/java/cn/ponfee/disjob/registry/redis/RedisServerRegistry.java b/disjob-registry/disjob-registry-redis/src/main/java/cn/ponfee/disjob/registry/redis/RedisServerRegistry.java index 7fab74eb9..3ddc78417 100644 --- a/disjob-registry/disjob-registry-redis/src/main/java/cn/ponfee/disjob/registry/redis/RedisServerRegistry.java +++ b/disjob-registry/disjob-registry-redis/src/main/java/cn/ponfee/disjob/registry/redis/RedisServerRegistry.java @@ -139,7 +139,7 @@ protected RedisServerRegistry(StringRedisTemplate stringRedisTemplate, container.setConnectionFactory(Objects.requireNonNull(stringRedisTemplate.getConnectionFactory())); container.setTaskExecutor(redisSubscribeExecutor); // validate “handleMessage” method is valid - String listenerMethod = ThrowingSupplier.get(() -> RedisServerRegistry.class.getMethod("handleMessage", String.class, String.class).getName()); + String listenerMethod = ThrowingSupplier.doChecked(() -> RedisServerRegistry.class.getMethod("handleMessage", String.class, String.class).getName()); MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(this, listenerMethod); listenerAdapter.afterPropertiesSet(); container.addMessageListener(listenerAdapter, new ChannelTopic(discoveryRootPath + separator + CHANNEL)); @@ -172,15 +172,15 @@ public final void register(R server) { doRegisterServers(Collections.singleton(server)); registered.add(server); - ThrowingRunnable.execute(() -> publish(server, EventType.REGISTER)); + ThrowingRunnable.doCaught(() -> publish(server, EventType.REGISTER)); log.info("Server registered: {} | {}", registryRole.name(), server); } @Override public final void deregister(R server) { registered.remove(server); - ThrowingSupplier.execute(() -> stringRedisTemplate.opsForZSet().remove(registryRootPath, server.serialize())); - ThrowingRunnable.execute(() -> publish(server, EventType.DEREGISTER)); + ThrowingSupplier.doCaught(() -> stringRedisTemplate.opsForZSet().remove(registryRootPath, server.serialize())); + ThrowingRunnable.doCaught(() -> publish(server, EventType.DEREGISTER)); log.info("Server deregister: {} | {}", registryRole.name(), server); } @@ -196,7 +196,7 @@ public void close() { registerHeartbeatThread.terminate(); registered.forEach(this::deregister); - ThrowingRunnable.execute(redisMessageListenerContainer::stop); + ThrowingRunnable.doCaught(redisMessageListenerContainer::stop); discoverHeartbeatThread.terminate(); ThreadPoolExecutors.shutdown(redisSubscribeExecutor, 2); registered.clear(); diff --git a/disjob-registry/disjob-registry-zookeeper/src/main/java/cn/ponfee/disjob/registry/zookeeper/CuratorFrameworkClient.java b/disjob-registry/disjob-registry-zookeeper/src/main/java/cn/ponfee/disjob/registry/zookeeper/CuratorFrameworkClient.java index dd7bfbe7d..708d5c998 100644 --- a/disjob-registry/disjob-registry-zookeeper/src/main/java/cn/ponfee/disjob/registry/zookeeper/CuratorFrameworkClient.java +++ b/disjob-registry/disjob-registry-zookeeper/src/main/java/cn/ponfee/disjob/registry/zookeeper/CuratorFrameworkClient.java @@ -233,7 +233,7 @@ public void unwatch() { @Override public void process(WatchedEvent event) throws Exception { - ThrowingRunnable.execute(latch::await); + ThrowingRunnable.doCaught(latch::await); LOG.info("Watched event type: {}", event.getType()); final Consumer> action = processor; diff --git a/disjob-registry/disjob-registry-zookeeper/src/main/java/cn/ponfee/disjob/registry/zookeeper/ZookeeperServerRegistry.java b/disjob-registry/disjob-registry-zookeeper/src/main/java/cn/ponfee/disjob/registry/zookeeper/ZookeeperServerRegistry.java index a888fa8c3..bf3db7eb5 100644 --- a/disjob-registry/disjob-registry-zookeeper/src/main/java/cn/ponfee/disjob/registry/zookeeper/ZookeeperServerRegistry.java +++ b/disjob-registry/disjob-registry-zookeeper/src/main/java/cn/ponfee/disjob/registry/zookeeper/ZookeeperServerRegistry.java @@ -109,7 +109,7 @@ public void close() { } registered.forEach(this::deregister); - ThrowingRunnable.execute(client::close); + ThrowingRunnable.doCaught(client::close); registered.clear(); super.close(); } diff --git a/disjob-registry/disjob-registry-zookeeper/src/test/java/cn/ponfee/disjob/registry/zookeeper/EmbeddedZookeeperServer.java b/disjob-registry/disjob-registry-zookeeper/src/test/java/cn/ponfee/disjob/registry/zookeeper/EmbeddedZookeeperServer.java index 6e193d287..4eaf498b7 100644 --- a/disjob-registry/disjob-registry-zookeeper/src/test/java/cn/ponfee/disjob/registry/zookeeper/EmbeddedZookeeperServer.java +++ b/disjob-registry/disjob-registry-zookeeper/src/test/java/cn/ponfee/disjob/registry/zookeeper/EmbeddedZookeeperServer.java @@ -26,7 +26,7 @@ public static void main(String[] args) throws Exception { TestingServer testingServer = new TestingServer(2181, createTempDir()); System.out.println("Embedded zookeeper server started!"); - Runtime.getRuntime().addShutdownHook(new Thread(ThrowingRunnable.checked(testingServer::stop))); + Runtime.getRuntime().addShutdownHook(new Thread(ThrowingRunnable.toCaught(testingServer::stop))); } private static File createTempDir() { diff --git a/disjob-samples/disjob-samples-frameless-worker/src/main/java/cn/ponfee/disjob/samples/worker/WorkerFramelessMain.java b/disjob-samples/disjob-samples-frameless-worker/src/main/java/cn/ponfee/disjob/samples/worker/WorkerFramelessMain.java index c4b39a83a..b96e21c12 100644 --- a/disjob-samples/disjob-samples-frameless-worker/src/main/java/cn/ponfee/disjob/samples/worker/WorkerFramelessMain.java +++ b/disjob-samples/disjob-samples-frameless-worker/src/main/java/cn/ponfee/disjob/samples/worker/WorkerFramelessMain.java @@ -132,8 +132,8 @@ public static void main(String[] args) throws Exception { try { Runtime.getRuntime().addShutdownHook(new Thread(() -> { - ThrowingRunnable.execute(workerStartup::close); - ThrowingRunnable.execute(vertxWebServer::close); + ThrowingRunnable.doCaught(workerStartup::close); + ThrowingRunnable.doCaught(vertxWebServer::close); })); vertxWebServer.deploy(); diff --git a/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/SupervisorStartup.java b/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/SupervisorStartup.java index d605ef488..ee7fabc3d 100644 --- a/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/SupervisorStartup.java +++ b/disjob-supervisor/src/main/java/cn/ponfee/disjob/supervisor/SupervisorStartup.java @@ -104,14 +104,14 @@ public void stop() { LOG.warn("Supervisor startup already Stopped."); return; } - ThrowingRunnable.execute(supervisorRegistry::close); - ThrowingRunnable.execute(triggeringJobScanner::toStop); - ThrowingRunnable.execute(runningInstanceScanner::toStop); - ThrowingRunnable.execute(waitingInstanceScanner::toStop); - ThrowingRunnable.execute(taskDispatcher::close); - ThrowingRunnable.execute(triggeringJobScanner::close); - ThrowingRunnable.execute(runningInstanceScanner::close); - ThrowingRunnable.execute(waitingInstanceScanner::close); + ThrowingRunnable.doCaught(supervisorRegistry::close); + ThrowingRunnable.doCaught(triggeringJobScanner::toStop); + ThrowingRunnable.doCaught(runningInstanceScanner::toStop); + ThrowingRunnable.doCaught(waitingInstanceScanner::toStop); + ThrowingRunnable.doCaught(taskDispatcher::close); + ThrowingRunnable.doCaught(triggeringJobScanner::close); + ThrowingRunnable.doCaught(runningInstanceScanner::close); + ThrowingRunnable.doCaught(waitingInstanceScanner::close); } // ----------------------------------------------------------------------------------------builder diff --git a/disjob-supervisor/src/test/java/cn/ponfee/disjob/supervisor/SpringBootTestApplication.java b/disjob-supervisor/src/test/java/cn/ponfee/disjob/supervisor/SpringBootTestApplication.java index 2624ec7ae..1cb34ab00 100644 --- a/disjob-supervisor/src/test/java/cn/ponfee/disjob/supervisor/SpringBootTestApplication.java +++ b/disjob-supervisor/src/test/java/cn/ponfee/disjob/supervisor/SpringBootTestApplication.java @@ -30,7 +30,7 @@ public class SpringBootTestApplication { .redisMasterPort(26379) .redisSlavePort(26380) .start(); - ThrowingRunnable.execute(() -> Thread.sleep(5000)); + ThrowingRunnable.doCaught(() -> Thread.sleep(5000)); } public static void main(String[] args) { diff --git a/disjob-test/src/main/java/cn/ponfee/disjob/test/EmbeddedMysqlAndRedisServer.java b/disjob-test/src/main/java/cn/ponfee/disjob/test/EmbeddedMysqlAndRedisServer.java index 3ef2615a1..8596cc6a0 100644 --- a/disjob-test/src/main/java/cn/ponfee/disjob/test/EmbeddedMysqlAndRedisServer.java +++ b/disjob-test/src/main/java/cn/ponfee/disjob/test/EmbeddedMysqlAndRedisServer.java @@ -35,7 +35,7 @@ public static void main(String[] args) { private EmbeddedMysqlAndRedisServer(int mysqlPort, int redisMasterPort, int redisSlavePort) { System.out.println("/*============================================================*\\"); - this.mariaDBServer = ThrowingSupplier.get(() -> EmbeddedMysqlServerMariaDB.start(mysqlPort)); + this.mariaDBServer = ThrowingSupplier.doChecked(() -> EmbeddedMysqlServerMariaDB.start(mysqlPort)); System.out.println("\\*============================================================*/"); System.out.println("\n\n\n\n\n\n"); @@ -48,13 +48,13 @@ private EmbeddedMysqlAndRedisServer(int mysqlPort, int redisMasterPort, int redi } public synchronized void stop() { - ThrowingRunnable.execute(() -> Thread.sleep(10000)); + ThrowingRunnable.doCaught(() -> Thread.sleep(10000)); if (mariaDBServer != null) { - ThrowingRunnable.execute(mariaDBServer::stop); + ThrowingRunnable.doCaught(mariaDBServer::stop); mariaDBServer = null; } if (redisServer != null) { - ThrowingRunnable.execute(redisServer::stop); + ThrowingRunnable.doCaught(redisServer::stop); redisServer = null; } } diff --git a/disjob-test/src/main/java/cn/ponfee/disjob/test/db/EmbeddedMysqlServerMariaDB.java b/disjob-test/src/main/java/cn/ponfee/disjob/test/db/EmbeddedMysqlServerMariaDB.java index 06ee323bc..417ce11ab 100644 --- a/disjob-test/src/main/java/cn/ponfee/disjob/test/db/EmbeddedMysqlServerMariaDB.java +++ b/disjob-test/src/main/java/cn/ponfee/disjob/test/db/EmbeddedMysqlServerMariaDB.java @@ -54,7 +54,7 @@ public class EmbeddedMysqlServerMariaDB { public static void main(String[] args) throws Exception { DB db = start(3306); - Runtime.getRuntime().addShutdownHook(new Thread(ThrowingRunnable.checked(db::stop))); + Runtime.getRuntime().addShutdownHook(new Thread(ThrowingRunnable.toCaught(db::stop))); } public static DB start(int port) throws Exception { diff --git a/disjob-worker/src/main/java/cn/ponfee/disjob/worker/WorkerStartup.java b/disjob-worker/src/main/java/cn/ponfee/disjob/worker/WorkerStartup.java index 030c60c5c..f4cb6831f 100644 --- a/disjob-worker/src/main/java/cn/ponfee/disjob/worker/WorkerStartup.java +++ b/disjob-worker/src/main/java/cn/ponfee/disjob/worker/WorkerStartup.java @@ -100,10 +100,10 @@ public void stop() { LOG.warn("Worker startup already stopped."); return; } - ThrowingRunnable.execute(workerRegistry::close); - ThrowingRunnable.execute(taskReceiver::close); - ThrowingRunnable.execute(timingWheelRotator::close); - ThrowingRunnable.execute(workerThreadPool::close); + ThrowingRunnable.doCaught(workerRegistry::close); + ThrowingRunnable.doCaught(taskReceiver::close); + ThrowingRunnable.doCaught(timingWheelRotator::close); + ThrowingRunnable.doCaught(workerThreadPool::close); } // ----------------------------------------------------------------------------------------builder diff --git a/disjob-worker/src/main/java/cn/ponfee/disjob/worker/base/TimingWheelRotator.java b/disjob-worker/src/main/java/cn/ponfee/disjob/worker/base/TimingWheelRotator.java index 218641ad1..af3a67842 100644 --- a/disjob-worker/src/main/java/cn/ponfee/disjob/worker/base/TimingWheelRotator.java +++ b/disjob-worker/src/main/java/cn/ponfee/disjob/worker/base/TimingWheelRotator.java @@ -116,7 +116,7 @@ private void process(List tasks) { .map(e -> new TaskWorkerParam(e.getTaskId(), e.getWorker().serialize())) .collect(Collectors.toList()); // 更新task的worker信息 - ThrowingRunnable.execute(() -> supervisorCoreRpcClient.updateTaskWorker(list), () -> "Update task worker error: " + Jsons.toJson(list)); + ThrowingRunnable.doCaught(() -> supervisorCoreRpcClient.updateTaskWorker(list), () -> "Update task worker error: " + Jsons.toJson(list)); // 触发执行 subs.forEach(e -> { diff --git a/disjob-worker/src/main/java/cn/ponfee/disjob/worker/base/WorkerThreadPool.java b/disjob-worker/src/main/java/cn/ponfee/disjob/worker/base/WorkerThreadPool.java index a2dd3da39..901c3d4af 100644 --- a/disjob-worker/src/main/java/cn/ponfee/disjob/worker/base/WorkerThreadPool.java +++ b/disjob-worker/src/main/java/cn/ponfee/disjob/worker/base/WorkerThreadPool.java @@ -67,9 +67,9 @@ public class WorkerThreadPool extends Thread implements Closeable { * This jdk thread pool for asynchronous to stop(pause or cancel) task */ private final ThreadPoolExecutor stopTaskExecutor = ThreadPoolExecutors.builder() - .corePoolSize(2) + .corePoolSize(1) .maximumPoolSize(10) - .workQueue(new LinkedBlockingQueue<>(20)) + .workQueue(new LinkedBlockingQueue<>(5)) .keepAliveTimeSeconds(300) .rejectedHandler(ThreadPoolExecutors.CALLER_RUNS) .threadFactory(NamedThreadFactory.builder().prefix("stop_task_operation").priority(Thread.MAX_PRIORITY).build()) @@ -198,27 +198,27 @@ public void close() { // 1、prepare close // 1.1、change executing pool thread state - ThrowingRunnable.execute(activePool::stopPool); + ThrowingRunnable.doCaught(activePool::stopPool); // 1.2、change idle pool thread state - idlePool.forEach(e -> ThrowingRunnable.execute(e::toStop)); + idlePool.forEach(e -> ThrowingRunnable.doCaught(e::toStop)); // 2、do close // 2.1、stop this boss thread - ThrowingRunnable.execute(() -> Threads.stopThread(this, 200)); + ThrowingRunnable.doCaught(() -> Threads.stopThread(this, 200)); // 2.2、stop idle pool thread - idlePool.forEach(e -> ThrowingRunnable.execute(() -> stopWorkerThread(e, true))); - ThrowingRunnable.execute(idlePool::clear); + idlePool.forEach(e -> ThrowingRunnable.doCaught(() -> stopWorkerThread(e, true))); + ThrowingRunnable.doCaught(idlePool::clear); // 2.3、stop executing pool thread - ThrowingRunnable.execute(activePool::closePool); + ThrowingRunnable.doCaught(activePool::closePool); // 2.4、shutdown jdk thread pool ThreadPoolExecutors.shutdown(stopTaskExecutor, 1); // 2.5、clear task execution param queue - ThrowingRunnable.execute(taskQueue::clear); + ThrowingRunnable.doCaught(taskQueue::clear); workerThreadCounter.set(0); LOG.info("Close worker thread pool end."); @@ -806,7 +806,7 @@ private void runTask(ExecuteTaskParam param) { if (param.getRouteStrategy() != RouteStrategy.BROADCAST) { // reset task worker final List list = Collections.singletonList(new TaskWorkerParam(param.getTaskId(), "")); - ThrowingRunnable.execute(() -> client.updateTaskWorker(list), () -> "Reset task worker occur error: " + param); + ThrowingRunnable.doCaught(() -> client.updateTaskWorker(list), () -> "Reset task worker occur error: " + param); } Threads.interruptIfNecessary(t); // discard task