Skip to content

Commit

Permalink
[FLINK-26082][runtime] Refactor ServerTransportErrorHandlingTest and …
Browse files Browse the repository at this point in the history
…ClientTransportErrorHandlingTest according to `initServerAndClient` method changes.
  • Loading branch information
akalash authored and pnowojski committed Jan 27, 2023

Verified

This commit was signed with the committer’s verified signature.
devinbileck Devin Bileck
1 parent 2b5fe30 commit 399bcef
Showing 2 changed files with 3 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -51,7 +51,6 @@
import java.util.concurrent.TimeUnit;

import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.connect;
import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.createConfig;
import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.initServerAndClient;
import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.shutdown;
import static org.assertj.core.api.Assertions.assertThat;
@@ -89,7 +88,7 @@ public ChannelHandler[] getServerChannelHandlers() {

// We need a real server and client in this test, because Netty's EmbeddedChannel is
// not failing the ChannelPromise of failed writes.
NettyServerAndClient serverAndClient = initServerAndClient(protocol, createConfig());
NettyServerAndClient serverAndClient = initServerAndClient(protocol);

Channel ch = connect(serverAndClient);

@@ -238,7 +237,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg)
}
};

NettyServerAndClient serverAndClient = initServerAndClient(protocol, createConfig());
NettyServerAndClient serverAndClient = initServerAndClient(protocol);

Channel ch = connect(serverAndClient);

Original file line number Diff line number Diff line change
@@ -26,7 +26,6 @@
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.util.ExceptionUtils;

import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
@@ -37,12 +36,10 @@
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import java.net.BindException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.connect;
import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.createConfig;
import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.initServerAndClient;
import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.shutdown;
import static org.junit.Assert.fail;
@@ -52,7 +49,6 @@
import static org.mockito.Mockito.when;

public class ServerTransportErrorHandlingTest {
private static final int NETTY_INIT_MAX_RETRY_TIMES = 20;

/** Verifies remote closes trigger the release of all resources. */
@Test
@@ -104,27 +100,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg)
NettyTestUtil.NettyServerAndClient serverAndClient = null;

try {
for (int retry = 0; retry < NETTY_INIT_MAX_RETRY_TIMES; retry++) {
try {
serverAndClient = initServerAndClient(protocol, createConfig());
break;
} catch (Exception e) {
if (retry >= NETTY_INIT_MAX_RETRY_TIMES - 1) {
throw new RuntimeException(
"Failed to initialize netty server and client, retried "
+ retry
+ " times.",
e);
}
if (e instanceof BindException
|| ExceptionUtils.findThrowableWithMessage(e, "Address already in use")
.isPresent()) {
continue;
}
throw e;
}
}

serverAndClient = initServerAndClient(protocol);
Channel ch = connect(serverAndClient);

// Write something to trigger close by server

0 comments on commit 399bcef

Please sign in to comment.