Skip to content

Commit

Permalink
add
Browse files Browse the repository at this point in the history
  • Loading branch information
x-jay committed Oct 30, 2014
1 parent 879578e commit 53d91a3
Show file tree
Hide file tree
Showing 16 changed files with 445 additions and 0 deletions.
6 changes: 6 additions & 0 deletions MyThreadPool/.classpath
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" path="src"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
<classpathentry kind="output" path="bin"/>
</classpath>
17 changes: 17 additions & 0 deletions MyThreadPool/.project
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>MyThreadPool</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.jdt.core.javanature</nature>
</natures>
</projectDescription>
12 changes: 12 additions & 0 deletions MyThreadPool/.settings/org.eclipse.jdt.core.prefs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#Thu Oct 30 11:22:51 CST 2014
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve
org.eclipse.jdt.core.compiler.compliance=1.6
org.eclipse.jdt.core.compiler.debug.lineNumber=generate
org.eclipse.jdt.core.compiler.debug.localVariable=generate
org.eclipse.jdt.core.compiler.debug.sourceFile=generate
org.eclipse.jdt.core.compiler.problem.assertIdentifier=error
org.eclipse.jdt.core.compiler.problem.enumIdentifier=error
org.eclipse.jdt.core.compiler.source=1.6
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file added MyThreadPool/bin/com/study/pool/ThreadPool.class
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file added MyThreadPool/bin/com/study/pool/test$1.class
Binary file not shown.
Binary file added MyThreadPool/bin/com/study/pool/test$2.class
Binary file not shown.
Binary file added MyThreadPool/bin/com/study/pool/test$3.class
Binary file not shown.
Binary file added MyThreadPool/bin/com/study/pool/test.class
Binary file not shown.
182 changes: 182 additions & 0 deletions MyThreadPool/src/com/study/pool/CustomSafeQueue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package com.study.pool;

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

//一个固定长度的并且线程安全的队列
/*
* 特定:当该队列已满时.如果你再调用add添加方法.他会调用wait进行等待.一直到调用了remove方法.
* remove方法调用了notifyAll方法唤醒了add方法..同样元素为空时.他也会调用wait.一直到调用了add方法.
*
*/
public class CustomSafeQueue {
private Object[] queue;
private int capacity;
private int size;
private int head;
private int tail;

public CustomSafeQueue(int cap)
{
capacity = (cap > 0 ) ? cap : 1;
queue = new Object[capacity];
head = 0;
size = 0;
tail = 0;
}

public synchronized int getSize()
{
return size;
}

public synchronized boolean isFull()
{
return (size == capacity);
}

public synchronized void add(Object obj) throws InterruptedException
{
while(isFull())
{
this.wait();
}

queue[head] = obj;
head = (head+1) % capacity;
//这样赋值的作用是.如果值相同则等于零.不相同则还是head这个数;
size++;

this.notifyAll();
}

public boolean isEmpty()
{
return size<1;
}


public synchronized Object[] removeAll() throws InterruptedException
{

Object[] list = new Object[size];


for(int i=0;i<list.length;i++)
{
list[i] = remove();
}
return list;
}

public synchronized Object remove() throws InterruptedException
{
while(size==0)
{
this.wait();
}

Object obj = queue[tail];
queue[tail] = null;
tail = (tail + 1) % capacity;
size--;

this.notifyAll();

return obj;
}


public synchronized void printState()
{
StringBuffer sb = new StringBuffer();

sb.append("SimpleObjectFIFO:\n");
sb.append(" capacity=" + capacity + "\n");
sb.append(" size=" + size+ "\n");


if(isFull())
{
sb.append("- Full");
}
else if(size==0)
{
sb.append("- Empty");
}
sb.append("\n");

sb.append("head=" + head + "\n");
sb.append("tail=" + tail + "\n");

for(int i=0;i<queue.length;i++)
{
sb.append("queue[" + i + "]=" + queue[i] + "\n");
}

System.out.println(sb);
}



public static void main(String[] args) throws InterruptedException {
final CustomSafeQueue queue = new CustomSafeQueue(5);
final Random rand = new Random(50);
Runnable run1 = new Runnable()
{
public void run()
{
try {
while(!Thread.interrupted())
{
TimeUnit.MICROSECONDS.sleep(200);
int val = rand.nextInt(100);
System.out.println(Thread.currentThread().getName() + ",已加入最新的值:" + val);
queue.add(val);
}
System.out.println("run1没有被interrupt时已经运行完了");
} catch (InterruptedException e) {
System.out.println("run1 Interrupted");
}
}
};

Runnable run2 = new Runnable()
{
public void run()
{
try
{
while(!Thread.interrupted())
{
TimeUnit.MICROSECONDS.sleep(200);
System.out.println(Thread.currentThread().getName() + ",队列已将其移除:" + queue.remove());;
}
System.out.println("run2没有被interrupt时已经运行完了");
}
catch(InterruptedException e)
{
System.out.println("run2 Interrupted");
}
}
};

Thread addThread = new Thread(run1);
Thread delThread = new Thread(run2);
addThread.start();
delThread.start();

TimeUnit.MILLISECONDS.sleep(10);
addThread.interrupt();
delThread.interrupt();


// ExecutorService exec = Executors.newCachedThreadPool();
// exec.execute(run1);
// exec.execute(run2);
// TimeUnit.SECONDS.sleep(1);
// exec.shutdownNow();
}
}
176 changes: 176 additions & 0 deletions MyThreadPool/src/com/study/pool/ThreadPool.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package com.study.pool;


