Skip to content

Commit

Permalink
PERFORMANCE: Queue RW benchmark for in memory queue
Browse files Browse the repository at this point in the history
  • Loading branch information
original-brownbear committed Jun 10, 2017
1 parent 4926f5b commit 941446a
Showing 1 changed file with 59 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.logstash.Event;
import org.logstash.Timestamp;
import org.logstash.ackedqueue.Batch;
import org.logstash.ackedqueue.Queue;
import org.logstash.ackedqueue.Queueable;
import org.logstash.ackedqueue.Settings;
import org.logstash.ackedqueue.SettingsImpl;
import org.logstash.ackedqueue.io.ByteBufferPageIO;
import org.logstash.ackedqueue.io.CheckpointIOFactory;
import org.logstash.ackedqueue.io.FileCheckpointIO;
import org.logstash.ackedqueue.io.MemoryCheckpointIO;
import org.logstash.ackedqueue.io.MmapPageIO;
import org.logstash.ackedqueue.io.PageIOFactory;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
Expand Down Expand Up @@ -52,29 +55,38 @@ public class QueueRWBenchmark {

private static final Event EVENT = new Event();

private Queue queue;
private ArrayBlockingQueue<Event> queueArrayBlocking;

private Queue queuePersisted;

private Queue queueMemory;

private String path;

private ExecutorService exec;

@Setup
public void setUp() throws IOException, CloneNotSupportedException {
final Settings settings = settings();
final Settings settingsPersisted = settings(true);
EVENT.setField("Foo", "Bar");
EVENT.setField("Foo1", "Bar1");
EVENT.setField("Foo2", "Bar2");
EVENT.setField("Foo3", "Bar3");
EVENT.setField("Foo4", "Bar4");
path = settings.getDirPath();
queue = new Queue(settings);
queue.open();
path = settingsPersisted.getDirPath();
queuePersisted = new Queue(settingsPersisted);
queueArrayBlocking = new ArrayBlockingQueue<>(ACK_INTERVAL);
queueMemory = new Queue(settings(false));
queuePersisted.open();
queueMemory.open();
exec = Executors.newSingleThreadExecutor();
}

@TearDown
public void tearDown() throws IOException {
queue.close();
queuePersisted.close();
queueMemory.close();
queueArrayBlocking.clear();
FileUtils.deleteDirectory(new File(path));
exec.shutdownNow();
}
Expand All @@ -85,16 +97,36 @@ public final void readFromPersistedQueue(final Blackhole blackhole) throws Excep
final Future<?> future = exec.submit(() -> {
for (int i = 0; i < EVENTS_PER_INVOCATION; ++i) {
try {
final Event evnt = EVENT.clone();
evnt.setTimestamp(Timestamp.now());
this.queue.write(evnt);
} catch (final IOException | CloneNotSupportedException ex) {
this.queuePersisted.write(EVENT);
} catch (final IOException ex) {
throw new IllegalStateException(ex);
}
}
});
for (int i = 0; i < EVENTS_PER_INVOCATION / BATCH_SIZE; ++i) {
try (Batch batch = queue.readBatch(BATCH_SIZE)) {
try (Batch batch = queuePersisted.readBatch(BATCH_SIZE)) {
for (final Queueable elem : batch.getElements()) {
blackhole.consume(elem);
}
}
}
future.get();
}

@Benchmark
@OperationsPerInvocation(EVENTS_PER_INVOCATION)
public final void readFromMemoryQueue(final Blackhole blackhole) throws Exception {
final Future<?> future = exec.submit(() -> {
for (int i = 0; i < EVENTS_PER_INVOCATION; ++i) {
try {
this.queueMemory.write(EVENT);
} catch (final IOException ex) {
throw new IllegalStateException(ex);
}
}
});
for (int i = 0; i < EVENTS_PER_INVOCATION / BATCH_SIZE; ++i) {
try (Batch batch = queueMemory.readBatch(BATCH_SIZE)) {
for (final Queueable elem : batch.getElements()) {
blackhole.consume(elem);
}
Expand All @@ -106,20 +138,17 @@ public final void readFromPersistedQueue(final Blackhole blackhole) throws Excep
@Benchmark
@OperationsPerInvocation(EVENTS_PER_INVOCATION)
public final void readFromArrayBlockingQueue(final Blackhole blackhole) throws Exception {
final ArrayBlockingQueue<Event> arrayBlockingQueue = new ArrayBlockingQueue<>(ACK_INTERVAL);
final Future<?> future = exec.submit(() -> {
for (int i = 0; i < EVENTS_PER_INVOCATION; ++i) {
try {
final Event evnt = EVENT.clone();
evnt.setTimestamp(Timestamp.now());
arrayBlockingQueue.put(evnt);
} catch (final CloneNotSupportedException | InterruptedException ex) {
queueArrayBlocking.put(EVENT);
} catch (final InterruptedException ex) {
throw new IllegalStateException(ex);
}
}
});
for (int i = 0; i < EVENTS_PER_INVOCATION; ++i) {
blackhole.consume(arrayBlockingQueue.take());
blackhole.consume(queueArrayBlocking.take());
}
future.get();
}
Expand All @@ -132,14 +161,23 @@ public static void main(final String... args) throws RunnerException {
new Runner(opt).run();
}

private static Settings settings() {
private static Settings settings(final boolean persisted) {
final PageIOFactory pageIOFactory;
final CheckpointIOFactory checkpointIOFactory;
if (persisted) {
pageIOFactory = MmapPageIO::new;
checkpointIOFactory = FileCheckpointIO::new;
} else {
pageIOFactory = ByteBufferPageIO::new;
checkpointIOFactory = MemoryCheckpointIO::new;
}
return SettingsImpl.fileSettingsBuilder(Files.createTempDir().getPath())
.capacity(256 * 1024 * 1024)
.queueMaxBytes(Long.MAX_VALUE)
.elementIOFactory(MmapPageIO::new)
.elementIOFactory(pageIOFactory)
.checkpointMaxWrites(ACK_INTERVAL)
.checkpointMaxAcks(ACK_INTERVAL)
.checkpointIOFactory(FileCheckpointIO::new)
.checkpointIOFactory(checkpointIOFactory)
.elementClass(Event.class).build();
}
}

0 comments on commit 941446a

Please sign in to comment.