Skip to content

Commit

Permalink
rename Throwables method name
Browse files Browse the repository at this point in the history
  • Loading branch information
ponfee committed Nov 16, 2023
1 parent dc5b515 commit 972960f
Show file tree
Hide file tree
Showing 32 changed files with 156 additions and 139 deletions.
18 changes: 9 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,22 +57,22 @@ disjob # 主项目①
- 任务在执行时若抛出[PauseTaskException](disjob-core/src/main/java/cn/ponfee/disjob/core/exception/PauseTaskException.java),会暂停对应实例下的全部任务(包括分布在不同worker机器中的任务)
- 支持广播任务,广播任务会派发给job-group下的所有worker执行
- 支持Job间的依赖,多个Job配置好依赖关系后便会按既定的依赖顺序依次执行
- 支持DAG工作流,可把jobHandler配置为复杂的DAG表达式,如:A->B,C,(D->E)->D,F->G
- 支持DAG工作流,可把`jobHandler`配置为复杂的DAG表达式,如:A->B,C,(D->E)->D,F->G
- 提供Web管理后台,通过界面进行作业配置,任务监控等

## Comparison

| | **Quartz** | **Elastic-Job** | **Xxl-Job** | **Disjob** |
| ---------------- | ---------- |-----------------|----------------------|----------------------------------------|
| **触发类型** | Cron | Cron | Cron、固定频率、父子依赖 | Cron、指定时间、固定频率、固定延时、父子依赖 |
| **任务编排** | 不支持 | 不支持 | 不支持 | DAG表达式 |
| **任务分片** | 不支持 | 静态分片 | 广播任务 | 广播任务、动态分片 |
| **停止与恢复** | 不支持 | 不支持 | 终止运行中的任务 | 暂停执行中的任务、恢复已暂停的任务 |
| **保存执行快照** | 不支持 | 不支持 | 不支持 | 支持 |
| **失败重试** | 不支持 | 失效转移 | 支持 | 支持 |
| **后台管理** | 不支持 | 支持 | 支持 | 支持 |
| **监控告警** | 不支持 | 邮件 | 邮件 | 暂不支持 |
| **查看执行日志** | 不支持 | 支持 | 支持 | 暂不支持 |
| **任务编排** | | | | DAG表达式 |
| **任务分片** | | 静态分片 | 广播任务 | 广播任务、动态分片 |
| **停止与恢复** | | | 终止运行中的任务 | 暂停执行中的任务、恢复执行已暂停的任务 |
| **保存执行快照** | | | | |
| **失败重试** | | 失效转移 | | |
| **后台管理** | | | | |
| **监控告警** | | 邮件 | 邮件 | 暂无 |
| **查看执行日志** | | | | 暂无 |

## [Download From Maven Central](https://central.sonatype.com/namespace/cn.ponfee)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ public class CopyrightVerifyTest {
private static final String COPYRIGHT_KEYWORD = " Copyright (c) 2017-2023 Ponfee ";

private static final String BASE_DIR = MavenProjects.getProjectBaseDir();
private static final String COPYRIGHT = ThrowingSupplier.get(() -> IOUtils.resourceToString("copy-right.txt", UTF_8, CopyrightVerifyTest.class.getClassLoader()));
private static final String COPYRIGHT = ThrowingSupplier.doChecked(() -> IOUtils.resourceToString("copy-right.txt", UTF_8, CopyrightVerifyTest.class.getClassLoader()));

