Skip to content

Commit

Permalink
模版方法抽象获取线程池运行状态.
Browse files Browse the repository at this point in the history
  • Loading branch information
magestacks committed Dec 7, 2021
1 parent 8d38bd8 commit f0f4409
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Random;
import java.util.concurrent.ThreadPoolExecutor;
Expand All @@ -21,7 +22,7 @@ public class RunStateHandlerTest {
@Resource
private ThreadPoolExecutor dynamicThreadPoolExecutor;

// @PostConstruct
@PostConstruct
@SuppressWarnings("all")
public void runStateHandlerTest() {
log.info("Test thread pool runtime state interface, The rejection policy will be triggered after 30s...");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package cn.hippo4j.starter.handler;

import cn.hippo4j.common.model.PoolRunStateInfo;
import cn.hippo4j.starter.core.DynamicThreadPoolExecutor;
import cn.hippo4j.starter.core.GlobalThreadPoolManage;
import cn.hippo4j.starter.toolkit.CalculateUtil;
import cn.hippo4j.starter.wrapper.DynamicThreadPoolWrapper;
import cn.hutool.core.date.DateUtil;

import java.util.Date;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;

/**
* Abstract threadPool runtime info.
*
* @author chen.ma
* @date 2021/12/7 19:44
*/
public abstract class AbstractThreadPoolRuntime {

/**
* Supplement.
*
* @param basePoolRunStateInfo
* @return
*/
protected abstract PoolRunStateInfo supplement(PoolRunStateInfo basePoolRunStateInfo);

/**
* Get pool run state.
*
* @param threadPoolId
* @return
*/
public PoolRunStateInfo getPoolRunState(String threadPoolId) {
DynamicThreadPoolWrapper executorService = GlobalThreadPoolManage.getExecutorService(threadPoolId);
ThreadPoolExecutor pool = executorService.getExecutor();

// 核心线程数
int corePoolSize = pool.getCorePoolSize();
// 最大线程数
int maximumPoolSize = pool.getMaximumPoolSize();
// 线程池当前线程数 (有锁)
int poolSize = pool.getPoolSize();
// 活跃线程数 (有锁)
int activeCount = pool.getActiveCount();
// 同时进入池中的最大线程数 (有锁)
int largestPoolSize = pool.getLargestPoolSize();
// 线程池中执行任务总数量 (有锁)
long completedTaskCount = pool.getCompletedTaskCount();
// 当前负载
String currentLoad = CalculateUtil.divide(activeCount, maximumPoolSize) + "%";
// 峰值负载
String peakLoad = CalculateUtil.divide(largestPoolSize, maximumPoolSize) + "%";

BlockingQueue<Runnable> queue = pool.getQueue();
// 队列元素个数
int queueSize = queue.size();
// 队列类型
String queueType = queue.getClass().getSimpleName();
// 队列剩余容量
int remainingCapacity = queue.remainingCapacity();
// 队列容量
int queueCapacity = queueSize + remainingCapacity;

PoolRunStateInfo stateInfo = new PoolRunStateInfo();
stateInfo.setCoreSize(corePoolSize);
stateInfo.setTpId(threadPoolId);
stateInfo.setPoolSize(poolSize);
stateInfo.setMaximumSize(maximumPoolSize);
stateInfo.setActiveSize(activeCount);
stateInfo.setCurrentLoad(currentLoad);
stateInfo.setQueueType(queueType);
stateInfo.setPeakLoad(peakLoad);
stateInfo.setQueueSize(queueSize);
stateInfo.setQueueCapacity(queueCapacity);
stateInfo.setQueueRemainingCapacity(remainingCapacity);
stateInfo.setLargestPoolSize(largestPoolSize);
stateInfo.setCompletedTaskCount(completedTaskCount);

int rejectCount = pool instanceof DynamicThreadPoolExecutor
? ((DynamicThreadPoolExecutor) pool).getRejectCount()
: -1;
stateInfo.setRejectCount(rejectCount);
stateInfo.setClientLastRefreshTime(DateUtil.formatDateTime(new Date()));

return supplement(stateInfo);
}

}
Original file line number Diff line number Diff line change
@@ -1,21 +1,13 @@
package cn.hippo4j.starter.handler;

import cn.hippo4j.common.model.PoolRunStateInfo;
import cn.hippo4j.starter.core.DynamicThreadPoolExecutor;
import cn.hippo4j.starter.core.GlobalThreadPoolManage;
import cn.hippo4j.starter.toolkit.ByteConvertUtil;
import cn.hippo4j.starter.toolkit.CalculateUtil;
import cn.hippo4j.starter.wrapper.DynamicThreadPoolWrapper;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.system.RuntimeInfo;
import lombok.extern.slf4j.Slf4j;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Date;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;

/**
* Thread pool run state service.
Expand All @@ -24,7 +16,7 @@
* @date 2021/7/12 21:25
*/
@Slf4j
public class ThreadPoolRunStateHandler {
public class ThreadPoolRunStateHandler extends AbstractThreadPoolRuntime {

private static InetAddress INET_ADDRESS;

Expand All @@ -36,37 +28,8 @@ public class ThreadPoolRunStateHandler {
}
}

public static PoolRunStateInfo getPoolRunState(String tpId) {
DynamicThreadPoolWrapper executorService = GlobalThreadPoolManage.getExecutorService(tpId);
ThreadPoolExecutor pool = executorService.getExecutor();

// 核心线程数
int corePoolSize = pool.getCorePoolSize();
// 最大线程数
int maximumPoolSize = pool.getMaximumPoolSize();
// 线程池当前线程数 (有锁)
int poolSize = pool.getPoolSize();
// 活跃线程数 (有锁)
int activeCount = pool.getActiveCount();
// 同时进入池中的最大线程数 (有锁)
int largestPoolSize = pool.getLargestPoolSize();
// 线程池中执行任务总数量 (有锁)
long completedTaskCount = pool.getCompletedTaskCount();
// 当前负载
String currentLoad = CalculateUtil.divide(activeCount, maximumPoolSize) + "%";
// 峰值负载
String peakLoad = CalculateUtil.divide(largestPoolSize, maximumPoolSize) + "%";

BlockingQueue<Runnable> queue = pool.getQueue();
// 队列类型
String queueType = queue.getClass().getSimpleName();
// 队列元素个数
int queueSize = queue.size();
// 队列剩余容量
int remainingCapacity = queue.remainingCapacity();
// 队列容量
int queueCapacity = queueSize + remainingCapacity;

@Override
protected PoolRunStateInfo supplement(PoolRunStateInfo poolRunStateInfo) {
// 内存占比: 使用内存 / 最大内存
RuntimeInfo runtimeInfo = new RuntimeInfo();
String memoryProportion = StrUtil.builder(
Expand All @@ -76,31 +39,11 @@ public static PoolRunStateInfo getPoolRunState(String tpId) {
ByteConvertUtil.getPrintSize(runtimeInfo.getMaxMemory())
).toString();

PoolRunStateInfo stateInfo = new PoolRunStateInfo();
stateInfo.setCoreSize(corePoolSize);
stateInfo.setMaximumSize(maximumPoolSize);
stateInfo.setPoolSize(poolSize);
stateInfo.setActiveSize(activeCount);
stateInfo.setCurrentLoad(currentLoad);
stateInfo.setPeakLoad(peakLoad);
stateInfo.setQueueType(queueType);
stateInfo.setQueueSize(queueSize);
stateInfo.setQueueRemainingCapacity(remainingCapacity);
stateInfo.setQueueCapacity(queueCapacity);
stateInfo.setLargestPoolSize(largestPoolSize);
stateInfo.setCompletedTaskCount(completedTaskCount);
stateInfo.setHost(INET_ADDRESS.getHostAddress());
stateInfo.setTpId(tpId);
stateInfo.setMemoryProportion(memoryProportion);
stateInfo.setFreeMemory(ByteConvertUtil.getPrintSize(runtimeInfo.getFreeMemory()));

int rejectCount = pool instanceof DynamicThreadPoolExecutor
? ((DynamicThreadPoolExecutor) pool).getRejectCount()
: -1;
stateInfo.setRejectCount(rejectCount);
stateInfo.setClientLastRefreshTime(DateUtil.formatDateTime(new Date()));
poolRunStateInfo.setHost(INET_ADDRESS.getHostAddress());
poolRunStateInfo.setMemoryProportion(memoryProportion);
poolRunStateInfo.setFreeMemory(ByteConvertUtil.getPrintSize(runtimeInfo.getFreeMemory()));

return stateInfo;
return poolRunStateInfo;
}

}

0 comments on commit f0f4409

Please sign in to comment.