Skip to content

Commit

Permalink
[FLINK-9766][network][tests] fix cleanup in RemoteInputChannelTest
Browse files Browse the repository at this point in the history
If an assertion in the test fails and as a result the cleanup fails, in most
tests the original assertion was swallowed making it hard to debug.

Furthermore, #testConcurrentRecycleAndRelease2() does even not clean up at all
if successful.

This closes apache#6271
  • Loading branch information
Nico Kruber committed Jul 12, 2018
1 parent a154dd5 commit 598e460
Showing 1 changed file with 70 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@

import org.junit.Test;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
Expand All @@ -53,7 +55,6 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
Expand Down Expand Up @@ -329,6 +330,7 @@ public void testAvailableBuffersLessThanRequiredBuffers() throws Exception {
final SingleInputGate inputGate = createSingleInputGate();
final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate);
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel);
Throwable thrown = null;
try {
final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
inputGate.setBufferPool(bufferPool);
Expand Down Expand Up @@ -447,13 +449,10 @@ public void testAvailableBuffersLessThanRequiredBuffers() throws Exception {
assertEquals("There should be 0 buffers available in local pool",
0, bufferPool.getNumberOfAvailableMemorySegments());
assertTrue(inputChannel.isWaitingForFloatingBuffers());

} catch (Throwable t) {
thrown = t;
} finally {
// Release all the buffer resources
inputChannel.releaseAllResources();

networkBufferPool.destroyAllBufferPools();
networkBufferPool.destroy();
cleanup(networkBufferPool, null, thrown, inputChannel);
}
}

Expand All @@ -471,6 +470,7 @@ public void testAvailableBuffersEqualToRequiredBuffers() throws Exception {
final SingleInputGate inputGate = createSingleInputGate();
final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate);
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel);
Throwable thrown = null;
try {
final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
inputGate.setBufferPool(bufferPool);
Expand Down Expand Up @@ -525,13 +525,10 @@ public void testAvailableBuffersEqualToRequiredBuffers() throws Exception {
14, inputChannel.getNumberOfRequiredBuffers());
assertEquals("There should be 2 buffers available in local pool",
2, bufferPool.getNumberOfAvailableMemorySegments());

} catch (Throwable t) {
thrown = t;
} finally {
// Release all the buffer resources
inputChannel.releaseAllResources();

networkBufferPool.destroyAllBufferPools();
networkBufferPool.destroy();
cleanup(networkBufferPool, null, thrown, inputChannel);
}
}

Expand All @@ -549,6 +546,7 @@ public void testAvailableBuffersMoreThanRequiredBuffers() throws Exception {
final SingleInputGate inputGate = createSingleInputGate();
final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate);
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel);
Throwable thrown = null;
try {
final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
inputGate.setBufferPool(bufferPool);
Expand Down Expand Up @@ -617,13 +615,10 @@ public void testAvailableBuffersMoreThanRequiredBuffers() throws Exception {
12, inputChannel.getNumberOfRequiredBuffers());
assertEquals("There should be 2 buffers available in local pool",
2, bufferPool.getNumberOfAvailableMemorySegments());

} catch (Throwable t) {
thrown = t;
} finally {
// Release all the buffer resources
inputChannel.releaseAllResources();

networkBufferPool.destroyAllBufferPools();
networkBufferPool.destroy();
cleanup(networkBufferPool, null, thrown, inputChannel);
}
}

Expand All @@ -645,6 +640,7 @@ public void testFairDistributionFloatingBuffers() throws Exception {
inputGate.setInputChannel(channel1.partitionId.getPartitionId(), channel1);
inputGate.setInputChannel(channel2.partitionId.getPartitionId(), channel2);
inputGate.setInputChannel(channel3.partitionId.getPartitionId(), channel3);
Throwable thrown = null;
try {
final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
inputGate.setBufferPool(bufferPool);
Expand Down Expand Up @@ -688,15 +684,10 @@ public void testFairDistributionFloatingBuffers() throws Exception {
assertEquals("There should be 3 buffers available in the channel", 3, channel1.getNumberOfAvailableBuffers());
assertEquals("There should be 3 buffers available in the channel", 3, channel2.getNumberOfAvailableBuffers());
assertEquals("There should be 3 buffers available in the channel", 3, channel3.getNumberOfAvailableBuffers());

} catch (Throwable t) {
thrown = t;
} finally {
// Release all the buffer resources
channel1.releaseAllResources();
channel2.releaseAllResources();
channel3.releaseAllResources();

networkBufferPool.destroyAllBufferPools();
networkBufferPool.destroy();
cleanup(networkBufferPool, null, thrown, channel1, channel2, channel3);
}
}

Expand All @@ -717,6 +708,7 @@ public void testConcurrentOnSenderBacklogAndRelease() throws Exception {
final SingleInputGate inputGate = createSingleInputGate();
final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate);
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel);
Throwable thrown = null;
try {
final BufferPool bufferPool = networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers);
inputGate.setBufferPool(bufferPool);
Expand Down Expand Up @@ -754,17 +746,10 @@ public Void call() throws Exception {
0, inputChannel.getNumberOfAvailableBuffers());
assertEquals("There should be 130 buffers available in local pool.",
130, bufferPool.getNumberOfAvailableMemorySegments() + networkBufferPool.getNumberOfAvailableMemorySegments());

} catch (Throwable t) {
thrown = t;
} finally {
// Release all the buffer resources once exception
if (!inputChannel.isReleased()) {
inputChannel.releaseAllResources();
}

networkBufferPool.destroyAllBufferPools();
networkBufferPool.destroy();

executor.shutdown();
cleanup(networkBufferPool, executor, thrown, inputChannel);
}
}

