From 0cbd5e3115833ecb88b43ce7d9c9ef776f61e281 Mon Sep 17 00:00:00 2001 From: Zhijiang Date: Thu, 4 Jun 2020 05:10:09 +0200 Subject: [PATCH] [FLINK-18063][checkpointing][refactoring] Implement default #isBlocked method in CheckpointBarrierHandler Simplify the implementations of CheckpointBarrierTracker and CheckpointBarrierUnaligner to reuse the parent default implementation. This closes #12460. --- .../streaming/runtime/io/CheckpointBarrierHandler.java | 4 +++- .../streaming/runtime/io/CheckpointBarrierTracker.java | 5 ----- .../streaming/runtime/io/CheckpointBarrierUnaligner.java | 8 -------- 3 files changed, 3 insertions(+), 14 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java index 952af24aebbd1..b2dd24578b45b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java @@ -60,7 +60,9 @@ public CheckpointBarrierHandler(AbstractInvokable toNotifyOnCheckpoint) { * @param channelIndex The channel index to check. * @return True if the channel is blocked, false if not. */ - public abstract boolean isBlocked(int channelIndex); + public boolean isBlocked(int channelIndex) { + return false; + } @Override public void close() throws IOException { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierTracker.java index 9a8f387414ea2..faf5a37bf8b93 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierTracker.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierTracker.java @@ -79,11 +79,6 @@ public CheckpointBarrierTracker(int totalNumberOfInputChannels, AbstractInvokabl public void releaseBlocksAndResetBarriers() { } - @Override - public boolean isBlocked(int channelIndex) { - return false; - } - @Override public void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception { final long barrierId = receivedBarrier.getId(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java index e89016b6113c1..91ec64ac8cc77 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java @@ -129,14 +129,6 @@ public void releaseBlocksAndResetBarriers() { threadSafeUnaligner.resetReceivedBarriers(currentConsumedCheckpointId); } - /** - * For unaligned checkpoint, it never blocks processing from the task aspect. - */ - @Override - public boolean isBlocked(int channelIndex) { - return false; - } - /** * We still need to trigger checkpoint via {@link ThreadSafeUnaligner#notifyBarrierReceived(CheckpointBarrier, InputChannelInfo)} * while reading the first barrier from one channel, because this might happen