diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java index 764d55851902d..429ccb7dbda92 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/BufferManager.java @@ -198,7 +198,11 @@ public void recycle(MemorySegment segment) { } } - inputChannel.notifyBufferAvailable(numAddedBuffers); + try { + inputChannel.notifyBufferAvailable(numAddedBuffers); + } catch (Throwable t) { + ExceptionUtils.rethrow(t); + } } void releaseFloatingBuffers() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java index 0c47272da0e30..975fa5baf06f3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java @@ -133,7 +133,7 @@ public ResultPartitionID getPartitionId() { * exactly-once mode, the upstream will be blocked and become unavailable. This method * tries to unblock the corresponding upstream and resume data consumption. */ - public abstract void resumeConsumption(); + public abstract void resumeConsumption() throws IOException; /** * Notifies the owning {@link SingleInputGate} that this channel became non-empty. @@ -154,7 +154,7 @@ protected void notifyChannelNonEmpty() { public void spillInflightBuffers(long checkpointId, ChannelStateWriter channelStateWriter) throws IOException { } - protected void notifyBufferAvailable(int numAvailableBuffers) { + protected void notifyBufferAvailable(int numAvailableBuffers) throws IOException { } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java index 0489fdee8673d..ff4691349ee93 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java @@ -114,7 +114,7 @@ public CompletableFuture getAvailableFuture() { return availabilityHelper.getAvailableFuture(); } - public abstract void resumeConsumption(int channelIndex); + public abstract void resumeConsumption(int channelIndex) throws IOException; /** * Returns the channel of this gate. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java index ba8fc11797b21..dfbbc6b41ee13 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java @@ -157,7 +157,7 @@ public void requestSubpartition(int subpartitionIndex) throws IOException, Inter * Retriggers a remote subpartition request. */ void retriggerSubpartitionRequest(int subpartitionIndex) throws IOException { - checkState(partitionRequestClient != null, "Missing initial subpartition request."); + checkPartitionRequestQueueInitialized(); if (increaseBackoff()) { partitionRequestClient.requestSubpartition( @@ -169,9 +169,7 @@ void retriggerSubpartitionRequest(int subpartitionIndex) throws IOException { @Override Optional getNextBuffer() throws IOException { - checkState(partitionRequestClient != null, "Queried for a buffer before requesting a queue."); - - checkError(); + checkPartitionRequestQueueInitialized(); final Buffer next; final boolean moreAvailable; @@ -227,9 +225,7 @@ public void spillInflightBuffers(long checkpointId, ChannelStateWriter channelSt @Override void sendTaskEvent(TaskEvent event) throws IOException { checkState(!isReleased.get(), "Tried to send task event to producer after channel has been released."); - checkState(partitionRequestClient != null, "Tried to send task event to producer before requesting a queue."); - - checkError(); + checkPartitionRequestQueueInitialized(); partitionRequestClient.sendTaskEvent(partitionId, event, this); } @@ -283,8 +279,8 @@ public String toString() { /** * Enqueue this input channel in the pipeline for notifying the producer of unannounced credit. */ - private void notifyCreditAvailable() { - checkState(partitionRequestClient != null, "Tried to send task event to producer before requesting a queue."); + private void notifyCreditAvailable() throws IOException { + checkPartitionRequestQueueInitialized(); partitionRequestClient.notifyCreditAvailable(this); } @@ -330,16 +326,16 @@ PartitionRequestClient getPartitionRequestClient() { * increased credit to the producer. */ @Override - public void notifyBufferAvailable(int numAvailableBuffers) { + public void notifyBufferAvailable(int numAvailableBuffers) throws IOException { if (numAvailableBuffers > 0 && unannouncedCredit.getAndAdd(numAvailableBuffers) == 0) { notifyCreditAvailable(); } } @Override - public void resumeConsumption() { + public void resumeConsumption() throws IOException { checkState(!isReleased.get(), "Channel released."); - checkState(partitionRequestClient != null, "Trying to send event to producer before requesting a queue."); + checkPartitionRequestQueueInitialized(); // notifies the producer that this channel is ready to // unblock from checkpoint and resume data consumption @@ -519,6 +515,12 @@ public void onError(Throwable cause) { setError(cause); } + private void checkPartitionRequestQueueInitialized() throws IOException { + checkError(); + checkState(partitionRequestClient != null, + "Bug: partitionRequestClient is not initialized before processing data and no error is detected."); + } + private static class BufferReorderingException extends IOException { private static final long serialVersionUID = -888282210356266816L; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index 0bd06c05e0edd..dcb4e2eb7e016 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -737,7 +737,7 @@ public void sendTaskEvent(TaskEvent event) throws IOException { } @Override - public void resumeConsumption(int channelIndex) { + public void resumeConsumption(int channelIndex) throws IOException { // BEWARE: consumption resumption only happens for streaming jobs in which all slots // are allocated together so there should be no UnknownInputChannel. As a result, it // is safe to not synchronize the requestLock here. We will refactor the code to not diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java index c05eef75be2b6..ef56a7b05134f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java @@ -250,7 +250,7 @@ public void sendTaskEvent(TaskEvent event) throws IOException { } @Override - public void resumeConsumption(int channelIndex) { + public void resumeConsumption(int channelIndex) throws IOException { // BEWARE: consumption resumption only happens for streaming jobs in which all // slots are allocated together so there should be no UnknownInputChannel. We // will refactor the code to not rely on this assumption in the future. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java index fc6b63583af97..806a4613e6fba 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java @@ -56,7 +56,7 @@ public CompletableFuture getAvailableFuture() { } @Override - public void resumeConsumption(int channelIndex) { + public void resumeConsumption(int channelIndex) throws IOException { inputGate.resumeConsumption(channelIndex); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java index 3984dde8b6961..a2e123613edfa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java @@ -335,6 +335,18 @@ public void testProducerFailedException() throws Exception { ch.getNextBuffer(); } + @Test(expected = PartitionConnectionException.class) + public void testPartitionConnectionException() throws IOException { + final ConnectionManager connManager = new TestingExceptionConnectionManager(); + final SingleInputGate gate = createSingleInputGate(1); + final RemoteInputChannel ch = InputChannelTestUtils.createRemoteInputChannel(gate, 0, connManager); + gate.setInputChannels(ch); + + gate.requestPartitions(); + + ch.getNextBuffer(); + } + /** * Tests to verify the behaviours of three different processes if the number of available * buffers is less than required buffers. diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java index 2fb6e72baaa19..b287f1e5f4c32 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java @@ -46,7 +46,7 @@ class AlternatingCheckpointBarrierHandler extends CheckpointBarrierHandler { } @Override - public void releaseBlocksAndResetBarriers() { + public void releaseBlocksAndResetBarriers() throws IOException { activeHandler.releaseBlocksAndResetBarriers(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java index a052a70107cae..6ecfb4271b253 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java @@ -100,15 +100,15 @@ public void abortPendingCheckpoint(long checkpointId, CheckpointException except } @Override - public void releaseBlocksAndResetBarriers() { + public void releaseBlocksAndResetBarriers() throws IOException { LOG.debug("{}: End of stream alignment, feeding buffered data back.", taskName); - blockedChannels.entrySet().forEach(blockedChannel -> { + for (Map.Entry blockedChannel : blockedChannels.entrySet()) { if (blockedChannel.getValue()) { resumeConsumption(blockedChannel.getKey()); } blockedChannel.setValue(false); - }); + } // the next barrier that comes must assume it is the first numBarriersReceived = 0; @@ -338,7 +338,7 @@ protected boolean isCheckpointPending() { return numBarriersReceived > 0; } - private void resumeConsumption(InputChannelInfo channelInfo) { + private void resumeConsumption(InputChannelInfo channelInfo) throws IOException { InputGate inputGate = inputGates[channelInfo.getGateIdx()]; checkState(!inputGate.isFinished(), "InputGate already finished."); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java index fb0a319cd1235..c6d530b4331af 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java @@ -52,7 +52,7 @@ public CheckpointBarrierHandler(AbstractInvokable toNotifyOnCheckpoint) { this.toNotifyOnCheckpoint = checkNotNull(toNotifyOnCheckpoint); } - public void releaseBlocksAndResetBarriers() { + public void releaseBlocksAndResetBarriers() throws IOException { } /**