Skip to content

Commit

Permalink
improve poller example code
Browse files Browse the repository at this point in the history
n(producer/publisher):1(consumer)
mark :
every event process by all consumer!
every event process by all consumer!
every event process by all consumer!
  • Loading branch information
altnti committed Feb 20, 2016
1 parent 2e47296 commit d882454
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 60 deletions.
60 changes: 37 additions & 23 deletions src/main/java/com/ac/disruptor/PollerExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,49 +10,63 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

class PollCall implements Runnable{
class PollCall implements Runnable {
EventPoller<AddEvent> eventPoller;
public PollCall( EventPoller<AddEvent> eventPoller ){
final EventPoller.Handler<AddEvent> handler = new EventPoller.Handler<AddEvent>() {
public boolean onEvent(AddEvent event, long sequence, boolean endOfBatch) throws Exception {
QueueConfig.count.addAndGet(event.getSize());
return true;
}
};

public PollCall(EventPoller<AddEvent> eventPoller) {
this.eventPoller = eventPoller;
}

public void run() {
while (QueueConfig.publishNum.get() != 0) try {
EventPoller.PollState state = eventPoller.poll(new EventPoller.Handler<AddEvent>() {
public boolean onEvent(AddEvent event, long sequence, boolean endOfBatch) throws Exception {
QueueConfig.count.addAndGet( event.getSize() );
return true;
}
});
while (QueueConfig.count.get() / QueueConfig.CONSUMER_SIZE != QueueConfig.PUBLISHER_SIZE * QueueConfig.TOTAL_SIZE) {
try {
EventPoller.PollState state = eventPoller.poll( handler );

if ( state == EventPoller.PollState.IDLE ){
Thread.sleep(9);
if (state == EventPoller.PollState.IDLE) {
//Thread.sleep(9);
}
} catch (Exception e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

public class PollerExample {
public void test() throws ExecutionException, InterruptedException {
public PollerExample() {
QueueConfig.reset();
}

public boolean test() throws ExecutionException, InterruptedException {
RingBuffer<AddEvent> ringBuffer = RingBuffer.createMultiProducer(new AddEventFactory(), QueueConfig.QUEUE_SIZE);
ExecutorService es = Executors.newFixedThreadPool(QueueConfig.publishNum.get() + QueueConfig.CONSUMER_SIZE);
//创建消费者
for ( int i=0; i<QueueConfig.CONSUMER_SIZE; ++i ){
ExecutorService executorService = Executors.newFixedThreadPool(QueueConfig.publishNum.get() + QueueConfig.CONSUMER_SIZE);
List<Future> list = new ArrayList<Future>();
/*
创建消费者
注:每个事件都会被所有的消费者消费
*/
for (int i = 0; i < QueueConfig.CONSUMER_SIZE; ++i) {
EventPoller<AddEvent> poller = ringBuffer.newPoller();
ringBuffer.addGatingSequences(poller.getSequence());
es.submit( new PollCall(poller) );
list.add(executorService.submit(new PollCall(poller)));
}
//创建生产者
List<Future> list = new ArrayList<Future>();
final int threadNum = QueueConfig.publishNum.get();
for ( int i=0; i<threadNum; ++i ){
list.add( es.submit(new publisher(ringBuffer)) );
for (int i = 0; i < threadNum; ++i) {
list.add(executorService.submit(new publisher(ringBuffer)));
}
//等待生产者线程退出
for ( Future f :list ){
//等待所有线程退出
for (Future f : list) {
f.get();
}

System.out.println("count:" + QueueConfig.count.get());
return QueueConfig.count.get() == QueueConfig.TOTAL_SIZE * QueueConfig.PUBLISHER_SIZE;
}
}
16 changes: 11 additions & 5 deletions src/main/java/com/ac/disruptor/QueueConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,17 @@
* Created by Administrator on 2016/2/20.
*/
class QueueConfig {
public static AtomicInteger publishNum = new AtomicInteger(5); //final == 0
public static final int STEP = 1;
public static final int TOTAL_SIZE = 1000000;
public static final int PUBLISHER_SIZE = 5;
public static final int CONSUMER_SIZE = 1;
public static final int QUEUE_SIZE = 1024;

public static AtomicInteger publishNum = new AtomicInteger(PUBLISHER_SIZE); //final == 0
public static AtomicInteger count = new AtomicInteger(0); //final == totalSize

public static final int STEP = 1;
public static final int TOTAL_SIZE = 1000000;
public static final int CONSUMER_SIZE = 3;
public static final int QUEUE_SIZE = 1024;
public static void reset(){
publishNum.set(PUBLISHER_SIZE);
count.set(0);
}
}
12 changes: 3 additions & 9 deletions src/main/java/com/ac/disruptor/publisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,10 @@ public void translateTo(AddEvent event, long sequence, Integer arg0) {
public void run() {
int count = QueueConfig.TOTAL_SIZE;

while ( --count > 0 ){
if ( ringBuffer.remainingCapacity() > 0 ) {
ringBuffer.publishEvent(TRANSLATOR, QueueConfig.STEP);
}
else {
QueueConfig.count.addAndGet(1);
}
while ( --count >= 0 ){
ringBuffer.publishEvent(TRANSLATOR, QueueConfig.STEP);
}

int num = QueueConfig.publishNum.decrementAndGet();
System.out.println("thread end, id: " + num);
System.out.println("thread end, id: " + QueueConfig.publishNum.decrementAndGet());
}
}
27 changes: 4 additions & 23 deletions src/test/java/QueueTest.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import com.ac.disruptor.PollerExample;
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
Expand All @@ -7,6 +8,8 @@
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.assertTrue;

class ConfigInfo {
public static AtomicInteger thredNum = new AtomicInteger(1000);
public static AtomicInteger queueSize = new AtomicInteger(0);
Expand Down Expand Up @@ -262,28 +265,6 @@ public boolean onEvent(LogEvent event, long sequence, boolean endOfBatch) throws

@org.junit.Test
public void testPoller() throws ExecutionException, InterruptedException {
RingBuffer<LogEvent> ringBuffer = RingBuffer.createMultiProducer(new LogEventFactory(), 1024);

//生产
ExecutorService es = Executors.newFixedThreadPool(ConfigInfo.thredNum.get()+3);
EventPoller<LogEvent> poller1 = ringBuffer.newPoller();
EventPoller<LogEvent> poller2 = ringBuffer.newPoller();
EventPoller<LogEvent> poller3 = ringBuffer.newPoller();
ringBuffer.addGatingSequences(poller1.getSequence());
ringBuffer.addGatingSequences(poller2.getSequence());
ringBuffer.addGatingSequences(poller3.getSequence());
es.submit(new PollCall(poller1));
es.submit(new PollCall(poller2));
es.submit( new PollCall(poller3) );
List<Future> list = new ArrayList<Future>();
int threadNum = ConfigInfo.thredNum.get();
for ( int i=0; i<threadNum; ++i ){
list.add( es.submit(new LogEventProducerWithTranslator(ringBuffer)) );
}
for ( Future f :list ){
f.get();
}

System.out.println( ConfigInfo.totalSize.get() );
assertTrue( new PollerExample().test() );
}
}

0 comments on commit d882454

Please sign in to comment.