Skip to content

Commit

Permalink
[FLINK-10052][ha] Tolerate temporarily suspended ZooKeeper connections
Browse files Browse the repository at this point in the history
Signed-off-by: tison <[email protected]>
  • Loading branch information
tisonkun authored and tillrohrmann committed Aug 15, 2021
1 parent 71caf5e commit 9635083
Show file tree
Hide file tree
Showing 11 changed files with 260 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@
<td>Integer</td>
<td>Defines the session timeout for the ZooKeeper session in ms.</td>
</tr>
<tr>
<td><h5>high-availability.zookeeper.client.tolerate-suspended-connections</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Defines whether a suspended ZooKeeper connection will be treated as an error that causes the leader information to be invalidated or not. In case you set this option to <code class="highlighter-rouge">true</code>, Flink will wait until a ZooKeeper connection is marked as lost before it revokes the leadership of components. This has the effect that Flink is more resilient against temporary connection instabilities at the cost of running more likely into timing issues with ZooKeeper.</td>
</tr>
<tr>
<td><h5>high-availability.zookeeper.path.jobgraphs</h5></td>
<td style="word-wrap: break-word;">"/jobgraphs"</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@
<td>Integer</td>
<td>Defines the session timeout for the ZooKeeper session in ms.</td>
</tr>
<tr>
<td><h5>high-availability.zookeeper.client.tolerate-suspended-connections</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Defines whether a suspended ZooKeeper connection will be treated as an error that causes the leader information to be invalidated or not. In case you set this option to <code class="highlighter-rouge">true</code>, Flink will wait until a ZooKeeper connection is marked as lost before it revokes the leadership of components. This has the effect that Flink is more resilient against temporary connection instabilities at the cost of running more likely into timing issues with ZooKeeper.</td>
</tr>
<tr>
<td><h5>high-availability.zookeeper.path.jobgraphs</h5></td>
<td style="word-wrap: break-word;">"/jobgraphs"</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.flink.configuration;

import org.apache.flink.annotation.docs.Documentation;
import org.apache.flink.configuration.description.Description;
import org.apache.flink.configuration.description.TextElement;

import static org.apache.flink.configuration.ConfigOptions.key;

Expand Down Expand Up @@ -169,6 +171,22 @@ public class HighAvailabilityOptions {
+ " set to “creator” if the ZooKeeper server configuration has the “authProvider” property mapped to use"
+ " SASLAuthenticationProvider and the cluster is configured to run in secure mode (Kerberos).");

@Documentation.Section(Documentation.Sections.EXPERT_ZOOKEEPER_HIGH_AVAILABILITY)
public static final ConfigOption<Boolean> ZOOKEEPER_TOLERATE_SUSPENDED_CONNECTIONS =
key("high-availability.zookeeper.client.tolerate-suspended-connections")
.booleanType()
.defaultValue(false)
.withDescription(
Description.builder()
.text(
"Defines whether a suspended ZooKeeper connection will be treated as an error that causes the leader "
+ "information to be invalidated or not. In case you set this option to %s, Flink will wait until a "
+ "ZooKeeper connection is marked as lost before it revokes the leadership of components. This has the "
+ "effect that Flink is more resilient against temporary connection instabilities at the cost of running "
+ "more likely into timing issues with ZooKeeper.",
TextElement.code("true"))
.build());

// ------------------------------------------------------------------------
// Deprecated options
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public static ClientHighAvailabilityServices createClientHAService(Configuration
return new StandaloneClientHAServices(webMonitorAddress);
case ZOOKEEPER:
final CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration);
return new ZooKeeperClientHAServices(client);
return new ZooKeeperClientHAServices(client, configuration);
case FACTORY_CLASS:
return createCustomClientHAServices(configuration);
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.highavailability.zookeeper;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.highavailability.ClientHighAvailabilityServices;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.util.ZooKeeperUtils;
Expand All @@ -31,14 +32,18 @@ public class ZooKeeperClientHAServices implements ClientHighAvailabilityServices

private final CuratorFramework client;

