Skip to content

Commit

Permalink
Clean up formatting and make use of more specific exceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
johnpr01 committed May 4, 2015
1 parent 2f3f0ff commit 87f7d48
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package uk.co.real_logic.aeron.tools.perf_tools;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.PrintWriter;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -70,7 +71,7 @@ public AeronPing(final String[] args)
{
parseArgs(args);
}
catch (final Exception e)
catch (final ParseException e)
{
e.printStackTrace();
}
Expand All @@ -83,7 +84,7 @@ public AeronPing(final String[] args)
connectionLatch = new CountDownLatch(1);
buffer = new UnsafeBuffer(ByteBuffer.allocateDirect(msgLen));
timestamps = new long[2][numMsgs];
this.claim = claim;

if (claim)
{
bufferClaim = new BufferClaim();
Expand All @@ -96,7 +97,7 @@ public void connect()
{
connectionLatch.await();
}
catch (final Exception e)
catch (final InterruptedException e)
{
e.printStackTrace();
}
Expand Down Expand Up @@ -182,12 +183,12 @@ public void dumpStats()
}
stdDev = Math.sqrt(sum / tmp.length);

dumpPercentileData(min, max, .9);
dumpPercentileData(min, max, .99);
dumpPercentileData(min, max, .999);
dumpPercentileData(min, max, .9999);
dumpPercentileData(min, max, .99999);
dumpPercentileData(min, max, .999999);
dumpPercentileData(.9);
dumpPercentileData(.99);
dumpPercentileData(.999);
dumpPercentileData(.9999);
dumpPercentileData(.99999);
dumpPercentileData(.999999);

System.out.println("Num Messages: " + numMsgs);
System.out.println("Message Length: " + msgLen);
Expand Down Expand Up @@ -228,10 +229,8 @@ private void parseArgs(final String[] args) throws ParseException
}
}

private void dumpPercentileData(final double min, final double max, final double percentile)
private void dumpPercentileData(final double percentile)
{
final int width = 390;
final int height = 370;
final int num = (int)((numMsgs - 1) * percentile);
final double newMax = sorted[num];

Expand All @@ -248,10 +247,14 @@ private void dumpPercentileData(final double min, final double max, final double
}
out.close();
}
catch (final Exception e)
catch (final FileNotFoundException e)
{
e.printStackTrace();
}
catch (final SecurityException se)
{
se.printStackTrace();
}
}

public void dumpData()
Expand All @@ -266,23 +269,34 @@ public void dumpData()
}
out.close();
}
catch (final Exception e)
catch (final FileNotFoundException e)
{
e.printStackTrace();
}
catch (final SecurityException se)
{
se.printStackTrace();
}
}

public void onNewConnection(final String channel, final int streamId,
final int sessionId, final long position, final String sourceInfo)
public void onNewConnection(
final String channel,
final int streamId,
final int sessionId,
final long position,
final String sourceInfo)
{
if (channel.equals(pongChannel) && pongStreamId == streamId)
{
connectionLatch.countDown();
}
}