//模拟线程池
public class ThreadPool {
private CustomSafeQueue idleWorkers;
private ThreadPoolWorker[] workerList;

public ThreadPool(int numberOfThreads)
{
numberOfThreads = Math.max(1, numberOfThreads);

idleWorkers = new CustomSafeQueue(numberOfThreads);
workerList = new ThreadPoolWorker[numberOfThreads];

for(int i=0;i<workerList.length;i++)
{
workerList[i] = new ThreadPoolWorker(idleWorkers);
}
}

public void execute(Runnable target) throws InterruptedException
{
ThreadPoolWorker worker = (ThreadPoolWorker)idleWorkers.remove();
worker.process(target);
}

public void stopRequestIdleWorkers()
{
try
{
Object[] idle = idleWorkers.removeAll();
for(int i=0;i<idle.length;i++)
{
((ThreadPoolWorker)idle[i]).stopRequest();
}
}
catch(InterruptedException e)
{
Thread.currentThread().interrupt();
}
}

public void stopRequestAllWorkers()
{
stopRequestIdleWorkers();

try
{
Thread.sleep(250);
}
catch(InterruptedException e)
{

}

for(int i=0;i<workerList.length;i++)
{
if(workerList[i].isAlive())
{
workerList[i].stopRequest();
}
}
}

}

class ThreadPoolWorker
{
private static int nextWorkerID = 0;
private int workerID;
private CustomSafeQueue idleWorkers;
//将当前处理任务的线程加入到该队列..通常会有这种情况.handoffBox队列将任务加入了.但是有些任务可能需要花更长时间才能执行完成.但后面的任务已经执行完了.
//然而队列是数组形式.通过顺序来存放的.所以为了防止这种情况.每次要加入一个任务.就必须将该线程加入到idleWorkers队列.如果idleWorkers队列已经满了.则表示
//还有任务没有执行完.所以add方法会调用wait进行等待.所以就解决了这种问题.

private CustomSafeQueue handoffBox;
private Thread internalThread;
private volatile boolean noStopRequested;


public ThreadPoolWorker(CustomSafeQueue idleWorkers)
{
this.idleWorkers = idleWorkers;

workerID = getNextWorkerID();
handoffBox = new CustomSafeQueue(1);


noStopRequested = true;

Runnable run = new Runnable()
{
public void run()
{
try
{
runWork();
}
catch(Exception e)
{
e.printStackTrace();
}
}
};

internalThread = new Thread(run);
internalThread.start();
}

public static synchronized int getNextWorkerID()
{
int id = nextWorkerID;
nextWorkerID++;
return id;
}

public void process(Runnable target) throws InterruptedException
{
handoffBox.add(target);
}

private void runWork() throws InterruptedException
{
try
{
while(noStopRequested)
{
System.out.println("workerID=" + workerID + ", ready for work");

idleWorkers.add(this);

Runnable run = (Runnable)handoffBox.remove();
//每当有任务来就会接收,否则就阻塞

System.out.println("workerID=" + workerID + ",starting execution of new Runnable:" + run);
runIt(run);
}
}
catch(InterruptedException e)
{
// e.printStackTrace();
//在自己有意调用线程的结束时.InterruptedException这个错误不应该被显示.因为这个错误只要你关闭线程都会产生.
Thread.currentThread().interrupt();
}
}

private void runIt(Runnable run)
{
try
{
run.run();
}
catch(Exception e)
{
System.out.println("Uncaught exception fell through from run()");
e.printStackTrace();
}
finally
{
Thread.interrupted();
}
}

public void stopRequest()
{
System.out.println("workerID=" + workerID + ",stopRequest() received.");
noStopRequested = false;
internalThread.interrupt();
}

public boolean isAlive()
{
return internalThread.isAlive();
}
}
Loading

0 comments on commit 53d91a3

Please sign in to comment.