Skip to content

Commit

Permalink
[Java] Await subscription connect in archiver setup.
Browse files Browse the repository at this point in the history
Test should work consistently now.
  • Loading branch information
nitsanw committed Feb 6, 2017
1 parent 306105d commit 759dcea
Showing 1 changed file with 37 additions and 23 deletions.
60 changes: 37 additions & 23 deletions aeron-archiver/src/test/java/io/aeron/archiver/SystemTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,21 @@
*/
package io.aeron.archiver;

import io.aeron.Aeron;
import io.aeron.Publication;
import io.aeron.Subscription;
import io.aeron.*;
import io.aeron.archiver.messages.*;
import io.aeron.driver.MediaDriver;
import io.aeron.driver.ThreadingMode;
import io.aeron.driver.*;
import io.aeron.logbuffer.FragmentHandler;
import io.aeron.protocol.DataHeaderFlyweight;
import org.agrona.CloseHelper;
import org.agrona.concurrent.UnsafeBuffer;
import org.junit.*;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.io.*;
import java.nio.*;
import java.nio.channels.FileChannel;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.LockSupport;

import static org.mockito.Mockito.mock;

@Ignore
public class SystemTest
{
Expand All @@ -56,7 +48,7 @@ public class SystemTest
private MediaDriver driver;

private UnsafeBuffer buffer = new UnsafeBuffer(new byte[4096]);
private FragmentHandler pongHandler = mock(FragmentHandler.class);

private File archiveFolder;
private int streamInstanceId;
private String source;
Expand Down Expand Up @@ -168,20 +160,16 @@ public void archiveAndReplay() throws IOException, InterruptedException
final Subscription archiverNotifications = publishingClient.addSubscription(
archiverCtx.archiverNotificationsChannel(), archiverCtx.archiverNotificationsStreamId());

while (!archiverServiceRequest.isConnected())
{
LockSupport.parkNanos(1000);
}
awaitPublicationIsConnected(archiverServiceRequest, 1000);
awaitSubscriptionIsConnected(archiverNotifications, 1000);
println("Archive service connected");

requestArchive(archiverServiceRequest, PUBLISH_URI, PUBLISH_STREAM_ID);
println("Archive requested");

final Publication publication = publishingClient.addPublication(PUBLISH_URI, PUBLISH_STREAM_ID);
while (!publication.isConnected())
{
LockSupport.parkNanos(1000);
}
awaitPublicationIsConnected(publication, 1000);


// the archiver has subscribed to the pingPub, now we wait for the archive start message
poll(archiverNotifications,
Expand Down Expand Up @@ -393,9 +381,7 @@ private void trackArchiveProgress(
final long end = System.currentTimeMillis();
final long deltaTime = end - start;

start = end;
final long deltaBytes = delivered - startBytes;
startBytes = delivered;
final double mbps = ((deltaBytes * 1000.0) / deltaTime) / (1024.0 * 1024.0);
printf("Archive reported speed: %f MB/s \n", mbps);

Expand Down Expand Up @@ -450,6 +436,34 @@ private void offer(
}
}

private void awaitSubscriptionIsConnected(final Subscription subscription,
final long timeout)
{
final long limit = System.currentTimeMillis() + timeout;
while (subscription.imageCount() == 0)
{
LockSupport.parkNanos(1000);
if (limit < System.currentTimeMillis())
{
Assert.fail("awaitSubscriptionIsConnected has timed out");
}
}
}

private void awaitPublicationIsConnected(final Publication publication,
final long timeout)
{
final long limit = System.currentTimeMillis() + timeout;
while (!publication.isConnected())
{
LockSupport.parkNanos(1000);
if (limit < System.currentTimeMillis())
{
Assert.fail("awaitPublicationIsConnected has timed out");
}
}
}

private void printf(final String s, final Object... args)
{
if (DEBUG)
Expand Down

0 comments on commit 759dcea

Please sign in to comment.