private void pongHandler(final DirectBuffer buffer, final int offset, final int length,
final Header header)
private void pongHandler(
final DirectBuffer buffer,
final int offset,
final int length,
final Header header)
{
if (buffer.getByte(offset + 0) == (byte)'p')
{
Expand Down Expand Up @@ -317,32 +331,23 @@ private void sendPingAndReceivePongClaim()
{
if (pub.tryClaim(msgLen, bufferClaim) > 0)
{
try
final MutableDirectBuffer buffer = bufferClaim.buffer();
final int offset = bufferClaim.offset();
if (!warmedUp)
{
final MutableDirectBuffer buffer = bufferClaim.buffer();
final int offset = bufferClaim.offset();
if (!warmedUp)
{
buffer.putByte(offset + 0, (byte) 'w');
}
else
{
buffer.putByte(offset + 0, (byte) 'p');
buffer.putInt(offset + 1, msgCount);
timestamps[0][msgCount++] = System.nanoTime();
}
buffer.putByte(offset + 0, (byte) 'w');
}
catch (final Exception e)
else
{
e.printStackTrace();
buffer.putByte(offset + 0, (byte) 'p');
buffer.putInt(offset + 1, msgCount);
timestamps[0][msgCount++] = System.nanoTime();
}
finally

bufferClaim.commit();
while (sub.poll(1) <= 0)
{
bufferClaim.commit();
while (sub.poll(1) <= 0)
{

}
}
}
else
Expand All @@ -351,13 +356,6 @@ private void sendPingAndReceivePongClaim()
}
}

private double round(final double unrounded, final int precision, final int roundingMode)
{
final BigDecimal bd = new BigDecimal(unrounded);
final BigDecimal rounded = bd.setScale(precision, roundingMode);
return rounded.doubleValue();
}

public static void main(final String[] args)
{
final AeronPing ping = new AeronPing(args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public AeronPong(final String[] args)
{
parseArgs(args);
}
catch (final Exception e)
catch (final ParseException e)
{
e.printStackTrace();
}
Expand All @@ -71,7 +71,7 @@ public AeronPong(final String[] args)
aeron = Aeron.connect(ctx);
pongPub = aeron.addPublication(pongChannel, pongStreamId);
pingSub = aeron.addSubscription(pingChannel, pingStreamId, dataHandler);
this.claim = claim;

if (claim)
{
bufferClaim = new BufferClaim();
Expand Down Expand Up @@ -142,19 +142,10 @@ private void pingHandlerClaim(final DirectBuffer buffer, final int offset, final
}
if (pongPub.tryClaim(length, bufferClaim) >= 0)
{
try
{
final MutableDirectBuffer newBuffer = bufferClaim.buffer();
newBuffer.putBytes(bufferClaim.offset(), buffer, offset, length);
}
catch (final Exception e)
{
e.printStackTrace();
}
finally
{
bufferClaim.commit();
}
final MutableDirectBuffer newBuffer = bufferClaim.buffer();
newBuffer.putBytes(bufferClaim.offset(), buffer, offset, length);

bufferClaim.commit();
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@
import uk.co.real_logic.agrona.concurrent.UnsafeBuffer;
import uk.co.real_logic.agrona.console.ContinueBarrier;

/**
* Created by philip on 4/22/15.
*/
public class AeronThroughput
{
private static final int STREAM_ID = 10;
Expand All @@ -54,17 +51,12 @@ public class AeronThroughput
private static long iterations = 0;

public static void printRate(
final double messagesPerSec,
final double bytesPerSec,
final long totalMessages,
final long totalBytes)
final double messagesPerSec,
final double bytesPerSec,
final long totalMessages,
final long totalBytes)
{
// System.out.println(
// String.format(
// "%s msgs/sec, %s/sec, totals %s messages %s",
// humanReadableCount((long)messagesPerSec, true),
// humanReadableByteCount((long)bytesPerSec, false),
// humanReadableCount(totalMessages, true), humanReadableByteCount(totalBytes, false)));

}

public static DataHandler rateReporterHandler(final RateReporter reporter)
Expand All @@ -80,7 +72,9 @@ public static Consumer<Subscription> subscriberLoop(final int limit, final Atomi
}

public static Consumer<Subscription> subscriberLoop(
final int limit, final AtomicBoolean running, final IdleStrategy idleStrategy)
final int limit,
final AtomicBoolean running,
final IdleStrategy idleStrategy)
{
return
(subscription) ->
Expand All @@ -100,18 +94,6 @@ public static Consumer<Subscription> subscriberLoop(
};
}

private static String humanReadableByteCount(final long bytes, final boolean si)
{
final int unit = si ? 1000 : 1024;
if (bytes < unit)
{
return bytes + "B";
}
final int exp = (int)(Math.log(bytes) / Math.log(unit));
final String pre = (si ? "kMGTPE" : "KMGTPE").charAt(exp - 1) + (si ? "" : "i");
return String.format("%.2f%sB", bytes / Math.pow(unit, exp), pre);
}

private static String humanReadableCount(final long val, final boolean si)
{
final int unit = si ? 1000 : 1024;
Expand Down Expand Up @@ -153,34 +135,23 @@ public static void main(final String[] args) throws Exception
executor.execute(reporter);
executor.execute(() -> AeronThroughput.subscriberLoop(FRAGMENT_COUNT_LIMIT, running).accept(subscription));

final ContinueBarrier barrier = new ContinueBarrier("Execute again?");

//System.out.format(
// "\nStreaming %,d messages of size %d bytes to %s on stream Id %d\n",
// NUMBER_OF_MESSAGES, MESSAGE_LENGTH, CHANNEL, STREAM_ID);

long backPressureCount = 0;

final long start = System.currentTimeMillis();
for (long i = 0; i < NUMBER_OF_MESSAGES; i++)
{
ATOMIC_BUFFER.putLong(0, i);

while (publication.offer(ATOMIC_BUFFER, 0, ATOMIC_BUFFER.capacity()) < 0)
{
backPressureCount++;
OFFER_IDLE_STRATEGY.idle(0);
}
}
final long stop = System.currentTimeMillis();
System.out.println("Average throughput for " +
MESSAGE_LENGTH + " byte messages was: " +
humanReadableCount(NUMBER_OF_MESSAGES / ((stop - start) / 1000), false) + " msgs/sec");
//System.out.println("Done streaming. backPressureRatio=" + ((double)backPressureCount / NUMBER_OF_MESSAGES));

if (0 < LINGER_TIMEOUT_MS)
{
//System.out.println("Lingering for " + LINGER_TIMEOUT_MS + " milliseconds...");
Thread.sleep(LINGER_TIMEOUT_MS);
}

Expand Down
Loading

0 comments on commit 87f7d48

Please sign in to comment.