Skip to content

Commit

Permalink
[FLINK-15455][network] Enabled tcp connection reuse across multi jobs.
Browse files Browse the repository at this point in the history
This closes apache#18417.
  • Loading branch information
KarmaGYZ committed Feb 16, 2022
1 parent c0cca2e commit 5be7d48
Show file tree
Hide file tree
Showing 17 changed files with 269 additions and 282 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,5 +152,11 @@
<td>Integer</td>
<td>Parallelism threshold to switch between sort-based blocking shuffle and hash-based blocking shuffle, which means for batch jobs of smaller parallelism, hash-shuffle will be used and for batch jobs of larger or equal parallelism, sort-shuffle will be used. The value 1 means that sort-shuffle is the default option. Note: For production usage, you may also need to tune 'taskmanager.network.sort-shuffle.min-buffers' and 'taskmanager.memory.framework.off-heap.batch-shuffle.size' for better performance.</td>
</tr>
<tr>
<td><h5>taskmanager.network.tcp-connection.enable-reuse-across-jobs</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether to reuse tcp connections across multi jobs. If set to true, tcp connections will not be released after job finishes. The subsequent jobs will be free from the overhead of the connection re-establish. However, this may lead to an increase in the total number of connections on your machine. When it reaches the upper limit, you can set it to false to release idle connections. Note that to avoid connection leak, you must set taskmanager.network.max-num-tcp-connections to a smaller value before you enable tcp connection reuse.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -140,5 +140,11 @@
<td>Integer</td>
<td>Parallelism threshold to switch between sort-based blocking shuffle and hash-based blocking shuffle, which means for batch jobs of smaller parallelism, hash-shuffle will be used and for batch jobs of larger or equal parallelism, sort-shuffle will be used. The value 1 means that sort-shuffle is the default option. Note: For production usage, you may also need to tune 'taskmanager.network.sort-shuffle.min-buffers' and 'taskmanager.memory.framework.off-heap.batch-shuffle.size' for better performance.</td>
</tr>
<tr>
<td><h5>taskmanager.network.tcp-connection.enable-reuse-across-jobs</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether to reuse tcp connections across multi jobs. If set to true, tcp connections will not be released after job finishes. The subsequent jobs will be free from the overhead of the connection re-establish. However, this may lead to an increase in the total number of connections on your machine. When it reaches the upper limit, you can set it to false to release idle connections. Note that to avoid connection leak, you must set taskmanager.network.max-num-tcp-connections to a smaller value before you enable tcp connection reuse.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,32 @@ public class NettyShuffleEnvironmentOptions {
+ " by configured memory limits, but some resource frameworks like yarn would track this memory usage and kill the container once"
+ " memory exceeding some threshold. Also note that this option is experimental and might be changed future.");

/**
* Whether to reuse tcp connections across multi jobs. If set to true, tcp connections will not
* be released after job finishes. The subsequent jobs will be free from the overhead of the
* connection re-establish. However, this may lead to an increase in the total number of
* connections on your machine. When it reaches the upper limit, you can set it to false to
* release idle connections.
*
* <p>Note: To avoid connection leak, you must set {@link #MAX_NUM_TCP_CONNECTIONS} to a smaller
* value before you enable tcp connection reuse.
*/
@Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
public static final ConfigOption<Boolean> TCP_CONNECTION_REUSE_ACROSS_JOBS_ENABLED =
key("taskmanager.network.tcp-connection.enable-reuse-across-jobs")
.booleanType()
.defaultValue(true)
.withDescription(
"Whether to reuse tcp connections across multi jobs. If set to true, tcp "
+ "connections will not be released after job finishes. The subsequent "
+ "jobs will be free from the overhead of the connection re-establish. "
+ "However, this may lead to an increase in the total number of connections "
+ "on your machine. When it reaches the upper limit, you can set it to false "
+ "to release idle connections. Note that to avoid connection leak, you must set "
+ MAX_NUM_TCP_CONNECTIONS.key()
+ " to a smaller value before you "
+ "enable tcp connection reuse.");

// ------------------------------------------------------------------------
// Netty Options
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ static NettyShuffleEnvironment createNettyShuffleEnvironment(
resultPartitionManager,
taskEventPublisher,
nettyConfig,
config.getMaxNumberOfConnections())
config.getMaxNumberOfConnections(),
config.isConnectionReuseEnabled())
: new LocalConnectionManager();

