Skip to content

Commit

Permalink
[CELEBORN-1024] Thread factory should set UncaughtExceptionHandler to…
Browse files Browse the repository at this point in the history
… handle uncaught exception

### What changes were proposed in this pull request?

`batchHandleChangePartitionExecutors` could not handle uncaught exception in `ChangePartitionRequest`, which causes that the uncaught exception of the thread could not get for troubleshooting. Thread factory should set `UncaughtExceptionHandler` to handle uncaught exception.

### Why are the changes needed?

Thread factory sets `UncaughtExceptionHandler` to handle uncaught exception in `ThreadUtils`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit tests.

Closes apache#1962 from SteNicholas/CELEBORN-1024.

Authored-by: SteNicholas <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
  • Loading branch information
SteNicholas authored and pan3793 committed Oct 9, 2023
1 parent b2412d0 commit 56276e9
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,27 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.shuffle.*;
import org.apache.flink.runtime.shuffle.JobShuffleContext;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.shuffle.ProducerDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.shuffle.ShuffleMasterContext;
import org.apache.flink.runtime.shuffle.TaskInputsOutputsDescriptor;
import org.apache.flink.util.ExecutorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.celeborn.client.LifecycleManager;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.util.JavaUtils;
import org.apache.celeborn.common.util.ThreadUtils;
import org.apache.celeborn.plugin.flink.utils.FlinkUtils;
import org.apache.celeborn.plugin.flink.utils.ThreadUtils;

public class RemoteShuffleMaster implements ShuffleMaster<RemoteShuffleDescriptor> {
private static final Logger LOG = LoggerFactory.getLogger(RemoteShuffleMaster.class);
Expand All @@ -47,9 +55,7 @@ public class RemoteShuffleMaster implements ShuffleMaster<RemoteShuffleDescripto
private ShuffleResourceTracker shuffleResourceTracker;
private final ScheduledThreadPoolExecutor executor =
new ScheduledThreadPoolExecutor(
1,
ThreadUtils.createFactoryWithDefaultExceptionHandler(
"remote-shuffle-master-executor", LOG));
1, ThreadUtils.namedThreadFactory("remote-shuffle-master-executor"));
private final ResultPartitionAdapter resultPartitionDelegation;
private final long lifecycleManagerTimestamp;

Expand Down Expand Up @@ -230,6 +236,6 @@ public void close() throws Exception {
LOG.warn("Encounter exception when shutdown: {}", e.getMessage(), e);
}

ThreadUtils.shutdownExecutors(10, executor);
ExecutorUtils.gracefulShutdown(10, TimeUnit.SECONDS, executor);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.celeborn.common.protocol.PbStreamHandler;
import org.apache.celeborn.common.protocol.StreamType;
import org.apache.celeborn.common.util.ShuffleBlockInfoUtils;
import org.apache.celeborn.common.util.ThreadExceptionHandler;
import org.apache.celeborn.common.util.Utils;

public class DfsPartitionReader implements PartitionReader {
Expand Down Expand Up @@ -166,13 +167,7 @@ public DfsPartitionReader(
logger.debug("fetch {} is done.", location.getStorageInfo().getFilePath());
},
"Dfs-fetch-thread" + location.getStorageInfo().getFilePath());
fetchThread.setUncaughtExceptionHandler(
new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
logger.error("thread {} failed with exception {}", t, e);
}
});
fetchThread.setUncaughtExceptionHandler(new ThreadExceptionHandler(fetchThread.getName()));
logger.debug("Start dfs read on location {}", location);
ShuffleClient.incrementTotalReadCounter();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import scala.util.control.NonFatal
import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}

import org.apache.celeborn.common.exception.CelebornException
import org.apache.celeborn.common.internal.Logging

