Skip to content

Commit

Permalink
[FLINK-5528] [tests] Reduce query retry delay in QueryableStateITCase
Browse files Browse the repository at this point in the history
Using 100ms instead of the 1s previously used does not impose too much
additional query load and reduces the test suite's duration from 16-20s to
13-15s on my machine with the current set of unit tests. Further reductions
in the retry delay do not yield more improvements so far.
  • Loading branch information
Nico Kruber authored and uce committed Jan 20, 2017
1 parent f266e82 commit 8d64263
Showing 1 changed file with 7 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
public class QueryableStateITCase extends TestLogger {

private final static FiniteDuration TEST_TIMEOUT = new FiniteDuration(100, TimeUnit.SECONDS);
private final static FiniteDuration QUERY_RETRY_DELAY = new FiniteDuration(100, TimeUnit.MILLISECONDS);

private final static ActorSystem TEST_ACTOR_SYSTEM = AkkaUtils.createDefaultActorSystem();

Expand Down Expand Up @@ -200,8 +201,6 @@ public Integer getKey(Tuple2<Integer, Long> value) throws Exception {

final AtomicLongArray counts = new AtomicLongArray(numKeys);

final FiniteDuration retryDelay = new FiniteDuration(1, TimeUnit.SECONDS);

boolean allNonZero = false;
while (!allNonZero && deadline.hasTimeLeft()) {
allNonZero = true;
Expand Down Expand Up @@ -230,7 +229,7 @@ public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
queryName,
key,
serializedKey,
retryDelay);
QUERY_RETRY_DELAY);

serializedResult.onSuccess(new OnSuccess<byte[]>() {
@Override
Expand Down Expand Up @@ -347,14 +346,13 @@ public Integer getKey(Tuple2<Integer, Long> value) throws Exception {

boolean success = false;
while (!success && deadline.hasTimeLeft()) {
final FiniteDuration retryDelay = new FiniteDuration(1, TimeUnit.SECONDS);
Future<byte[]> serializedResultFuture = getKvStateWithRetries(
client,
jobId,
queryName,
key,
serializedKey,
retryDelay);
QUERY_RETRY_DELAY);

byte[] serializedResult = Await.result(serializedResultFuture, deadline.timeLeft());

Expand Down Expand Up @@ -451,14 +449,13 @@ public ExecutionGraph apply(ExecutionGraphFound found) {
// Now start another task manager
cluster.addTaskManager();

final FiniteDuration retryDelay = new FiniteDuration(1, TimeUnit.SECONDS);
Future<byte[]> serializedResultFuture = getKvStateWithRetries(
client,
jobId,
queryName,
key,
serializedKey,
retryDelay);
QUERY_RETRY_DELAY);

byte[] serializedResult = Await.result(serializedResultFuture, deadline.timeLeft());

Expand Down Expand Up @@ -719,7 +716,7 @@ private void executeValueQuery(final Deadline deadline,
final QueryableStateClient client, final JobID jobId,
final QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState,
final long expected) throws Exception {
FiniteDuration retryDelay = new FiniteDuration(1, TimeUnit.SECONDS);

for (int key = 0; key < NUM_SLOTS; key++) {
final byte[] serializedKey = KvStateRequestSerializer.serializeKeyAndNamespace(
key,
Expand All @@ -734,7 +731,7 @@ private void executeValueQuery(final Deadline deadline,
queryableState.getQueryableStateName(),
key,
serializedKey,
retryDelay);
QUERY_RETRY_DELAY);

byte[] serializedValue = Await.result(future, deadline.timeLeft());

Expand Down Expand Up @@ -872,7 +869,6 @@ public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
// Now query
String expected = Integer.toString(numElements * (numElements + 1) / 2);

FiniteDuration retryDelay = new FiniteDuration(1, TimeUnit.SECONDS);
for (int key = 0; key < NUM_SLOTS; key++) {
final byte[] serializedKey = KvStateRequestSerializer.serializeKeyAndNamespace(
key,
Expand All @@ -887,7 +883,7 @@ public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
queryableState.getQueryableStateName(),
key,
serializedKey,
retryDelay);
QUERY_RETRY_DELAY);

byte[] serializedValue = Await.result(future, deadline.timeLeft());

Expand Down

0 comments on commit 8d64263

Please sign in to comment.