Skip to content

Commit

Permalink
[FLINK-23041][streaming] Calculation of timeout for switching from AC…
Browse files Browse the repository at this point in the history
… to UC is based on the global checkpoint start time rather than the local first barrier received time
  • Loading branch information
akalash authored and dawidwys committed Jun 29, 2021
1 parent 6e62a41 commit 2af2740
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ public class SingleCheckpointBarrierHandler extends CheckpointBarrierHandler {
private CompletableFuture<Void> allBarriersReceivedFuture = new CompletableFuture<>();

private BarrierHandlerState currentState;
private long firstBarrierArrivalTime;
private Cancellable currentAlignmentTimer;
private final boolean alternating;

Expand Down Expand Up @@ -271,6 +270,13 @@ public void processBarrierAnnouncement(
}

private void registerAlignmentTimer(CheckpointBarrier announcedBarrier) {
long alignedCheckpointTimeout =
announcedBarrier.getCheckpointOptions().getAlignmentTimeout();
long timePassedSinceCheckpointStart =
getClock().absoluteTimeMillis() - announcedBarrier.getTimestamp();

long timerDelay = Math.max(alignedCheckpointTimeout - timePassedSinceCheckpointStart, 0);

this.currentAlignmentTimer =
registerTimer.apply(
() -> {
Expand All @@ -290,8 +296,7 @@ private void registerAlignmentTimer(CheckpointBarrier announcedBarrier) {
currentAlignmentTimer = null;
return null;
},
Duration.ofMillis(
announcedBarrier.getCheckpointOptions().getAlignmentTimeout()));
Duration.ofMillis(timerDelay));
}

private void checkNewCheckpoint(CheckpointBarrier barrier) throws IOException {
Expand All @@ -306,7 +311,6 @@ private void checkNewCheckpoint(CheckpointBarrier barrier) throws IOException {
currentCheckpointId = barrierId;
numBarriersReceived = 0;
allBarriersReceivedFuture = new CompletableFuture<>();
firstBarrierArrivalTime = getClock().relativeTimeNanos();

if (alternating && barrier.getCheckpointOptions().isTimeoutable()) {
registerAlignmentTimer(barrier);
Expand Down Expand Up @@ -432,8 +436,8 @@ public void triggerGlobalCheckpoint(CheckpointBarrier checkpointBarrier)
public boolean isTimedOut(CheckpointBarrier barrier) {
return barrier.getCheckpointOptions().isTimeoutable()
&& barrier.getId() <= currentCheckpointId
&& barrier.getCheckpointOptions().getAlignmentTimeout() * 1_000_000
< (getClock().relativeTimeNanos() - firstBarrierArrivalTime);
&& barrier.getCheckpointOptions().getAlignmentTimeout()
< (getClock().absoluteTimeMillis() - barrier.getTimestamp());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,30 @@ numChannels, getTestBarrierHandlerFactory(target))
assertEquals(1, target.getTriggeredCheckpointCounter());
}

@Test
public void testTimeoutAlignmentBeforeFirstBarrier() throws Exception {
// given: Local channels.
int numChannels = 2;
ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
CheckpointedInputGate gate =
new TestCheckpointedInputGateBuilder(
numChannels, getTestBarrierHandlerFactory(target))
.withTestChannels()
.withMailboxExecutor()
.build();

long alignedCheckpointTimeout = 100;
// when: Aligned checkpoint timeout expired before the first barrier received.
Buffer checkpointBarrier = withTimeout(1, alignedCheckpointTimeout);
clock.advanceTime(alignedCheckpointTimeout + 1, TimeUnit.MILLISECONDS);

((TestInputChannel) gate.getChannel(0)).read(checkpointBarrier.retainBuffer());

// then: The UC is triggered as soon as the first barrier is received.
assertBarrier(gate);
assertEquals(1, target.getTriggeredCheckpointCounter());
}

@Test
public void testTimeoutAlignmentWhenLocalBarrierFirst() throws Exception {
// given: Gate with remote and local channels.
Expand Down Expand Up @@ -736,6 +760,48 @@ numChannels, getTestBarrierHandlerFactory(target))
}
}

@Test
public void testActiveTimeoutBeforeFirstAnnouncementPassiveTimeout() throws Exception {
// given: Two barriers from two channels.
int numChannels = 2;
ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
try (CheckpointedInputGate gate =
new TestCheckpointedInputGateBuilder(
numChannels, getTestBarrierHandlerFactory(target))
.withRemoteChannels()
.withMailboxExecutor()
.build()) {
long alignmentCheckpointTimeout = 10;
Buffer checkpointBarrier = withTimeout(alignmentCheckpointTimeout);

getChannel(gate, 0).onBuffer(dataBuffer(), 0, 0);
getChannel(gate, 0).onBuffer(checkpointBarrier.retainBuffer(), 1, 0);
getChannel(gate, 1).onBuffer(dataBuffer(), 0, 0);
getChannel(gate, 1).onBuffer(checkpointBarrier.retainBuffer(), 1, 0);

assertEquals(0, target.getTriggeredCheckpointCounter());

// when: The receiving of the first announcement is delayed on more than alignment
// checkpoint timeout.
clock.advanceTimeWithoutRunningCallables(
alignmentCheckpointTimeout + 1, TimeUnit.MILLISECONDS);

assertAnnouncement(gate);
// we simulate active time out firing after the passive one
clock.executeCallables();

// then: Barriers should be reprioritized and the UC should be triggered.
assertAnnouncement(gate);
assertBarrier(gate);
assertBarrier(gate);
assertEquals(1, target.getTriggeredCheckpointCounter());
assertThat(target.getTriggeredCheckpointOptions(), contains(unaligned(getDefault())));
// Followed by overtaken buffers
assertData(gate);
assertData(gate);
}
}

@Test
public void testActiveTimeoutAfterBarrierPassiveTimeout() throws Exception {
int numChannels = 2;
Expand Down Expand Up @@ -1112,15 +1178,15 @@ private void send(Buffer buffer, InputChannel channel, CheckpointedInputGate che
while (checkpointedGate.pollNext().isPresent()) {}
}

private Buffer withTimeout(long alignmentTimeout) throws IOException {
return withTimeout(1, alignmentTimeout);
private Buffer withTimeout(long alignedCheckpointTimeout) throws IOException {
return withTimeout(1, alignedCheckpointTimeout);
}

private Buffer withTimeout(int checkpointId, long alignmentTimeout) throws IOException {
private Buffer withTimeout(int checkpointId, long alignedCheckpointTimeout) throws IOException {
return barrier(
checkpointId,
clock.relativeTimeMillis(),
alignedWithTimeout(getDefault(), alignmentTimeout));
alignedWithTimeout(getDefault(), alignedCheckpointTimeout));
}

private Buffer barrier(long barrierId, long barrierTimestamp, CheckpointOptions options)
Expand Down

0 comments on commit 2af2740

Please sign in to comment.