Skip to content

Commit

Permalink
akka#17717 handle OnComplete in ActorSubscriber doc samples
Browse files Browse the repository at this point in the history
  • Loading branch information
2m authored and johanandren committed Oct 17, 2016
1 parent 0e60020 commit 1e85e17
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 20 deletions.
51 changes: 32 additions & 19 deletions akka-docs/rst/java/code/docs/stream/ActorSubscriberDocTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.duration.Duration;

import java.util.*;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;

Expand All @@ -48,7 +50,7 @@ public static void tearDown() {
system = null;
mat = null;
}

//#worker-pool
public static class WorkerPoolProtocol {

Expand All @@ -70,7 +72,7 @@ public static Msg msg(int id, ActorRef replyTo) {
return new Msg(id, replyTo);
}


public static class Work {
public final int id;
public Work(int id) { this.id = id; }
Expand All @@ -83,8 +85,8 @@ public String toString() {
public static Work work(int id) {
return new Work(id);
}


public static class Reply {
public final int id;
public Reply(int id) { this.id = id; }
Expand All @@ -97,8 +99,8 @@ public String toString() {
public static Reply reply(int id) {
return new Reply(id);
}


public static class Done {
public final int id;
public Done(int id) { this.id = id; }
Expand Down Expand Up @@ -136,16 +138,16 @@ public static Done done(int id) {
}

}

public static class WorkerPool extends AbstractActorSubscriber {

public static Props props() { return Props.create(WorkerPool.class); }

final int MAX_QUEUE_SIZE = 10;
final Map<Integer, ActorRef> queue = new HashMap<>();

final Router router;

@Override
public RequestStrategy requestStrategy() {
return new MaxInFlightRequestStrategy(MAX_QUEUE_SIZE) {
Expand All @@ -161,7 +163,7 @@ public WorkerPool() {
for (int i = 0; i < 3; i++)
routees.add(new ActorRefRoutee(context().actorOf(Props.create(Worker.class))));
router = new Router(new RoundRobinRoutingLogic(), routees);

receive(ReceiveBuilder.
match(ActorSubscriberMessage.OnNext.class, on -> on.element() instanceof WorkerPoolProtocol.Msg,
onNext -> {
Expand All @@ -170,18 +172,26 @@ public WorkerPool() {

if (queue.size() > MAX_QUEUE_SIZE)
throw new RuntimeException("queued too many: " + queue.size());

router.route(WorkerPoolProtocol.work(msg.id), self());
}).
match(ActorSubscriberMessage.onCompleteInstance().getClass(), complete -> {
if (queue.isEmpty()) {
context().stop(self());
}
}).
match(WorkerPoolProtocol.Reply.class, reply -> {
int id = reply.id;
queue.get(id).tell(WorkerPoolProtocol.done(id), self());
queue.remove(id);
if (canceled() && queue.isEmpty()) {
context().stop(self());
}
}).
build());
}
}

static class Worker extends AbstractActor {
public Worker() {
receive(ReceiveBuilder.
Expand All @@ -192,11 +202,11 @@ public Worker() {
}
}
//#worker-pool

@Test
public void demonstrateActorPublisherUsage() {
new JavaTestKit(system) {

{
final ActorRef replyTo = getTestActor();

Expand All @@ -206,12 +216,14 @@ public void demonstrateActorPublisherUsage() {
for (int i = 0; i < N; i++) {
data.add(i);
}
Source.from(data)

final ActorRef worker = Source.from(data)
.map(i -> WorkerPoolProtocol.msg(i, replyTo))
.runWith(Sink.<WorkerPoolProtocol.Msg>actorSubscriber(WorkerPool.props()), mat);
//#actor-subscriber-usage

watch(worker);

List<Object> got = Arrays.asList(receiveN(N));
Collections.sort(got, new Comparator<Object>() {
@Override
Expand All @@ -226,9 +238,10 @@ public int compare(Object o1, Object o2) {
assertEquals(String.format("Expected %d, but got %s", i, got.get(i)), WorkerPoolProtocol.done(i), got.get(i));
}
assertEquals(String.format("Expected 117 messages but got %d", i), i, 117);
expectTerminated(Duration.create(10, TimeUnit.SECONDS), worker);
}
};
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import akka.stream.actor.MaxInFlightRequestStrategy
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.testkit.AkkaSpec
import scala.concurrent.duration._

object ActorSubscriberDocSpec {
//#worker-pool
Expand Down Expand Up @@ -54,6 +55,13 @@ object ActorSubscriberDocSpec {
case Reply(id) =>
queue(id) ! Done(id)
queue -= id
if (canceled && queue.isEmpty) {
context.stop(self)
}
case OnComplete =>
if (queue.isEmpty) {
context.stop(self)
}
}
}

Expand All @@ -79,11 +87,13 @@ class ActorSubscriberDocSpec extends AkkaSpec {

//#actor-subscriber-usage
val N = 117
Source(1 to N).map(WorkerPool.Msg(_, replyTo))
val worker = Source(1 to N).map(WorkerPool.Msg(_, replyTo))
.runWith(Sink.actorSubscriber(WorkerPool.props))
//#actor-subscriber-usage

watch(worker)
receiveN(N).toSet should be((1 to N).map(WorkerPool.Done).toSet)
expectTerminated(worker, 10.seconds)
}

}

0 comments on commit 1e85e17

Please sign in to comment.