Skip to content

Commit

Permalink
[hotfix][runtime] Check style ServerTransportErrorHandlingTest and Cl…
Browse files Browse the repository at this point in the history
…ientTransportErrorHandlingTest fix
  • Loading branch information
akalash authored and pnowojski committed Jan 27, 2023
1 parent 399bcef commit 952c783
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;

import org.junit.jupiter.api.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import java.io.IOException;
Expand All @@ -55,8 +54,8 @@
import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.shutdown;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatNoException;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.isA;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -102,9 +101,7 @@ public ChannelHandler[] getServerChannelHandlers() {

@Override
public void write(
ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {

ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (writeNum >= 1) {
throw new RuntimeException("Expected test exception.");
}
Expand All @@ -124,17 +121,15 @@ public void write(

final CountDownLatch sync = new CountDownLatch(1);

// Do this with explicit synchronization. Otherwise this is not robust against slow timings
// Do this with explicit synchronization. Otherwise, this is not robust against slow timings
// of the callback (e.g. we cannot just verify that it was called once, because there is
// a chance that we do this too early).
doAnswer(
new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
sync.countDown();
return null;
}
})
(Answer<Void>)
invocation -> {
sync.countDown();
return null;
})
.when(rich[1])
.onError(isA(LocalTransportException.class));

Expand All @@ -143,7 +138,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable {

// Second request is *not* successful
requestClient.requestSubpartition(new ResultPartitionID(), 0, rich[1], 0);
// Wait for the notification and it could confirm all the request operations are done
// Wait for the notification, and it could confirm all the request operations are done
assertThat(sync.await(TestingUtils.TESTING_DURATION.toMillis(), TimeUnit.MILLISECONDS))
.withFailMessage(
"Timed out after waiting for "
Expand Down Expand Up @@ -227,9 +222,7 @@ public ChannelHandler[] getServerChannelHandlers() {
// Close on read
new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {

public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.channel().close();
}
}
Expand All @@ -250,12 +243,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg)
final CountDownLatch sync = new CountDownLatch(rich.length);

Answer<Void> countDownLatch =
new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
sync.countDown();
return null;
}
invocation -> {
sync.countDown();
return null;
};

for (RemoteInputChannel r : rich) {
Expand Down Expand Up @@ -329,26 +319,27 @@ void testConnectionResetByPeer() throws Throwable {

// Verify the Exception
doAnswer(
new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
Throwable cause = (Throwable) invocation.getArguments()[0];

try {
assertThat(cause).isInstanceOf(RemoteTransportException.class);
assertThat(cause)
.hasMessageNotContaining("Connection reset by peer");

assertThat(cause.getCause()).isInstanceOf(IOException.class);
assertThat(cause.getCause())
.hasMessage("Connection reset by peer");
} catch (Throwable t) {
error[0] = t;
}

return null;
}
})
(Answer<Void>)
invocation -> {
Throwable cause = (Throwable) invocation.getArguments()[0];

try {
assertThat(cause)
.isInstanceOf(RemoteTransportException.class);
assertThat(cause)
.hasMessageNotContaining(
"Connection reset by peer");

assertThat(cause.getCause())
.isInstanceOf(IOException.class);
assertThat(cause.getCause())
.hasMessage("Connection reset by peer");
} catch (Throwable t) {
error[0] = t;
}

return null;
})
.when(rich)
.onError(any(Throwable.class));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.io.network.netty;

import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.netty.CancelPartitionRequestTest.InfiniteSubpartitionView;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
Expand All @@ -33,7 +34,6 @@
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;

import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import java.util.concurrent.CountDownLatch;
Expand All @@ -43,8 +43,8 @@
import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.initServerAndClient;
import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.shutdown;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand All @@ -64,18 +64,14 @@ public void testRemoteClose() throws Exception {
anyInt(),
any(BufferAvailabilityListener.class)))
.thenAnswer(
new Answer<ResultSubpartitionView>() {
@Override
public ResultSubpartitionView answer(InvocationOnMock invocationOnMock)
throws Throwable {
BufferAvailabilityListener listener =
(BufferAvailabilityListener)
invocationOnMock.getArguments()[2];
listener.notifyDataAvailable();
return new CancelPartitionRequestTest.InfiniteSubpartitionView(
outboundBuffers, sync);
}
});
(Answer<ResultSubpartitionView>)
invocationOnMock -> {
BufferAvailabilityListener listener =
(BufferAvailabilityListener)
invocationOnMock.getArguments()[2];
listener.notifyDataAvailable();
return new InfiniteSubpartitionView(outboundBuffers, sync);
});

NettyProtocol protocol =
new NettyProtocol(partitionManager, mock(TaskEventDispatcher.class)) {
Expand All @@ -87,9 +83,7 @@ public ChannelHandler[] getClientChannelHandlers() {
// Close on read
new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {

public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.channel().close();
}
}
Expand Down

0 comments on commit 952c783

Please sign in to comment.