Skip to content

Commit

Permalink
提交线程池解析
Browse files Browse the repository at this point in the history
  • Loading branch information
qiurunze committed Mar 18, 2020
1 parent 85f8af2 commit 860c4b6
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 1 deletion.
30 changes: 29 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,35 @@
- [多线程wait notify notifyall join sleep yield作用与方法详细解读](/docs/thread-base-006.md)
- [多线程可能会带来什么问题](/docs/thread-base-007.md)
- [多种单例模式](/docs/thread-base-008.md)


#### :couple: 线程池

- 线程池
- [JDK 线程池高度解析](/docs/thread-base-9.md)<br>
- [线程池是什么](/docs/threadpool0001.md)
- [多次创建线程的劣势](/docs/threadpool0001.md)<br>
- [什么时候使用线程池](/docs/threadpool0001.md)<br>
- [线程池的优势](/docs/threadpool0001.md)
- [Executor框架](/docs/threadpool0001.md)
- [如何使用钩子函数来进行线程池操作](/docs/threadpool0001.md)
- [线程池的重点属性](/docs/threadpool0001.md)
- [多线程重要属性](/docs/threadpool0001.md)
- [线程池的具体实现](/docs/threadpool0001.md)
- [线程池的创建](/docs/threadpool0001.md)
- [线程池参数解释](/docs/threadpool0001.md)
- [线程池监控](/docs/threadpool0001.md)
- [线程池的源码分析](/docs/threadpool0001.md)
- [execute方法](/docs/threadpool0001.md)
- [addWorker方法](/docs/threadpool0001.md)
- [Worker类](/docs/threadpool0001.md)
- [runWorker方法](/docs/threadpool0001.md)
- [getTask方法](/docs/threadpool0001.md)
- [processWorkerExit方法](/docs/threadpool0001.md)
- [小结](/docs/threadpool0001.md)
- [processWorkerExit方法](/docs/threadpool0001.md)
- [手写线程池代码](/docs/threadpool0001.md)


### 多线程进阶更新

| ID | Problem | Article |
Expand Down
17 changes: 17 additions & 0 deletions docs/threadpool0001.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,25 @@
#### 线程池的具体实现

ThreadPoolExecutor 默认线程池

ScheduledThreadPoolExecutor 定时线程池

newFixedThreadPool
return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());

newSingleThreadExecutor
new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));

newCachedThreadPool
new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());

newWorkStealingPool
new ForkJoinPool(Runtime.getRuntime().availableProcessors(),ForkJoinPool.defaultForkJoinWorkerThreadFactory,null, true);

#### 如何使用钩子函数来进行线程池操作

com.executor.PauseableThreadPool 如何在使用线程池前后添加钩子函数来进行数据监控

### 线程池的具体创建

public ThreadPoolExecutor(int corePoolSize,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package com.executor;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
* 描述: 演示每个任务执行前后放钩子函数
*/
public class PauseableThreadPool extends ThreadPoolExecutor {

private final ReentrantLock lock = new ReentrantLock();
private Condition unpaused = lock.newCondition();
private boolean isPaused;


public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}

public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}

public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}

public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory,
handler);
}

@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
lock.lock();
try {
while (isPaused) {
unpaused.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

private void pause() {
lock.lock();
try {
isPaused = true;
} finally {
lock.unlock();
}
}

public void resume() {
lock.lock();
try {
isPaused = false;
unpaused.signalAll();
} finally {
lock.unlock();
}
}

public static void main(String[] args) throws InterruptedException {
PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(10, 20, 10l,
TimeUnit.SECONDS, new LinkedBlockingQueue<>());
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("我被执行");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
for (int i = 0; i < 10000; i++) {
pauseableThreadPool.execute(runnable);
}
Thread.sleep(1500);
pauseableThreadPool.pause();
System.out.println("线程池被暂停了");
Thread.sleep(1500);
pauseableThreadPool.resume();
System.out.println("线程池被恢复了");

}
}

0 comments on commit 860c4b6

Please sign in to comment.