Skip to content

Commit

Permalink
Fix Amqp pub-sub tests to not depend on broker timings
Browse files Browse the repository at this point in the history
  • Loading branch information
2m committed Nov 25, 2016
1 parent 89b5e5d commit 0cd8972
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import akka.stream.javadsl.*;
import akka.testkit.JavaTestKit;
import akka.util.ByteString;
import scala.concurrent.duration.Duration;

import java.util.*;
import java.util.concurrent.*;
Expand All @@ -28,8 +29,8 @@
*/
public class AmqpConnectorsTest {

static ActorSystem system;
static Materializer materializer;
private static ActorSystem system;
private static Materializer materializer;

@BeforeClass
public static void setup() {
Expand Down Expand Up @@ -70,7 +71,7 @@ public void publishAndConsume() throws Exception {

//#run-sink
final List<String> input = Arrays.asList("one", "two", "three", "four", "five");
Source.from(input).map(s -> ByteString.fromString(s)).runWith(amqpSink, materializer);
Source.from(input).map(ByteString::fromString).runWith(amqpSink, materializer);
//#run-sink

//#run-source
Expand All @@ -96,8 +97,6 @@ public void publishFanoutAndConsume() throws Exception {
);
//#create-exchange-sink

final List<String> input = Arrays.asList("one", "two", "three", "four", "five");

//#create-exchange-source
final Integer fanoutSize = 4;
final Integer bufferSize = 1;
Expand All @@ -118,28 +117,24 @@ public void publishFanoutAndConsume() throws Exception {
}
//#create-exchange-source

final CompletableFuture<Done> materialized = new CompletableFuture<>();
final CompletionStage<List<Pair<Integer, String>>> result = mergedSources
.take(input.size() * fanoutSize)
.mapMaterializedValue(matVal -> {
materialized.complete(Done.getInstance());
return matVal;
})
.runWith(Sink.seq(), materializer);

// There is a race here if we don`t make sure the sources has declared their subscription queues and bindings
// before we start writing to the exchange
materialized.get(3, TimeUnit.SECONDS);
Thread.sleep(200);
final CompletableFuture<Done> completion = new CompletableFuture<>();
mergedSources
.runWith(Sink.fold(new HashSet<Integer>(), (seen, branchElem) -> {
if (seen.size() == fanoutSize) {
completion.complete(Done.getInstance());
}
seen.add(branchElem.first());
return seen;
}), materializer);

Source.from(input).map(s -> ByteString.fromString(s)).runWith(amqpSink, materializer);
system.scheduler().scheduleOnce(
Duration.create(5, TimeUnit.SECONDS),
() -> completion.completeExceptionally(new Error("Did not get at least one element from every fanout branch")),
system.dispatcher());

final Set<Pair<Integer, String>> expectedResult = input
.stream()
.flatMap(str -> IntStream.range(0, fanoutSize).boxed().map(i -> Pair.create(i, str)))
.collect(Collectors.toSet());
Source.repeat("stuff").map(ByteString::fromString).runWith(amqpSink, materializer);

assertEquals(expectedResult, result.toCompletableFuture().get(10, TimeUnit.SECONDS).stream().collect(Collectors.toSet()));
assertEquals(Done.getInstance(), completion.get(10, TimeUnit.SECONDS));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ class AmqpConnectorsSpec extends AmqpSpec {

subscriber.cancel()
publisher.sendComplete()

}

"not ack messages unless they get consumed" in {
Expand Down Expand Up @@ -189,7 +188,6 @@ class AmqpConnectorsSpec extends AmqpSpec {
subscriber2.expectNext().bytes.utf8String shouldEqual "five"

subscriber2.cancel()

}

"pub-sub from one source with multiple sinks" in {
Expand All @@ -208,8 +206,6 @@ class AmqpConnectorsSpec extends AmqpSpec {
)
//#create-exchange-sink

val input = Vector("one", "two", "three", "four", "five")

//#create-exchange-source
val fanoutSize = 4

Expand All @@ -227,24 +223,20 @@ class AmqpConnectorsSpec extends AmqpSpec {
}
//#create-exchange-source

val materialized = Promise[Done]()
val futureResult = mergedSources
.take(input.size * fanoutSize)
.mapMaterializedValue { n =>
materialized.success(Done)
n
}
.runWith(Sink.seq)
val completion = Promise[Done]
mergedSources.runWith(Sink.fold(Set.empty[Int]) {
case (seen, (branch, element)) =>
if (seen.size == fanoutSize) completion.trySuccess(Done)
seen + branch
})

// There is a race here if we don`t make sure the sources has declared their subscription queues and bindings
// before we start writing to the exchange
materialized.future.futureValue
Thread.sleep(200)
import system.dispatcher
system.scheduler.scheduleOnce(5.seconds)(
completion.tryFailure(new Error("Did not get at least one element from every fanout branch")))

Source(input).map(s => ByteString(s)).runWith(amqpSink)
Source.repeat("stuff").map(s => ByteString(s)).runWith(amqpSink)

val expectedOutput = input.flatMap(string => (0 until fanoutSize).map(n => (n, string))).toSet
futureResult.futureValue.toSet shouldEqual expectedOutput
completion.future.futureValue shouldBe Done
}
}
}

0 comments on commit 0cd8972

Please sign in to comment.