object ThreadUtils {

Expand All @@ -44,17 +45,31 @@ object ThreadUtils {
/**
* Create a thread factory that names threads with a prefix and also sets the threads to daemon.
*/
def namedThreadFactory(prefix: String): ThreadFactory = {
new ThreadFactoryBuilder().setDaemon(true).setNameFormat(prefix + "-%d").build()
def namedThreadFactory(threadNamePrefix: String): ThreadFactory = {
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(s"$threadNamePrefix-%d")
.setUncaughtExceptionHandler(new ThreadExceptionHandler(threadNamePrefix))
.build()
}

/**
* Create a thread factory that names threads with thread name and also sets the threads to daemon.
*/
def namedSingleThreadFactory(threadName: String): ThreadFactory = {
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(threadName)
.setUncaughtExceptionHandler(new ThreadExceptionHandler(threadName))
.build()
}

/**
* Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID is a
* unique, sequentially assigned integer.
*/
def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = {
val threadFactory = namedThreadFactory(prefix)
Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
Executors.newCachedThreadPool(namedThreadFactory(prefix)).asInstanceOf[ThreadPoolExecutor]
}

/**
Expand All @@ -65,14 +80,13 @@ object ThreadUtils {
prefix: String,
maxThreadNumber: Int,
keepAliveSeconds: Int = 60): ThreadPoolExecutor = {
val threadFactory = namedThreadFactory(prefix)
val threadPool = new ThreadPoolExecutor(
maxThreadNumber, // corePoolSize: the max number of threads to create before queuing the tasks
maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, this one is not used
keepAliveSeconds,
TimeUnit.SECONDS,
new LinkedBlockingQueue[Runnable],
threadFactory)
namedThreadFactory(prefix))
threadPool.allowCoreThreadTimeOut(true)
threadPool
}
Expand All @@ -82,24 +96,22 @@ object ThreadUtils {
* unique, sequentially assigned integer.
*/
def newDaemonFixedThreadPool(nThreads: Int, prefix: String): ThreadPoolExecutor = {
val threadFactory = namedThreadFactory(prefix)
Executors.newFixedThreadPool(nThreads, threadFactory).asInstanceOf[ThreadPoolExecutor]
Executors.newFixedThreadPool(nThreads, namedThreadFactory(prefix))
.asInstanceOf[ThreadPoolExecutor]
}

/**
* Wrapper over newSingleThreadExecutor.
*/
def newDaemonSingleThreadExecutor(threadName: String): ExecutorService = {
val threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build()
Executors.newSingleThreadExecutor(threadFactory)
Executors.newSingleThreadExecutor(namedSingleThreadFactory(threadName))
}

/**
* Wrapper over ScheduledThreadPoolExecutor.
*/
def newDaemonSingleThreadScheduledExecutor(threadName: String): ScheduledExecutorService = {
val threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build()
val executor = new ScheduledThreadPoolExecutor(1, threadFactory)
val executor = new ScheduledThreadPoolExecutor(1, namedSingleThreadFactory(threadName))
// By default, a cancelled task is not automatically removed from the work queue until its delay
// elapses. We have to enable it manually.
executor.setRemoveOnCancelPolicy(true)
Expand All @@ -112,11 +124,7 @@ object ThreadUtils {
def newDaemonThreadPoolScheduledExecutor(
threadNamePrefix: String,
numThreads: Int): ScheduledExecutorService = {
val threadFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(s"$threadNamePrefix-%d")
.build()
val executor = new ScheduledThreadPoolExecutor(numThreads, threadFactory)
val executor = new ScheduledThreadPoolExecutor(numThreads, namedThreadFactory(threadNamePrefix))
// By default, a cancelled task is not automatically removed from the work queue until its delay
// elapses. We have to enable it manually.
executor.setRemoveOnCancelPolicy(true)
Expand Down Expand Up @@ -299,3 +307,10 @@ object ThreadUtils {
}
}
}

class ThreadExceptionHandler(executorService: String)
extends Thread.UncaughtExceptionHandler with Logging {

override def uncaughtException(t: Thread, e: Throwable): Unit =
logError(s"Uncaught exception in executor service $executorService, thread $t", e)
}

0 comments on commit 56276e9

Please sign in to comment.