Skip to content

Commit

Permalink
[FLINK-4892] Add Key-Group Ranges Test in HeapInternalTimerServiceTest
Browse files Browse the repository at this point in the history
This checks whether key groups are correctly checkpointed and wether we
can correctly restore reassigned key-group ranges.
  • Loading branch information
aljoscha committed Oct 26, 2016
1 parent e3b5d33 commit 9dc2635
Showing 1 changed file with 113 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,119 @@ public void testSnapshotAndRestore() throws Exception {
assertEquals(0, timerService.numEventTimeTimers());
}

/**
* This test checks whether timers are assigned to correct key groups
* and whether snapshot/restore respects key groups.
*/
@Test
public void testSnapshotAndRebalancingRestore() throws Exception {
@SuppressWarnings("unchecked")
Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class);

TestKeyContext keyContext = new TestKeyContext();
TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
HeapInternalTimerService<Integer, String> timerService =
createTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, maxParallelism);

int midpoint = testKeyGroupRange.getStartKeyGroup() +
(testKeyGroupRange.getEndKeyGroup() - testKeyGroupRange.getStartKeyGroup()) / 2;

// get two sub key-ranges so that we can restore two ranges separately
KeyGroupRange subKeyGroupRange1 = new KeyGroupRange(testKeyGroupRange.getStartKeyGroup(), midpoint);
KeyGroupRange subKeyGroupRange2 = new KeyGroupRange(midpoint + 1, testKeyGroupRange.getEndKeyGroup());

// get two different keys, one per sub range
int key1 = getKeyInKeyGroupRange(subKeyGroupRange1, maxParallelism);
int key2 = getKeyInKeyGroupRange(subKeyGroupRange2, maxParallelism);

keyContext.setCurrentKey(key1);

timerService.registerProcessingTimeTimer("ciao", 10);
timerService.registerEventTimeTimer("hello", 10);

keyContext.setCurrentKey(key2);

timerService.registerEventTimeTimer("ciao", 10);
timerService.registerProcessingTimeTimer("hello", 10);

assertEquals(2, timerService.numProcessingTimeTimers());
assertEquals(1, timerService.numProcessingTimeTimers("hello"));
assertEquals(1, timerService.numProcessingTimeTimers("ciao"));
assertEquals(2, timerService.numEventTimeTimers());
assertEquals(1, timerService.numEventTimeTimers("hello"));
assertEquals(1, timerService.numEventTimeTimers("ciao"));

// one map per sub key-group range
Map<Integer, byte[]> snapshot1 = new HashMap<>();
Map<Integer, byte[]> snapshot2 = new HashMap<>();
for (Integer keyGroupIndex : testKeyGroupRange) {
ByteArrayOutputStream outStream = new ByteArrayOutputStream();
timerService.snapshotTimersForKeyGroup(new DataOutputViewStreamWrapper(outStream), keyGroupIndex);
outStream.close();
if (subKeyGroupRange1.contains(keyGroupIndex)) {
snapshot1.put(keyGroupIndex, outStream.toByteArray());
} else if (subKeyGroupRange2.contains(keyGroupIndex)) {
snapshot2.put(keyGroupIndex, outStream.toByteArray());
} else {
throw new IllegalStateException("Key-Group index doesn't belong to any sub range.");
}
}

// from now on we need everything twice. once per sub key-group range
@SuppressWarnings("unchecked")
Triggerable<Integer, String> mockTriggerable1 = mock(Triggerable.class);

@SuppressWarnings("unchecked")
Triggerable<Integer, String> mockTriggerable2 = mock(Triggerable.class);


TestKeyContext keyContext1 = new TestKeyContext();
TestKeyContext keyContext2 = new TestKeyContext();

TestProcessingTimeService processingTimeService1 = new TestProcessingTimeService();
TestProcessingTimeService processingTimeService2 = new TestProcessingTimeService();

HeapInternalTimerService<Integer, String> timerService1 = restoreTimerService(
snapshot1,
mockTriggerable1,
keyContext1,
processingTimeService1,
subKeyGroupRange1,
maxParallelism);

HeapInternalTimerService<Integer, String> timerService2 = restoreTimerService(
snapshot2,
mockTriggerable2,
keyContext2,
processingTimeService2,
subKeyGroupRange2,
maxParallelism);


processingTimeService1.setCurrentTime(10);
timerService1.advanceWatermark(10);

verify(mockTriggerable1, times(1)).onProcessingTime(anyInternalTimer());
verify(mockTriggerable1, times(1)).onProcessingTime(eq(new InternalTimer<>(10, key1, "ciao")));
verify(mockTriggerable1, never()).onProcessingTime(eq(new InternalTimer<>(10, key2, "hello")));
verify(mockTriggerable1, times(1)).onEventTime(anyInternalTimer());
verify(mockTriggerable1, times(1)).onEventTime(eq(new InternalTimer<>(10, key1, "hello")));
verify(mockTriggerable1, never()).onEventTime(eq(new InternalTimer<>(10, key2, "ciao")));

assertEquals(0, timerService1.numEventTimeTimers());

processingTimeService2.setCurrentTime(10);
timerService2.advanceWatermark(10);

verify(mockTriggerable2, times(1)).onProcessingTime(anyInternalTimer());
verify(mockTriggerable2, never()).onProcessingTime(eq(new InternalTimer<>(10, key1, "ciao")));
verify(mockTriggerable2, times(1)).onProcessingTime(eq(new InternalTimer<>(10, key2, "hello")));
verify(mockTriggerable2, times(1)).onEventTime(anyInternalTimer());
verify(mockTriggerable2, never()).onEventTime(eq(new InternalTimer<>(10, key1, "hello")));
verify(mockTriggerable2, times(1)).onEventTime(eq(new InternalTimer<>(10, key2, "ciao")));

assertEquals(0, timerService2.numEventTimeTimers());
}

private static class TestKeyContext implements KeyContext {

Expand Down

0 comments on commit 9dc2635

Please sign in to comment.