Skip to content

Commit

Permalink
[CELEBORN-1084] Initialize workerSource member to prevent `NullPoin…
Browse files Browse the repository at this point in the history
…tException`

### What changes were proposed in this pull request?

As title

### Why are the changes needed?

This PR addresses a NPE issue that occurs when the `workerSource` member is accessed before it is initialized. To resolve this issue, we initialize the `workerSource` member when the handlers are created.

```
23/10/24 16:27:03,363 ERROR [fetch-server-11-1] TransportChannelHandler: Exception from request handler while channel is active
java.lang.NullPointerException
        at org.apache.celeborn.service.deploy.worker.FetchHandler.channelActive(FetchHandler.scala:412)
        at org.apache.celeborn.common.network.server.TransportRequestHandler.channelActive(TransportRequestHandler.java:66)
        at org.apache.celeborn.common.network.server.TransportChannelHandler.channelActive(TransportChannelHandler.java:120)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:262)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:238)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:231)
        at io.netty.channel.ChannelInboundHandlerAdapter.channelActive(ChannelInboundHandlerAdapter.java:69)
        at io.netty.handler.timeout.IdleStateHandler.channelActive(IdleStateHandler.java:271)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:260)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:238)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:231)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelActive(DefaultChannelPipeline.java:1398)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:258)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:238)
        at io.netty.channel.DefaultChannelPipeline.fireChannelActive(DefaultChannelPipeline.java:895)
        at io.netty.channel.AbstractChannel$AbstractUnsafe.register0(AbstractChannel.java:522)
        at io.netty.channel.AbstractChannel$AbstractUnsafe.access$200(AbstractChannel.java:429)
        at io.netty.channel.AbstractChannel$AbstractUnsafe$1.run(AbstractChannel.java:486)
        at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)
...
23/10/24 16:27:03,423 INFO [main] Worker: Starting Worker <ip>:<port1>:<port2>:<port3> with {/data1=DiskInfo(maxSlots: 0, committed shuffles 0 shuffleAllocations: Map(), mountPoint: /data1, usableSpace: 250.0 GiB, avgFlushTime: 0 ns, avgFetchTime: 0 ns, activeSlots: 0) status: HEALTHY dirs /data1/celeborn/worker/celeborn-worker/shuffle_data, /data=DiskInfo(maxSlots: 0, committed shuffles 0 shuffleAllocations: Map(), mountPoint: /data, usableSpace: 250.0 GiB, avgFlushTime: 0 ns, avgFetchTime: 0 ns, activeSlots: 0) status: HEALTHY dirs /data/celeborn/worker/celeborn-worker/shuffle_data} slots.
...
```

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Pass GA

Closes apache#2034 from cfmcgrady/fix-start-worker-npe.

Authored-by: Fu Chen <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
  • Loading branch information
