Skip to content

Commit

Permalink
[FLINK-11103][runtime] Set a configurable uncaught exception handler …
Browse files Browse the repository at this point in the history
…for all entrypoints

[FLINK-11103][docs] Generate docs for ClusterOptions changes

[FLINK-11103][tests] Add integration test for ClusterUncaughtExceptionHandler to check exit behaviour

This closes apache#15938.
  • Loading branch information
ashwinkolhatkar authored and tillrohrmann committed Jul 12, 2021
1 parent 4fddcb2 commit 4a3e6d6
Show file tree
Hide file tree
Showing 7 changed files with 229 additions and 0 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/expert_cluster_section.html
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,11 @@
<td>Boolean</td>
<td>Whether processes should halt on fatal errors instead of performing a graceful shutdown. In some environments (e.g. Java 8 with the G1 garbage collector), a regular graceful shutdown can lead to a JVM deadlock. See <a href="https://issues.apache.org/jira/browse/FLINK-16510">FLINK-16510</a> for details.</td>
</tr>
<tr>
<td><h5>cluster.uncaught-exception-handling</h5></td>
<td style="word-wrap: break-word;">LOG</td>
<td><p>Enum</p>Possible values: [LOG, FAIL]</td>
<td>Defines whether cluster will handle any uncaught exceptions by just logging them (LOG mode), or by failing job (FAIL mode)</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,18 @@ public class ClusterOptions {
.withDescription(
"Whether to convert all PIPELINE edges to BLOCKING when apply fine-grained resource management in batch jobs.");

@Documentation.Section(Documentation.Sections.EXPERT_CLUSTER)
public static final ConfigOption<UncaughtExceptionHandleMode> UNCAUGHT_EXCEPTION_HANDLING =
ConfigOptions.key("cluster.uncaught-exception-handling")
.enumType(UncaughtExceptionHandleMode.class)
.defaultValue(UncaughtExceptionHandleMode.LOG)
.withDescription(
String.format(
"Defines whether cluster will handle any uncaught exceptions "
+ "by just logging them (%s mode), or by failing job (%s mode)",
UncaughtExceptionHandleMode.LOG.name(),
UncaughtExceptionHandleMode.FAIL.name()));

public static JobManagerOptions.SchedulerType getSchedulerType(Configuration configuration) {
if (isAdaptiveSchedulerEnabled(configuration) || isReactiveModeEnabled(configuration)) {
return JobManagerOptions.SchedulerType.Adaptive;
Expand Down Expand Up @@ -196,4 +208,10 @@ public String getDescription() {
return description;
}
}

/** @see ClusterOptions#UNCAUGHT_EXCEPTION_HANDLING */
public enum UncaughtExceptionHandleMode {
LOG,
FAIL
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ public void startCluster() throws ClusterEntrypointException {

SecurityContext securityContext = installSecurityContext(configuration);

ClusterEntrypointUtils.configureUncaughtExceptionHandler(configuration);
securityContext.runSecured(
(Callable<Void>)
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
import org.apache.flink.runtime.entrypoint.parser.ParserResultFactory;
import org.apache.flink.runtime.util.ClusterUncaughtExceptionHandler;
import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.util.Preconditions;

Expand Down Expand Up @@ -116,4 +117,15 @@ public static int getPoolSize(Configuration config) {
poolSize, ClusterOptions.CLUSTER_IO_EXECUTOR_POOL_SIZE.key()));
return poolSize;
}

/**
* Sets the uncaught exception handler for current thread based on configuration.
*
* @param config the configuration to read.
*/
public static void configureUncaughtExceptionHandler(Configuration config) {
Thread.setDefaultUncaughtExceptionHandler(
new ClusterUncaughtExceptionHandler(
config.get(ClusterOptions.UNCAUGHT_EXCEPTION_HANDLING)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils;
import org.apache.flink.runtime.entrypoint.FlinkParseException;
import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.externalresource.ExternalResourceUtils;
Expand Down Expand Up @@ -417,6 +418,7 @@ public static void runTaskManagerProcessSecurely(Configuration configuration) {
int exitCode;
Throwable throwable = null;

ClusterEntrypointUtils.configureUncaughtExceptionHandler(configuration);
try {
SecurityUtils.install(new SecurityConfiguration(configuration));

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.util;

import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.util.FatalExitExceptionHandler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Utility for handling any uncaught exceptions
*
* <p>Handles any uncaught exceptions according to cluster configuration in {@link ClusterOptions}
* to either just log exception, or fail job.
*/
public class ClusterUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
private static final Logger LOG =
LoggerFactory.getLogger(ClusterUncaughtExceptionHandler.class);
private final ClusterOptions.UncaughtExceptionHandleMode handleMode;

public ClusterUncaughtExceptionHandler(ClusterOptions.UncaughtExceptionHandleMode handleMode) {
this.handleMode = handleMode;
}

@Override
public void uncaughtException(Thread t, Throwable e) {
if (handleMode == ClusterOptions.UncaughtExceptionHandleMode.LOG) {
LOG.error(
"WARNING: Thread '{}' produced an uncaught exception. If you want to fail on uncaught exceptions, then configure {} accordingly",
t.getName(),
ClusterOptions.UNCAUGHT_EXCEPTION_HANDLING.key());
} else { // by default, fail the job
FatalExitExceptionHandler.INSTANCE.uncaughtException(t, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.entrypoint;

import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.testutils.TestJvmProcess;
import org.apache.flink.runtime.util.ClusterUncaughtExceptionHandler;
import org.apache.flink.util.FatalExitExceptionHandler;
import org.apache.flink.util.OperatingSystem;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.ScheduledExecutor;

import org.junit.Before;
import org.junit.Test;

import java.io.IOException;

import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assume.assumeTrue;

/** Integration test to check exit behaviour for the {@link ClusterUncaughtExceptionHandler}. */
public class ClusterUncaughtExceptionHandlerITCase extends TestLogger {

@Before
public void ensureSupportedOS() {
// based on the assumption in JvmExitOnFatalErrorTest, and manual testing on Mac, we do not
// support all platforms (in particular not Windows)
assumeTrue(OperatingSystem.isLinux() || OperatingSystem.isMac());
}

@Test
public void testExitDueToUncaughtException() throws Exception {
final ForcedJVMExitProcess testProcess =
new ForcedJVMExitProcess(ClusterTestingEntrypoint.class);

testProcess.startProcess();
try {
testProcess.waitFor();
int signedIntegerExitCode =
FatalExitExceptionHandler
.EXIT_CODE; // for FAIL mode, exit is done using this handler.
int unsignedIntegerExitCode = ((byte) signedIntegerExitCode) & 0xFF;
assertThat(testProcess.exitCode(), is(unsignedIntegerExitCode));
} finally {
testProcess.destroy();
}
}

private static class ClusterTestingEntrypoint extends ClusterEntrypoint {

protected ClusterTestingEntrypoint(Configuration configuration) {
super(configuration);
}

@Override
protected DispatcherResourceManagerComponentFactory
createDispatcherResourceManagerComponentFactory(Configuration configuration)
throws IOException {
Thread t =
new Thread(
() -> {
throw new RuntimeException("Test exception");
});
t.start();
try {
t.join(1000L);
} catch (InterruptedException e) {
throw new ExpectedTestException("this line should not be reached");
}
throw new ExpectedTestException("this line should not be reached.");
}

@Override
protected ExecutionGraphInfoStore createSerializableExecutionGraphStore(
Configuration configuration, ScheduledExecutor scheduledExecutor)
throws IOException {
return null;
}

public static void main(String[] args) {
try {
final Configuration config = new Configuration();
config.set(
ClusterOptions.UNCAUGHT_EXCEPTION_HANDLING,
ClusterOptions.UncaughtExceptionHandleMode.FAIL);
ClusterTestingEntrypoint testingEntrypoint = new ClusterTestingEntrypoint(config);
testingEntrypoint.startCluster();
} catch (Throwable t) {
System.exit(1);
}
}
}

private static final class ForcedJVMExitProcess extends TestJvmProcess {
private final Class<?> entryPointName;

private ForcedJVMExitProcess(Class<?> entryPointName) throws Exception {
this.entryPointName = entryPointName;
}

@Override
public String getName() {
return getEntryPointClassName();
}

@Override
public String[] getJvmArgs() {
return new String[0];
}

@Override
public String getEntryPointClassName() {
return entryPointName.getName();
}
}
}

0 comments on commit 4a3e6d6

Please sign in to comment.