Skip to content

Commit

Permalink
[FLINK-32180][runtime] Moves error handling from DefaultMultipleCompo…
Browse files Browse the repository at this point in the history
…nentLeaderElectionService into the MultipleComponentLeaderElectionDriver implementations (apache#22656)

The error handling needs to be moved into the driver to allow errors that were reported to the
k8s watcher to be forwarded to the LeaderContender's error handling.

Signed-off-by: Matthias Pohl <[email protected]>
  • Loading branch information
XComp authored Jun 20, 2023
1 parent 8119411 commit 6a41089
Show file tree
Hide file tree
Showing 13 changed files with 133 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
Expand Down Expand Up @@ -137,15 +138,18 @@ public boolean hasLeadership() {
}

@Override
public void publishLeaderInformation(String componentId, LeaderInformation leaderInformation)
throws Exception {
public void publishLeaderInformation(String componentId, LeaderInformation leaderInformation) {
Preconditions.checkState(running.get());

kubeClient
.checkAndUpdateConfigMap(
configMapName,
updateConfigMapWithLeaderInformation(componentId, leaderInformation))
.get();
try {
kubeClient
.checkAndUpdateConfigMap(
configMapName,
updateConfigMapWithLeaderInformation(componentId, leaderInformation))
.get();
} catch (InterruptedException | ExecutionException e) {
fatalErrorHandler.onFatalError(e);
}

LOG.debug(
"Successfully wrote leader information {} for leader {} into the config map {}.",
Expand All @@ -155,7 +159,7 @@ public void publishLeaderInformation(String componentId, LeaderInformation leade
}

@Override
public void deleteLeaderInformation(String componentId) throws Exception {
public void deleteLeaderInformation(String componentId) {
publishLeaderInformation(componentId, LeaderInformation.empty());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,22 @@ public class KubernetesMultipleComponentLeaderElectionDriverFactory

private final Executor watchExecutor;

private final FatalErrorHandler fatalErrorHandler;

public KubernetesMultipleComponentLeaderElectionDriverFactory(
FlinkKubeClient kubeClient,
KubernetesLeaderElectionConfiguration kubernetesLeaderElectionConfiguration,
KubernetesConfigMapSharedWatcher configMapSharedWatcher,
Executor watchExecutor,
FatalErrorHandler fatalErrorHandler) {
Executor watchExecutor) {
this.kubeClient = Preconditions.checkNotNull(kubeClient);
this.kubernetesLeaderElectionConfiguration =
Preconditions.checkNotNull(kubernetesLeaderElectionConfiguration);
this.configMapSharedWatcher = Preconditions.checkNotNull(configMapSharedWatcher);
this.watchExecutor = Preconditions.checkNotNull(watchExecutor);
this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler);
}

@Override
public KubernetesMultipleComponentLeaderElectionDriver create(
MultipleComponentLeaderElectionDriver.Listener leaderElectionListener)
MultipleComponentLeaderElectionDriver.Listener leaderElectionListener,
FatalErrorHandler fatalErrorHandler)
throws Exception {
return new KubernetesMultipleComponentLeaderElectionDriver(
kubernetesLeaderElectionConfiguration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,7 @@ protected LeaderElectionDriverFactory createLeaderElectionDriverFactory(String l
kubeClient,
leaderElectionConfiguration,
configMapSharedWatcher,
watchExecutorService,
fatalErrorHandler));
watchExecutorService));
} catch (Exception e) {
throw new FlinkRuntimeException(
"Could not initialize the default single leader election service.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;

Expand Down Expand Up @@ -53,8 +52,6 @@ public class DefaultMultipleComponentLeaderElectionService

private final MultipleComponentLeaderElectionDriver multipleComponentLeaderElectionDriver;

private final FatalErrorHandler fatalErrorHandler;

@GuardedBy("lock")
private final ExecutorService leadershipOperationExecutor;

Expand All @@ -75,14 +72,12 @@ public class DefaultMultipleComponentLeaderElectionService
multipleComponentLeaderElectionDriverFactory,
ExecutorService leadershipOperationExecutor)
throws Exception {
this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler);

this.leadershipOperationExecutor = Preconditions.checkNotNull(leadershipOperationExecutor);

leaderElectionEventHandlers = new HashMap<>();

multipleComponentLeaderElectionDriver =
multipleComponentLeaderElectionDriverFactory.create(this);
multipleComponentLeaderElectionDriverFactory.create(this, fatalErrorHandler);
}

public DefaultMultipleComponentLeaderElectionService(
Expand Down Expand Up @@ -120,17 +115,8 @@ public LeaderElectionDriverFactory createDriverFactory(String componentId) {

@Override
public void publishLeaderInformation(String componentId, LeaderInformation leaderInformation) {
try {
multipleComponentLeaderElectionDriver.publishLeaderInformation(
componentId, leaderInformation);
} catch (Exception e) {
fatalErrorHandler.onFatalError(
new FlinkException(
String.format(
"Could not write leader information %s for leader %s.",
leaderInformation, componentId),
e));
}
multipleComponentLeaderElectionDriver.publishLeaderInformation(
componentId, leaderInformation);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,15 @@ public interface MultipleComponentLeaderElectionDriver {
*
* @param componentId identifying the component for which to publish the leader information
* @param leaderInformation leader information of the respective component
* @throws Exception if publishing fails
*/
void publishLeaderInformation(String componentId, LeaderInformation leaderInformation)
throws Exception;
void publishLeaderInformation(String componentId, LeaderInformation leaderInformation);

/**
* Deletes the leader information for the given component.
*
* @param componentId identifying the component for which to delete the leader information
* @throws Exception if deleting fails
*/
void deleteLeaderInformation(String componentId) throws Exception;
void deleteLeaderInformation(String componentId);

/**
* Listener interface for state changes of the {@link MultipleComponentLeaderElectionDriver}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.runtime.leaderelection;

import org.apache.flink.runtime.rpc.FatalErrorHandler;

/** Factory for {@link MultipleComponentLeaderElectionDriver}. */
public interface MultipleComponentLeaderElectionDriverFactory {

Expand All @@ -27,9 +29,12 @@ public interface MultipleComponentLeaderElectionDriverFactory {
*
* @param leaderElectionListener listener for the callbacks of the {@link
* MultipleComponentLeaderElectionDriver}
* @param fatalErrorHandler component for handling fatal errors.
* @return created {@link MultipleComponentLeaderElectionDriver} instance
* @throws Exception if the creation fails
*/
MultipleComponentLeaderElectionDriver create(
MultipleComponentLeaderElectionDriver.Listener leaderElectionListener) throws Exception;
MultipleComponentLeaderElectionDriver.Listener leaderElectionListener,
FatalErrorHandler fatalErrorHandler)
throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.leaderelection;

import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -48,6 +49,8 @@ public class ZooKeeperMultipleComponentLeaderElectionDriver

private final MultipleComponentLeaderElectionDriver.Listener leaderElectionListener;

private final FatalErrorHandler fatalErrorHandler;

private final LeaderLatch leaderLatch;

private final TreeCache treeCache;
Expand All @@ -59,10 +62,12 @@ public class ZooKeeperMultipleComponentLeaderElectionDriver

public ZooKeeperMultipleComponentLeaderElectionDriver(
CuratorFramework curatorFramework,
MultipleComponentLeaderElectionDriver.Listener leaderElectionListener)
MultipleComponentLeaderElectionDriver.Listener leaderElectionListener,
FatalErrorHandler fatalErrorHandler)
throws Exception {
this.curatorFramework = Preconditions.checkNotNull(curatorFramework);
this.leaderElectionListener = Preconditions.checkNotNull(leaderElectionListener);
this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler);

this.leaderLatch = new LeaderLatch(curatorFramework, ZooKeeperUtils.getLeaderLatchPath());
this.treeCache =
Expand Down Expand Up @@ -130,8 +135,7 @@ public boolean hasLeadership() {
}

@Override
public void publishLeaderInformation(String componentId, LeaderInformation leaderInformation)
throws Exception {
public void publishLeaderInformation(String componentId, LeaderInformation leaderInformation) {
Preconditions.checkState(running.get());

if (LOG.isDebugEnabled()) {
Expand All @@ -145,17 +149,25 @@ public void publishLeaderInformation(String componentId, LeaderInformation leade
final String connectionInformationPath =
ZooKeeperUtils.generateConnectionInformationPath(componentId);

ZooKeeperUtils.writeLeaderInformationToZooKeeper(
leaderInformation,
curatorFramework,
leaderLatch::hasLeadership,
connectionInformationPath);
try {
ZooKeeperUtils.writeLeaderInformationToZooKeeper(
leaderInformation,
curatorFramework,
leaderLatch::hasLeadership,
connectionInformationPath);
} catch (Exception e) {
fatalErrorHandler.onFatalError(e);
}
}

@Override
public void deleteLeaderInformation(String leaderName) throws Exception {
ZooKeeperUtils.deleteZNode(
curatorFramework, ZooKeeperUtils.generateZookeeperPath(leaderName));
public void deleteLeaderInformation(String leaderName) {
try {
ZooKeeperUtils.deleteZNode(
curatorFramework, ZooKeeperUtils.generateZookeeperPath(leaderName));
} catch (Exception e) {
fatalErrorHandler.onFatalError(e);
}
}

private void handleStateChange(ConnectionState newState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.leaderelection;

import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.util.Preconditions;

import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
Expand All @@ -35,9 +36,10 @@ public ZooKeeperMultipleComponentLeaderElectionDriverFactory(

@Override
public ZooKeeperMultipleComponentLeaderElectionDriver create(
MultipleComponentLeaderElectionDriver.Listener leaderElectionListener)
MultipleComponentLeaderElectionDriver.Listener leaderElectionListener,
FatalErrorHandler fatalErrorHandler)
throws Exception {
return new ZooKeeperMultipleComponentLeaderElectionDriver(
curatorFramework, leaderElectionListener);
curatorFramework, leaderElectionListener, fatalErrorHandler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.leaderelection;

import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerExtension;
import org.apache.flink.util.ExceptionUtils;
Expand Down Expand Up @@ -117,6 +118,32 @@ void notLeaderInformsAllRegisteredLeaderElectionEventHandlers() throws Exception
}
}

@Test
void handleFatalError() throws Exception {
final TestingMultipleComponentLeaderElectionDriver leaderElectionDriver =
TestingMultipleComponentLeaderElectionDriver.newBuilder().build();

final DefaultMultipleComponentLeaderElectionService leaderElectionService =
createDefaultMultiplexingLeaderElectionService(leaderElectionDriver);

try {
final Throwable expectedFatalError =
new Exception("Expected exception simulating a fatal error.");

leaderElectionDriver.triggerErrorHandling(expectedFatalError);

FlinkAssertions.assertThatFuture(
fatalErrorHandlerExtension
.getTestingFatalErrorHandler()
.getErrorFuture())
.eventuallySucceeds()
.isEqualTo(expectedFatalError);
} finally {
leaderElectionService.close();
fatalErrorHandlerExtension.getTestingFatalErrorHandler().clearError();
}
}

@Test
void unregisteredEventHandlersAreNotNotified() throws Exception {
final TestingMultipleComponentLeaderElectionDriver leaderElectionDriver =
Expand Down
Loading

0 comments on commit 6a41089

Please sign in to comment.