public ZooKeeperClientHAServices(@Nonnull CuratorFramework client) {
private final Configuration configuration;

public ZooKeeperClientHAServices(
@Nonnull CuratorFramework client, @Nonnull Configuration configuration) {
this.client = client;
this.configuration = configuration;
}

@Override
public LeaderRetrievalService getClusterRestEndpointLeaderRetriever() {
return ZooKeeperUtils.createLeaderRetrievalService(
client, ZooKeeperUtils.getLeaderPathForRestServer());
client, ZooKeeperUtils.getLeaderPathForRestServer(), configuration);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ protected LeaderElectionService createLeaderElectionService(String leaderPath) {

@Override
protected LeaderRetrievalService createLeaderRetrievalService(String leaderPath) {
return ZooKeeperUtils.createLeaderRetrievalService(client, leaderPath);
return ZooKeeperUtils.createLeaderRetrievalService(client, leaderPath, configuration);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,7 @@ private void handleStateChange(ConnectionState newState) {
LOG.debug("Connected to ZooKeeper quorum. Leader election can start.");
break;
case SUSPENDED:
LOG.warn(
"Connection to ZooKeeper suspended. The contender "
+ leaderContenderDescription
+ " no longer participates in the leader election.");
LOG.warn("Connection to ZooKeeper suspended, waiting for reconnection.");
break;
case RECONNECTED:
LOG.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public class ZooKeeperLeaderRetrievalDriver

private final LeaderRetrievalEventHandler leaderRetrievalEventHandler;

private final LeaderInformationClearancePolicy leaderInformationClearancePolicy;

private final FatalErrorHandler fatalErrorHandler;

private volatile boolean running;
Expand All @@ -74,12 +76,15 @@ public class ZooKeeperLeaderRetrievalDriver
* @param client Client which constitutes the connection to the ZooKeeper quorum
* @param path Path of the ZooKeeper node which contains the leader information
* @param leaderRetrievalEventHandler Handler to notify the leader changes.
* @param leaderInformationClearancePolicy leaderInformationClearancePolicy controls when the
* leader information is being cleared
* @param fatalErrorHandler Fatal error handler
*/
public ZooKeeperLeaderRetrievalDriver(
CuratorFramework client,
String path,
LeaderRetrievalEventHandler leaderRetrievalEventHandler,
LeaderInformationClearancePolicy leaderInformationClearancePolicy,
FatalErrorHandler fatalErrorHandler)
throws Exception {
this.client = checkNotNull(client, "CuratorFramework client");
Expand All @@ -91,6 +96,7 @@ public ZooKeeperLeaderRetrievalDriver(
this::retrieveLeaderInformationFromZooKeeper);

this.leaderRetrievalEventHandler = checkNotNull(leaderRetrievalEventHandler);
this.leaderInformationClearancePolicy = leaderInformationClearancePolicy;
this.fatalErrorHandler = checkNotNull(fatalErrorHandler);

client.getUnhandledErrorListenable().addListener(this);
Expand Down Expand Up @@ -136,7 +142,7 @@ private void retrieveLeaderInformationFromZooKeeper() {
return;
}
}
leaderRetrievalEventHandler.notifyLeaderAddress(LeaderInformation.empty());
notifyNoLeader();
} catch (Exception e) {
fatalErrorHandler.onFatalError(
new LeaderRetrievalException("Could not handle node changed event.", e));
Expand All @@ -150,10 +156,11 @@ private void handleStateChange(ConnectionState newState) {
LOG.debug("Connected to ZooKeeper quorum. Leader retrieval can start.");
break;
case SUSPENDED:
LOG.warn(
"Connection to ZooKeeper suspended. Can no longer retrieve the leader from "
+ "ZooKeeper.");
leaderRetrievalEventHandler.notifyLeaderAddress(LeaderInformation.empty());
LOG.warn("Connection to ZooKeeper suspended, waiting for reconnection.");
if (leaderInformationClearancePolicy
== LeaderInformationClearancePolicy.ON_SUSPENDED_CONNECTION) {
notifyNoLeader();
}
break;
case RECONNECTED:
LOG.info(
Expand All @@ -164,11 +171,15 @@ private void handleStateChange(ConnectionState newState) {
LOG.warn(
"Connection to ZooKeeper lost. Can no longer retrieve the leader from "
+ "ZooKeeper.");
leaderRetrievalEventHandler.notifyLeaderAddress(LeaderInformation.empty());
notifyNoLeader();
break;
}
}

private void notifyNoLeader() {
leaderRetrievalEventHandler.notifyLeaderAddress(LeaderInformation.empty());
}

private void onReconnectedConnectionState() {
// check whether we find some new leader information in ZooKeeper
retrieveLeaderInformationFromZooKeeper();
Expand All @@ -194,4 +205,13 @@ public String toString() {
public String getConnectionInformationPath() {
return connectionInformationPath;
}

/** Policy when to clear the leader information and to notify the listener about it. */
public enum LeaderInformationClearancePolicy {
// clear the leader information as soon as the ZK connection is suspended
ON_SUSPENDED_CONNECTION,

// clear the leader information only once the ZK connection is lost
ON_LOST_CONNECTION
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,28 @@ public class ZooKeeperLeaderRetrievalDriverFactory implements LeaderRetrievalDri

private final String retrievalPath;

public ZooKeeperLeaderRetrievalDriverFactory(CuratorFramework client, String retrievalPath) {
private final ZooKeeperLeaderRetrievalDriver.LeaderInformationClearancePolicy
leaderInformationClearancePolicy;

public ZooKeeperLeaderRetrievalDriverFactory(
CuratorFramework client,
String retrievalPath,
ZooKeeperLeaderRetrievalDriver.LeaderInformationClearancePolicy
leaderInformationClearancePolicy) {
this.client = client;
this.retrievalPath = retrievalPath;
this.leaderInformationClearancePolicy = leaderInformationClearancePolicy;
}

@Override
public ZooKeeperLeaderRetrievalDriver createLeaderRetrievalDriver(
LeaderRetrievalEventHandler leaderEventHandler, FatalErrorHandler fatalErrorHandler)
throws Exception {
return new ZooKeeperLeaderRetrievalDriver(
client, retrievalPath, leaderEventHandler, fatalErrorHandler);
client,
retrievalPath,
leaderEventHandler,
leaderInformationClearancePolicy,
fatalErrorHandler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.TreeCacheSelector;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.state.SessionConnectionStateErrorPolicy;
import org.apache.flink.shaded.curator4.org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooDefs;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.data.ACL;
Expand Down Expand Up @@ -207,7 +208,7 @@ public static CuratorFramework startCuratorFramework(Configuration configuration

LOG.info("Using '{}' as Zookeeper namespace.", rootWithNamespace);

CuratorFramework cf =
final CuratorFrameworkFactory.Builder curatorFrameworkBuilder =
CuratorFrameworkFactory.builder()
.connectString(zkQuorum)
.sessionTimeoutMs(sessionTimeout)
Expand All @@ -216,8 +217,14 @@ public static CuratorFramework startCuratorFramework(Configuration configuration
// Curator prepends a '/' manually and throws an Exception if the
// namespace starts with a '/'.
.namespace(trimStartingSlash(rootWithNamespace))
.aclProvider(aclProvider)
.build();
.aclProvider(aclProvider);

if (configuration.get(HighAvailabilityOptions.ZOOKEEPER_TOLERATE_SUSPENDED_CONNECTIONS)) {
curatorFrameworkBuilder.connectionStateErrorPolicy(
new SessionConnectionStateErrorPolicy());
}

CuratorFramework cf = curatorFrameworkBuilder.build();

cf.start();

Expand Down Expand Up @@ -257,7 +264,7 @@ public static String getZooKeeperEnsemble(Configuration flinkConf)
*/
public static DefaultLeaderRetrievalService createLeaderRetrievalService(
final CuratorFramework client) {
return createLeaderRetrievalService(client, "");
return createLeaderRetrievalService(client, "", new Configuration());
}

/**
Expand All @@ -266,11 +273,13 @@ public static DefaultLeaderRetrievalService createLeaderRetrievalService(
*
* @param client The {@link CuratorFramework} ZooKeeper client to use
* @param path The path for the leader retrieval
* @param configuration configuration for further config options
* @return {@link DefaultLeaderRetrievalService} instance.
*/
public static DefaultLeaderRetrievalService createLeaderRetrievalService(
final CuratorFramework client, final String path) {
return new DefaultLeaderRetrievalService(createLeaderRetrievalDriverFactory(client, path));
final CuratorFramework client, final String path, final Configuration configuration) {
return new DefaultLeaderRetrievalService(
createLeaderRetrievalDriverFactory(client, path, configuration));
}

/**
Expand All @@ -281,19 +290,34 @@ public static DefaultLeaderRetrievalService createLeaderRetrievalService(
*/
public static ZooKeeperLeaderRetrievalDriverFactory createLeaderRetrievalDriverFactory(
final CuratorFramework client) {
return createLeaderRetrievalDriverFactory(client, "");
return createLeaderRetrievalDriverFactory(client, "", new Configuration());
}

/**
* Creates a {@link LeaderRetrievalDriverFactory} implemented by ZooKeeper.
*
* @param client The {@link CuratorFramework} ZooKeeper client to use
* @param path The path for the leader zNode
* @param configuration configuration for further config options
* @return {@link LeaderRetrievalDriverFactory} instance.
*/
public static ZooKeeperLeaderRetrievalDriverFactory createLeaderRetrievalDriverFactory(
final CuratorFramework client, final String path) {
return new ZooKeeperLeaderRetrievalDriverFactory(client, path);
final CuratorFramework client, final String path, final Configuration configuration) {
final ZooKeeperLeaderRetrievalDriver.LeaderInformationClearancePolicy
leaderInformationClearancePolicy;

if (configuration.get(HighAvailabilityOptions.ZOOKEEPER_TOLERATE_SUSPENDED_CONNECTIONS)) {
leaderInformationClearancePolicy =
ZooKeeperLeaderRetrievalDriver.LeaderInformationClearancePolicy
.ON_LOST_CONNECTION;
} else {
leaderInformationClearancePolicy =
ZooKeeperLeaderRetrievalDriver.LeaderInformationClearancePolicy
.ON_SUSPENDED_CONNECTION;
}

return new ZooKeeperLeaderRetrievalDriverFactory(
client, path, leaderInformationClearancePolicy);
}

/**
Expand Down
Loading

0 comments on commit 9635083

Please sign in to comment.