From 2af2740bd5251b7cc39a56f9b73ace1f1cfbb871 Mon Sep 17 00:00:00 2001 From: Anton Kalashnikov Date: Tue, 22 Jun 2021 20:16:33 +0300 Subject: [PATCH] [FLINK-23041][streaming] Calculation of timeout for switching from AC to UC is based on the global checkpoint start time rather than the local first barrier received time --- .../SingleCheckpointBarrierHandler.java | 16 ++-- .../AlternatingCheckpointsTest.java | 74 ++++++++++++++++++- 2 files changed, 80 insertions(+), 10 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java index 83c324c1c6b69..fcca333860d31 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java @@ -82,7 +82,6 @@ public class SingleCheckpointBarrierHandler extends CheckpointBarrierHandler { private CompletableFuture allBarriersReceivedFuture = new CompletableFuture<>(); private BarrierHandlerState currentState; - private long firstBarrierArrivalTime; private Cancellable currentAlignmentTimer; private final boolean alternating; @@ -271,6 +270,13 @@ public void processBarrierAnnouncement( } private void registerAlignmentTimer(CheckpointBarrier announcedBarrier) { + long alignedCheckpointTimeout = + announcedBarrier.getCheckpointOptions().getAlignmentTimeout(); + long timePassedSinceCheckpointStart = + getClock().absoluteTimeMillis() - announcedBarrier.getTimestamp(); + + long timerDelay = Math.max(alignedCheckpointTimeout - timePassedSinceCheckpointStart, 0); + this.currentAlignmentTimer = registerTimer.apply( () -> { @@ -290,8 +296,7 @@ private void registerAlignmentTimer(CheckpointBarrier announcedBarrier) { currentAlignmentTimer = null; return null; }, - Duration.ofMillis( - announcedBarrier.getCheckpointOptions().getAlignmentTimeout())); + Duration.ofMillis(timerDelay)); } private void checkNewCheckpoint(CheckpointBarrier barrier) throws IOException { @@ -306,7 +311,6 @@ private void checkNewCheckpoint(CheckpointBarrier barrier) throws IOException { currentCheckpointId = barrierId; numBarriersReceived = 0; allBarriersReceivedFuture = new CompletableFuture<>(); - firstBarrierArrivalTime = getClock().relativeTimeNanos(); if (alternating && barrier.getCheckpointOptions().isTimeoutable()) { registerAlignmentTimer(barrier); @@ -432,8 +436,8 @@ public void triggerGlobalCheckpoint(CheckpointBarrier checkpointBarrier) public boolean isTimedOut(CheckpointBarrier barrier) { return barrier.getCheckpointOptions().isTimeoutable() && barrier.getId() <= currentCheckpointId - && barrier.getCheckpointOptions().getAlignmentTimeout() * 1_000_000 - < (getClock().relativeTimeNanos() - firstBarrierArrivalTime); + && barrier.getCheckpointOptions().getAlignmentTimeout() + < (getClock().absoluteTimeMillis() - barrier.getTimestamp()); } @Override diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java index d4acc1f4876b7..456e8af72ee1b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java @@ -353,6 +353,30 @@ numChannels, getTestBarrierHandlerFactory(target)) assertEquals(1, target.getTriggeredCheckpointCounter()); } + @Test + public void testTimeoutAlignmentBeforeFirstBarrier() throws Exception { + // given: Local channels. + int numChannels = 2; + ValidatingCheckpointHandler target = new ValidatingCheckpointHandler(); + CheckpointedInputGate gate = + new TestCheckpointedInputGateBuilder( + numChannels, getTestBarrierHandlerFactory(target)) + .withTestChannels() + .withMailboxExecutor() + .build(); + + long alignedCheckpointTimeout = 100; + // when: Aligned checkpoint timeout expired before the first barrier received. + Buffer checkpointBarrier = withTimeout(1, alignedCheckpointTimeout); + clock.advanceTime(alignedCheckpointTimeout + 1, TimeUnit.MILLISECONDS); + + ((TestInputChannel) gate.getChannel(0)).read(checkpointBarrier.retainBuffer()); + + // then: The UC is triggered as soon as the first barrier is received. + assertBarrier(gate); + assertEquals(1, target.getTriggeredCheckpointCounter()); + } + @Test public void testTimeoutAlignmentWhenLocalBarrierFirst() throws Exception { // given: Gate with remote and local channels. @@ -736,6 +760,48 @@ numChannels, getTestBarrierHandlerFactory(target)) } } + @Test + public void testActiveTimeoutBeforeFirstAnnouncementPassiveTimeout() throws Exception { + // given: Two barriers from two channels. + int numChannels = 2; + ValidatingCheckpointHandler target = new ValidatingCheckpointHandler(); + try (CheckpointedInputGate gate = + new TestCheckpointedInputGateBuilder( + numChannels, getTestBarrierHandlerFactory(target)) + .withRemoteChannels() + .withMailboxExecutor() + .build()) { + long alignmentCheckpointTimeout = 10; + Buffer checkpointBarrier = withTimeout(alignmentCheckpointTimeout); + + getChannel(gate, 0).onBuffer(dataBuffer(), 0, 0); + getChannel(gate, 0).onBuffer(checkpointBarrier.retainBuffer(), 1, 0); + getChannel(gate, 1).onBuffer(dataBuffer(), 0, 0); + getChannel(gate, 1).onBuffer(checkpointBarrier.retainBuffer(), 1, 0); + + assertEquals(0, target.getTriggeredCheckpointCounter()); + + // when: The receiving of the first announcement is delayed on more than alignment + // checkpoint timeout. + clock.advanceTimeWithoutRunningCallables( + alignmentCheckpointTimeout + 1, TimeUnit.MILLISECONDS); + + assertAnnouncement(gate); + // we simulate active time out firing after the passive one + clock.executeCallables(); + + // then: Barriers should be reprioritized and the UC should be triggered. + assertAnnouncement(gate); + assertBarrier(gate); + assertBarrier(gate); + assertEquals(1, target.getTriggeredCheckpointCounter()); + assertThat(target.getTriggeredCheckpointOptions(), contains(unaligned(getDefault()))); + // Followed by overtaken buffers + assertData(gate); + assertData(gate); + } + } + @Test public void testActiveTimeoutAfterBarrierPassiveTimeout() throws Exception { int numChannels = 2; @@ -1112,15 +1178,15 @@ private void send(Buffer buffer, InputChannel channel, CheckpointedInputGate che while (checkpointedGate.pollNext().isPresent()) {} } - private Buffer withTimeout(long alignmentTimeout) throws IOException { - return withTimeout(1, alignmentTimeout); + private Buffer withTimeout(long alignedCheckpointTimeout) throws IOException { + return withTimeout(1, alignedCheckpointTimeout); } - private Buffer withTimeout(int checkpointId, long alignmentTimeout) throws IOException { + private Buffer withTimeout(int checkpointId, long alignedCheckpointTimeout) throws IOException { return barrier( checkpointId, clock.relativeTimeMillis(), - alignedWithTimeout(getDefault(), alignmentTimeout)); + alignedWithTimeout(getDefault(), alignedCheckpointTimeout)); } private Buffer barrier(long barrierId, long barrierTimestamp, CheckpointOptions options)