cfmcgrady authored and waitinfuture committed Oct 24, 2023
1 parent 447c243 commit 875ad1f
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ import org.apache.celeborn.service.deploy.worker.storage.{FileWriter, MapPartiti
private[deploy] class Controller(
override val rpcEnv: RpcEnv,
val conf: CelebornConf,
val metricsSystem: MetricsSystem)
val metricsSystem: MetricsSystem,
val workerSource: WorkerSource)
extends RpcEndpoint with Logging {

var workerSource: WorkerSource = _
var storageManager: StorageManager = _
var shuffleMapperAttempts: ConcurrentHashMap[String, AtomicIntegerArray] = _
// shuffleKey -> (epoch -> CommitInfo)
Expand All @@ -65,7 +65,6 @@ private[deploy] class Controller(
val testRetryCommitFiles = conf.testRetryCommitFiles

def init(worker: Worker): Unit = {
workerSource = worker.workerSource
storageManager = worker.storageManager
shufflePartitionType = worker.shufflePartitionType
shufflePushDataTimeout = worker.shufflePushDataTimeout
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ import org.apache.celeborn.common.protocol.{MessageType, PartitionType, PbBuffer
import org.apache.celeborn.common.util.{ExceptionUtils, Utils}
import org.apache.celeborn.service.deploy.worker.storage.{ChunkStreamManager, CreditStreamManager, PartitionFilesSorter, StorageManager}

class FetchHandler(val conf: CelebornConf, val transportConf: TransportConf)
class FetchHandler(
val conf: CelebornConf,
val transportConf: TransportConf,
val workerSource: WorkerSource)
extends BaseMessageHandler with Logging {

val chunkStreamManager = new ChunkStreamManager()
Expand All @@ -52,13 +55,11 @@ class FetchHandler(val conf: CelebornConf, val transportConf: TransportConf)
conf.partitionReadBuffersMax,
conf.creditStreamThreadsPerMountpoint,
conf.readBuffersToTriggerReadMin)
var workerSource: WorkerSource = _
var storageManager: StorageManager = _
var partitionsSorter: PartitionFilesSorter = _
var registered: AtomicBoolean = new AtomicBoolean(false)

def init(worker: Worker): Unit = {
this.workerSource = worker.workerSource

workerSource.addGauge(WorkerSource.CREDIT_STREAM_COUNT) { () =>
creditStreamManager.getStreamsCount
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ import org.apache.celeborn.common.util.Utils
import org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController
import org.apache.celeborn.service.deploy.worker.storage.{FileWriter, HdfsFlusher, LocalFlusher, MapPartitionFileWriter, StorageManager}

class PushDataHandler extends BaseMessageHandler with Logging {
class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler with Logging {

private var workerSource: WorkerSource = _
private var partitionLocationInfo: WorkerPartitionLocationInfo = _
private var shuffleMapperAttempts: ConcurrentHashMap[String, AtomicIntegerArray] = _
private var shufflePartitionType: ConcurrentHashMap[String, PartitionType] = _
Expand All @@ -66,7 +65,6 @@ class PushDataHandler extends BaseMessageHandler with Logging {
private var testPushReplicaDataTimeout: Boolean = _

def init(worker: Worker): Unit = {
workerSource = worker.workerSource
partitionLocationInfo = worker.partitionLocationInfo
shufflePartitionType = worker.shufflePartitionType
shufflePushDataTimeout = worker.shufflePushDataTimeout
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,10 @@ private[celeborn] class Worker(
conf.workerCongestionControlUserInactiveIntervalMs)
}

var controller = new Controller(rpcEnv, conf, metricsSystem)
var controller = new Controller(rpcEnv, conf, metricsSystem, workerSource)
rpcEnv.setupEndpoint(RpcNameConstants.WORKER_EP, controller)

val pushDataHandler = new PushDataHandler()
val pushDataHandler = new PushDataHandler(workerSource)
val (pushServer, pushClientFactory) = {
val closeIdleConnections = conf.workerCloseIdleConnections
val numThreads = conf.workerPushIoThreads.getOrElse(storageManager.totalFlusherThread)
Expand All @@ -154,7 +154,7 @@ private[celeborn] class Worker(
transportContext.createClientFactory())
}

val replicateHandler = new PushDataHandler()
val replicateHandler = new PushDataHandler(workerSource)
private val replicateServer = {
val closeIdleConnections = conf.workerCloseIdleConnections
val numThreads =
Expand All @@ -179,7 +179,7 @@ private[celeborn] class Worker(
val numThreads = conf.workerFetchIoThreads.getOrElse(storageManager.totalFlusherThread)
val transportConf =
Utils.fromCelebornConf(conf, TransportModuleConstants.FETCH_MODULE, numThreads)
fetchHandler = new FetchHandler(conf, transportConf)
fetchHandler = new FetchHandler(conf, transportConf, workerSource)
val transportContext: TransportContext =
new TransportContext(
transportConf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,7 @@ public void testFetchSortFile() throws IOException {
TransportClient client = new TransportClient(channel, mock(TransportResponseHandler.class));
FetchHandler fetchHandler = mockFetchHandler(fileInfo);

PbStreamHandler streamHandler =
openStreamAndCheck(client, channel, fetchHandler, 0, Integer.MAX_VALUE);
PbStreamHandler streamHandler = openStreamAndCheck(client, channel, fetchHandler, 5, 10);

fetchChunkAndCheck(client, channel, fetchHandler, streamHandler);
} finally {
Expand Down Expand Up @@ -257,11 +256,11 @@ public void testDoNotDeleteOriginalFileWhenNonRangeReadWorkInProgress() throws I
}

private FetchHandler mockFetchHandler(FileInfo fileInfo) {
WorkerSource workerSource = mock(WorkerSource.class);
TransportConf transportConf =
Utils.fromCelebornConf(conf, TransportModuleConstants.FETCH_MODULE, 4);
FetchHandler fetchHandler0 = new FetchHandler(conf, transportConf);
FetchHandler fetchHandler0 = new FetchHandler(conf, transportConf, workerSource);
Worker worker = mock(Worker.class);
WorkerSource workerSource = mock(WorkerSource.class);
PartitionFilesSorter partitionFilesSorter =
new PartitionFilesSorter(MemoryManager.instance(), conf, workerSource);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ public class FileWriterSuiteJ {
private static final CelebornConf CONF = new CelebornConf();
public static final Long SPLIT_THRESHOLD = 256 * 1024 * 1024L;
public static final PartitionSplitMode splitMode = PartitionSplitMode.HARD;
public static final PartitionType partitionType = PartitionType.REDUCE;

private static File tempDir = null;
private static LocalFlusher localFlusher = null;
Expand Down Expand Up @@ -139,7 +138,7 @@ public static void beforeAll() {

public static void setupChunkServer(FileInfo info) throws IOException {
FetchHandler handler =
new FetchHandler(transConf.getCelebornConf(), transConf) {
new FetchHandler(transConf.getCelebornConf(), transConf, mock(WorkerSource.class)) {
@Override
public StorageManager storageManager() {
return new StorageManager(CONF, source);
Expand Down

0 comments on commit 875ad1f

Please sign in to comment.