Skip to content

Commit

Permalink
优化动态线程池创建以及监控代码.
Browse files Browse the repository at this point in the history
  • Loading branch information
magestacks committed Dec 6, 2021
1 parent 5b5eaba commit 5f75eb5
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import cn.hippo4j.starter.alarm.ThreadPoolAlarm;
import cn.hippo4j.starter.alarm.ThreadPoolAlarmManage;
import cn.hippo4j.starter.event.EventExecutor;
import cn.hippo4j.starter.event.MonitorEventExecutor;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import org.springframework.core.task.TaskDecorator;
Expand Down Expand Up @@ -312,7 +312,7 @@ private void interruptIdleWorkers() {

final void reject(Runnable command) {
rejectCount.incrementAndGet();
EventExecutor.publishEvent(
MonitorEventExecutor.publishEvent(
() -> ThreadPoolAlarmManage.checkPoolRejectAlarm(this)
);
handler.rejectedExecution(command, this);
Expand Down Expand Up @@ -367,7 +367,7 @@ private boolean addWorker(Runnable firstTask, boolean core) {
}
}

EventExecutor.publishEvent(
MonitorEventExecutor.publishEvent(
() -> ThreadPoolAlarmManage.checkPoolLivenessAlarm(core, this)
);

Expand Down Expand Up @@ -548,7 +548,7 @@ public void execute(@NonNull Runnable command) {
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
EventExecutor.publishEvent(
MonitorEventExecutor.publishEvent(
() -> ThreadPoolAlarmManage.checkPoolCapacityAlarm(this)
);
int recheck = ctl.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,14 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
private final ThreadPoolOperation threadPoolOperation;

private final ExecutorService executorService = ThreadPoolBuilder.builder()
.poolThreadSize(2, 4)
.keepAliveTime(0L, TimeUnit.MILLISECONDS)
.workQueue(QueueTypeEnum.ARRAY_BLOCKING_QUEUE, 1)
.threadFactory("dynamic-threadPool-config")
.corePoolSize(2)
.maxPoolNum(4)
.keepAliveTime(2000)
.timeUnit(TimeUnit.MILLISECONDS)
.workQueue(QueueTypeEnum.ARRAY_BLOCKING_QUEUE)
.capacity(1)
.allowCoreThreadTimeOut(true)
.threadFactory("dynamic-threadPool-init-config")
.rejected(new ThreadPoolExecutor.DiscardOldestPolicy())
.build();

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package cn.hippo4j.starter.event;

import cn.hippo4j.common.function.NoArgsConsumer;
import cn.hippo4j.starter.toolkit.thread.QueueTypeEnum;
import cn.hippo4j.starter.toolkit.thread.ThreadPoolBuilder;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;

import static cn.hippo4j.common.constant.Constants.AVAILABLE_PROCESSORS;

/**
* 动态线程池监控事件执行器.
*
* @author chen.ma
* @date 2021/11/8 23:44
*/
@Slf4j
public class MonitorEventExecutor {

private static final ExecutorService EVENT_EXECUTOR = ThreadPoolBuilder.builder()
.threadFactory("monitor-event-executor")
.corePoolSize(AVAILABLE_PROCESSORS)
.maxPoolNum(AVAILABLE_PROCESSORS)
.workQueue(QueueTypeEnum.LINKED_BLOCKING_QUEUE)
.capacity(4096)
.rejected(new ThreadPoolExecutor.AbortPolicy())
.build();

/**
* 发布事件.
*
* @param consumer
*/
public static void publishEvent(NoArgsConsumer consumer) {
try {
EVENT_EXECUTOR.execute(consumer::accept);
} catch (RejectedExecutionException ex) {
log.error("Monitoring thread pool run events exceeded load.");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,20 @@ public static ThreadPoolExecutor buildPool() {
*/
public static ThreadPoolExecutor buildPool(ThreadPoolInitParam initParam) {
Assert.notNull(initParam);
ThreadPoolExecutor executorService =
new ThreadPoolExecutorTemplate(initParam.getCorePoolNum(),
initParam.getMaxPoolNum(),
initParam.getKeepAliveTime(),
initParam.getTimeUnit(),
initParam.getWorkQueue(),
initParam.getThreadFactory(),
initParam.rejectedExecutionHandler);
ThreadPoolExecutor executorService;
try {
executorService = new ThreadPoolExecutorTemplate(initParam.getCorePoolNum(),
initParam.getMaxPoolNum(),
initParam.getKeepAliveTime(),
initParam.getTimeUnit(),
initParam.getWorkQueue(),
initParam.getThreadFactory(),
initParam.rejectedExecutionHandler);
} catch (IllegalArgumentException ex) {
throw new IllegalArgumentException("Error creating thread pool parameter.", ex);
}

executorService.allowCoreThreadTimeOut(initParam.allowCoreThreadTimeOut);
return executorService;
}

Expand All @@ -77,15 +83,21 @@ public static ThreadPoolExecutor buildFastPool() {
*/
public static ThreadPoolExecutor buildFastPool(ThreadPoolInitParam initParam) {
TaskQueue<Runnable> taskQueue = new TaskQueue(initParam.getCapacity());
FastThreadPoolExecutor fastThreadPoolExecutor =
new FastThreadPoolExecutor(initParam.getCorePoolNum(),
initParam.getMaxPoolNum(),
initParam.getKeepAliveTime(),
initParam.getTimeUnit(),
taskQueue,
initParam.getThreadFactory(),
initParam.rejectedExecutionHandler);
FastThreadPoolExecutor fastThreadPoolExecutor;
try {
fastThreadPoolExecutor = new FastThreadPoolExecutor(initParam.getCorePoolNum(),
initParam.getMaxPoolNum(),
initParam.getKeepAliveTime(),
initParam.getTimeUnit(),
taskQueue,
initParam.getThreadFactory(),
initParam.rejectedExecutionHandler);
} catch (IllegalArgumentException ex) {
throw new IllegalArgumentException("Error creating thread pool parameter.", ex);
}

taskQueue.setExecutor(fastThreadPoolExecutor);
fastThreadPoolExecutor.allowCoreThreadTimeOut(initParam.allowCoreThreadTimeOut);
return fastThreadPoolExecutor;
}

Expand All @@ -97,21 +109,28 @@ public static ThreadPoolExecutor buildFastPool(ThreadPoolInitParam initParam) {
*/
public static DynamicThreadPoolExecutor buildDynamicPool(ThreadPoolInitParam initParam) {
Assert.notNull(initParam);
DynamicThreadPoolExecutor executorService =
new DynamicThreadPoolExecutor(initParam.getCorePoolNum(),
initParam.getMaxPoolNum(),
initParam.getKeepAliveTime(),
initParam.getTimeUnit(),
initParam.getWaitForTasksToCompleteOnShutdown(),
initParam.getAwaitTerminationMillis(),
initParam.getWorkQueue(),
initParam.getThreadPoolId(),
initParam.getThreadFactory(),
initParam.getThreadPoolAlarm(),
initParam.getRejectedExecutionHandler());

executorService.setTaskDecorator(initParam.getTaskDecorator());
return executorService;
DynamicThreadPoolExecutor dynamicThreadPoolExecutor;
try {
dynamicThreadPoolExecutor = new DynamicThreadPoolExecutor(
initParam.getCorePoolNum(),
initParam.getMaxPoolNum(),
initParam.getKeepAliveTime(),
initParam.getTimeUnit(),
initParam.getWaitForTasksToCompleteOnShutdown(),
initParam.getAwaitTerminationMillis(),
initParam.getWorkQueue(),
initParam.getThreadPoolId(),
initParam.getThreadFactory(),
initParam.getThreadPoolAlarm(),
initParam.getRejectedExecutionHandler()
);
} catch (IllegalArgumentException ex) {
throw new IllegalArgumentException(String.format("Error creating thread pool parameter. threadPool id :: %s", initParam.getThreadPoolId()), ex);
}

dynamicThreadPoolExecutor.setTaskDecorator(initParam.getTaskDecorator());
dynamicThreadPoolExecutor.allowCoreThreadTimeOut(initParam.allowCoreThreadTimeOut);
return dynamicThreadPoolExecutor;
}

@Data
Expand Down Expand Up @@ -183,6 +202,11 @@ public static class ThreadPoolInitParam {
*/
private Boolean waitForTasksToCompleteOnShutdown;

/**
* 允许核心线程超时
*/
private Boolean allowCoreThreadTimeOut = false;

public ThreadPoolInitParam(String threadNamePrefix, boolean isDaemon) {
this.threadPoolId = threadNamePrefix;
this.threadFactory = ThreadFactoryBuilder.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
*/
private Boolean waitForTasksToCompleteOnShutdown = true;

/**
* 允许核心线程超时
*/
private Boolean allowCoreThreadTimeOut = false;

/**
* 计算公式:CPU 核数 / (1 - 阻塞系数 0.8)
*
Expand Down Expand Up @@ -234,6 +239,11 @@ public ThreadPoolBuilder dynamicSupport(boolean waitForTasksToCompleteOnShutdown
return this;
}

public ThreadPoolBuilder allowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
return this;
}

/**
* 构建
*
Expand Down Expand Up @@ -303,6 +313,7 @@ private static AbstractBuildThreadPoolTemplate.ThreadPoolInitParam buildInitPara
.setCapacity(builder.capacity)
.setRejectedExecutionHandler(builder.rejectedExecutionHandler)
.setTimeUnit(builder.timeUnit)
.setAllowCoreThreadTimeOut(builder.allowCoreThreadTimeOut)
.setTaskDecorator(builder.taskDecorator);

if (builder.isDynamicPool) {
Expand Down

0 comments on commit 5f75eb5

Please sign in to comment.