Skip to content

Commit

Permalink
Use replace partial function with function in Java API
Browse files Browse the repository at this point in the history
  • Loading branch information
2m committed Aug 22, 2019
1 parent 9c2a413 commit 4b20217
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import akka.stream.alpakka.dynamodb.impl.scaladsl
import akka.stream.javadsl.{Flow, FlowWithContext, Keep}

import scala.concurrent.duration._
import scala.runtime.AbstractPartialFunction
import scala.util.Try
import scala.collection.JavaConverters._

Expand Down Expand Up @@ -48,7 +47,7 @@ object RetryFlow {
maxBackoff: java.time.Duration,
randomFactor: Double,
flow: Flow[Pair[In, State], Pair[Try[Out], State], Mat],
retryWith: AbstractPartialFunction[Pair[Try[Out], State], akka.japi.Option[util.Collection[Pair[In, State]]]]
retryWith: java.util.function.Function[Pair[Try[Out], State], akka.japi.Option[util.Collection[Pair[In, State]]]]
): Flow[akka.japi.Pair[In, State], akka.japi.Pair[Try[Out], State], Mat] = {
val retryFlow = scaladsl.RetryFlow
.withBackoffAndContext(parallelism,
Expand Down
94 changes: 36 additions & 58 deletions dynamodb/src/test/java/docs/javadsl/RetryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@

import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.japi.JavaPartialFunction;
import akka.japi.Option;
import akka.japi.Pair;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
Expand All @@ -24,7 +22,6 @@

import java.net.URI;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -104,42 +101,32 @@ public void retrySuccessfulRequests() throws Exception {
.toCompletableFuture()
.get(5, TimeUnit.SECONDS);

final JavaPartialFunction<
Pair<Try<BatchGetItemResponse>, NotUsed>,
Option<Collection<Pair<BatchGetItemRequest, NotUsed>>>>
retryMatcher =
new JavaPartialFunction<
Pair<Try<BatchGetItemResponse>, NotUsed>,
Option<Collection<Pair<BatchGetItemRequest, NotUsed>>>>() {
public Option<Collection<Pair<BatchGetItemRequest, NotUsed>>> apply(
Pair<Try<BatchGetItemResponse>, NotUsed> in, boolean isCheck) {
final Try<BatchGetItemResponse> response = in.first();
if (response.isSuccess()) {
final BatchGetItemResponse result = response.get();
if (result.unprocessedKeys().size() > 0) {
return some(
Collections.singleton(
Pair.create(
batchGetItemRequest(result.unprocessedKeys()),
NotUsed.getInstance())));
} else {
return none();
}
} else {
return none();
}
}
};

Flow<Pair<BatchGetItemRequest, NotUsed>, Pair<Try<BatchGetItemResponse>, NotUsed>, NotUsed>
final Flow<
Pair<BatchGetItemRequest, NotUsed>, Pair<Try<BatchGetItemResponse>, NotUsed>, NotUsed>
retryFlow =
RetryFlow.withBackoff(
8,
Duration.ofMillis(10),
Duration.ofSeconds(5),
0,
DynamoDb.tryFlow(client, DynamoDbOp.batchGetItem(), 1),
retryMatcher);
resp -> {
final Try<BatchGetItemResponse> response = resp.first();
if (response.isSuccess()) {
final BatchGetItemResponse result = response.get();
if (result.unprocessedKeys().size() > 0) {
return some(
Collections.singleton(
Pair.create(
batchGetItemRequest(result.unprocessedKeys()),
NotUsed.getInstance())));
} else {
return none();
}
} else {
return none();
}
});

final long responses =
Source.single(Pair.create(batchGetLargeItemRequest(1, 50), NotUsed.getInstance()))
Expand All @@ -154,32 +141,23 @@ public Option<Collection<Pair<BatchGetItemRequest, NotUsed>>> apply(
@Test
public void retryFailedRequests() throws Exception {
// #create-retry-flow
final JavaPartialFunction<
Pair<Try<GetItemResponse>, Integer>, Option<Collection<Pair<GetItemRequest, Integer>>>>
retryMatcher =
new JavaPartialFunction<
Pair<Try<GetItemResponse>, Integer>,
Option<Collection<Pair<GetItemRequest, Integer>>>>() {
public Option<Collection<Pair<GetItemRequest, Integer>>> apply(
Pair<Try<GetItemResponse>, Integer> in, boolean isCheck) {
final Try<GetItemResponse> response = in.first();
final Integer retries = in.second();
if (response.isFailure()) {
return some(Collections.singleton(Pair.create(getItemRequest(), retries + 1)));
} else {
return none();
}
}
};

Flow<Pair<GetItemRequest, Integer>, Pair<Try<GetItemResponse>, Integer>, NotUsed> retryFlow =
RetryFlow.withBackoff(
8,
Duration.ofMillis(10),
Duration.ofSeconds(5),
0,
DynamoDb.tryFlow(client, DynamoDbOp.getItem(), 1),
retryMatcher);
final Flow<Pair<GetItemRequest, Integer>, Pair<Try<GetItemResponse>, Integer>, NotUsed>
retryFlow =
RetryFlow.withBackoff(
8,
Duration.ofMillis(10),
Duration.ofSeconds(5),
0,
DynamoDb.tryFlow(client, DynamoDbOp.getItem(), 1),
resp -> {
final Try<GetItemResponse> response = resp.first();
final Integer retries = resp.second();
if (response.isFailure()) {
return some(Collections.singleton(Pair.create(getItemRequest(), retries + 1)));
} else {
return none();
}
});
// #create-retry-flow

// #use-retry-flow
Expand Down

0 comments on commit 4b20217

Please sign in to comment.