Skip to content

Commit

Permalink
feat: 添加并行调用实例
Browse files Browse the repository at this point in the history
  • Loading branch information
liuyueyi committed Jul 8, 2023
1 parent 6098d1c commit 245c3a1
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
package com.github.paicoding.forum.core.async;

import cn.hutool.core.thread.ExecutorBuilder;
import cn.hutool.core.util.ArrayUtil;
import com.github.paicoding.forum.core.util.EnvUtil;
import com.google.common.util.concurrent.SimpleTimeLimiter;
import lombok.extern.slf4j.Slf4j;

import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -14,13 +23,15 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

/**
* 异步工具类
*
* @author YiHui
* @date 2023/6/12
*/
@Slf4j
public class AsyncUtil {
private static final ThreadFactory THREAD_FACTORY = new ThreadFactory() {
private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
Expand All @@ -44,12 +55,7 @@ public Thread newThread(Runnable r) {
}

public static void initExecutorService(int core, int max) {
executorService = new ExecutorBuilder().setCorePoolSize(core).setMaxPoolSize(max)
.setKeepAliveTime(0).setKeepAliveTime(0, TimeUnit.SECONDS)
.setWorkQueue(new SynchronousQueue<Runnable>())
.setHandler(new ThreadPoolExecutor.AbortPolicy())
.setThreadFactory(THREAD_FACTORY)
.buildFinalizable();
executorService = new ExecutorBuilder().setCorePoolSize(core).setMaxPoolSize(max).setKeepAliveTime(0).setKeepAliveTime(0, TimeUnit.SECONDS).setWorkQueue(new SynchronousQueue<Runnable>()).setHandler(new ThreadPoolExecutor.CallerRunsPolicy()).setThreadFactory(THREAD_FACTORY).buildFinalizable();
simpleTimeLimiter = SimpleTimeLimiter.create(executorService);
}

Expand Down Expand Up @@ -102,4 +108,144 @@ public static boolean sleep(long millis) {

return true;
}


public static class CompletableFutureBridge {
private List<CompletableFuture> list;
private Map<String, Long> cost;
private String taskName;

public CompletableFutureBridge() {
this("CompletableFutureExecute");
}

public CompletableFutureBridge(String task) {
this.taskName = task;
list = new ArrayList<>();
cost = new ConcurrentHashMap<>();
cost.put(task, System.currentTimeMillis());
}

/**
* 异步执行,带返回结果
*
* @param supplier
* @return
*/
public CompletableFutureBridge supplyAsync(Supplier supplier) {
return supplyAsync(supplier, executorService);
}

public CompletableFutureBridge supplyAsync(Supplier supplier, ExecutorService executor) {
return supplyAsyncWithTimeRecord(supplier, supplier.toString(), executor);
}

public CompletableFutureBridge supplyAsyncWithTimeRecord(Supplier supplier, String name) {
return supplyAsyncWithTimeRecord(supplier, name, executorService);
}

public CompletableFutureBridge supplyAsyncWithTimeRecord(Supplier supplier, String name, ExecutorService executor) {
list.add(CompletableFuture.supplyAsync(supplyWithTime(supplier, name), executor));
return this;
}


/**
* 异步并发执行,无返回结果
*
* @param run
* @return
*/
public CompletableFutureBridge runAsync(Runnable run) {
// return runAsync(run, executorService);
list.add(CompletableFuture.runAsync(runWithTime(run, run.toString()), executorService));
return this;
}

public CompletableFutureBridge runAsync(Runnable run, ExecutorService executor) {
return runAsyncWithTimeRecord(run, run.toString(), executor);
}


public CompletableFutureBridge runAsyncWithTimeRecord(Runnable run, String name) {
return runAsyncWithTimeRecord(run, name, executorService);
}

public CompletableFutureBridge runAsyncWithTimeRecord(Runnable run, String name, ExecutorService executor) {
// list.add(CompletableFuture.runAsync(run, executor));
list.add(CompletableFuture.runAsync(runWithTime(run, name), executor));
return this;
}

private Runnable runWithTime(Runnable run, String name) {
return () -> {
startRecord(name);
try {
run.run();
} finally {
endRecord(name);
}
};
}

private Supplier supplyWithTime(Supplier call, String name) {
return () -> {
startRecord(name);
try {
return call.get();
} finally {
endRecord(name);
}
};
}

public CompletableFutureBridge allExecuted() {
CompletableFuture.allOf(ArrayUtil.toArray(list, CompletableFuture.class)).join();
endRecord(this.taskName);
return this;
}

private void startRecord(String name) {
cost.put(name, System.currentTimeMillis());
}

private void endRecord(String name) {
long now = System.currentTimeMillis();
cost.put(name, now - cost.getOrDefault(name, now));
}

public void prettyPrint() {
StringBuilder sb = new StringBuilder();
sb.append('\n');
long totalCost = cost.remove(taskName);
sb.append("StopWatch '").append(taskName).append("': running time = ").append(totalCost).append(" ms");
sb.append('\n');
if (cost.size() <= 1) {
sb.append("No task info kept");
} else {
sb.append("---------------------------------------------\n");
sb.append("ms % Task name\n");
sb.append("---------------------------------------------\n");
NumberFormat pf = NumberFormat.getPercentInstance();
pf.setMinimumIntegerDigits(2);
pf.setMinimumFractionDigits(2);
pf.setGroupingUsed(false);
for (Map.Entry<String, Long> entry : cost.entrySet()) {
sb.append(entry.getValue()).append("\t\t");
sb.append(pf.format(entry.getValue() / (double) totalCost)).append("\t\t");
sb.append(entry.getKey()).append("\n");
}
}
if (!EnvUtil.isPro()) {
log.info("\n---------------------\n{}\n--------------------\n", sb);
}
}
}

public static CompletableFutureBridge concurrentExecutor(String... name) {
if (name.length > 0) {
return new CompletableFutureBridge(name[0]);
}
return new CompletableFutureBridge();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.github.paicoding.forum.api.model.vo.banner.dto.ConfigDTO;
import com.github.paicoding.forum.api.model.vo.recommend.CarouseDTO;
import com.github.paicoding.forum.api.model.vo.user.dto.UserStatisticInfoDTO;
import com.github.paicoding.forum.core.async.AsyncUtil;
import com.github.paicoding.forum.core.common.CommonConstants;
import com.github.paicoding.forum.service.article.service.ArticleReadService;
import com.github.paicoding.forum.service.article.service.CategoryService;
Expand Down Expand Up @@ -51,13 +52,17 @@ public class IndexRecommendHelper {
public IndexVo buildIndexVo(String activeTab) {
IndexVo vo = new IndexVo();
CategoryDTO category = categories(activeTab, vo);
vo.setArticles(articleList(category.getCategoryId()));
vo.setTopArticles(topArticleList(category));
vo.setHomeCarouselList(homeCarouselList());
vo.setSideBarItems(sidebarService.queryHomeSidebarList());
vo.setCurrentCategory(category.getCategory());
vo.setCategoryId(category.getCategoryId());
vo.setUser(loginInfo());
vo.setCurrentCategory(category.getCategory());
// 并行调度实例,提高响应性能
AsyncUtil.concurrentExecutor("首页响应")
.runAsyncWithTimeRecord(() -> vo.setArticles(articleList(category.getCategoryId())), "文章列表")
.runAsyncWithTimeRecord(() -> vo.setTopArticles(topArticleList(category)), "置顶文章")
.runAsyncWithTimeRecord(() -> vo.setHomeCarouselList(homeCarouselList()), "轮播图")
.runAsyncWithTimeRecord(() -> vo.setSideBarItems(sidebarService.queryHomeSidebarList()), "侧边栏")
.runAsyncWithTimeRecord(() -> vo.setUser(loginInfo()), "用户信息")
.allExecuted()
.prettyPrint();
return vo;
}

Expand Down

0 comments on commit 245c3a1

Please sign in to comment.