Skip to content

Commit

Permalink
[FLINK-15416][network] Retry connection to the upstream task
Browse files Browse the repository at this point in the history
  • Loading branch information
HuangZhenQiu authored and zhijiangW committed Jul 3, 2020
1 parent 01da0ea commit 0175bf5
Show file tree
Hide file tree
Showing 8 changed files with 233 additions and 156 deletions.
6 changes: 6 additions & 0 deletions docs/_includes/generated/all_taskmanager_network_section.html
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@
<td>Integer</td>
<td>The number of Netty client threads.</td>
</tr>
<tr>
<td><h5>taskmanager.network.retries</h5></td>
<td style="word-wrap: break-word;">0</td>
<td>Integer</td>
<td>The number of retry attempts for network communication. Currently it's only used for establishing input/output channel connections</td>
</tr>
<tr>
<td><h5>taskmanager.network.netty.num-arenas</h5></td>
<td style="word-wrap: break-word;">-1</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@
<td>Integer</td>
<td>The number of Netty client threads.</td>
</tr>
<tr>
<td><h5>taskmanager.network.retries</h5></td>
<td style="word-wrap: break-word;">0</td>
<td>Integer</td>
<td>The number of retry attempts for network communication. Currently it's only used for establishing input/output channel connections</td>
</tr>
<tr>
<td><h5>taskmanager.network.netty.num-arenas</h5></td>
<td style="word-wrap: break-word;">-1</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,14 @@ public class NettyShuffleEnvironmentOptions {
.withDeprecatedKeys("taskmanager.net.client.connectTimeoutSec")
.withDescription("The Netty client connection timeout.");

@Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
public static final ConfigOption<Integer> NETWORK_RETRIES =
key("taskmanager.network.retries")
.defaultValue(0)
.withDeprecatedKeys("taskmanager.network.retries")
.withDescription("The number of retry attempts for network communication." +
" Currently it's only used for establishing input/output channel connections");

@Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
public static final ConfigOption<Integer> SEND_RECEIVE_BUFFER_SIZE =
key("taskmanager.network.netty.sendReceiveBufferSize")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ public int getClientConnectTimeoutSeconds() {
return config.getInteger(NettyShuffleEnvironmentOptions.CLIENT_CONNECT_TIMEOUT_SECONDS);
}

public int getNetworkRetries() {
return config.getInteger(NettyShuffleEnvironmentOptions.NETWORK_RETRIES);
}

public int getSendAndReceiveBufferSize() {
return config.getInteger(NettyShuffleEnvironmentOptions.SEND_RECEIVE_BUFFER_SIZE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public NettyConnectionManager(
this.client = new NettyClient(nettyConfig);
this.bufferPool = new NettyBufferPool(nettyConfig.getNumberOfArenas());

this.partitionRequestClientFactory = new PartitionRequestClientFactory(client);
this.partitionRequestClientFactory = new PartitionRequestClientFactory(client, nettyConfig.getNetworkRetries());

this.nettyProtocol = new NettyProtocol(checkNotNull(partitionProvider), checkNotNull(taskEventPublisher));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,21 @@
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.NetworkClientHandler;
import org.apache.flink.runtime.io.network.PartitionRequestClient;
import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;

import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Factory for {@link NettyPartitionRequestClient} instances.
Expand All @@ -40,83 +44,92 @@
* instances.
*/
class PartitionRequestClientFactory {
private static final Logger LOG = LoggerFactory.getLogger(PartitionRequestClientFactory.class);

private final NettyClient nettyClient;

private final ConcurrentMap<ConnectionID, Object> clients = new ConcurrentHashMap<ConnectionID, Object>();
private final int retryNumber;

private final ConcurrentMap<ConnectionID, CompletableFuture<NettyPartitionRequestClient>> clients = new ConcurrentHashMap<>();

PartitionRequestClientFactory(NettyClient nettyClient) {
this(nettyClient, 0);
}

PartitionRequestClientFactory(NettyClient nettyClient, int retryNumber) {
this.nettyClient = nettyClient;
this.retryNumber = retryNumber;
}

/**
* Atomically establishes a TCP connection to the given remote address and
* creates a {@link NettyPartitionRequestClient} instance for this connection.
*/
NettyPartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException, InterruptedException {
Object entry;
NettyPartitionRequestClient client = null;

while (client == null) {
entry = clients.get(connectionId);

if (entry != null) {
// Existing channel or connecting channel
if (entry instanceof NettyPartitionRequestClient) {
client = (NettyPartitionRequestClient) entry;
}
else {
ConnectingChannel future = (ConnectingChannel) entry;
client = future.waitForChannel();

clients.replace(connectionId, future, client);
}
while (true) {
AtomicBoolean isTheFirstOne = new AtomicBoolean(false);
CompletableFuture<NettyPartitionRequestClient> clientFuture = clients.computeIfAbsent(connectionId, unused -> {
isTheFirstOne.set(true);
return new CompletableFuture<>();
});
if (isTheFirstOne.get()) {
clientFuture.complete(connectWithRetries(connectionId));
}
else {
// No channel yet. Create one, but watch out for a race.
// We create a "connecting future" and atomically add it to the map.
// Only the thread that really added it establishes the channel.
// The others need to wait on that original establisher's future.
ConnectingChannel connectingChannel = new ConnectingChannel(connectionId, this);
Object old = clients.putIfAbsent(connectionId, connectingChannel);

if (old == null) {
nettyClient.connect(connectionId.getAddress()).addListener(connectingChannel);

client = connectingChannel.waitForChannel();

clients.replace(connectionId, connectingChannel, client);
}
else if (old instanceof ConnectingChannel) {
client = ((ConnectingChannel) old).waitForChannel();

clients.replace(connectionId, old, client);
}
else {
client = (NettyPartitionRequestClient) old;
}
final NettyPartitionRequestClient client;
try {
client = clientFuture.get();
} catch (ExecutionException e) {
throw new IOException(e);
}

// Make sure to increment the reference count before handing a client
// out to ensure correct bookkeeping for channel closing.
if (!client.incrementReferenceCounter()) {
if (client.incrementReferenceCounter()) {
return client;
} else {
destroyPartitionRequestClient(connectionId, client);
client = null;
}
}
}

return client;
private NettyPartitionRequestClient connectWithRetries(ConnectionID connectionId) {
int tried = 0;
while (true) {
try {
return connect(connectionId);
} catch (RemoteTransportException e) {
tried++;
LOG.error("Failed {} times to connect to {}", tried, connectionId.getAddress(), e);
if (tried > retryNumber) {
throw new CompletionException(e);
}
}
}
}

public void closeOpenChannelConnections(ConnectionID connectionId) {
Object entry = clients.get(connectionId);
private NettyPartitionRequestClient connect(ConnectionID connectionId) throws RemoteTransportException {
try {
Channel channel = nettyClient.connect(connectionId.getAddress()).await().channel();
NetworkClientHandler clientHandler = channel.pipeline().get(NetworkClientHandler.class);
return new NettyPartitionRequestClient(channel, clientHandler, connectionId, this);
} catch (Exception e) {
throw new RemoteTransportException(
"Connecting to remote task manager '" + connectionId.getAddress() +
"' has failed. This might indicate that the remote task " +
"manager has been lost.",
connectionId.getAddress(), e);
}
}

if (entry instanceof ConnectingChannel) {
ConnectingChannel channel = (ConnectingChannel) entry;
void closeOpenChannelConnections(ConnectionID connectionId) {
CompletableFuture<NettyPartitionRequestClient> entry = clients.get(connectionId);

if (channel.dispose()) {
clients.remove(connectionId, channel);
}
if (entry != null && !entry.isDone()) {
entry.thenAccept(client -> {
if (client.disposeIfNotUsed()) {
clients.remove(connectionId, entry);
}
});
}
}

Expand All @@ -128,104 +141,14 @@ int getNumberOfActiveClients() {
* Removes the client for the given {@link ConnectionID}.
*/
void destroyPartitionRequestClient(ConnectionID connectionId, PartitionRequestClient client) {
clients.remove(connectionId, client);
}

private static final class ConnectingChannel implements ChannelFutureListener {

private final Object connectLock = new Object();

private final ConnectionID connectionId;

private final PartitionRequestClientFactory clientFactory;

private boolean disposeRequestClient = false;

public ConnectingChannel(ConnectionID connectionId, PartitionRequestClientFactory clientFactory) {
this.connectionId = connectionId;
this.clientFactory = clientFactory;
}

private boolean dispose() {
boolean result;
synchronized (connectLock) {
if (partitionRequestClient != null) {
result = partitionRequestClient.disposeIfNotUsed();
}
else {
disposeRequestClient = true;
result = true;
}

connectLock.notifyAll();
}

return result;
}

private void handInChannel(Channel channel) {
synchronized (connectLock) {
try {
NetworkClientHandler clientHandler = channel.pipeline().get(NetworkClientHandler.class);
partitionRequestClient = new NettyPartitionRequestClient(
channel, clientHandler, connectionId, clientFactory);

if (disposeRequestClient) {
partitionRequestClient.disposeIfNotUsed();
}

connectLock.notifyAll();
}
catch (Throwable t) {
notifyOfError(t);
final CompletableFuture<NettyPartitionRequestClient> future = clients.get(connectionId);
if (future != null && future.isDone()) {
future.thenAccept(futureClient -> {
if (client.equals(futureClient)) {
clients.remove(connectionId, future);
}
}
}

private volatile NettyPartitionRequestClient partitionRequestClient;

private volatile Throwable error;

private NettyPartitionRequestClient waitForChannel() throws IOException, InterruptedException {
synchronized (connectLock) {
while (error == null && partitionRequestClient == null) {
connectLock.wait(2000);
}
}

if (error != null) {
throw new IOException("Connecting the channel failed: " + error.getMessage(), error);
}

return partitionRequestClient;
}

private void notifyOfError(Throwable error) {
synchronized (connectLock) {
this.error = error;
connectLock.notifyAll();
}
}

@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
handInChannel(future.channel());
}
else if (future.cause() != null) {
notifyOfError(new RemoteTransportException(
"Connecting to remote task manager + '" + connectionId.getAddress() +
"' has failed. This might indicate that the remote task " +
"manager has been lost.",
connectionId.getAddress(), future.cause()));
}
else {
notifyOfError(new LocalTransportException(
String.format(
"Connecting to remote task manager '%s' has been cancelled.",
connectionId.getAddress()),
null));
}
});
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
package org.apache.flink.runtime.io.network.netty;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
import org.apache.flink.util.NetUtils;

import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;

import java.net.InetAddress;
import java.net.InetSocketAddress;
Expand Down
Loading

0 comments on commit 0175bf5

Please sign in to comment.