Skip to content

Commit

Permalink
Task execution timeout time assignment
Browse files Browse the repository at this point in the history
  • Loading branch information
magestacks committed Nov 5, 2022
1 parent 328bb4a commit a9cd485
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,20 @@ public static String getAndRemoveTimeoutTrace() {
}

/**
* Put execute timeout trace.
* Put timeout trace.
*
* @param trace trace
*/
public static void putExecuteTimeoutTrace(String trace) {
public static void putTimeoutTrace(String trace) {
MDC.put(EXECUTE_TIMEOUT_TRACE, trace);
}

/**
* Set execute timeout trace key.
* Set timeout trace key.
*
* @param key trace key
*/
public static void setExecuteTimeoutTraceKey(String key) {
public static void setTimeoutTraceKey(String key) {
EXECUTE_TIMEOUT_TRACE_KEY = key;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public Runnable decorate(Runnable runnable) {
String executeTimeoutTrace = MDC.get(EXECUTE_TIMEOUT_TRACE);
Runnable taskRun = () -> {
if (StringUtil.isNotBlank(executeTimeoutTrace)) {
ExecutorTraceContextUtil.putExecuteTimeoutTrace(executeTimeoutTrace);
ExecutorTraceContextUtil.putTimeoutTrace(executeTimeoutTrace);
}
runnable.run();
// There is no need to clean up here, and it will be cleaned up after the thread task is executed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package cn.hippo4j.example.core.inittest;

import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
Expand All @@ -28,6 +29,8 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static cn.hippo4j.common.constant.Constants.EXECUTE_TIMEOUT_TRACE;

/**
* Run state handler test.
*/
Expand All @@ -44,7 +47,6 @@ public class RunStateHandlerTest {
/*
* @Resource private ThreadPoolTaskExecutor testSpringThreadPoolTaskExecutor;
*/

private final ThreadPoolExecutor runStateHandlerTestExecutor = new ThreadPoolExecutor(
4,
4,
Expand Down Expand Up @@ -78,7 +80,7 @@ private void runTask(Executor executor) {
/**
* When the execution of the thread pool task times out, the Trace flag is put into the MDC, and it is printed out when an alarm occurs.
*/
// MDC.put(EXECUTE_TIMEOUT_TRACE, "39948722194639841.251.16612352194691531");
MDC.put(EXECUTE_TIMEOUT_TRACE, "39948722194639841.251.16612352194691531");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import cn.hippo4j.core.executor.support.adpter.DynamicThreadPoolAdapterChoose;
import cn.hippo4j.core.toolkit.DynamicThreadPoolAnnotationUtil;
import cn.hippo4j.message.service.GlobalNotifyAlarmManage;
Expand Down Expand Up @@ -162,29 +161,6 @@ private ExecutorProperties buildDefaultExecutorProperties(String threadPoolId, T
return executorProperties;
}

/**
* Build new dynamic thread-pool.
*
* @param executorProperties executor properties
* @return thread-pool executor
*/
private ThreadPoolExecutor buildNewDynamicThreadPool(ExecutorProperties executorProperties) {
String threadNamePrefix = executorProperties.getThreadNamePrefix();
ExecutorProperties newExecutorProperties = buildExecutorProperties(executorProperties);
ThreadPoolExecutor newDynamicPoolExecutor = ThreadPoolBuilder.builder()
.threadPoolId(executorProperties.getThreadPoolId())
.threadFactory(StringUtil.isNotBlank(threadNamePrefix) ? threadNamePrefix : executorProperties.getThreadPoolId())
.poolThreadSize(newExecutorProperties.getCorePoolSize(), newExecutorProperties.getMaximumPoolSize())
.workQueue(BlockingQueueTypeEnum.createBlockingQueue(newExecutorProperties.getBlockingQueue(), newExecutorProperties.getQueueCapacity()))
.executeTimeOut(newExecutorProperties.getExecuteTimeOut())
.keepAliveTime(newExecutorProperties.getKeepAliveTime(), TimeUnit.SECONDS)
.rejected(RejectedPolicyTypeEnum.createPolicy(newExecutorProperties.getRejectedHandler()))
.allowCoreThreadTimeOut(newExecutorProperties.getAllowCoreThreadTimeOut())
.dynamicPool()
.build();
return newDynamicPoolExecutor;
}

/**
* Thread-pool param replace.
*
Expand All @@ -199,6 +175,10 @@ private void threadPoolParamReplace(ThreadPoolExecutor executor, ExecutorPropert
executor.setKeepAliveTime(executorProperties.getKeepAliveTime(), TimeUnit.SECONDS);
executor.allowCoreThreadTimeOut(executorProperties.getAllowCoreThreadTimeOut());
executor.setRejectedExecutionHandler(RejectedPolicyTypeEnum.createPolicy(executorProperties.getRejectedHandler()));
if (executor instanceof DynamicThreadPoolExecutor) {
Optional.ofNullable(executorProperties.getExecuteTimeOut())
.ifPresent(executeTimeOut -> ((DynamicThreadPoolExecutor) executor).setExecuteTimeOut(executeTimeOut));
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -180,6 +181,10 @@ private void threadPoolParamReplace(ThreadPoolExecutor executor, ThreadPoolParam
executor.setKeepAliveTime(threadPoolParameterInfo.getKeepAliveTime(), TimeUnit.SECONDS);
executor.allowCoreThreadTimeOut(EnableEnum.getBool(threadPoolParameterInfo.getAllowCoreThreadTimeOut()));
executor.setRejectedExecutionHandler(RejectedPolicyTypeEnum.createPolicy(threadPoolParameterInfo.getRejectedType()));
if (executor instanceof DynamicThreadPoolExecutor) {
Optional.ofNullable(threadPoolParameterInfo.getExecuteTimeOut())
.ifPresent(executeTimeOut -> ((DynamicThreadPoolExecutor) executor).setExecuteTimeOut(executeTimeOut));
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public Object postProcessAfterInitialization(Object bean, String beanName) throw

private void registerThreadPoolPluginSupportIfNecessary(Object bean, Class<?> beanType) {
if (ThreadPoolPluginSupport.class.isAssignableFrom(beanType)) {
ThreadPoolPluginSupport support = (ThreadPoolPluginSupport)bean;
ThreadPoolPluginSupport support = (ThreadPoolPluginSupport) bean;
if (registerThreadPoolPluginSupport(support) && log.isDebugEnabled()) {
log.info("register ThreadPoolPluginSupport [{}]", support.getThreadPoolId());
}
Expand All @@ -110,7 +110,7 @@ private void registerThreadPoolPluginSupportIfNecessary(Object bean, Class<?> be

private void registerThreadPoolPluginIfNecessary(Object bean, Class<?> beanType) {
if (ThreadPoolPlugin.class.isAssignableFrom(beanType)) {
ThreadPoolPlugin plugin = (ThreadPoolPlugin)bean;
ThreadPoolPlugin plugin = (ThreadPoolPlugin) bean;
if (enableThreadPoolPlugin(plugin) && log.isDebugEnabled()) {
log.info("register ThreadPoolPlugin [{}]", plugin.getId());
}
Expand All @@ -119,7 +119,7 @@ private void registerThreadPoolPluginIfNecessary(Object bean, Class<?> beanType)

private void registerThreadPoolPluginRegistrarIfNecessary(Object bean, Class<?> beanType) {
if (ThreadPoolPluginRegistrar.class.isAssignableFrom(beanType)) {
ThreadPoolPluginRegistrar registrar = (ThreadPoolPluginRegistrar)bean;
ThreadPoolPluginRegistrar registrar = (ThreadPoolPluginRegistrar) bean;
if (enableThreadPoolPluginRegistrar(registrar) && log.isDebugEnabled()) {
log.info("register ThreadPoolPluginRegistrar [{}]", registrar.getId());
}
Expand Down

0 comments on commit a9cd485

Please sign in to comment.