Skip to content

Commit

Permalink
Core code refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
magestacks committed May 1, 2022
1 parent b281c15 commit 8d00d6e
Show file tree
Hide file tree
Showing 48 changed files with 74 additions and 252 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package cn.hippo4j.common.constant;

/**
* Change thread-pool constants.
*
* @author chen.ma
* @date 2022/5/1 17:25
*/
public class ChangeThreadPoolConstants {

public static final String CHANGE_THREAD_POOL_TEXT = "[{}] Changed thread pool. " +
"\n coreSize :: [{}]" +
"\n maxSize :: [{}]" +
"\n queueType :: [{}]" +
"\n capacity :: [{}]" +
"\n keepAliveTime :: [{}]" +
"\n executeTimeOut :: [{}]" +
"\n rejectedType :: [{}]" +
"\n allowCoreThreadTimeOut :: [{}]";

public static final String CHANGE_DELIMITER = "%s => %s";
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,5 +87,4 @@ default String getServerAddr() {
default Boolean getBanner() {
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,4 @@ public class ConfigEmptyException extends RuntimeException {
* action
*/
private String action;

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,4 @@ public class UtilAutoConfiguration {
public InetUtils hippo4JInetUtils(InetUtilsProperties properties) {
return new InetUtils(properties);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,4 @@ public UndertowWebThreadPoolHandler undertowWebThreadPoolHandler() {
public WebThreadPoolHandlerChoose webThreadPoolServiceChoose() {
return new WebThreadPoolHandlerChoose();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,35 +51,30 @@ public BeforeCheckConfiguration.BeforeCheck dynamicThreadPoolBeforeCheckBean(@Au
"Web server failed to start. The dynamic thread pool username is empty.",
"Please check whether the [spring.dynamic.thread-pool.username] configuration is empty or an empty string.");
}

String password = properties.getPassword();
if (StringUtil.isBlank(password)) {
throw new ConfigEmptyException(
"Web server failed to start. The dynamic thread pool password is empty.",
"Please check whether the [spring.dynamic.thread-pool.password] configuration is empty or an empty string.");
}

String namespace = properties.getNamespace();
if (StringUtil.isBlank(namespace)) {
throw new ConfigEmptyException(
"Web server failed to start. The dynamic thread pool namespace is empty.",
"Please check whether the [spring.dynamic.thread-pool.namespace] configuration is empty or an empty string.");
}

String itemId = properties.getItemId();
if (StringUtil.isBlank(itemId)) {
throw new ConfigEmptyException(
"Web server failed to start. The dynamic thread pool item id is empty.",
"Please check whether the [spring.dynamic.thread-pool.item-id] configuration is empty or an empty string.");
}

String serverAddr = properties.getServerAddr();
if (StringUtil.isBlank(serverAddr)) {
throw new ConfigEmptyException(
"Web server failed to start. The dynamic thread pool server addr is empty.",
"Please check whether the [spring.dynamic.thread-pool.server-addr] configuration is empty or an empty string.");
}

String applicationName = environment.getProperty("spring.application.name");
if (StringUtil.isBlank(applicationName)) {
throw new ConfigEmptyException(
Expand All @@ -94,5 +89,4 @@ public BeforeCheckConfiguration.BeforeCheck dynamicThreadPoolBeforeCheckBean(@Au
public class BeforeCheck {

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,4 @@ public Marker dynamicThreadPoolMarkerBean() {
public class Marker {

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,9 @@ public DynamicThreadPoolExecutor(int corePoolSize,
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, waitForTasksToCompleteOnShutdown, awaitTerminationMillis, workQueue, threadPoolId, threadFactory, handler);
this.threadPoolId = threadPoolId;
this.executeTimeOut = executeTimeOut;

// Number of dynamic proxy denial policies.
RejectedExecutionHandler rejectedProxy = RejectedProxyUtil.createProxy(handler, threadPoolId, rejectCount);
setRejectedExecutionHandler(rejectedProxy);

// Redundant fields to avoid reflecting the acquired fields when sending change information.
redundancyHandler = handler;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,83 +35,29 @@
@Data
public class DynamicThreadPoolWrapper implements DisposableBean {

/**
* Tenant id
*/
private String tenantId;
private String tenantId, itemId, threadPoolId;

/**
* Item id
*/
private String itemId;
private boolean subscribeFlag, initFlag;

/**
* Thread pool id
*/
private String tpId;

/**
* Subscribe flag
*/
private boolean subscribeFlag;

/**
* Init flag
*/
private boolean initFlag;

/**
* executor
* {@link DynamicThreadPoolExecutor}
*/
private ThreadPoolExecutor executor;

/**
* 首选服务端线程池, 为空使用默认线程池 {@link CommonDynamicThreadPool#getInstance(String)}
*
* @param threadPoolId
*/
public DynamicThreadPoolWrapper(String threadPoolId) {
this(threadPoolId, CommonDynamicThreadPool.getInstance(threadPoolId));
}

/**
* 首选服务端线程池, 为空使用 threadPoolExecutor.
*
* @param threadPoolId
* @param threadPoolExecutor
*/
public DynamicThreadPoolWrapper(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) {
this.tpId = threadPoolId;
this.threadPoolId = threadPoolId;
this.executor = threadPoolExecutor;
}

/**
* 提交任务.
*
* @param command
*/
public void execute(Runnable command) {
executor.execute(command);
}

/**
* 提交任务.
*
* @param task
* @return
*/
public Future<?> submit(Runnable task) {
return executor.submit(task);
}

/**
* 提交任务.
*
* @param task
* @param <T>
* @return
*/
public <T> Future<T> submit(Callable<T> task) {
return executor.submit(task);
}
Expand All @@ -122,5 +68,4 @@ public void destroy() throws Exception {
((AbstractDynamicExecutorSupport) executor).destroy();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,8 @@ public void checkPoolCapacityAlarm(String threadPoolId, ThreadPoolExecutor threa
if (hippoSendMessageService == null) {
return;
}

ThreadPoolNotifyAlarm threadPoolNotifyAlarm = GlobalNotifyAlarmManage.get(threadPoolId);
BlockingQueue blockIngQueue = threadPoolExecutor.getQueue();

int queueSize = blockIngQueue.size();
int capacity = queueSize + blockIngQueue.remainingCapacity();
int divide = CalculateUtil.divide(queueSize, capacity);
Expand All @@ -133,7 +131,6 @@ public void checkPoolActivityAlarm(String threadPoolId, ThreadPoolExecutor threa
int activeCount = threadPoolExecutor.getActiveCount();
int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize();
int divide = CalculateUtil.divide(activeCount, maximumPoolSize);

ThreadPoolNotifyAlarm threadPoolNotifyAlarm = GlobalNotifyAlarmManage.get(threadPoolId);
boolean isSend = threadPoolNotifyAlarm.getIsAlarm()
&& divide > threadPoolNotifyAlarm.getActiveAlarm();
Expand All @@ -154,7 +151,6 @@ public void checkPoolRejectedAlarm(String threadPoolId) {
if (Objects.isNull(threadPoolNotifyAlarm) || !threadPoolNotifyAlarm.getIsAlarm()) {
return;
}

ThreadPoolExecutor threadPoolExecutor = GlobalThreadPoolManage.getExecutorService(threadPoolId).getExecutor();
checkPoolRejectedAlarm(threadPoolId, threadPoolExecutor);
}
Expand Down Expand Up @@ -186,19 +182,16 @@ public void asyncSendExecuteTimeOutAlarm(String threadPoolId, long executeTime,
if (Objects.isNull(threadPoolNotifyAlarm) || !threadPoolNotifyAlarm.getIsAlarm()) {
return;
}

if (threadPoolExecutor instanceof DynamicThreadPoolExecutor) {
try {
AlarmNotifyRequest alarmNotifyRequest = buildAlarmNotifyReq(threadPoolExecutor);
alarmNotifyRequest.setThreadPoolId(threadPoolId);
alarmNotifyRequest.setExecuteTime(executeTime);
alarmNotifyRequest.setExecuteTimeOut(executeTimeOut);

String executeTimeoutTrace = TraceContextUtil.getAndRemove();
if (StringUtil.isNotBlank(executeTimeoutTrace)) {
alarmNotifyRequest.setExecuteTimeoutTrace(executeTimeoutTrace);
}

Runnable task = () -> hippoSendMessageService.sendAlarmMessage(NotifyTypeEnum.TIMEOUT, alarmNotifyRequest);
EXECUTE_TIMEOUT_EXECUTOR.execute(task);
} catch (Throwable ex) {
Expand All @@ -217,7 +210,6 @@ public void sendPoolConfigChange(ChangeParameterNotifyRequest request) {
String appName = StrUtil.isBlank(itemId) ? applicationName : itemId;
request.setAppName(appName);
request.setIdentify(IdentifyUtil.getIdentify());

hippoSendMessageService.sendChangeMessage(request);
}

Expand All @@ -229,10 +221,8 @@ public void sendPoolConfigChange(ChangeParameterNotifyRequest request) {
*/
public AlarmNotifyRequest buildAlarmNotifyReq(ThreadPoolExecutor threadPoolExecutor) {
AlarmNotifyRequest request = new AlarmNotifyRequest();

String appName = StrUtil.isBlank(itemId) ? applicationName : itemId;
request.setAppName(appName);

// 核心线程数
int corePoolSize = threadPoolExecutor.getCorePoolSize();
// 最大线程数
Expand All @@ -245,7 +235,6 @@ public AlarmNotifyRequest buildAlarmNotifyReq(ThreadPoolExecutor threadPoolExecu
int largestPoolSize = threadPoolExecutor.getLargestPoolSize();
// 线程池中执行任务总数量 (有锁)
long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();

request.setActive(active.toUpperCase());
request.setIdentify(IdentifyUtil.getIdentify());
request.setCorePoolSize(corePoolSize);
Expand All @@ -254,7 +243,6 @@ public AlarmNotifyRequest buildAlarmNotifyReq(ThreadPoolExecutor threadPoolExecu
request.setActiveCount(activeCount);
request.setLargestPoolSize(largestPoolSize);
request.setCompletedTaskCount(completedTaskCount);

BlockingQueue<Runnable> queue = threadPoolExecutor.getQueue();
// 队列元素个数
int queueSize = queue.size();
Expand All @@ -268,18 +256,14 @@ public AlarmNotifyRequest buildAlarmNotifyReq(ThreadPoolExecutor threadPoolExecu
request.setCapacity(queueCapacity);
request.setQueueSize(queueSize);
request.setRemainingCapacity(remainingCapacity);

RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor instanceof DynamicThreadPoolExecutor
? ((DynamicThreadPoolExecutor) threadPoolExecutor).getRedundancyHandler()
: threadPoolExecutor.getRejectedExecutionHandler();
request.setRejectedExecutionHandlerName(rejectedExecutionHandler.getClass().getSimpleName());

long rejectCount = threadPoolExecutor instanceof DynamicThreadPoolExecutor
? ((DynamicThreadPoolExecutor) threadPoolExecutor).getRejectCountNum()
: -1L;
request.setRejectCountNum(rejectCount);

return request;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,4 @@ public static ThreadPoolNotifyAlarm get(String key) {
public static void put(String key, ThreadPoolNotifyAlarm val) {
NOTIFY_ALARM_MAP.put(key, val);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -113,5 +113,4 @@ public static List<String> listThreadPoolId() {
public static Integer getThreadPoolNum() {
return listThreadPoolId().size();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ public PoolRunStateInfo getPoolRunState(String threadPoolId, Executor executor)
String currentLoad = CalculateUtil.divide(activeCount, maximumPoolSize) + "";
// 峰值负载
String peakLoad = CalculateUtil.divide(largestPoolSize, maximumPoolSize) + "";

BlockingQueue<Runnable> queue = pool.getQueue();
// 队列元素个数
int queueSize = queue.size();
Expand All @@ -93,7 +92,6 @@ public PoolRunStateInfo getPoolRunState(String threadPoolId, Executor executor)
int remainingCapacity = queue.remainingCapacity();
// 队列容量
int queueCapacity = queueSize + remainingCapacity;

stateInfo.setCoreSize(corePoolSize);
stateInfo.setTpId(threadPoolId);
stateInfo.setPoolSize(poolSize);
Expand All @@ -107,13 +105,11 @@ public PoolRunStateInfo getPoolRunState(String threadPoolId, Executor executor)
stateInfo.setQueueRemainingCapacity(remainingCapacity);
stateInfo.setLargestPoolSize(largestPoolSize);
stateInfo.setCompletedTaskCount(completedTaskCount);

long rejectCount =
pool instanceof DynamicThreadPoolExecutor ? ((DynamicThreadPoolExecutor) pool).getRejectCountNum() : -1L;
stateInfo.setRejectCount(rejectCount);
stateInfo.setClientLastRefreshTime(DateUtil.formatDateTime(new Date()));
stateInfo.setTimestamp(System.currentTimeMillis());
return supplement(stateInfo);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,37 +59,28 @@ protected PoolRunStateInfo supplement(PoolRunStateInfo poolRunStateInfo) {
ByteConvertUtil.getPrintSize(runtimeInfo.getTotalMemory()),
" / 最大可用: ",
ByteConvertUtil.getPrintSize(runtimeInfo.getMaxMemory())).toString();

poolRunStateInfo.setCurrentLoad(poolRunStateInfo.getCurrentLoad() + "%");
poolRunStateInfo.setPeakLoad(poolRunStateInfo.getPeakLoad() + "%");

String ipAddress = hippo4JInetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
poolRunStateInfo.setHost(ipAddress);
poolRunStateInfo.setMemoryProportion(memoryProportion);
poolRunStateInfo.setFreeMemory(ByteConvertUtil.getPrintSize(runtimeInfo.getFreeMemory()));

String threadPoolId = poolRunStateInfo.getTpId();
DynamicThreadPoolWrapper executorService = GlobalThreadPoolManage.getExecutorService(threadPoolId);
ThreadPoolExecutor pool = executorService.getExecutor();

String rejectedName;
if (pool instanceof AbstractDynamicExecutorSupport) {
rejectedName = ((DynamicThreadPoolExecutor) pool).getRedundancyHandler().getClass().getSimpleName();
} else {
rejectedName = pool.getRejectedExecutionHandler().getClass().getSimpleName();
}
poolRunStateInfo.setRejectedName(rejectedName);

ManyPoolRunStateInfo manyPoolRunStateInfo = BeanUtil.toBean(poolRunStateInfo, ManyPoolRunStateInfo.class);
manyPoolRunStateInfo.setIdentify(CLIENT_IDENTIFICATION_VALUE);

String active = environment.getProperty("spring.profiles.active", "UNKNOWN");
manyPoolRunStateInfo.setActive(active.toUpperCase());

String threadPoolState = ThreadPoolStatusHandler.getThreadPoolState(pool);
manyPoolRunStateInfo.setState(threadPoolState);

return manyPoolRunStateInfo;
}

}
Loading

0 comments on commit 8d00d6e

Please sign in to comment.