Skip to content

Commit

Permalink
[Java] Fix concurrency issues with fragment reporting in system tests…
Browse files Browse the repository at this point in the history
… and tidy up.
  • Loading branch information
mjpt777 committed Nov 15, 2016
1 parent d3d819e commit e4938db
Showing 1 changed file with 44 additions and 63 deletions.
107 changes: 44 additions & 63 deletions aeron-system-tests/src/test/java/io/aeron/PubAndSubTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.aeron.driver.MediaDriver;
import io.aeron.driver.ThreadingMode;
import org.agrona.CloseHelper;
import org.agrona.DirectBuffer;
import org.junit.After;
import org.junit.Assume;
Expand All @@ -40,6 +41,7 @@
import java.nio.channels.FileChannel;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.hamcrest.core.IsNot.not;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -94,30 +96,9 @@ private void launch(final String channel) throws Exception
@After
public void closeEverything() throws Exception
{
if (null != publication)
{
publication.close();
}

if (null != subscription)
{
subscription.close();
}

if (null != subscribingClient)
{
subscribingClient.close();
}

if (null != publishingClient)
{
publishingClient.close();
}

if (null != driver)
{
driver.close();
}
CloseHelper.quietClose(subscribingClient);
CloseHelper.quietClose(publishingClient);
CloseHelper.quietClose(driver);

context.deleteAeronDirectory();
}
Expand All @@ -137,12 +118,12 @@ public void shouldReceivePublishedMessage(final String channel) throws Exception

publishMessage();