Expand All @@ -786,6 +771,7 @@ public void testConcurrentOnSenderBacklogAndRecycle() throws Exception {
final SingleInputGate inputGate = createSingleInputGate();
final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate);
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel);
Throwable thrown = null;
try {
final BufferPool bufferPool = networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers);
inputGate.setBufferPool(bufferPool);
Expand Down Expand Up @@ -813,15 +799,10 @@ public Void call() throws Exception {
inputChannel.getNumberOfRequiredBuffers(), inputChannel.getNumberOfAvailableBuffers());
assertEquals("There should be no buffers available in local pool.",
0, bufferPool.getNumberOfAvailableMemorySegments());

} catch (Throwable t) {
thrown = t;
} finally {
// Release all the buffer resources
inputChannel.releaseAllResources();

networkBufferPool.destroyAllBufferPools();
networkBufferPool.destroy();

executor.shutdown();
cleanup(networkBufferPool, executor, thrown, inputChannel);
}
}

Expand All @@ -842,6 +823,7 @@ public void testConcurrentRecycleAndRelease() throws Exception {
final SingleInputGate inputGate = createSingleInputGate();
final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate);
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel);
Throwable thrown = null;
try {
final BufferPool bufferPool = networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers);
inputGate.setBufferPool(bufferPool);
Expand Down Expand Up @@ -869,17 +851,10 @@ public Void call() throws Exception {
numFloatingBuffers, bufferPool.getNumberOfAvailableMemorySegments());
assertEquals("There should be " + numExclusiveSegments + " buffers available in global pool.",
numExclusiveSegments, networkBufferPool.getNumberOfAvailableMemorySegments());

} catch (Throwable t) {
thrown = t;
} finally {
// Release all the buffer resources once exception
if (!inputChannel.isReleased()) {
inputChannel.releaseAllResources();
}

networkBufferPool.destroyAllBufferPools();
networkBufferPool.destroy();

executor.shutdown();
cleanup(networkBufferPool, executor, thrown, inputChannel);
}
}

Expand All @@ -903,6 +878,7 @@ public void testConcurrentRecycleAndRelease2() throws Exception {
final SingleInputGate inputGate = createSingleInputGate();
final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate);
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel);
Throwable thrown = null;
try {
final BufferPool bufferPool = networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers);
inputGate.setBufferPool(bufferPool);
Expand Down Expand Up @@ -958,17 +934,9 @@ public void testConcurrentRecycleAndRelease2() throws Exception {
submitTasksAndWaitForResults(executor,
new Callable[] {bufferPoolInteractionsTask, channelInteractionsTask});
} catch (Throwable t) {
inputChannel.releaseAllResources();

try {
networkBufferPool.destroyAllBufferPools();
} catch (Throwable tInner) {
t.addSuppressed(tInner);
}

networkBufferPool.destroy();
executor.shutdown();
ExceptionUtils.rethrowException(t);
thrown = t;
} finally {
cleanup(networkBufferPool, executor, thrown, inputChannel);
}
}

Expand Down Expand Up @@ -1089,4 +1057,40 @@ private void submitTasksAndWaitForResults(ExecutorService executor, Callable[] t
result.get();
}
}

/**
* Helper code to ease cleanup handling with suppressed exceptions.
*/
private void cleanup(
NetworkBufferPool networkBufferPool,
@Nullable ExecutorService executor,
@Nullable Throwable throwable,
InputChannel... inputChannels) throws Exception {
for (InputChannel inputChannel : inputChannels) {
try {
inputChannel.releaseAllResources();
} catch (Throwable tInner) {
throwable = ExceptionUtils.firstOrSuppressed(tInner, throwable);
}
}

try {
networkBufferPool.destroyAllBufferPools();
} catch (Throwable tInner) {
throwable = ExceptionUtils.firstOrSuppressed(tInner, throwable);
}

try {
networkBufferPool.destroy();
} catch (Throwable tInner) {
throwable = ExceptionUtils.firstOrSuppressed(tInner, throwable);
}

if (executor != null) {
executor.shutdown();
}
if (throwable != null) {
ExceptionUtils.rethrowException(throwable);
}
}
}

0 comments on commit 598e460

Please sign in to comment.