@Test
public void upsertCopyright() {
handleFile(file -> {
String text = ThrowingSupplier.get(() -> IOUtils.toString(file.toURI(), UTF_8));
String text = ThrowingSupplier.doChecked(() -> IOUtils.toString(file.toURI(), UTF_8));
if (!isOwnerCode(text)) {
return;
}
Expand Down Expand Up @@ -72,7 +72,7 @@ public void upsertCopyright() {
@Test
public void checkCopyright() {
handleFile(file -> {
String text = ThrowingSupplier.get(() -> IOUtils.toString(file.toURI(), UTF_8));
String text = ThrowingSupplier.doChecked(() -> IOUtils.toString(file.toURI(), UTF_8));
if (StringUtils.countMatches(text, " @author ") == 0) {
System.out.println(file.getName());
} else if (isOwnerCode(text)) {
Expand All @@ -92,7 +92,7 @@ public void checkCopyright() {
@Test
public void testNoCopyright() {
handleFile(file -> {
String text = ThrowingSupplier.get(() -> IOUtils.toString(file.toURI(), UTF_8));
String text = ThrowingSupplier.doChecked(() -> IOUtils.toString(file.toURI(), UTF_8));
if (!text.contains(COPYRIGHT_KEYWORD)) {
System.out.println(file.getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@
package cn.ponfee.disjob.common.base;

import cn.ponfee.disjob.common.util.CRC16;
import com.google.common.hash.Hashing;
import org.apache.commons.codec.digest.DigestUtils;

import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.function.Function;

import static java.nio.charset.StandardCharsets.UTF_8;

/**
* Consistent hashing algorithm.
*
Expand Down Expand Up @@ -66,7 +68,11 @@ public interface HashFunction {
return h;
};

HashFunction CRC_16 = key -> CRC16.digest(key.getBytes(StandardCharsets.UTF_8));
HashFunction SIP_HASH = key -> Hashing.sipHash24().hashBytes(key.getBytes(UTF_8)).asInt();

HashFunction MURMUR3_32 = key -> Hashing.murmur3_32_fixed().hashBytes(key.getBytes(UTF_8)).asInt();

HashFunction CRC_16 = key -> CRC16.digest(key.getBytes(UTF_8));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ public void run() {
if (delayed != null) {
E data = delayed.getData();
if (asyncExecutor != null) {
asyncExecutor.submit(ThrowingRunnable.caught(() -> processor.accept(data)));
asyncExecutor.submit(ThrowingRunnable.toCaught(() -> processor.accept(data)));
} else {
ThrowingRunnable.execute(() -> processor.accept(data));
ThrowingRunnable.doCaught(() -> processor.accept(data));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public static LoopThread createStarted(String name, boolean daemon, int priority
public void run() {
LOG.info("Loop process thread begin.");
if (delayMs > 0) {
ThrowingRunnable.run(() -> Thread.sleep(delayMs));
ThrowingRunnable.doChecked(() -> Thread.sleep(delayMs));
}
while (state.get() == RUNNING) {
try {
Expand Down Expand Up @@ -90,7 +90,7 @@ public synchronized void start() {

public boolean terminate() {
if (state.compareAndSet(RUNNING, TERMINATED)) {
ThrowingRunnable.execute(super::interrupt);
ThrowingRunnable.doCaught(super::interrupt);
return true;
} else {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public static boolean shutdown(ExecutorService executorService) {
return true;
} catch (Throwable t) {
LOG.error("Shutdown ExecutorService occur error.", t);
ThrowingRunnable.execute(executorService::shutdownNow);
ThrowingRunnable.doCaught(executorService::shutdownNow);
Threads.interruptIfNecessary(t);
return false;
}
Expand All @@ -232,7 +232,7 @@ public static boolean shutdown(ExecutorService executorService, int awaitSeconds
} catch (Throwable t) {
LOG.error("Shutdown ExecutorService occur error.", t);
if (!hasCallShutdownNow) {
ThrowingRunnable.execute(executorService::shutdownNow);
ThrowingRunnable.doCaught(executorService::shutdownNow);
}
Threads.interruptIfNecessary(t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,19 +85,19 @@ default <R> ThrowingCallable<R, Throwable> toCallable(R result) {
};
}

static void run(ThrowingRunnable<?> runnable) {
static void doChecked(ThrowingRunnable<?> runnable) {
try {
runnable.run();
} catch (Throwable t) {
ExceptionUtils.rethrow(t);
}
}

static void execute(ThrowingRunnable<?> runnable) {
execute(runnable, EMPTY_MESSAGE);
static void doCaught(ThrowingRunnable<?> runnable) {
doCaught(runnable, EMPTY_MESSAGE);
}

static void execute(ThrowingRunnable<?> runnable, Supplier<String> message) {
static void doCaught(ThrowingRunnable<?> runnable, Supplier<String> message) {
try {
runnable.run();
} catch (Throwable t) {
Expand All @@ -106,7 +106,7 @@ static void execute(ThrowingRunnable<?> runnable, Supplier<String> message) {
}
}

static Runnable checked(ThrowingRunnable<?> runnable) {
static Runnable toChecked(ThrowingRunnable<?> runnable) {
return () -> {
try {
runnable.run();
Expand All @@ -116,11 +116,11 @@ static Runnable checked(ThrowingRunnable<?> runnable) {
};
}

static Runnable caught(ThrowingRunnable<?> runnable) {
return caught(runnable, EMPTY_MESSAGE);
static Runnable toCaught(ThrowingRunnable<?> runnable) {
return toCaught(runnable, EMPTY_MESSAGE);
}

static Runnable caught(ThrowingRunnable<?> runnable, Supplier<String> message) {
static Runnable toCaught(ThrowingRunnable<?> runnable, Supplier<String> message) {
return () -> {
try {
runnable.run();
Expand All @@ -146,19 +146,19 @@ default ThrowingRunnable<Throwable> toRunnable() {
return this::get;
}

static <R> R get(ThrowingSupplier<R, ?> supplier) {
static <R> R doChecked(ThrowingSupplier<R, ?> supplier) {
try {
return supplier.get();
} catch (Throwable t) {
return ExceptionUtils.rethrow(t);
}
}

static <R> R execute(ThrowingSupplier<R, ?> supplier) {
return execute(supplier, null, EMPTY_MESSAGE);
static <R> R doCaught(ThrowingSupplier<R, ?> supplier) {
return doCaught(supplier, null, EMPTY_MESSAGE);
}

static <R> R execute(ThrowingSupplier<R, ?> supplier, R defaultValue, Supplier<String> message) {
static <R> R doCaught(ThrowingSupplier<R, ?> supplier, R defaultValue, Supplier<String> message) {
try {
return supplier.get();
} catch (Throwable t) {
Expand All @@ -168,7 +168,7 @@ static <R> R execute(ThrowingSupplier<R, ?> supplier, R defaultValue, Supplier<S
}
}

static <R> Supplier<R> checked(ThrowingSupplier<R, ?> supplier) {
static <R> Supplier<R> toChecked(ThrowingSupplier<R, ?> supplier) {
return () -> {
try {
return supplier.get();
Expand All @@ -178,11 +178,11 @@ static <R> Supplier<R> checked(ThrowingSupplier<R, ?> supplier) {
};
}

static <R> Supplier<R> caught(ThrowingSupplier<R, ?> supplier) {
return caught(supplier, null, EMPTY_MESSAGE);
static <R> Supplier<R> toCaught(ThrowingSupplier<R, ?> supplier) {
return toCaught(supplier, null, EMPTY_MESSAGE);
}

static <R> Supplier<R> caught(ThrowingSupplier<R, ?> supplier, R defaultValue, Supplier<String> message) {
static <R> Supplier<R> toCaught(ThrowingSupplier<R, ?> supplier, R defaultValue, Supplier<String> message) {
return () -> {
try {
return supplier.get();
Expand All @@ -209,19 +209,19 @@ default ThrowingRunnable<Throwable> toRunnable() {
return this::call;
}

static <R> R call(ThrowingCallable<R, ?> callable) {
static <R> R doChecked(ThrowingCallable<R, ?> callable) {
try {
return callable.call();
} catch (Throwable t) {
return ExceptionUtils.rethrow(t);
}
}

static <R> R execute(ThrowingCallable<R, ?> callable) {
return execute(callable, null, EMPTY_MESSAGE);
static <R> R doCaught(ThrowingCallable<R, ?> callable) {
return doCaught(callable, null, EMPTY_MESSAGE);
}

static <R> R execute(ThrowingCallable<R, ?> callable, R defaultValue, Supplier<String> message) {
static <R> R doCaught(ThrowingCallable<R, ?> callable, R defaultValue, Supplier<String> message) {
try {
return callable.call();
} catch (Throwable t) {
Expand All @@ -231,7 +231,7 @@ static <R> R execute(ThrowingCallable<R, ?> callable, R defaultValue, Supplier<S
}
}

static <R> Callable<R> checked(ThrowingCallable<R, ?> callable) {
static <R> Callable<R> toChecked(ThrowingCallable<R, ?> callable) {
return () -> {
try {
return callable.call();
Expand All @@ -241,11 +241,11 @@ static <R> Callable<R> checked(ThrowingCallable<R, ?> callable) {
};
}

static <R> Callable<R> caught(ThrowingCallable<R, ?> supplier) {
return caught(supplier, null, EMPTY_MESSAGE);
static <R> Callable<R> toCaught(ThrowingCallable<R, ?> supplier) {
return toCaught(supplier, null, EMPTY_MESSAGE);
}

static <R> Callable<R> caught(ThrowingCallable<R, ?> supplier, R defaultValue, Supplier<String> message) {
static <R> Callable<R> toCaught(ThrowingCallable<R, ?> supplier, R defaultValue, Supplier<String> message) {
return () -> {
try {
return supplier.call();
Expand Down Expand Up @@ -275,19 +275,19 @@ default <R> ThrowingFunction<E, R, Throwable> toFunction(R result) {
};
}

static <E> void accept(ThrowingConsumer<E, ?> consumer, E arg) {
static <E> void doChecked(ThrowingConsumer<E, ?> consumer, E arg) {
try {
consumer.accept(arg);
} catch (Throwable t) {
ExceptionUtils.rethrow(t);
}
}

static <E> void execute(ThrowingConsumer<E, ?> consumer, E arg) {
execute(consumer, arg, EMPTY_MESSAGE);
static <E> void doCaught(ThrowingConsumer<E, ?> consumer, E arg) {
doCaught(consumer, arg, EMPTY_MESSAGE);
}

static <E> void execute(ThrowingConsumer<E, ?> consumer, E arg, Supplier<String> message) {
static <E> void doCaught(ThrowingConsumer<E, ?> consumer, E arg, Supplier<String> message) {
try {
consumer.accept(arg);
} catch (Throwable t) {
Expand All @@ -296,7 +296,7 @@ static <E> void execute(ThrowingConsumer<E, ?> consumer, E arg, Supplier<String>
}
}

static <E> Consumer<E> checked(ThrowingConsumer<E, ?> consumer) {
static <E> Consumer<E> toChecked(ThrowingConsumer<E, ?> consumer) {
return e -> {
try {
consumer.accept(e);
Expand All @@ -306,11 +306,11 @@ static <E> Consumer<E> checked(ThrowingConsumer<E, ?> consumer) {
};
}

static <E> Consumer<E> caught(ThrowingConsumer<E, ?> consumer) {
return caught(consumer, EMPTY_MESSAGE);
static <E> Consumer<E> toCaught(ThrowingConsumer<E, ?> consumer) {
return toCaught(consumer, EMPTY_MESSAGE);
}

static <E> Consumer<E> caught(ThrowingConsumer<E, ?> consumer, Supplier<String> message) {
static <E> Consumer<E> toCaught(ThrowingConsumer<E, ?> consumer, Supplier<String> message) {
return arg -> {
try {
consumer.accept(arg);
Expand All @@ -337,19 +337,19 @@ default ThrowingConsumer<E, Throwable> toConsumer() {
return this::apply;
}

static <E, R> R apply(ThrowingFunction<E, R, ?> function, E arg) {
static <E, R> R doChecked(ThrowingFunction<E, R, ?> function, E arg) {
try {
return function.apply(arg);
} catch (Throwable t) {
return ExceptionUtils.rethrow(t);
}
}

static <E, R> R execute(ThrowingFunction<E, R, ?> function, E arg) {
return execute(function, arg, null, EMPTY_MESSAGE);
static <E, R> R doCaught(ThrowingFunction<E, R, ?> function, E arg) {
return doCaught(function, arg, null, EMPTY_MESSAGE);
}

static <E, R> R execute(ThrowingFunction<E, R, ?> function, E arg, R defaultValue, Supplier<String> message) {
static <E, R> R doCaught(ThrowingFunction<E, R, ?> function, E arg, R defaultValue, Supplier<String> message) {
try {
return function.apply(arg);
} catch (Throwable t) {
Expand All @@ -359,7 +359,7 @@ static <E, R> R execute(ThrowingFunction<E, R, ?> function, E arg, R defaultValu
}
}

static <E, R> Function<E, R> checked(ThrowingFunction<E, R, ?> function) {
static <E, R> Function<E, R> toChecked(ThrowingFunction<E, R, ?> function) {
return e -> {
try {
return function.apply(e);
Expand All @@ -369,11 +369,11 @@ static <E, R> Function<E, R> checked(ThrowingFunction<E, R, ?> function) {
};
}

static <E, R> Function<E, R> caught(ThrowingFunction<E, R, ?> function) {
return caught(function, null, EMPTY_MESSAGE);
static <E, R> Function<E, R> toCaught(ThrowingFunction<E, R, ?> function) {
return toCaught(function, null, EMPTY_MESSAGE);
}

static <E, R> Function<E, R> caught(ThrowingFunction<E, R, ?> function, R defaultValue, Supplier<String> message) {
static <E, R> Function<E, R> toCaught(ThrowingFunction<E, R, ?> function, R defaultValue, Supplier<String> message) {
return arg -> {
try {
return function.apply(arg);
Expand Down
Loading

0 comments on commit 972960f

Please sign in to comment.