Skip to content

Commit

Permalink
[FLINK-28695][refactor][network] Refactor the nettyServerAndClient sh…
Browse files Browse the repository at this point in the history
…utdown and some lambdas
  • Loading branch information
1996fanrui authored and pnowojski committed Nov 21, 2022
1 parent 4b10f42 commit d4a50a7
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,29 +131,26 @@ public void requestSubpartition(
inputChannel.getInitialCredit());

final ChannelFutureListener listener =
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (!future.isSuccess()) {
clientHandler.removeInputChannel(inputChannel);
inputChannel.onError(
new LocalTransportException(
String.format(
"Sending the partition request to '%s [%s] (#%d)' failed.",
connectionId.getAddress(),
connectionId
.getResourceID()
.getStringWithMetadata(),
connectionId.getConnectionIndex()),
future.channel().localAddress(),
future.cause()));
sendToChannel(
new ConnectionErrorMessage(
future.cause() == null
? new RuntimeException(
"Cannot send partition request.")
: future.cause()));
}
future -> {
if (!future.isSuccess()) {
clientHandler.removeInputChannel(inputChannel);
inputChannel.onError(
new LocalTransportException(
String.format(
"Sending the partition request to '%s [%s] (#%d)' failed.",
connectionId.getAddress(),
connectionId
.getResourceID()
.getStringWithMetadata(),
connectionId.getConnectionIndex()),
future.channel().localAddress(),
future.cause()));
sendToChannel(
new ConnectionErrorMessage(
future.cause() == null
? new RuntimeException(
"Cannot send partition request.")
: future.cause()));
}
};

Expand All @@ -165,12 +162,9 @@ public void operationComplete(ChannelFuture future) {
tcpChannel
.eventLoop()
.schedule(
new Runnable() {
@Override
public void run() {
f[0] = tcpChannel.writeAndFlush(request);
f[0].addListener(listener);
}
() -> {
f[0] = tcpChannel.writeAndFlush(request);
f[0].addListener(listener);
},
delayMs,
TimeUnit.MILLISECONDS);
Expand All @@ -194,30 +188,28 @@ public void sendTaskEvent(
.writeAndFlush(
new TaskEventRequest(event, partitionId, inputChannel.getInputChannelId()))
.addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (!future.isSuccess()) {
inputChannel.onError(
new LocalTransportException(
String.format(
"Sending the task event to '%s [%s] (#%d)' failed.",
connectionId.getAddress(),
connectionId
.getResourceID()
.getStringWithMetadata(),
connectionId.getConnectionIndex()),
future.channel().localAddress(),
future.cause()));
sendToChannel(
new ConnectionErrorMessage(
future.cause() == null
? new RuntimeException(
"Cannot send task event.")
: future.cause()));
}
}
});
(ChannelFutureListener)
future -> {
if (!future.isSuccess()) {
inputChannel.onError(
new LocalTransportException(
String.format(
"Sending the task event to '%s [%s] (#%d)' failed.",
connectionId.getAddress(),
connectionId
.getResourceID()
.getStringWithMetadata(),
connectionId.getConnectionIndex()),
future.channel().localAddress(),
future.cause()));
sendToChannel(
new ConnectionErrorMessage(
future.cause() == null
? new RuntimeException(
"Cannot send task event.")
: future.cause()));
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.shutdown;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.fail;
Expand Down Expand Up @@ -81,8 +82,7 @@ void testInterruptsNotCached() throws Exception {
factory.createPartitionRequestClient(
nettyServerAndClient.getConnectionID(RESOURCE_ID, 0));
} finally {
nettyServerAndClient.client().shutdown();
nettyServerAndClient.server().shutdown();
shutdown(nettyServerAndClient);
}
}

Expand Down Expand Up @@ -125,8 +125,7 @@ void testExceptionsAreNotCached() throws Exception {

factory.createPartitionRequestClient(connectionID);
} finally {
nettyServerAndClient.client().shutdown();
nettyServerAndClient.server().shutdown();
shutdown(nettyServerAndClient);
}
}

Expand All @@ -139,8 +138,7 @@ void testReuseNettyPartitionRequestClient() throws Exception {
checkReuseNettyPartitionRequestClient(nettyServerAndClient, 5);
checkReuseNettyPartitionRequestClient(nettyServerAndClient, 10);
} finally {
nettyServerAndClient.client().shutdown();
nettyServerAndClient.server().shutdown();
shutdown(nettyServerAndClient);
}
}

Expand Down Expand Up @@ -176,8 +174,7 @@ void testNettyClientConnectRetry() throws Exception {

factory.createPartitionRequestClient(serverAndClient.getConnectionID(RESOURCE_ID, 0));

serverAndClient.client().shutdown();
serverAndClient.server().shutdown();
shutdown(serverAndClient);
}

// see https://issues.apache.org/jira/browse/FLINK-18821
Expand Down Expand Up @@ -218,14 +215,12 @@ void testNettyClientConnectRetryFailure() throws Exception {
unstableNettyClient, 2, 1, connectionReuseEnabled);

assertThatThrownBy(
() -> {
factory.createPartitionRequestClient(
serverAndClient.getConnectionID(RESOURCE_ID, 0));
})
() ->
factory.createPartitionRequestClient(
serverAndClient.getConnectionID(RESOURCE_ID, 0)))
.isInstanceOf(IOException.class);
} finally {
serverAndClient.client().shutdown();
serverAndClient.server().shutdown();
shutdown(serverAndClient);
}
}

Expand Down Expand Up @@ -274,8 +269,7 @@ void testNettyClientConnectRetryMultipleThread() throws Exception {
});

threadPoolExecutor.shutdown();
serverAndClient.client().shutdown();
serverAndClient.server().shutdown();
shutdown(serverAndClient);
}

private NettyTestUtil.NettyServerAndClient createNettyServerAndClient() throws Exception {
Expand Down

0 comments on commit d4a50a7

Please sign in to comment.