Skip to content

Commit

Permalink
add
Browse files Browse the repository at this point in the history
  • Loading branch information
x-jay committed Oct 30, 2014
1 parent c95fd9c commit 6b88b8c
Show file tree
Hide file tree
Showing 14 changed files with 440 additions and 0 deletions.
Binary file not shown.
Binary file not shown.
Binary file added MySchedule/bin/com/study/schdule/CustomTask.class
Binary file not shown.
Binary file added MySchedule/bin/com/study/schdule/Worker.class
Binary file not shown.
Binary file added MySchedule/bin/com/study/schdule/test$1.class
Binary file not shown.
Binary file added MySchedule/bin/com/study/schdule/test$2.class
Binary file not shown.
Binary file added MySchedule/bin/com/study/schdule/test$3.class
Binary file not shown.
Binary file added MySchedule/bin/com/study/schdule/test$4.class
Binary file not shown.
Binary file added MySchedule/bin/com/study/schdule/test.class
Binary file not shown.
119 changes: 119 additions & 0 deletions MySchedule/src/com/study/schdule/CustomPriorityQueue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package com.study.schdule;

import java.util.PriorityQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

//该队列是一个优先级队列.并且也是一个阻塞队列
public class CustomPriorityQueue extends PriorityQueue<CustomTask>{
public ReentrantLock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private boolean isOk = false;

@Override
public boolean add(CustomTask e) {
final ReentrantLock lock = this.lock;
lock.lock();
try
{
CustomTask ct = this.peek();
// int val = this.size();
boolean res = super.offer(e);
//判断是否有线程进入了永久休眠
//调用方法进行唤醒所有.以免线程进入永久休眠(等待)
//(如果没有进入永久休眠也不会有影响.类似于notifyAll)
// if(ct==null || e.compareTo(ct)<0)

if(ct==null && isOk)
{
condition.signalAll();
}
return res;
}
finally
{
lock.unlock();
}
}

public CustomTask take() throws InterruptedException
{
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
//和lock不同的是当阻塞时可被interrupt的锁.

// lock.lock();
try
{
for(;;)
{
CustomTask ct = this.poll();
//如果线程为空.则进入永久等待.
if(ct==null)
{
// Thread.yield();
isOk=true;

condition.await();
}
else
{
// long delay = ct.getDelay(TimeUnit.NANOSECONDS);
// if(delay>0)
// {
//// long tl = condition.awaitNanos(ct.time);
// condition.await(1000, TimeUnit.MILLISECONDS);
// }
// else
// {
// CustomTask task = this.peek();
// return task;
// }

if(ct.time>0)
{
condition.await(ct.time, ct.getUnit());
}
CustomTask task = ct;

// assert task!=null;
// if(this.size()!=0)
// {
// System.out.println("sdfdsf");
// condition.signalAll();
// }
return task;
}
}
}
catch(InterruptedException e)
{
Thread.currentThread().interrupt();
}
finally
{
lock.unlock();
}
return null;
}


//获取头部元素.但是不会删除.
@Override
public synchronized CustomTask peek() {
return super.peek();
}

//获取头部元素.会删除元素
@Override
public synchronized CustomTask poll() {
return super.poll();
}


//添加元素
@Override
public boolean offer(CustomTask e) {
return super.offer(e);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package com.study.schdule;

import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class CustomScheduledThreadPoolExecutor {
public static CustomPriorityQueue queue = new CustomPriorityQueue();
//任务队列

private Set<Worker> workers = new HashSet<Worker>();
//线程队列

private final int maxThreadCount;
//最大线程数量

public CustomScheduledThreadPoolExecutor(int maxThreadCount)
{
this.maxThreadCount = maxThreadCount;
}

//获取当前已创建的线程数量
public int getWorkerCount()
{
return workers.size();
}

//获取线程池最大数量
public int getMaxThreadCount()
{
return this.maxThreadCount;
}

//按指定的时间执行
public void startForDate(Runnable task,Date date)
{
long target = date.getTime();
long now = new Date().getTime();
if(now>target)
{
throw new RuntimeException("目标时间小于当前时间");
}
long initial = target - now;
schedule(task,initial);
}

//立即执行.没有延迟也没有定时
public void schedule(Runnable task)
{
CustomTask ct = new CustomTask(task);
//对Runnable进行包装.包装成自己一个CustomTask对象


delayedExecute(ct,queue);
//执行主方法
}


//延迟执行.但是只执行一次
public void schedule(Runnable task,long initial)
{
CustomTask ct = new CustomTask(task,initial);
delayedExecute(ct,queue);
}


//延迟执行并且重复执行
public void scheduleAtFixedRate(Runnable task,long initial,long proied,TimeUnit unit)
{
CustomTask ct = new CustomTask(task,initial,proied,unit);
delayedExecute(ct,queue);
}

public void scheduleAtFixedRate(Runnable task,long initial,long proied,int repeatCount,TimeUnit unit)
{
CustomTask ct = new CustomTask(task,initial,proied,repeatCount,unit);
delayedExecute(ct,queue);
}

public void delayedExecute(CustomTask ct,CustomPriorityQueue queue)
{
final ReentrantLock lock = queue.lock;
lock.lock();

//如果当前线程数量小于线程池最大数量
if(this.getWorkerCount()<maxThreadCount)
{
Thread t = newThread(); //创建一个新线程
if(t!=null)
{
t.start();
}
}

//将任务加入队列中.
queue.add(ct);
lock.unlock();
}


//创建新的线程
public Thread newThread()
{
Worker w = new Worker(queue);
Thread t = new Thread(w);
if(t!=null)
{
w.thread = t;

workers.add(w);
//加入线程队列
}
return t;
}


//关闭线程池中的所有线程
public void shutdownNow()
{
for(Iterator<Worker> it = workers.iterator();it.hasNext();)
{
it.next().thread.interrupt();
}
}
}
77 changes: 77 additions & 0 deletions MySchedule/src/com/study/schdule/CustomTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package com.study.schdule;

import java.util.concurrent.TimeUnit;

public class CustomTask implements Comparable<CustomTask>,Runnable{
private long proied;
//重复的时间间隔

public long time;
//延迟运行的时间

private Runnable task;
//运行任务

private TimeUnit unit;
//时间单位

public int repeatCount;
//重复次数

public Runnable getTask() {
return task;
}

public TimeUnit getUnit() {
return unit;
}

public CustomTask(Runnable task)
{
this.time = 0;
this.proied = 0;
this.unit = TimeUnit.MILLISECONDS;
this.task = task;
this.repeatCount = 0;
}

public CustomTask(Runnable task,long initial)
{
this(task);
this.time = initial;
this.unit = TimeUnit.MILLISECONDS;
this.proied = 0;
this.repeatCount = 0;
}

public CustomTask(Runnable task,long initial,long proied,TimeUnit unit)
{
this(task,initial);
this.proied = proied;
this.unit = unit;
this.repeatCount = 0;
}

public CustomTask(Runnable task,long initial,long proied,int repeatCount,TimeUnit unit)
{
this(task,initial,proied,unit);
this.repeatCount = repeatCount;
}


@Override
public int compareTo(CustomTask ct) {
return proied>ct.proied?1:(proied<ct.proied?-1:0);
}

@Override
public void run() {
task.run();

if(this.proied!=0 && this.repeatCount-->1)
{
time = proied;
CustomScheduledThreadPoolExecutor.queue.offer(this);
}
}
}
49 changes: 49 additions & 0 deletions MySchedule/src/com/study/schdule/Worker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.study.schdule;

import java.util.PriorityQueue;

public class Worker implements Runnable{
//一个工作线程对象.
public Thread thread;
private CustomPriorityQueue queue;
public Worker(CustomPriorityQueue queue)
{
this.queue = queue;
}

public void run()
{
Runnable task = null;
while((task=getTask())!=null)
{
runTask(task);
task = null;
}
}

public void runTask(Runnable run)
{
run.run();
}

public Runnable getTask()
{
// if(!queue.isEmpty())
// {
// CustomTask task = queue.remove();
// return task.getTask();
// }
// return null;

CustomTask ct;
try {
ct = queue.take();
return ct;
} catch (InterruptedException e) {

}
return null;
}


}
Loading

0 comments on commit 6b88b8c

Please sign in to comment.