final int fragmentsRead[] = new int[1];
final AtomicInteger fragmentsRead = new AtomicInteger();
SystemTestHelper.executeUntil(
() -> fragmentsRead[0] > 0,
() -> fragmentsRead.get() > 0,
(i) ->
{
fragmentsRead[0] += subscription.poll(fragmentHandler, 10);
fragmentsRead.getAndAdd(subscription.poll(fragmentHandler, 10));
Thread.yield();
},
Integer.MAX_VALUE,
Expand All @@ -163,12 +144,12 @@ public void shouldReceivePublishedMessageViaPollFile(final String channel) throw

publishMessage();

final int fragmentsRead[] = new int[1];
final AtomicInteger fragmentsRead = new AtomicInteger();
SystemTestHelper.executeUntil(
() -> fragmentsRead[0] > 0,
() -> fragmentsRead.get() > 0,
(i) ->
{
fragmentsRead[0] += subscription.rawPoll(rawBlockHandler, Integer.MAX_VALUE);
fragmentsRead.addAndGet((int)subscription.rawPoll(rawBlockHandler, Integer.MAX_VALUE));
Thread.yield();
},
Integer.MAX_VALUE,
Expand Down Expand Up @@ -221,12 +202,12 @@ public void shouldContinueAfterBufferRollover(final String channel) throws Excep
Thread.yield();
}

final int fragmentsRead[] = new int[1];
final AtomicInteger fragmentsRead = new AtomicInteger();
SystemTestHelper.executeUntil(
() -> fragmentsRead[0] > 0,
() -> fragmentsRead.get() > 0,
(j) ->
{
fragmentsRead[0] += subscription.poll(fragmentHandler, 10);
fragmentsRead.addAndGet(subscription.poll(fragmentHandler, 10));
Thread.yield();
},
Integer.MAX_VALUE,
Expand Down Expand Up @@ -263,12 +244,12 @@ public void shouldContinueAfterRolloverWithMinimalPaddingHeader(final String cha
Thread.yield();
}

final int fragmentsRead[] = new int[1];
final AtomicInteger fragmentsRead = new AtomicInteger();
SystemTestHelper.executeUntil(
() -> fragmentsRead[0] > 0,
() -> fragmentsRead.get() > 0,
(j) ->
{
fragmentsRead[0] += subscription.poll(fragmentHandler, 10);
fragmentsRead.getAndAdd(subscription.poll(fragmentHandler, 10));
Thread.yield();
},
Integer.MAX_VALUE,
Expand Down Expand Up @@ -419,12 +400,12 @@ public void shouldReceivePublishedMessageBatchedWithDataLoss(final String channe
}
}

final int fragmentsRead[] = new int[1];
final AtomicInteger fragmentsRead = new AtomicInteger();
SystemTestHelper.executeUntil(
() -> fragmentsRead[0] >= numMessagesPerBatch,
() -> fragmentsRead.get() >= numMessagesPerBatch,
(j) ->
{
fragmentsRead[0] += subscription.poll(fragmentHandler, 10);
fragmentsRead.addAndGet(subscription.poll(fragmentHandler, 10));
Thread.yield();
},
Integer.MAX_VALUE,
Expand Down Expand Up @@ -463,12 +444,12 @@ public void shouldContinueAfterBufferRolloverBatched(final String channel) throw
}
}

final int fragmentsRead[] = new int[1];
final AtomicInteger fragmentsRead = new AtomicInteger();
SystemTestHelper.executeUntil(
() -> fragmentsRead[0] >= numMessagesPerBatch,
() -> fragmentsRead.get() >= numMessagesPerBatch,
(j) ->
{
fragmentsRead[0] += subscription.poll(fragmentHandler, 10);
fragmentsRead.addAndGet(subscription.poll(fragmentHandler, 10));
Thread.yield();
},
Integer.MAX_VALUE,
Expand All @@ -480,12 +461,12 @@ public void shouldContinueAfterBufferRolloverBatched(final String channel) throw
Thread.yield();
}

final int fragmentsRead[] = new int[1];
final AtomicInteger fragmentsRead = new AtomicInteger();
SystemTestHelper.executeUntil(
() -> fragmentsRead[0] > 0,
() -> fragmentsRead.get() > 0,
(j) ->
{
fragmentsRead[0] += subscription.poll(fragmentHandler, 10);
fragmentsRead.addAndGet(subscription.poll(fragmentHandler, 10));
Thread.yield();
},
Integer.MAX_VALUE,
Expand Down Expand Up @@ -523,12 +504,12 @@ public void shouldContinueAfterBufferRolloverWithPadding(final String channel) t
Thread.yield();
}

final int fragmentsRead[] = new int[1];
final AtomicInteger fragmentsRead = new AtomicInteger();
SystemTestHelper.executeUntil(
() -> fragmentsRead[0] > 0,
() -> fragmentsRead.get() > 0,
(j) ->
{
fragmentsRead[0] += subscription.poll(fragmentHandler, 10);
fragmentsRead.addAndGet(subscription.poll(fragmentHandler, 10));
Thread.yield();
},
Integer.MAX_VALUE,
Expand Down Expand Up @@ -572,12 +553,12 @@ public void shouldContinueAfterBufferRolloverWithPaddingBatched(final String cha
}
}

final int fragmentsRead[] = new int[1];
final AtomicInteger fragmentsRead = new AtomicInteger();
SystemTestHelper.executeUntil(
() -> fragmentsRead[0] >= numMessagesPerBatch,
() -> fragmentsRead.get() >= numMessagesPerBatch,
(j) ->
{
fragmentsRead[0] += subscription.poll(fragmentHandler, 10);
fragmentsRead.addAndGet(subscription.poll(fragmentHandler, 10));
Thread.yield();
},
Integer.MAX_VALUE,
Expand Down Expand Up @@ -630,13 +611,13 @@ public void shouldReceiveOnlyAfterSendingUpToFlowControlLimit(final String chann
messagesSent++;
}

final int fragmentsRead[] = new int[1];
final AtomicInteger fragmentsRead = new AtomicInteger();
final int messagesToReceive = messagesSent;
SystemTestHelper.executeUntil(
() -> fragmentsRead[0] >= messagesToReceive,
() -> fragmentsRead.get() >= messagesToReceive,
(j) ->
{
fragmentsRead[0] += subscription.poll(fragmentHandler, 10);
fragmentsRead.addAndGet(subscription.poll(fragmentHandler, 10));
Thread.yield();
},
Integer.MAX_VALUE,
Expand Down Expand Up @@ -680,12 +661,12 @@ public void shouldReceivePublishedMessageOneForOneWithReSubscription(final Strin
Thread.yield();
}

final int fragmentsRead[] = new int[1];
final AtomicInteger fragmentsRead = new AtomicInteger();
SystemTestHelper.executeUntil(
() -> fragmentsRead[0] > 0,
() -> fragmentsRead.get() > 0,
(j) ->
{
fragmentsRead[0] += subscription.poll(fragmentHandler, 10);
fragmentsRead.addAndGet(subscription.poll(fragmentHandler, 10));
Thread.yield();
},
Integer.MAX_VALUE,
Expand All @@ -705,12 +686,12 @@ public void shouldReceivePublishedMessageOneForOneWithReSubscription(final Strin
Thread.yield();
}

final int fragmentsRead[] = new int[1];
final AtomicInteger fragmentsRead = new AtomicInteger();
SystemTestHelper.executeUntil(
() -> fragmentsRead[0] > 0,
() -> fragmentsRead.get() > 0,
(j) ->
{
fragmentsRead[0] += subscription.poll(fragmentHandler, 10);
fragmentsRead.addAndGet(subscription.poll(fragmentHandler, 10));
Thread.yield();
},
Integer.MAX_VALUE,
Expand Down Expand Up @@ -751,12 +732,12 @@ public void shouldFragmentExactMessageLengthsCorrectly(final String channel) thr
}
}

final int fragmentsRead[] = new int[1];
final AtomicInteger fragmentsRead = new AtomicInteger();
SystemTestHelper.executeUntil(
() -> fragmentsRead[0] > numFramesToExpect,
() -> fragmentsRead.get() > numFramesToExpect,
(j) ->
{
fragmentsRead[0] += subscription.poll(fragmentHandler, 10);
fragmentsRead.getAndAdd(subscription.poll(fragmentHandler, 10));
Thread.yield();
},
Integer.MAX_VALUE,
Expand Down

0 comments on commit e4938db

Please sign in to comment.