Skip to content

Commit

Permalink
[FLINK-18348] RemoteInputChannel should checkError before checking pa…
Browse files Browse the repository at this point in the history
…rtitionRequestClient
  • Loading branch information
Jiayi-Liao authored and pnowojski committed Jun 23, 2020
1 parent 1414bac commit a32c99d
Show file tree
Hide file tree
Showing 11 changed files with 43 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,11 @@ public void recycle(MemorySegment segment) {
}
}

inputChannel.notifyBufferAvailable(numAddedBuffers);
try {
inputChannel.notifyBufferAvailable(numAddedBuffers);
} catch (Throwable t) {
ExceptionUtils.rethrow(t);
}
}

void releaseFloatingBuffers() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public ResultPartitionID getPartitionId() {
* exactly-once mode, the upstream will be blocked and become unavailable. This method
* tries to unblock the corresponding upstream and resume data consumption.
*/
public abstract void resumeConsumption();
public abstract void resumeConsumption() throws IOException;

/**
* Notifies the owning {@link SingleInputGate} that this channel became non-empty.
Expand All @@ -154,7 +154,7 @@ protected void notifyChannelNonEmpty() {
public void spillInflightBuffers(long checkpointId, ChannelStateWriter channelStateWriter) throws IOException {
}

protected void notifyBufferAvailable(int numAvailableBuffers) {
protected void notifyBufferAvailable(int numAvailableBuffers) throws IOException {
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public CompletableFuture<?> getAvailableFuture() {
return availabilityHelper.getAvailableFuture();
}

public abstract void resumeConsumption(int channelIndex);
public abstract void resumeConsumption(int channelIndex) throws IOException;

/**
* Returns the channel of this gate.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public void requestSubpartition(int subpartitionIndex) throws IOException, Inter
* Retriggers a remote subpartition request.
*/
void retriggerSubpartitionRequest(int subpartitionIndex) throws IOException {
checkState(partitionRequestClient != null, "Missing initial subpartition request.");
checkPartitionRequestQueueInitialized();

if (increaseBackoff()) {
partitionRequestClient.requestSubpartition(
Expand All @@ -169,9 +169,7 @@ void retriggerSubpartitionRequest(int subpartitionIndex) throws IOException {

@Override
Optional<BufferAndAvailability> getNextBuffer() throws IOException {
checkState(partitionRequestClient != null, "Queried for a buffer before requesting a queue.");

checkError();
checkPartitionRequestQueueInitialized();

final Buffer next;
final boolean moreAvailable;
Expand Down Expand Up @@ -227,9 +225,7 @@ public void spillInflightBuffers(long checkpointId, ChannelStateWriter channelSt
@Override
void sendTaskEvent(TaskEvent event) throws IOException {
checkState(!isReleased.get(), "Tried to send task event to producer after channel has been released.");
checkState(partitionRequestClient != null, "Tried to send task event to producer before requesting a queue.");

checkError();
checkPartitionRequestQueueInitialized();

partitionRequestClient.sendTaskEvent(partitionId, event, this);
}
Expand Down Expand Up @@ -283,8 +279,8 @@ public String toString() {
/**
* Enqueue this input channel in the pipeline for notifying the producer of unannounced credit.
*/
private void notifyCreditAvailable() {
checkState(partitionRequestClient != null, "Tried to send task event to producer before requesting a queue.");
private void notifyCreditAvailable() throws IOException {
checkPartitionRequestQueueInitialized();

partitionRequestClient.notifyCreditAvailable(this);
}
Expand Down Expand Up @@ -330,16 +326,16 @@ PartitionRequestClient getPartitionRequestClient() {
* increased credit to the producer.
*/
@Override
public void notifyBufferAvailable(int numAvailableBuffers) {
public void notifyBufferAvailable(int numAvailableBuffers) throws IOException {
if (numAvailableBuffers > 0 && unannouncedCredit.getAndAdd(numAvailableBuffers) == 0) {
notifyCreditAvailable();
}
}

@Override
public void resumeConsumption() {
public void resumeConsumption() throws IOException {
checkState(!isReleased.get(), "Channel released.");
checkState(partitionRequestClient != null, "Trying to send event to producer before requesting a queue.");
checkPartitionRequestQueueInitialized();

// notifies the producer that this channel is ready to
// unblock from checkpoint and resume data consumption
Expand Down Expand Up @@ -519,6 +515,12 @@ public void onError(Throwable cause) {
setError(cause);
}

private void checkPartitionRequestQueueInitialized() throws IOException {
checkError();
checkState(partitionRequestClient != null,
"Bug: partitionRequestClient is not initialized before processing data and no error is detected.");
}

private static class BufferReorderingException extends IOException {

private static final long serialVersionUID = -888282210356266816L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,7 @@ public void sendTaskEvent(TaskEvent event) throws IOException {
}

@Override
public void resumeConsumption(int channelIndex) {
public void resumeConsumption(int channelIndex) throws IOException {
// BEWARE: consumption resumption only happens for streaming jobs in which all slots
// are allocated together so there should be no UnknownInputChannel. As a result, it
// is safe to not synchronize the requestLock here. We will refactor the code to not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ public void sendTaskEvent(TaskEvent event) throws IOException {
}

@Override
public void resumeConsumption(int channelIndex) {
public void resumeConsumption(int channelIndex) throws IOException {
// BEWARE: consumption resumption only happens for streaming jobs in which all
// slots are allocated together so there should be no UnknownInputChannel. We
// will refactor the code to not rely on this assumption in the future.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public CompletableFuture<?> getAvailableFuture() {
}

@Override
public void resumeConsumption(int channelIndex) {
public void resumeConsumption(int channelIndex) throws IOException {
inputGate.resumeConsumption(channelIndex);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,18 @@ public void testProducerFailedException() throws Exception {
ch.getNextBuffer();
}

@Test(expected = PartitionConnectionException.class)
public void testPartitionConnectionException() throws IOException {
final ConnectionManager connManager = new TestingExceptionConnectionManager();
final SingleInputGate gate = createSingleInputGate(1);
final RemoteInputChannel ch = InputChannelTestUtils.createRemoteInputChannel(gate, 0, connManager);
gate.setInputChannels(ch);

gate.requestPartitions();

ch.getNextBuffer();
}

/**
* Tests to verify the behaviours of three different processes if the number of available
* buffers is less than required buffers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class AlternatingCheckpointBarrierHandler extends CheckpointBarrierHandler {
}

@Override
public void releaseBlocksAndResetBarriers() {
public void releaseBlocksAndResetBarriers() throws IOException {
activeHandler.releaseBlocksAndResetBarriers();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,15 @@ public void abortPendingCheckpoint(long checkpointId, CheckpointException except
}

@Override
public void releaseBlocksAndResetBarriers() {
public void releaseBlocksAndResetBarriers() throws IOException {
LOG.debug("{}: End of stream alignment, feeding buffered data back.", taskName);

blockedChannels.entrySet().forEach(blockedChannel -> {
for (Map.Entry<InputChannelInfo, Boolean> blockedChannel : blockedChannels.entrySet()) {
if (blockedChannel.getValue()) {
resumeConsumption(blockedChannel.getKey());
}
blockedChannel.setValue(false);
});
}

// the next barrier that comes must assume it is the first
numBarriersReceived = 0;
Expand Down Expand Up @@ -338,7 +338,7 @@ protected boolean isCheckpointPending() {
return numBarriersReceived > 0;
}

private void resumeConsumption(InputChannelInfo channelInfo) {
private void resumeConsumption(InputChannelInfo channelInfo) throws IOException {
InputGate inputGate = inputGates[channelInfo.getGateIdx()];
checkState(!inputGate.isFinished(), "InputGate already finished.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public CheckpointBarrierHandler(AbstractInvokable toNotifyOnCheckpoint) {
this.toNotifyOnCheckpoint = checkNotNull(toNotifyOnCheckpoint);
}

public void releaseBlocksAndResetBarriers() {
public void releaseBlocksAndResetBarriers() throws IOException {
}

/**
Expand Down

0 comments on commit a32c99d

Please sign in to comment.