NetworkBufferPool networkBufferPool =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,11 @@ public interface NetworkClientHandler extends ChannelHandler {
RemoteInputChannel getInputChannel(InputChannelID inputChannelId);

void cancelRequestFor(InputChannelID inputChannelId);

/**
* Return whether there is channel error.
*
* @return true if there is channel error
*/
boolean hasChannelError();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.io.network.netty;

/** Message for indicating connection error. */
public class ConnectionErrorMessage {
private final Throwable cause;

public ConnectionErrorMessage(Throwable cause) {
this.cause = cause;
}

public Throwable getCause() {
return cause;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,17 +200,25 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exc
if (triggerWrite) {
writeAndFlushNextMessageIfPossible(ctx.channel());
}
} else if (msg instanceof ConnectionErrorMessage) {
notifyAllChannelsOfErrorAndClose(((ConnectionErrorMessage) msg).getCause());
} else {
ctx.fireUserEventTriggered(msg);
}
}

@Override
public boolean hasChannelError() {
return channelError.get() != null;
}

@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
writeAndFlushNextMessageIfPossible(ctx.channel());
}

private void notifyAllChannelsOfErrorAndClose(Throwable cause) {
@VisibleForTesting
void notifyAllChannelsOfErrorAndClose(Throwable cause) {
if (channelError.compareAndSet(null, cause)) {
try {
for (RemoteInputChannel inputChannel : inputChannels.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,19 @@ public NettyConnectionManager(
ResultPartitionProvider partitionProvider,
TaskEventPublisher taskEventPublisher,
NettyConfig nettyConfig,
int maxNumberOfConnections) {
int maxNumberOfConnections,
boolean connectionReuseEnabled) {

this.server = new NettyServer(nettyConfig);
this.client = new NettyClient(nettyConfig);
this.bufferPool = new NettyBufferPool(nettyConfig.getNumberOfArenas());

this.partitionRequestClientFactory =
new PartitionRequestClientFactory(
client, nettyConfig.getNetworkRetries(), maxNumberOfConnections);
client,
nettyConfig.getNetworkRetries(),
maxNumberOfConnections,
connectionReuseEnabled);

this.nettyProtocol =
new NettyProtocol(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.util.AtomicDisposableReferenceCounter;
import org.apache.flink.util.Preconditions;

import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
Expand All @@ -37,6 +37,8 @@
import java.io.IOException;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.flink.runtime.io.network.netty.NettyMessage.PartitionRequest;
import static org.apache.flink.runtime.io.network.netty.NettyMessage.TaskEventRequest;
Expand All @@ -61,8 +63,9 @@ public class NettyPartitionRequestClient implements PartitionRequestClient {
private final PartitionRequestClientFactory clientFactory;

/** If zero, the underlying TCP channel can be safely closed. */
private final AtomicDisposableReferenceCounter closeReferenceCounter =
new AtomicDisposableReferenceCounter();
private final AtomicInteger closeReferenceCounter = new AtomicInteger(0);

private final AtomicBoolean closed = new AtomicBoolean(false);

NettyPartitionRequestClient(
Channel tcpChannel,
Expand All @@ -76,18 +79,23 @@ public class NettyPartitionRequestClient implements PartitionRequestClient {
this.clientFactory = checkNotNull(clientFactory);
}

boolean disposeIfNotUsed() {
return closeReferenceCounter.disposeIfNotUsed();
boolean canBeDisposed() {
return closeReferenceCounter.get() == 0 && !canBeReused();
}

/**
* Increments the reference counter.
* Validate the client and increment the reference counter.
*
* <p>Note: the reference counter has to be incremented before returning the instance of this
* client to ensure correct closing logic.
*
* @return whether this client can be used.
*/
boolean incrementReferenceCounter() {
return closeReferenceCounter.increment();
boolean validateClientAndIncrementReferenceCounter() {
if (!clientHandler.hasChannelError()) {
return closeReferenceCounter.incrementAndGet() > 0;
}
return false;
}

/**
Expand Down Expand Up @@ -135,6 +143,12 @@ public void operationComplete(ChannelFuture future) throws Exception {
connectionId.getConnectionIndex()),
future.channel().localAddress(),
future.cause()));
sendToChannel(
new ConnectionErrorMessage(
future.cause() == null
? new RuntimeException(
"Cannot send partition request.")
: future.cause()));
}
}
};
Expand Down Expand Up @@ -188,6 +202,12 @@ public void operationComplete(ChannelFuture future) throws Exception {
connectionId.getConnectionIndex()),
future.channel().localAddress(),
future.cause()));
sendToChannel(
new ConnectionErrorMessage(
future.cause() == null
? new RuntimeException(
"Cannot send task event.")
: future.cause()));
}
}
});
Expand All @@ -213,7 +233,7 @@ public void acknowledgeAllRecordsProcessed(RemoteInputChannel inputChannel) {
sendToChannel(new AcknowledgeAllRecordsProcessedMessage(inputChannel));
}

private void sendToChannel(ClientOutboundMessage message) {
private void sendToChannel(Object message) {
tcpChannel.eventLoop().execute(() -> tcpChannel.pipeline().fireUserEventTriggered(message));
}

Expand All @@ -222,22 +242,36 @@ public void close(RemoteInputChannel inputChannel) throws IOException {

clientHandler.removeInputChannel(inputChannel);

if (closeReferenceCounter.decrement()) {
// Close the TCP connection. Send a close request msg to ensure
// that outstanding backwards task events are not discarded.
tcpChannel
.writeAndFlush(new NettyMessage.CloseRequest())
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);

// Make sure to remove the client from the factory
clientFactory.destroyPartitionRequestClient(connectionId, this);
if (closeReferenceCounter.updateAndGet(count -> Math.max(count - 1, 0)) == 0
&& !canBeReused()) {
closeConnection();
} else {
clientHandler.cancelRequestFor(inputChannel.getInputChannelId());
}
}

public void closeConnection() {
Preconditions.checkState(
canBeDisposed(), "The connection should not be closed before disposed.");
if (closed.getAndSet(true)) {
// Do not close connection repeatedly
return;
}
// Close the TCP connection. Send a close request msg to ensure
// that outstanding backwards task events are not discarded.
tcpChannel
.writeAndFlush(new NettyMessage.CloseRequest())
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
// Make sure to remove the client from the factory
clientFactory.destroyPartitionRequestClient(connectionId, this);
}

private boolean canBeReused() {
return clientFactory.isConnectionReuseEnabled() && !clientHandler.hasChannelError();
}

private void checkNotClosed() throws IOException {
if (closeReferenceCounter.isDisposed()) {
if (closed.get()) {
final SocketAddress localAddr = tcpChannel.localAddress();
final SocketAddress remoteAddr = tcpChannel.remoteAddress();
throw new LocalTransportException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,21 @@ class PartitionRequestClientFactory {
private final ConcurrentMap<ConnectionID, CompletableFuture<NettyPartitionRequestClient>>
clients = new ConcurrentHashMap<>();

PartitionRequestClientFactory(NettyClient nettyClient) {
this(nettyClient, 0, 1);
}
private final boolean connectionReuseEnabled;

PartitionRequestClientFactory(NettyClient nettyClient, int retryNumber) {
this(nettyClient, retryNumber, 1);
PartitionRequestClientFactory(NettyClient nettyClient, boolean connectionReuseEnabled) {
this(nettyClient, 0, 1, connectionReuseEnabled);
}

PartitionRequestClientFactory(
NettyClient nettyClient, int retryNumber, int maxNumberOfConnections) {
NettyClient nettyClient,
int retryNumber,
int maxNumberOfConnections,
boolean connectionReuseEnabled) {
this.nettyClient = nettyClient;
this.retryNumber = retryNumber;
this.maxNumberOfConnections = maxNumberOfConnections;
this.connectionReuseEnabled = connectionReuseEnabled;
}

/**
Expand Down Expand Up @@ -111,14 +113,20 @@ NettyPartitionRequestClient createPartitionRequestClient(ConnectionID connection

// Make sure to increment the reference count before handing a client
// out to ensure correct bookkeeping for channel closing.
if (client.incrementReferenceCounter()) {
if (client.validateClientAndIncrementReferenceCounter()) {
return client;
} else if (client.canBeDisposed()) {
client.closeConnection();
} else {
destroyPartitionRequestClient(connectionId, client);
}
}
}

public boolean isConnectionReuseEnabled() {
return connectionReuseEnabled;
}

private NettyPartitionRequestClient connectWithRetries(ConnectionID connectionId)
throws InterruptedException, RemoteTransportException {
int tried = 0;
Expand Down Expand Up @@ -169,7 +177,7 @@ void closeOpenChannelConnections(ConnectionID connectionId) {
if (entry != null && !entry.isDone()) {
entry.thenAccept(
client -> {
if (client.disposeIfNotUsed()) {
if (client.canBeDisposed()) {
clients.remove(connectionId, entry);
}
});
Expand Down
Loading

0 comments on commit 5be7d48

Please sign in to comment.